gateway/modules/routes/routeWorkflows.py
2025-08-30 09:25:24 +02:00

508 lines
19 KiB
Python

"""
Workflow routes for the backend API.
Implements the endpoints for workflow management according to the state machine.
"""
import os
import json
import logging
from typing import List, Dict, Any, Optional
from fastapi import APIRouter, HTTPException, Depends, Body, Path, Query, Response, status, Request
from fastapi.responses import HTMLResponse, RedirectResponse, JSONResponse
from datetime import datetime, timedelta
# Import auth modules
from modules.security.auth import limiter, getCurrentUser
# Import interfaces
import modules.interfaces.interfaceChatObjects as interfaceChatObjects
from modules.interfaces.interfaceChatObjects import getInterface
# Import models
from modules.interfaces.interfaceChatModel import (
ChatWorkflow,
ChatMessage,
ChatLog,
ChatStat,
ChatDocument,
UserInputRequest
)
from modules.shared.attributeUtils import getModelAttributeDefinitions, AttributeResponse
from modules.interfaces.interfaceAppModel import User
from modules.shared.timezoneUtils import get_utc_timestamp
# Configure logger
logger = logging.getLogger(__name__)
# Model attributes for ChatWorkflow
workflowAttributes = getModelAttributeDefinitions(ChatWorkflow)
# Create router for workflow endpoints
router = APIRouter(
prefix="/api/workflows",
tags=["Workflow"],
responses={404: {"description": "Not found"}}
)
def getServiceChat(currentUser: User):
return interfaceChatObjects.getInterface(currentUser)
# Consolidated endpoint for getting all workflows
@router.get("/", response_model=List[ChatWorkflow])
@limiter.limit("120/minute")
async def get_workflows(
request: Request,
currentUser: User = Depends(getCurrentUser)
) -> List[ChatWorkflow]:
"""Get all workflows for the current user."""
try:
appInterface = getInterface(currentUser)
workflows_data = appInterface.getAllWorkflows()
# Convert raw dictionaries to ChatWorkflow objects
workflows = []
for workflow_data in workflows_data:
try:
workflow = ChatWorkflow(
id=workflow_data["id"],
status=workflow_data.get("status", "running"),
name=workflow_data.get("name"),
currentRound=workflow_data.get("currentRound", 0), # Fixed: Default to 0 for new workflows
currentTask=workflow_data.get("currentTask", 0),
currentAction=workflow_data.get("currentAction", 0),
totalTasks=workflow_data.get("totalTasks", 0),
totalActions=workflow_data.get("totalActions", 0),
lastActivity=workflow_data.get("lastActivity", get_utc_timestamp()),
startedAt=workflow_data.get("startedAt", get_utc_timestamp()),
logs=[ChatLog(**log) for log in workflow_data.get("logs", [])],
messages=[ChatMessage(**msg) for msg in workflow_data.get("messages", [])],
stats=ChatStat(**workflow_data.get("dataStats", {})) if workflow_data.get("dataStats") else ChatStat(
bytesSent=0,
bytesReceived=0,
tokenCount=0,
processingTime=0
),
mandateId=workflow_data.get("mandateId", currentUser.mandateId or "")
)
workflows.append(workflow)
except Exception as e:
logger.warning(f"Error converting workflow data to ChatWorkflow object: {str(e)}")
# Skip invalid workflows instead of failing the entire request
continue
return workflows
except Exception as e:
logger.error(f"Error getting workflows: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to get workflows: {str(e)}"
)
@router.get("/{workflowId}", response_model=ChatWorkflow)
@limiter.limit("120/minute")
async def get_workflow(
request: Request,
workflowId: str = Path(..., description="ID of the workflow"),
currentUser: User = Depends(getCurrentUser)
) -> ChatWorkflow:
"""Get workflow by ID"""
try:
# Get workflow interface with current user context
workflowInterface = getInterface(currentUser)
# Get workflow
workflow = workflowInterface.getWorkflow(workflowId)
if not workflow:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Workflow not found"
)
return workflow
except Exception as e:
logger.error(f"Error getting workflow: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to get workflow: {str(e)}"
)
@router.put("/{workflowId}", response_model=ChatWorkflow)
@limiter.limit("120/minute")
async def update_workflow(
request: Request,
workflowId: str = Path(..., description="ID of the workflow to update"),
workflowData: Dict[str, Any] = Body(...),
currentUser: User = Depends(getCurrentUser)
) -> ChatWorkflow:
"""Update workflow by ID"""
try:
# Get workflow interface with current user context
workflowInterface = getInterface(currentUser)
# Get raw workflow data from database to check permissions
workflows = workflowInterface.db.getRecordset("workflows", recordFilter={"id": workflowId})
if not workflows:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Workflow not found"
)
workflow_data = workflows[0]
# Check if user has permission to update using the interface's permission system
if not workflowInterface._canModify("workflows", workflowId):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You don't have permission to update this workflow"
)
# Update workflow
updatedWorkflow = workflowInterface.updateWorkflow(workflowId, workflowData)
if not updatedWorkflow:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to update workflow"
)
return updatedWorkflow
except HTTPException:
raise
except Exception as e:
logger.error(f"Error updating workflow: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to update workflow: {str(e)}"
)
# API Endpoint for workflow status
@router.get("/{workflowId}/status", response_model=ChatWorkflow)
@limiter.limit("120/minute")
async def get_workflow_status(
request: Request,
workflowId: str = Path(..., description="ID of the workflow"),
currentUser: User = Depends(getCurrentUser)
) -> ChatWorkflow:
"""Get the current status of a workflow."""
try:
# Get service center
interfaceChat = getServiceChat(currentUser)
# Retrieve workflow
workflow = interfaceChat.getWorkflow(workflowId)
if not workflow:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Workflow with ID {workflowId} not found"
)
return workflow
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting workflow status: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error getting workflow status: {str(e)}"
)
# API Endpoint for workflow logs with selective data transfer
@router.get("/{workflowId}/logs", response_model=List[ChatLog])
@limiter.limit("120/minute")
async def get_workflow_logs(
request: Request,
workflowId: str = Path(..., description="ID of the workflow"),
logId: Optional[str] = Query(None, description="Optional log ID to get only newer logs"),
currentUser: User = Depends(getCurrentUser)
) -> List[ChatLog]:
"""Get logs for a workflow with support for selective data transfer."""
try:
# Get service center
interfaceChat = getServiceChat(currentUser)
# Verify workflow exists
workflow = interfaceChat.getWorkflow(workflowId)
if not workflow:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Workflow with ID {workflowId} not found"
)
# Get all logs
allLogs = interfaceChat.getWorkflowLogs(workflowId)
# Apply selective data transfer if logId is provided
if logId:
# Find the index of the log with the given ID
logIndex = next((i for i, log in enumerate(allLogs) if log.id == logId), -1)
if logIndex >= 0:
# Return only logs after the specified log
return allLogs[logIndex + 1:]
return allLogs
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting workflow logs: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error getting workflow logs: {str(e)}"
)
# API Endpoint for workflow messages with selective data transfer
@router.get("/{workflowId}/messages", response_model=List[ChatMessage])
@limiter.limit("120/minute")
async def get_workflow_messages(
request: Request,
workflowId: str = Path(..., description="ID of the workflow"),
messageId: Optional[str] = Query(None, description="Optional message ID to get only newer messages"),
currentUser: User = Depends(getCurrentUser)
) -> List[ChatMessage]:
"""Get messages for a workflow with support for selective data transfer."""
try:
# Get service center
interfaceChat = getServiceChat(currentUser)
# Verify workflow exists
workflow = interfaceChat.getWorkflow(workflowId)
if not workflow:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Workflow with ID {workflowId} not found"
)
# Get all messages
allMessages = interfaceChat.getWorkflowMessages(workflowId)
# Debug logging: Log attributes for each message
logger.debug(f"Retrieved {len(allMessages)} messages for workflow {workflowId}")
for i, message in enumerate(allMessages):
logger.debug(f"Message {i+1} (ID: {message.id}): {message}")
logger.debug(f" - Type: {getattr(message, 'type', 'N/A')}")
logger.debug(f" - Content: {getattr(message, 'content', 'N/A')[:100]}...")
logger.debug(f" - PublishedAt: {getattr(message, 'publishedAt', 'N/A')}")
logger.debug(f" - All attributes: {message.__dict__}")
# Apply selective data transfer if messageId is provided
if messageId:
# Find the index of the message with the given ID
messageIndex = next((i for i, msg in enumerate(allMessages) if msg.id == messageId), -1)
if messageIndex >= 0:
# Return only messages after the specified message
return allMessages[messageIndex + 1:]
return allMessages
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting workflow messages: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error getting workflow messages: {str(e)}"
)
# State 1: Workflow Initialization endpoint
@router.post("/start", response_model=ChatWorkflow)
@limiter.limit("120/minute")
async def start_workflow(
request: Request,
workflowId: Optional[str] = Query(None, description="Optional ID of the workflow to continue"),
userInput: UserInputRequest = Body(...),
currentUser: User = Depends(getCurrentUser)
) -> ChatWorkflow:
"""
Starts a new workflow or continues an existing one.
Corresponds to State 1 in the state machine documentation.
"""
try:
# Get service center
interfaceChat = getServiceChat(currentUser)
# Start or continue workflow using ChatObjects
workflow = await interfaceChat.workflowStart(currentUser, userInput, workflowId)
return workflow
except Exception as e:
logger.error(f"Error in start_workflow: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=str(e)
)
# State 8: Workflow Stopped endpoint
@router.post("/{workflowId}/stop", response_model=ChatWorkflow)
@limiter.limit("120/minute")
async def stop_workflow(
request: Request,
workflowId: str = Path(..., description="ID of the workflow to stop"),
currentUser: User = Depends(getCurrentUser)
) -> ChatWorkflow:
"""Stops a running workflow."""
try:
# Get service center
interfaceChat = getServiceChat(currentUser)
# Stop workflow using ChatObjects
workflow = await interfaceChat.workflowStop(workflowId)
return workflow
except Exception as e:
logger.error(f"Error in stop_workflow: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=str(e)
)
# State 11: Workflow Reset/Deletion endpoint
@router.delete("/{workflowId}", response_model=Dict[str, Any])
@limiter.limit("120/minute")
async def delete_workflow(
request: Request,
workflowId: str = Path(..., description="ID of the workflow to delete"),
currentUser: User = Depends(getCurrentUser)
) -> Dict[str, Any]:
"""Deletes a workflow and its associated data."""
try:
# Get service center
interfaceChat = getServiceChat(currentUser)
# Get raw workflow data from database to check permissions
workflows = interfaceChat.db.getRecordset("workflows", recordFilter={"id": workflowId})
if not workflows:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Workflow with ID {workflowId} not found"
)
workflow_data = workflows[0]
# Check if user has permission to delete using the interface's permission system
if not interfaceChat._canModify("workflows", workflowId):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You don't have permission to delete this workflow"
)
# Delete workflow
success = interfaceChat.deleteWorkflow(workflowId)
if not success:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to delete workflow"
)
return {
"id": workflowId,
"message": "Workflow and associated data deleted successfully"
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error deleting workflow: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error deleting workflow: {str(e)}"
)
# Document Management Endpoints
@router.delete("/{workflowId}/messages/{messageId}", response_model=Dict[str, Any])
@limiter.limit("120/minute")
async def delete_workflow_message(
request: Request,
workflowId: str = Path(..., description="ID of the workflow"),
messageId: str = Path(..., description="ID of the message to delete"),
currentUser: User = Depends(getCurrentUser)
) -> Dict[str, Any]:
"""Delete a message from a workflow."""
try:
# Get service center
interfaceChat = getServiceChat(currentUser)
# Verify workflow exists
workflow = interfaceChat.getWorkflow(workflowId)
if not workflow:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Workflow with ID {workflowId} not found"
)
# Delete the message
success = interfaceChat.deleteWorkflowMessage(workflowId, messageId)
if not success:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Message with ID {messageId} not found in workflow {workflowId}"
)
# Update workflow's messageIds
messageIds = workflow.get("messageIds", [])
if messageId in messageIds:
messageIds.remove(messageId)
interfaceChat.updateWorkflow(workflowId, {"messageIds": messageIds})
return {
"workflowId": workflowId,
"messageId": messageId,
"message": "Message deleted successfully"
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error deleting message: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error deleting message: {str(e)}"
)
@router.delete("/{workflowId}/messages/{messageId}/files/{fileId}", response_model=Dict[str, Any])
@limiter.limit("120/minute")
async def delete_file_from_message(
request: Request,
workflowId: str = Path(..., description="ID of the workflow"),
messageId: str = Path(..., description="ID of the message"),
fileId: str = Path(..., description="ID of the file to delete"),
currentUser: User = Depends(getCurrentUser)
) -> Dict[str, Any]:
"""Delete a file reference from a message in a workflow."""
try:
# Get service center
interfaceChat = getServiceChat(currentUser)
# Verify workflow exists
workflow = interfaceChat.getWorkflow(workflowId)
if not workflow:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Workflow with ID {workflowId} not found"
)
# Delete file reference from message
success = interfaceChat.deleteFileFromMessage(workflowId, messageId, fileId)
if not success:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"File with ID {fileId} not found in message {messageId}"
)
return {
"workflowId": workflowId,
"messageId": messageId,
"fileId": fileId,
"message": "File reference deleted successfully"
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error deleting file reference: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error deleting file reference: {str(e)}"
)