gateway/modules/routes/routeDataWorkflows.py
2026-01-22 00:23:33 +01:00

814 lines
No EOL
30 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Workflow routes for the backend API.
Implements the endpoints for workflow management according to the state machine.
"""
import logging
import json
from typing import List, Dict, Any, Optional
from fastapi import APIRouter, HTTPException, Depends, Body, Path, Query, Response, status, Request
# Import auth modules
from modules.auth import limiter, getCurrentUser
# Import interfaces
import modules.interfaces.interfaceDbChat as interfaceDbChat
from modules.interfaces.interfaceDbChat import getInterface
from modules.interfaces.interfaceRbac import getRecordsetWithRBAC
# Import models
from modules.datamodels.datamodelChat import (
ChatWorkflow,
ChatMessage,
ChatLog,
ChatStat,
ChatDocument
)
from modules.shared.attributeUtils import getModelAttributeDefinitions, AttributeResponse
from modules.datamodels.datamodelUam import User
from modules.datamodels.datamodelPagination import PaginationParams, PaginatedResponse, PaginationMetadata, normalize_pagination_dict
# Configure logger
logger = logging.getLogger(__name__)
# Model attributes for ChatWorkflow
workflowAttributes = getModelAttributeDefinitions(ChatWorkflow)
# Create router for workflow endpoints
router = APIRouter(
prefix="/api/workflows",
tags=["Workflow"],
responses={404: {"description": "Not found"}}
)
def getServiceChat(currentUser: User):
return interfaceDbChat.getInterface(currentUser)
# Consolidated endpoint for getting all workflows
@router.get("/", response_model=PaginatedResponse[ChatWorkflow])
@limiter.limit("120/minute")
async def get_workflows(
request: Request,
pagination: Optional[str] = Query(None, description="JSON-encoded PaginationParams object"),
currentUser: User = Depends(getCurrentUser)
) -> PaginatedResponse[ChatWorkflow]:
"""
Get workflows with optional pagination, sorting, and filtering.
Query Parameters:
- pagination: JSON-encoded PaginationParams object, or None for no pagination
Examples:
- GET /api/workflows/ (no pagination - returns all workflows)
- GET /api/workflows/?pagination={"page":1,"pageSize":10,"sort":[]}
"""
try:
# Parse pagination parameter
paginationParams = None
if pagination:
try:
paginationDict = json.loads(pagination)
if paginationDict:
# Normalize pagination dict (handles top-level "search" field)
paginationDict = normalize_pagination_dict(paginationDict)
paginationParams = PaginationParams(**paginationDict)
except (json.JSONDecodeError, ValueError) as e:
raise HTTPException(
status_code=400,
detail=f"Invalid pagination parameter: {str(e)}"
)
appInterface = getInterface(currentUser)
result = appInterface.getWorkflows(pagination=paginationParams)
# If pagination was requested, result is PaginatedResult with items as dicts
# If no pagination, result is List[Dict]
if paginationParams:
workflows = result.items
totalItems = result.totalItems
totalPages = result.totalPages
else:
workflows = result
totalItems = len(result)
totalPages = 1
if paginationParams:
return PaginatedResponse(
items=workflows,
pagination=PaginationMetadata(
currentPage=paginationParams.page,
pageSize=paginationParams.pageSize,
totalItems=totalItems,
totalPages=totalPages,
sort=paginationParams.sort,
filters=paginationParams.filters
)
)
else:
return PaginatedResponse(
items=workflows,
pagination=None
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting workflows: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to get workflows: {str(e)}"
)
@router.get("/{workflowId}", response_model=ChatWorkflow)
@limiter.limit("120/minute")
async def get_workflow(
request: Request,
workflowId: str = Path(..., description="ID of the workflow"),
currentUser: User = Depends(getCurrentUser)
) -> ChatWorkflow:
"""Get workflow by ID"""
try:
# Get workflow interface with current user context
workflowInterface = getInterface(currentUser)
# Get workflow
workflow = workflowInterface.getWorkflow(workflowId)
if not workflow:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Workflow not found"
)
return workflow
except Exception as e:
logger.error(f"Error getting workflow: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to get workflow: {str(e)}"
)
@router.put("/{workflowId}", response_model=ChatWorkflow)
@limiter.limit("120/minute")
async def update_workflow(
request: Request,
workflowId: str = Path(..., description="ID of the workflow to update"),
workflowData: Dict[str, Any] = Body(...),
currentUser: User = Depends(getCurrentUser)
) -> ChatWorkflow:
"""Update workflow by ID"""
try:
# Get workflow interface with current user context
workflowInterface = getInterface(currentUser)
# Get raw workflow data from database to check permissions
workflows = workflowInterface.db.getRecordset(ChatWorkflow, recordFilter={"id": workflowId})
if not workflows:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Workflow not found"
)
workflow_data = workflows[0]
# Check if user has permission to update using RBAC
if not workflowInterface.checkRbacPermission(ChatWorkflow, "update", workflowId):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You don't have permission to update this workflow"
)
# Update workflow
updatedWorkflow = workflowInterface.updateWorkflow(workflowId, workflowData)
if not updatedWorkflow:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to update workflow"
)
return updatedWorkflow
except HTTPException:
raise
except Exception as e:
logger.error(f"Error updating workflow: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to update workflow: {str(e)}"
)
# API Endpoint for workflow status
@router.get("/{workflowId}/status", response_model=ChatWorkflow)
@limiter.limit("120/minute")
async def get_workflow_status(
request: Request,
workflowId: str = Path(..., description="ID of the workflow"),
currentUser: User = Depends(getCurrentUser)
) -> ChatWorkflow:
"""Get the current status of a workflow."""
try:
# Get service center
interfaceDbChat = getServiceChat(currentUser)
# Retrieve workflow
workflow = interfaceDbChat.getWorkflow(workflowId)
if not workflow:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Workflow with ID {workflowId} not found"
)
return workflow
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting workflow status: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error getting workflow status: {str(e)}"
)
# API Endpoint for workflow logs with selective data transfer
@router.get("/{workflowId}/logs", response_model=PaginatedResponse[ChatLog])
@limiter.limit("120/minute")
async def get_workflow_logs(
request: Request,
workflowId: str = Path(..., description="ID of the workflow"),
logId: Optional[str] = Query(None, description="Optional log ID to get only newer logs (legacy selective data transfer)"),
pagination: Optional[str] = Query(None, description="JSON-encoded PaginationParams object"),
currentUser: User = Depends(getCurrentUser)
) -> PaginatedResponse[ChatLog]:
"""
Get logs for a workflow with optional pagination, sorting, and filtering.
Also supports legacy selective data transfer via logId parameter.
Query Parameters:
- logId: Optional log ID for selective data transfer (returns only logs after this ID)
- pagination: JSON-encoded PaginationParams object, or None for no pagination
"""
try:
# Parse pagination parameter
paginationParams = None
if pagination:
try:
paginationDict = json.loads(pagination)
if paginationDict:
# Normalize pagination dict (handles top-level "search" field)
paginationDict = normalize_pagination_dict(paginationDict)
paginationParams = PaginationParams(**paginationDict)
except (json.JSONDecodeError, ValueError) as e:
raise HTTPException(
status_code=400,
detail=f"Invalid pagination parameter: {str(e)}"
)
# Get service center
interfaceDbChat = getServiceChat(currentUser)
# Verify workflow exists
workflow = interfaceDbChat.getWorkflow(workflowId)
if not workflow:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Workflow with ID {workflowId} not found"
)
# Get logs with optional pagination
result = interfaceDbChat.getLogs(workflowId, pagination=paginationParams)
# Handle legacy selective data transfer if logId is provided (takes precedence over pagination)
if logId:
# If pagination was requested, result is PaginatedResult, otherwise List[ChatLog]
allLogs = result.items if paginationParams else result
# Find the index of the log with the given ID
logIndex = next((i for i, log in enumerate(allLogs) if log.id == logId), -1)
if logIndex >= 0:
# Return only logs after the specified log
filteredLogs = allLogs[logIndex + 1:]
return PaginatedResponse(
items=filteredLogs,
pagination=None
)
# If pagination was requested, result is PaginatedResult
# If no pagination, result is List[ChatLog]
if paginationParams:
return PaginatedResponse(
items=result.items,
pagination=PaginationMetadata(
currentPage=paginationParams.page,
pageSize=paginationParams.pageSize,
totalItems=result.totalItems,
totalPages=result.totalPages,
sort=paginationParams.sort,
filters=paginationParams.filters
)
)
else:
return PaginatedResponse(
items=result,
pagination=None
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting workflow logs: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error getting workflow logs: {str(e)}"
)
# API Endpoint for workflow messages with selective data transfer
@router.get("/{workflowId}/messages", response_model=PaginatedResponse[ChatMessage])
@limiter.limit("120/minute")
async def get_workflow_messages(
request: Request,
workflowId: str = Path(..., description="ID of the workflow"),
messageId: Optional[str] = Query(None, description="Optional message ID to get only newer messages (legacy selective data transfer)"),
pagination: Optional[str] = Query(None, description="JSON-encoded PaginationParams object"),
currentUser: User = Depends(getCurrentUser)
) -> PaginatedResponse[ChatMessage]:
"""
Get messages for a workflow with optional pagination, sorting, and filtering.
Also supports legacy selective data transfer via messageId parameter.
Query Parameters:
- messageId: Optional message ID for selective data transfer (returns only messages after this ID)
- pagination: JSON-encoded PaginationParams object, or None for no pagination
"""
try:
# Parse pagination parameter
paginationParams = None
if pagination:
try:
paginationDict = json.loads(pagination)
if paginationDict:
# Normalize pagination dict (handles top-level "search" field)
paginationDict = normalize_pagination_dict(paginationDict)
paginationParams = PaginationParams(**paginationDict)
except (json.JSONDecodeError, ValueError) as e:
raise HTTPException(
status_code=400,
detail=f"Invalid pagination parameter: {str(e)}"
)
# Get service center
interfaceDbChat = getServiceChat(currentUser)
# Verify workflow exists
workflow = interfaceDbChat.getWorkflow(workflowId)
if not workflow:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Workflow with ID {workflowId} not found"
)
# Get messages with optional pagination
result = interfaceDbChat.getMessages(workflowId, pagination=paginationParams)
# Handle legacy selective data transfer if messageId is provided (takes precedence over pagination)
if messageId:
# If pagination was requested, result is PaginatedResult, otherwise List[ChatMessage]
allMessages = result.items if paginationParams else result
# Find the index of the message with the given ID
messageIndex = next((i for i, msg in enumerate(allMessages) if msg.id == messageId), -1)
if messageIndex >= 0:
# Return only messages after the specified message
filteredMessages = allMessages[messageIndex + 1:]
return PaginatedResponse(
items=filteredMessages,
pagination=None
)
# If pagination was requested, result is PaginatedResult
# If no pagination, result is List[ChatMessage]
if paginationParams:
return PaginatedResponse(
items=result.items,
pagination=PaginationMetadata(
currentPage=paginationParams.page,
pageSize=paginationParams.pageSize,
totalItems=result.totalItems,
totalPages=result.totalPages,
sort=paginationParams.sort,
filters=paginationParams.filters
)
)
else:
return PaginatedResponse(
items=result,
pagination=None
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting workflow messages: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error getting workflow messages: {str(e)}"
)
# State 11: Workflow Reset/Deletion endpoint
@router.delete("/{workflowId}", response_model=Dict[str, Any])
@limiter.limit("120/minute")
async def delete_workflow(
request: Request,
workflowId: str = Path(..., description="ID of the workflow to delete"),
currentUser: User = Depends(getCurrentUser)
) -> Dict[str, Any]:
"""Deletes a workflow and its associated data."""
try:
# Get service center
interfaceDbChat = getServiceChat(currentUser)
# Check workflow access and permission using RBAC
workflows = getRecordsetWithRBAC(
interfaceDbChat.db,
ChatWorkflow,
currentUser,
recordFilter={"id": workflowId}
)
if not workflows:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Workflow with ID {workflowId} not found"
)
workflow_data = workflows[0]
# Check if user has permission to delete using RBAC
if not interfaceDbChat.checkRbacPermission(ChatWorkflow, "delete", workflowId):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You don't have permission to delete this workflow"
)
# Delete workflow
success = interfaceDbChat.deleteWorkflow(workflowId)
if not success:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to delete workflow"
)
return {
"id": workflowId,
"message": "Workflow and associated data deleted successfully"
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error deleting workflow: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error deleting workflow: {str(e)}"
)
# Document Management Endpoints
@router.delete("/{workflowId}/messages/{messageId}", response_model=Dict[str, Any])
@limiter.limit("120/minute")
async def delete_workflow_message(
request: Request,
workflowId: str = Path(..., description="ID of the workflow"),
messageId: str = Path(..., description="ID of the message to delete"),
currentUser: User = Depends(getCurrentUser)
) -> Dict[str, Any]:
"""Delete a message from a workflow."""
try:
# Get service center
interfaceDbChat = getServiceChat(currentUser)
# Verify workflow exists
workflow = interfaceDbChat.getWorkflow(workflowId)
if not workflow:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Workflow with ID {workflowId} not found"
)
# Delete the message
success = interfaceDbChat.deleteMessage(workflowId, messageId)
if not success:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Message with ID {messageId} not found in workflow {workflowId}"
)
# Update workflow's messageIds
messageIds = workflow.get("messageIds", [])
if messageId in messageIds:
messageIds.remove(messageId)
interfaceDbChat.updateWorkflow(workflowId, {"messageIds": messageIds})
return {
"workflowId": workflowId,
"messageId": messageId,
"message": "Message deleted successfully"
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error deleting message: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error deleting message: {str(e)}"
)
@router.delete("/{workflowId}/messages/{messageId}/files/{fileId}", response_model=Dict[str, Any])
@limiter.limit("120/minute")
async def delete_file_from_message(
request: Request,
workflowId: str = Path(..., description="ID of the workflow"),
messageId: str = Path(..., description="ID of the message"),
fileId: str = Path(..., description="ID of the file to delete"),
currentUser: User = Depends(getCurrentUser)
) -> Dict[str, Any]:
"""Delete a file reference from a message in a workflow."""
try:
# Get service center
interfaceDbChat = getServiceChat(currentUser)
# Verify workflow exists
workflow = interfaceDbChat.getWorkflow(workflowId)
if not workflow:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Workflow with ID {workflowId} not found"
)
# Delete file reference from message
success = interfaceDbChat.deleteFileFromMessage(workflowId, messageId, fileId)
if not success:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"File with ID {fileId} not found in message {messageId}"
)
return {
"workflowId": workflowId,
"messageId": messageId,
"fileId": fileId,
"message": "File reference deleted successfully"
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error deleting file reference: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error deleting file reference: {str(e)}"
)
# Action Discovery Endpoints
@router.get("/actions", response_model=Dict[str, Any])
@limiter.limit("120/minute")
async def get_all_actions(
request: Request,
currentUser: User = Depends(getCurrentUser)
) -> Dict[str, Any]:
"""
Get all available workflow actions for the current user (filtered by RBAC).
Returns:
- Dictionary with actions grouped by module, filtered by RBAC permissions
Example response:
{
"actions": [
{
"module": "outlook",
"actionId": "outlook.readEmails",
"name": "readEmails",
"description": "Read emails and metadata from a mailbox folder",
"parameters": {...}
},
...
]
}
"""
try:
from modules.services import getInterface as getServices
from modules.workflows.processing.shared.methodDiscovery import discoverMethods
# Get services and discover methods
services = getServices(currentUser, None)
discoverMethods(services)
# Import methods catalog
from modules.workflows.processing.shared.methodDiscovery import methods
# Collect all actions from all methods
allActions = []
for methodName, methodInfo in methods.items():
# Skip duplicate entries (same method stored with full and short name)
if methodName.startswith('Method'):
continue
methodInstance = methodInfo['instance']
methodActions = methodInstance.actions
for actionName, actionInfo in methodActions.items():
# Build action response
actionResponse = {
"module": methodInstance.name,
"actionId": f"{methodInstance.name}.{actionName}",
"name": actionName,
"description": actionInfo.get('description', ''),
"parameters": actionInfo.get('parameters', {})
}
allActions.append(actionResponse)
return {
"actions": allActions
}
except Exception as e:
logger.error(f"Error getting all actions: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to get actions: {str(e)}"
)
@router.get("/actions/{method}", response_model=Dict[str, Any])
@limiter.limit("120/minute")
async def get_method_actions(
request: Request,
method: str = Path(..., description="Method name (e.g., 'outlook', 'sharepoint')"),
currentUser: User = Depends(getCurrentUser)
) -> Dict[str, Any]:
"""
Get all available actions for a specific method (filtered by RBAC).
Path Parameters:
- method: Method name (e.g., 'outlook', 'sharepoint', 'ai')
Returns:
- Dictionary with actions for the specified method
Example response:
{
"module": "outlook",
"actions": [
{
"actionId": "outlook.readEmails",
"name": "readEmails",
"description": "Read emails and metadata from a mailbox folder",
"parameters": {...}
},
...
]
}
"""
try:
from modules.services import getInterface as getServices
from modules.workflows.processing.shared.methodDiscovery import discoverMethods
# Get services and discover methods
services = getServices(currentUser, None)
discoverMethods(services)
# Import methods catalog
from modules.workflows.processing.shared.methodDiscovery import methods
# Find method instance
methodInstance = None
for methodName, methodInfo in methods.items():
if methodInfo['instance'].name == method:
methodInstance = methodInfo['instance']
break
if not methodInstance:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Method '{method}' not found"
)
# Collect actions for this method
actions = []
methodActions = methodInstance.actions
for actionName, actionInfo in methodActions.items():
actionResponse = {
"actionId": f"{methodInstance.name}.{actionName}",
"name": actionName,
"description": actionInfo.get('description', ''),
"parameters": actionInfo.get('parameters', {})
}
actions.append(actionResponse)
return {
"module": methodInstance.name,
"description": methodInstance.description,
"actions": actions
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting actions for method {method}: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to get actions for method {method}: {str(e)}"
)
@router.get("/actions/{method}/{action}", response_model=Dict[str, Any])
@limiter.limit("120/minute")
async def get_action_schema(
request: Request,
method: str = Path(..., description="Method name (e.g., 'outlook', 'sharepoint')"),
action: str = Path(..., description="Action name (e.g., 'readEmails', 'uploadDocument')"),
currentUser: User = Depends(getCurrentUser)
) -> Dict[str, Any]:
"""
Get action schema with parameter definitions for a specific action.
Path Parameters:
- method: Method name (e.g., 'outlook', 'sharepoint', 'ai')
- action: Action name (e.g., 'readEmails', 'uploadDocument')
Returns:
- Action schema with full parameter definitions
Example response:
{
"method": "outlook",
"action": "readEmails",
"actionId": "outlook.readEmails",
"description": "Read emails and metadata from a mailbox folder",
"parameters": {
"connectionReference": {
"name": "connectionReference",
"type": "str",
"frontendType": "userConnection",
"frontendOptions": "user.connection",
"required": true,
"description": "Microsoft connection label"
},
...
}
}
"""
try:
from modules.services import getInterface as getServices
from modules.workflows.processing.shared.methodDiscovery import discoverMethods
# Get services and discover methods
services = getServices(currentUser, None)
discoverMethods(services)
# Import methods catalog
from modules.workflows.processing.shared.methodDiscovery import methods
# Find method instance
methodInstance = None
for methodName, methodInfo in methods.items():
if methodInfo['instance'].name == method:
methodInstance = methodInfo['instance']
break
if not methodInstance:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Method '{method}' not found"
)
# Get action
methodActions = methodInstance.actions
if action not in methodActions:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Action '{action}' not found in method '{method}'"
)
actionInfo = methodActions[action]
return {
"method": methodInstance.name,
"action": action,
"actionId": f"{methodInstance.name}.{action}",
"description": actionInfo.get('description', ''),
"parameters": actionInfo.get('parameters', {})
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting action schema for {method}.{action}: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to get action schema: {str(e)}"
)