494 lines
18 KiB
Python
494 lines
18 KiB
Python
"""
|
|
Workflow routes for the backend API.
|
|
Implements the endpoints for workflow management according to the state machine.
|
|
"""
|
|
|
|
import os
|
|
import json
|
|
import logging
|
|
from typing import List, Dict, Any, Optional
|
|
from fastapi import APIRouter, HTTPException, Depends, Body, Path, Query, Response, status, Request
|
|
from fastapi.responses import HTMLResponse, RedirectResponse, JSONResponse
|
|
from datetime import datetime, timedelta
|
|
|
|
# Import auth modules
|
|
from modules.security.auth import limiter, getCurrentUser
|
|
|
|
# Import interfaces
|
|
import modules.interfaces.interfaceChatObjects as interfaceChatObjects
|
|
from modules.interfaces.interfaceChatObjects import getInterface
|
|
|
|
# Import models
|
|
from modules.interfaces.interfaceChatModel import (
|
|
ChatWorkflow,
|
|
ChatMessage,
|
|
ChatLog,
|
|
ChatStat,
|
|
ChatDocument,
|
|
UserInputRequest
|
|
)
|
|
from modules.shared.attributeUtils import getModelAttributeDefinitions, AttributeResponse
|
|
from modules.interfaces.interfaceAppModel import User
|
|
|
|
# Configure logger
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Model attributes for ChatWorkflow
|
|
workflowAttributes = getModelAttributeDefinitions(ChatWorkflow)
|
|
|
|
# Create router for workflow endpoints
|
|
router = APIRouter(
|
|
prefix="/api/workflows",
|
|
tags=["Workflow"],
|
|
responses={404: {"description": "Not found"}}
|
|
)
|
|
|
|
def getServiceChat(currentUser: User):
|
|
return interfaceChatObjects.getInterface(currentUser)
|
|
|
|
# Consolidated endpoint for getting all workflows
|
|
@router.get("/", response_model=List[ChatWorkflow])
|
|
@limiter.limit("120/minute")
|
|
async def get_workflows(
|
|
request: Request,
|
|
currentUser: User = Depends(getCurrentUser)
|
|
) -> List[ChatWorkflow]:
|
|
"""Get all workflows for the current user."""
|
|
try:
|
|
appInterface = getInterface(currentUser)
|
|
workflows_data = appInterface.getAllWorkflows()
|
|
|
|
# Convert raw dictionaries to ChatWorkflow objects
|
|
workflows = []
|
|
for workflow_data in workflows_data:
|
|
try:
|
|
workflow = ChatWorkflow(
|
|
id=workflow_data["id"],
|
|
status=workflow_data.get("status", "running"),
|
|
name=workflow_data.get("name"),
|
|
currentRound=workflow_data.get("currentRound", 1),
|
|
lastActivity=workflow_data.get("lastActivity", appInterface._getCurrentTimestamp()),
|
|
startedAt=workflow_data.get("startedAt", appInterface._getCurrentTimestamp()),
|
|
logs=[ChatLog(**log) for log in workflow_data.get("logs", [])],
|
|
messages=[ChatMessage(**msg) for msg in workflow_data.get("messages", [])],
|
|
stats=ChatStat(**workflow_data.get("dataStats", {})) if workflow_data.get("dataStats") else ChatStat(
|
|
bytesSent=0,
|
|
bytesReceived=0,
|
|
tokenCount=0,
|
|
processingTime=0
|
|
),
|
|
mandateId=workflow_data.get("mandateId", currentUser.mandateId or "")
|
|
)
|
|
workflows.append(workflow)
|
|
except Exception as e:
|
|
logger.warning(f"Error converting workflow data to ChatWorkflow object: {str(e)}")
|
|
# Skip invalid workflows instead of failing the entire request
|
|
continue
|
|
|
|
return workflows
|
|
except Exception as e:
|
|
logger.error(f"Error getting workflows: {str(e)}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Failed to get workflows: {str(e)}"
|
|
)
|
|
|
|
@router.get("/{workflowId}", response_model=ChatWorkflow)
|
|
@limiter.limit("120/minute")
|
|
async def get_workflow(
|
|
request: Request,
|
|
workflowId: str = Path(..., description="ID of the workflow"),
|
|
currentUser: User = Depends(getCurrentUser)
|
|
) -> ChatWorkflow:
|
|
"""Get workflow by ID"""
|
|
try:
|
|
# Get workflow interface with current user context
|
|
workflowInterface = getInterface(currentUser)
|
|
|
|
# Get workflow
|
|
workflow = workflowInterface.getWorkflow(workflowId)
|
|
if not workflow:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="Workflow not found"
|
|
)
|
|
|
|
return workflow
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting workflow: {str(e)}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Failed to get workflow: {str(e)}"
|
|
)
|
|
|
|
@router.put("/{workflowId}", response_model=ChatWorkflow)
|
|
@limiter.limit("120/minute")
|
|
async def update_workflow(
|
|
request: Request,
|
|
workflowId: str = Path(..., description="ID of the workflow to update"),
|
|
workflowData: Dict[str, Any] = Body(...),
|
|
currentUser: User = Depends(getCurrentUser)
|
|
) -> ChatWorkflow:
|
|
"""Update workflow by ID"""
|
|
try:
|
|
# Get workflow interface with current user context
|
|
workflowInterface = getInterface(currentUser)
|
|
|
|
# Get raw workflow data from database to check permissions
|
|
workflows = workflowInterface.db.getRecordset("workflows", recordFilter={"id": workflowId})
|
|
if not workflows:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="Workflow not found"
|
|
)
|
|
|
|
workflow_data = workflows[0]
|
|
|
|
# Check if user has permission to update using the interface's permission system
|
|
if not workflowInterface._canModify("workflows", workflowId):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="You don't have permission to update this workflow"
|
|
)
|
|
|
|
# Update workflow
|
|
updatedWorkflow = workflowInterface.updateWorkflow(workflowId, workflowData)
|
|
if not updatedWorkflow:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail="Failed to update workflow"
|
|
)
|
|
|
|
return updatedWorkflow
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error updating workflow: {str(e)}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Failed to update workflow: {str(e)}"
|
|
)
|
|
|
|
# API Endpoint for workflow status
|
|
@router.get("/{workflowId}/status", response_model=ChatWorkflow)
|
|
@limiter.limit("120/minute")
|
|
async def get_workflow_status(
|
|
request: Request,
|
|
workflowId: str = Path(..., description="ID of the workflow"),
|
|
currentUser: User = Depends(getCurrentUser)
|
|
) -> ChatWorkflow:
|
|
"""Get the current status of a workflow."""
|
|
try:
|
|
# Get service center
|
|
interfaceChat = getServiceChat(currentUser)
|
|
|
|
# Retrieve workflow
|
|
workflow = interfaceChat.getWorkflow(workflowId)
|
|
if not workflow:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Workflow with ID {workflowId} not found"
|
|
)
|
|
|
|
return workflow
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error getting workflow status: {str(e)}", exc_info=True)
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Error getting workflow status: {str(e)}"
|
|
)
|
|
|
|
# API Endpoint for workflow logs with selective data transfer
|
|
@router.get("/{workflowId}/logs", response_model=List[ChatLog])
|
|
@limiter.limit("120/minute")
|
|
async def get_workflow_logs(
|
|
request: Request,
|
|
workflowId: str = Path(..., description="ID of the workflow"),
|
|
logId: Optional[str] = Query(None, description="Optional log ID to get only newer logs"),
|
|
currentUser: User = Depends(getCurrentUser)
|
|
) -> List[ChatLog]:
|
|
"""Get logs for a workflow with support for selective data transfer."""
|
|
try:
|
|
# Get service center
|
|
interfaceChat = getServiceChat(currentUser)
|
|
|
|
# Verify workflow exists
|
|
workflow = interfaceChat.getWorkflow(workflowId)
|
|
if not workflow:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Workflow with ID {workflowId} not found"
|
|
)
|
|
|
|
# Get all logs
|
|
allLogs = interfaceChat.getWorkflowLogs(workflowId)
|
|
|
|
# Apply selective data transfer if logId is provided
|
|
if logId:
|
|
# Find the index of the log with the given ID
|
|
logIndex = next((i for i, log in enumerate(allLogs) if log.id == logId), -1)
|
|
if logIndex >= 0:
|
|
# Return only logs after the specified log
|
|
return allLogs[logIndex + 1:]
|
|
|
|
return allLogs
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error getting workflow logs: {str(e)}", exc_info=True)
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Error getting workflow logs: {str(e)}"
|
|
)
|
|
|
|
# API Endpoint for workflow messages with selective data transfer
|
|
@router.get("/{workflowId}/messages", response_model=List[ChatMessage])
|
|
@limiter.limit("120/minute")
|
|
async def get_workflow_messages(
|
|
request: Request,
|
|
workflowId: str = Path(..., description="ID of the workflow"),
|
|
messageId: Optional[str] = Query(None, description="Optional message ID to get only newer messages"),
|
|
currentUser: User = Depends(getCurrentUser)
|
|
) -> List[ChatMessage]:
|
|
"""Get messages for a workflow with support for selective data transfer."""
|
|
try:
|
|
# Get service center
|
|
interfaceChat = getServiceChat(currentUser)
|
|
|
|
# Verify workflow exists
|
|
workflow = interfaceChat.getWorkflow(workflowId)
|
|
if not workflow:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Workflow with ID {workflowId} not found"
|
|
)
|
|
|
|
# Get all messages
|
|
allMessages = interfaceChat.getWorkflowMessages(workflowId)
|
|
|
|
# Apply selective data transfer if messageId is provided
|
|
if messageId:
|
|
# Find the index of the message with the given ID
|
|
messageIndex = next((i for i, msg in enumerate(allMessages) if msg.id == messageId), -1)
|
|
if messageIndex >= 0:
|
|
# Return only messages after the specified message
|
|
return allMessages[messageIndex + 1:]
|
|
|
|
return allMessages
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error getting workflow messages: {str(e)}", exc_info=True)
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Error getting workflow messages: {str(e)}"
|
|
)
|
|
|
|
# State 1: Workflow Initialization endpoint
|
|
@router.post("/start", response_model=ChatWorkflow)
|
|
@limiter.limit("120/minute")
|
|
async def start_workflow(
|
|
request: Request,
|
|
workflowId: Optional[str] = Query(None, description="Optional ID of the workflow to continue"),
|
|
userInput: UserInputRequest = Body(...),
|
|
currentUser: User = Depends(getCurrentUser)
|
|
) -> ChatWorkflow:
|
|
"""
|
|
Starts a new workflow or continues an existing one.
|
|
Corresponds to State 1 in the state machine documentation.
|
|
"""
|
|
try:
|
|
# Get service center
|
|
interfaceChat = getServiceChat(currentUser)
|
|
|
|
# Start or continue workflow using ChatObjects
|
|
workflow = await interfaceChat.workflowStart(currentUser, userInput, workflowId)
|
|
|
|
return workflow
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in start_workflow: {str(e)}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=str(e)
|
|
)
|
|
|
|
# State 8: Workflow Stopped endpoint
|
|
@router.post("/{workflowId}/stop", response_model=ChatWorkflow)
|
|
@limiter.limit("120/minute")
|
|
async def stop_workflow(
|
|
request: Request,
|
|
workflowId: str = Path(..., description="ID of the workflow to stop"),
|
|
currentUser: User = Depends(getCurrentUser)
|
|
) -> ChatWorkflow:
|
|
"""Stops a running workflow."""
|
|
try:
|
|
# Get service center
|
|
interfaceChat = getServiceChat(currentUser)
|
|
|
|
# Stop workflow using ChatObjects
|
|
workflow = await interfaceChat.workflowStop(workflowId)
|
|
|
|
return workflow
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in stop_workflow: {str(e)}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=str(e)
|
|
)
|
|
|
|
# State 11: Workflow Reset/Deletion endpoint
|
|
@router.delete("/{workflowId}", response_model=Dict[str, Any])
|
|
@limiter.limit("120/minute")
|
|
async def delete_workflow(
|
|
request: Request,
|
|
workflowId: str = Path(..., description="ID of the workflow to delete"),
|
|
currentUser: User = Depends(getCurrentUser)
|
|
) -> Dict[str, Any]:
|
|
"""Deletes a workflow and its associated data."""
|
|
try:
|
|
# Get service center
|
|
interfaceChat = getServiceChat(currentUser)
|
|
|
|
# Get raw workflow data from database to check permissions
|
|
workflows = interfaceChat.db.getRecordset("workflows", recordFilter={"id": workflowId})
|
|
if not workflows:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Workflow with ID {workflowId} not found"
|
|
)
|
|
|
|
workflow_data = workflows[0]
|
|
|
|
# Check if user has permission to delete using the interface's permission system
|
|
if not interfaceChat._canModify("workflows", workflowId):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="You don't have permission to delete this workflow"
|
|
)
|
|
|
|
# Delete workflow
|
|
success = interfaceChat.deleteWorkflow(workflowId)
|
|
|
|
if not success:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail="Failed to delete workflow"
|
|
)
|
|
|
|
return {
|
|
"id": workflowId,
|
|
"message": "Workflow and associated data deleted successfully"
|
|
}
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error deleting workflow: {str(e)}", exc_info=True)
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Error deleting workflow: {str(e)}"
|
|
)
|
|
|
|
|
|
# Document Management Endpoints
|
|
|
|
@router.delete("/{workflowId}/messages/{messageId}", response_model=Dict[str, Any])
|
|
@limiter.limit("120/minute")
|
|
async def delete_workflow_message(
|
|
request: Request,
|
|
workflowId: str = Path(..., description="ID of the workflow"),
|
|
messageId: str = Path(..., description="ID of the message to delete"),
|
|
currentUser: User = Depends(getCurrentUser)
|
|
) -> Dict[str, Any]:
|
|
"""Delete a message from a workflow."""
|
|
try:
|
|
# Get service center
|
|
interfaceChat = getServiceChat(currentUser)
|
|
|
|
# Verify workflow exists
|
|
workflow = interfaceChat.getWorkflow(workflowId)
|
|
if not workflow:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Workflow with ID {workflowId} not found"
|
|
)
|
|
|
|
# Delete the message
|
|
success = interfaceChat.deleteWorkflowMessage(workflowId, messageId)
|
|
|
|
if not success:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Message with ID {messageId} not found in workflow {workflowId}"
|
|
)
|
|
|
|
# Update workflow's messageIds
|
|
messageIds = workflow.get("messageIds", [])
|
|
if messageId in messageIds:
|
|
messageIds.remove(messageId)
|
|
interfaceChat.updateWorkflow(workflowId, {"messageIds": messageIds})
|
|
|
|
return {
|
|
"workflowId": workflowId,
|
|
"messageId": messageId,
|
|
"message": "Message deleted successfully"
|
|
}
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error deleting message: {str(e)}", exc_info=True)
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Error deleting message: {str(e)}"
|
|
)
|
|
|
|
@router.delete("/{workflowId}/messages/{messageId}/files/{fileId}", response_model=Dict[str, Any])
|
|
@limiter.limit("120/minute")
|
|
async def delete_file_from_message(
|
|
request: Request,
|
|
workflowId: str = Path(..., description="ID of the workflow"),
|
|
messageId: str = Path(..., description="ID of the message"),
|
|
fileId: str = Path(..., description="ID of the file to delete"),
|
|
currentUser: User = Depends(getCurrentUser)
|
|
) -> Dict[str, Any]:
|
|
"""Delete a file reference from a message in a workflow."""
|
|
try:
|
|
# Get service center
|
|
interfaceChat = getServiceChat(currentUser)
|
|
|
|
# Verify workflow exists
|
|
workflow = interfaceChat.getWorkflow(workflowId)
|
|
if not workflow:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Workflow with ID {workflowId} not found"
|
|
)
|
|
|
|
# Delete file reference from message
|
|
success = interfaceChat.deleteFileFromMessage(workflowId, messageId, fileId)
|
|
|
|
if not success:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"File with ID {fileId} not found in message {messageId}"
|
|
)
|
|
|
|
return {
|
|
"workflowId": workflowId,
|
|
"messageId": messageId,
|
|
"fileId": fileId,
|
|
"message": "File reference deleted successfully"
|
|
}
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error deleting file reference: {str(e)}", exc_info=True)
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Error deleting file reference: {str(e)}"
|
|
)
|