From 22e70aa8787f2db1bda69a62d77950ac818eaf2b Mon Sep 17 00:00:00 2001 From: Ida Dittrich Date: Mon, 9 Mar 2026 10:53:23 +0100 Subject: [PATCH] fix: chat-playground and automation feature routes unification --- .../automation/routeFeatureAutomation.py | 498 +++++++++- .../routeFeatureChatplayground.py | 495 +++++++++- modules/routes/routeDataWorkflows.py | 857 ------------------ scripts/migrate_async_to_sync.py | 3 - 4 files changed, 974 insertions(+), 879 deletions(-) delete mode 100644 modules/routes/routeDataWorkflows.py diff --git a/modules/features/automation/routeFeatureAutomation.py b/modules/features/automation/routeFeatureAutomation.py index 1e548bba..1b7d703e 100644 --- a/modules/features/automation/routeFeatureAutomation.py +++ b/modules/features/automation/routeFeatureAutomation.py @@ -17,14 +17,16 @@ from modules.features.automation.interfaceFeatureAutomation import getInterface from modules.features.automation.mainAutomation import getAutomationServices from modules.auth import limiter, getRequestContext, RequestContext from modules.features.automation.datamodelFeatureAutomation import AutomationDefinition, AutomationTemplate -from modules.datamodels.datamodelChat import ChatWorkflow +from modules.datamodels.datamodelChat import ChatWorkflow, ChatMessage, ChatLog from modules.datamodels.datamodelPagination import PaginationParams, PaginatedResponse, PaginationMetadata, normalize_pagination_dict from modules.shared.attributeUtils import getModelAttributeDefinitions +from modules.interfaces import interfaceDbChat # Configure logger logger = logging.getLogger(__name__) -# Model attributes for AutomationDefinition +# Model attributes for AutomationDefinition and ChatWorkflow automationAttributes = getModelAttributeDefinitions(AutomationDefinition) +workflowAttributes = getModelAttributeDefinitions(ChatWorkflow) # Create router for automation endpoints router = APIRouter( @@ -231,6 +233,496 @@ def get_available_actions( ) +# ----------------------------------------------------------------------------- +# Workflow routes under /{instanceId}/workflows/ (instance-scoped, same as chatplayground) +# ----------------------------------------------------------------------------- + +def _validateAutomationInstanceAccess(instanceId: str, context: RequestContext) -> Optional[str]: + """Validate user has access to the automation feature instance. Returns mandateId.""" + from modules.interfaces.interfaceDbApp import getRootInterface + rootInterface = getRootInterface() + instance = rootInterface.getFeatureInstance(instanceId) + if not instance: + raise HTTPException(status_code=404, detail=f"Feature instance {instanceId} not found") + featureAccess = rootInterface.getFeatureAccess(str(context.user.id), instanceId) + if not featureAccess or not featureAccess.enabled: + raise HTTPException(status_code=403, detail="Access denied to this feature instance") + return str(instance.mandateId) if instance.mandateId else None + + +def _getAutomationServiceChat(context: RequestContext, featureInstanceId: str = None, mandateId: str = None): + """Get chat interface with feature instance context for workflows.""" + return interfaceDbChat.getInterface( + context.user, + mandateId=mandateId or (str(context.mandateId) if context.mandateId else None), + featureInstanceId=featureInstanceId + ) + + +@router.get("/{instanceId}/workflows", response_model=PaginatedResponse[ChatWorkflow]) +@limiter.limit("120/minute") +def get_automation_workflows( + request: Request, + instanceId: str = Path(..., description="Feature instance ID"), + pagination: Optional[str] = Query(None, description="JSON-encoded PaginationParams object"), + page: int = Query(1, ge=1, description="Page number (legacy)"), + pageSize: int = Query(20, ge=1, le=100, description="Items per page (legacy)"), + context: RequestContext = Depends(getRequestContext) +) -> PaginatedResponse[ChatWorkflow]: + """Get all workflows for this automation feature instance.""" + try: + mandateId = _validateAutomationInstanceAccess(instanceId, context) + chatInterface = _getAutomationServiceChat(context, featureInstanceId=instanceId, mandateId=mandateId) + paginationParams = None + if pagination: + try: + paginationDict = json.loads(pagination) + if paginationDict: + paginationDict = normalize_pagination_dict(paginationDict) + paginationParams = PaginationParams(**paginationDict) + except (json.JSONDecodeError, ValueError) as e: + raise HTTPException(status_code=400, detail=f"Invalid pagination parameter: {str(e)}") + else: + paginationParams = PaginationParams(page=page, pageSize=pageSize) + result = chatInterface.getWorkflows(pagination=paginationParams) + if paginationParams: + return PaginatedResponse( + items=result.items, + pagination=PaginationMetadata( + currentPage=paginationParams.page, + pageSize=paginationParams.pageSize, + totalItems=result.totalItems, + totalPages=result.totalPages, + sort=paginationParams.sort, + filters=paginationParams.filters + ) + ) + return PaginatedResponse(items=result, pagination=None) + except HTTPException: + raise + except Exception as e: + logger.error(f"Error getting automation workflows: {str(e)}", exc_info=True) + raise HTTPException(status_code=500, detail=f"Error getting workflows: {str(e)}") + + +# Workflow attributes (ChatWorkflow model) +@router.get("/{instanceId}/workflows/attributes", response_model=Dict[str, Any]) +@limiter.limit("120/minute") +def get_automation_workflow_attributes( + request: Request, + instanceId: str = Path(..., description="Feature instance ID"), + context: RequestContext = Depends(getRequestContext) +) -> Dict[str, Any]: + """Get attribute definitions for ChatWorkflow model.""" + _validateAutomationInstanceAccess(instanceId, context) + return {"attributes": workflowAttributes} + + +# Actions (must be before /{workflowId} to avoid path conflict) +@router.get("/{instanceId}/workflows/actions", response_model=Dict[str, Any]) +@limiter.limit("120/minute") +def get_automation_workflow_actions( + request: Request, + instanceId: str = Path(..., description="Feature instance ID"), + context: RequestContext = Depends(getRequestContext) +) -> Dict[str, Any]: + """Get all available workflow actions.""" + try: + mandateId = _validateAutomationInstanceAccess(instanceId, context) + services = getAutomationServices(context.user, mandateId=mandateId, featureInstanceId=instanceId) + from modules.workflows.processing.shared.methodDiscovery import discoverMethods, methods + discoverMethods(services) + allActions = [] + for methodName, methodInfo in methods.items(): + if methodName.startswith('Method'): + continue + methodInstance = methodInfo['instance'] + for actionName, actionInfo in methodInstance.actions.items(): + allActions.append({ + "module": methodInstance.name, + "actionId": f"{methodInstance.name}.{actionName}", + "name": actionName, + "description": actionInfo.get('description', ''), + "parameters": actionInfo.get('parameters', {}) + }) + return {"actions": allActions} + except HTTPException: + raise + except Exception as e: + logger.error(f"Error getting actions: {str(e)}", exc_info=True) + raise HTTPException(status_code=500, detail=f"Failed to get actions: {str(e)}") + + +@router.get("/{instanceId}/workflows/actions/{method}", response_model=Dict[str, Any]) +@limiter.limit("120/minute") +def get_automation_method_actions( + request: Request, + instanceId: str = Path(..., description="Feature instance ID"), + method: str = Path(..., description="Method name"), + context: RequestContext = Depends(getRequestContext) +) -> Dict[str, Any]: + """Get actions for a specific method.""" + try: + _validateAutomationInstanceAccess(instanceId, context) + services = getAutomationServices(context.user, mandateId=str(context.mandateId) if context.mandateId else None, featureInstanceId=instanceId) + from modules.workflows.processing.shared.methodDiscovery import discoverMethods, methods + discoverMethods(services) + methodInstance = None + for mn, mi in methods.items(): + if mi['instance'].name == method: + methodInstance = mi['instance'] + break + if not methodInstance: + raise HTTPException(status_code=404, detail=f"Method '{method}' not found") + actions = [{"actionId": f"{methodInstance.name}.{an}", "name": an, "description": ai.get('description', ''), "parameters": ai.get('parameters', {})} + for an, ai in methodInstance.actions.items()] + return {"module": methodInstance.name, "description": methodInstance.description, "actions": actions} + except HTTPException: + raise + except Exception as e: + logger.error(f"Error getting actions for {method}: {str(e)}", exc_info=True) + raise HTTPException(status_code=500, detail=f"Failed to get actions: {str(e)}") + + +@router.get("/{instanceId}/workflows/actions/{method}/{action}", response_model=Dict[str, Any]) +@limiter.limit("120/minute") +def get_automation_action_schema( + request: Request, + instanceId: str = Path(..., description="Feature instance ID"), + method: str = Path(..., description="Method name"), + action: str = Path(..., description="Action name"), + context: RequestContext = Depends(getRequestContext) +) -> Dict[str, Any]: + """Get action schema for a specific action.""" + try: + _validateAutomationInstanceAccess(instanceId, context) + services = getAutomationServices(context.user, mandateId=str(context.mandateId) if context.mandateId else None, featureInstanceId=instanceId) + from modules.workflows.processing.shared.methodDiscovery import discoverMethods, methods + discoverMethods(services) + methodInstance = None + for mn, mi in methods.items(): + if mi['instance'].name == method: + methodInstance = mi['instance'] + break + if not methodInstance: + raise HTTPException(status_code=404, detail=f"Method '{method}' not found") + if action not in methodInstance.actions: + raise HTTPException(status_code=404, detail=f"Action '{action}' not found in method '{method}'") + ai = methodInstance.actions[action] + return {"method": methodInstance.name, "action": action, "actionId": f"{methodInstance.name}.{action}", + "description": ai.get('description', ''), "parameters": ai.get('parameters', {})} + except HTTPException: + raise + except Exception as e: + logger.error(f"Error getting action schema: {str(e)}", exc_info=True) + raise HTTPException(status_code=500, detail=f"Failed to get action schema: {str(e)}") + + +@router.get("/{instanceId}/workflows/{workflowId}", response_model=ChatWorkflow) +@limiter.limit("120/minute") +def get_automation_workflow( + request: Request, + instanceId: str = Path(..., description="Feature instance ID"), + workflowId: str = Path(..., description="Workflow ID"), + context: RequestContext = Depends(getRequestContext) +) -> ChatWorkflow: + """Get workflow by ID.""" + try: + _validateAutomationInstanceAccess(instanceId, context) + chatInterface = _getAutomationServiceChat(context, featureInstanceId=instanceId) + workflow = chatInterface.getWorkflow(workflowId) + if not workflow: + raise HTTPException(status_code=404, detail="Workflow not found") + return workflow + except HTTPException: + raise + except Exception as e: + logger.error(f"Error getting workflow: {str(e)}") + raise HTTPException(status_code=500, detail=f"Failed to get workflow: {str(e)}") + + +@router.put("/{instanceId}/workflows/{workflowId}", response_model=ChatWorkflow) +@limiter.limit("120/minute") +def update_automation_workflow( + request: Request, + instanceId: str = Path(..., description="Feature instance ID"), + workflowId: str = Path(..., description="Workflow ID"), + workflowData: Dict[str, Any] = Body(...), + context: RequestContext = Depends(getRequestContext) +) -> ChatWorkflow: + """Update workflow by ID.""" + try: + _validateAutomationInstanceAccess(instanceId, context) + chatInterface = _getAutomationServiceChat(context, featureInstanceId=instanceId) + workflow = chatInterface.getWorkflow(workflowId) + if not workflow: + raise HTTPException(status_code=404, detail="Workflow not found") + if not chatInterface.checkRbacPermission(ChatWorkflow, "update", workflowId): + raise HTTPException(status_code=403, detail="You don't have permission to update this workflow") + updated = chatInterface.updateWorkflow(workflowId, workflowData) + if not updated: + raise HTTPException(status_code=500, detail="Failed to update workflow") + return updated + except HTTPException: + raise + except Exception as e: + logger.error(f"Error updating workflow: {str(e)}") + raise HTTPException(status_code=500, detail=f"Failed to update workflow: {str(e)}") + + +@router.delete("/{instanceId}/workflows/{workflowId}") +@limiter.limit("120/minute") +def delete_automation_workflow( + request: Request, + instanceId: str = Path(..., description="Feature instance ID"), + workflowId: str = Path(..., description="Workflow ID"), + context: RequestContext = Depends(getRequestContext) +) -> Dict[str, Any]: + """Delete workflow and associated data.""" + try: + _validateAutomationInstanceAccess(instanceId, context) + chatInterface = _getAutomationServiceChat(context, featureInstanceId=instanceId) + workflow = chatInterface.getWorkflow(workflowId) + if not workflow: + raise HTTPException(status_code=404, detail=f"Workflow {workflowId} not found") + if not chatInterface.checkRbacPermission(ChatWorkflow, "delete", workflowId): + raise HTTPException(status_code=403, detail="You don't have permission to delete this workflow") + success = chatInterface.deleteWorkflow(workflowId) + if not success: + raise HTTPException(status_code=500, detail="Failed to delete workflow") + return {"id": workflowId, "message": "Workflow and associated data deleted successfully"} + except HTTPException: + raise + except Exception as e: + logger.error(f"Error deleting workflow: {str(e)}", exc_info=True) + raise HTTPException(status_code=500, detail=f"Error deleting workflow: {str(e)}") + + +@router.get("/{instanceId}/workflows/{workflowId}/status", response_model=ChatWorkflow) +@limiter.limit("120/minute") +def get_automation_workflow_status( + request: Request, + instanceId: str = Path(..., description="Feature instance ID"), + workflowId: str = Path(..., description="Workflow ID"), + context: RequestContext = Depends(getRequestContext) +) -> ChatWorkflow: + """Get workflow status.""" + try: + _validateAutomationInstanceAccess(instanceId, context) + chatInterface = _getAutomationServiceChat(context, featureInstanceId=instanceId) + workflow = chatInterface.getWorkflow(workflowId) + if not workflow: + raise HTTPException(status_code=404, detail=f"Workflow {workflowId} not found") + return workflow + except HTTPException: + raise + except Exception as e: + logger.error(f"Error getting workflow status: {str(e)}", exc_info=True) + raise HTTPException(status_code=500, detail=f"Error getting workflow status: {str(e)}") + + +@router.get("/{instanceId}/workflows/{workflowId}/logs", response_model=PaginatedResponse[ChatLog]) +@limiter.limit("120/minute") +def get_automation_workflow_logs( + request: Request, + instanceId: str = Path(..., description="Feature instance ID"), + workflowId: str = Path(..., description="Workflow ID"), + logId: Optional[str] = Query(None), + pagination: Optional[str] = Query(None), + context: RequestContext = Depends(getRequestContext) +) -> PaginatedResponse[ChatLog]: + """Get workflow logs.""" + try: + _validateAutomationInstanceAccess(instanceId, context) + chatInterface = _getAutomationServiceChat(context, featureInstanceId=instanceId) + workflow = chatInterface.getWorkflow(workflowId) + if not workflow: + raise HTTPException(status_code=404, detail=f"Workflow {workflowId} not found") + paginationParams = None + if pagination: + try: + pd = json.loads(pagination) + if pd: + pd = normalize_pagination_dict(pd) + paginationParams = PaginationParams(**pd) + except (json.JSONDecodeError, ValueError) as e: + raise HTTPException(status_code=400, detail=f"Invalid pagination: {str(e)}") + result = chatInterface.getLogs(workflowId, pagination=paginationParams) + if logId: + allLogs = result.items if paginationParams else result + idx = next((i for i, log in enumerate(allLogs) if log.id == logId), -1) + if idx >= 0: + return PaginatedResponse(items=allLogs[idx + 1:], pagination=None) + if paginationParams: + return PaginatedResponse(items=result.items, pagination=PaginationMetadata( + currentPage=paginationParams.page, pageSize=paginationParams.pageSize, + totalItems=result.totalItems, totalPages=result.totalPages, + sort=paginationParams.sort, filters=paginationParams.filters)) + return PaginatedResponse(items=result, pagination=None) + except HTTPException: + raise + except Exception as e: + logger.error(f"Error getting workflow logs: {str(e)}", exc_info=True) + raise HTTPException(status_code=500, detail=f"Error getting workflow logs: {str(e)}") + + +@router.get("/{instanceId}/workflows/{workflowId}/messages", response_model=PaginatedResponse[ChatMessage]) +@limiter.limit("120/minute") +def get_automation_workflow_messages( + request: Request, + instanceId: str = Path(..., description="Feature instance ID"), + workflowId: str = Path(..., description="Workflow ID"), + messageId: Optional[str] = Query(None), + pagination: Optional[str] = Query(None), + context: RequestContext = Depends(getRequestContext) +) -> PaginatedResponse[ChatMessage]: + """Get workflow messages.""" + try: + _validateAutomationInstanceAccess(instanceId, context) + chatInterface = _getAutomationServiceChat(context, featureInstanceId=instanceId) + workflow = chatInterface.getWorkflow(workflowId) + if not workflow: + raise HTTPException(status_code=404, detail=f"Workflow {workflowId} not found") + paginationParams = None + if pagination: + try: + pd = json.loads(pagination) + if pd: + pd = normalize_pagination_dict(pd) + paginationParams = PaginationParams(**pd) + except (json.JSONDecodeError, ValueError) as e: + raise HTTPException(status_code=400, detail=f"Invalid pagination: {str(e)}") + result = chatInterface.getMessages(workflowId, pagination=paginationParams) + if messageId: + allMsgs = result.items if paginationParams else result + idx = next((i for i, m in enumerate(allMsgs) if m.id == messageId), -1) + if idx >= 0: + return PaginatedResponse(items=allMsgs[idx + 1:], pagination=None) + if paginationParams: + return PaginatedResponse(items=result.items, pagination=PaginationMetadata( + currentPage=paginationParams.page, pageSize=paginationParams.pageSize, + totalItems=result.totalItems, totalPages=result.totalPages, + sort=paginationParams.sort, filters=paginationParams.filters)) + return PaginatedResponse(items=result, pagination=None) + except HTTPException: + raise + except Exception as e: + logger.error(f"Error getting workflow messages: {str(e)}", exc_info=True) + raise HTTPException(status_code=500, detail=f"Error getting workflow messages: {str(e)}") + + +@router.delete("/{instanceId}/workflows/{workflowId}/messages/{messageId}") +@limiter.limit("120/minute") +def delete_automation_workflow_message( + request: Request, + instanceId: str = Path(..., description="Feature instance ID"), + workflowId: str = Path(..., description="Workflow ID"), + messageId: str = Path(..., description="Message ID"), + context: RequestContext = Depends(getRequestContext) +) -> Dict[str, Any]: + """Delete message from workflow.""" + try: + _validateAutomationInstanceAccess(instanceId, context) + chatInterface = _getAutomationServiceChat(context, featureInstanceId=instanceId) + workflow = chatInterface.getWorkflow(workflowId) + if not workflow: + raise HTTPException(status_code=404, detail=f"Workflow {workflowId} not found") + success = chatInterface.deleteMessage(workflowId, messageId) + if not success: + raise HTTPException(status_code=404, detail=f"Message {messageId} not found") + return {"workflowId": workflowId, "messageId": messageId, "message": "Message deleted successfully"} + except HTTPException: + raise + except Exception as e: + logger.error(f"Error deleting message: {str(e)}", exc_info=True) + raise HTTPException(status_code=500, detail=f"Error deleting message: {str(e)}") + + +@router.delete("/{instanceId}/workflows/{workflowId}/messages/{messageId}/files/{fileId}") +@limiter.limit("120/minute") +def delete_automation_file_from_message( + request: Request, + instanceId: str = Path(..., description="Feature instance ID"), + workflowId: str = Path(..., description="Workflow ID"), + messageId: str = Path(..., description="Message ID"), + fileId: str = Path(..., description="File ID"), + context: RequestContext = Depends(getRequestContext) +) -> Dict[str, Any]: + """Delete file from message.""" + try: + _validateAutomationInstanceAccess(instanceId, context) + chatInterface = _getAutomationServiceChat(context, featureInstanceId=instanceId) + workflow = chatInterface.getWorkflow(workflowId) + if not workflow: + raise HTTPException(status_code=404, detail=f"Workflow {workflowId} not found") + success = chatInterface.deleteFileFromMessage(workflowId, messageId, fileId) + if not success: + raise HTTPException(status_code=404, detail=f"File {fileId} not found") + return {"workflowId": workflowId, "messageId": messageId, "fileId": fileId, "message": "File reference deleted successfully"} + except HTTPException: + raise + except Exception as e: + logger.error(f"Error deleting file: {str(e)}", exc_info=True) + raise HTTPException(status_code=500, detail=f"Error deleting file: {str(e)}") + + +@router.get("/{instanceId}/workflows/{workflowId}/chatData") +@limiter.limit("120/minute") +def get_automation_workflow_chat_data( + request: Request, + instanceId: str = Path(..., description="Feature instance ID"), + workflowId: str = Path(..., description="Workflow ID"), + afterTimestamp: Optional[float] = Query(None), + context: RequestContext = Depends(getRequestContext) +) -> Dict[str, Any]: + """Get unified chat data for workflow.""" + try: + _validateAutomationInstanceAccess(instanceId, context) + chatInterface = _getAutomationServiceChat(context, featureInstanceId=instanceId) + workflow = chatInterface.getWorkflow(workflowId) + if not workflow: + raise HTTPException(status_code=404, detail=f"Workflow {workflowId} not found") + return chatInterface.getUnifiedChatData(workflowId, afterTimestamp) + except HTTPException: + raise + except Exception as e: + logger.error(f"Error getting chat data: {str(e)}", exc_info=True) + raise HTTPException(status_code=500, detail=f"Error getting chat data: {str(e)}") + + +@router.post("/{instanceId}/workflows/{workflowId}/stop", response_model=ChatWorkflow) +@limiter.limit("30/minute") +async def stop_automation_workflow( + request: Request, + instanceId: str = Path(..., description="Feature instance ID"), + workflowId: str = Path(..., description="Workflow ID"), + context: RequestContext = Depends(getRequestContext) +) -> ChatWorkflow: + """Stop a running automation workflow. Uses instance-scoped services.""" + try: + from modules.workflows.automation import chatStop + mandateId = _validateAutomationInstanceAccess(instanceId, context) + services = getAutomationServices( + context.user, + mandateId=mandateId, + featureInstanceId=instanceId, + ) + services.featureCode = "automation" + return await chatStop( + context.user, + workflowId, + mandateId=mandateId, + featureInstanceId=instanceId, + featureCode="automation", + services=services, + ) + except HTTPException: + raise + except Exception as e: + logger.error(f"Error stopping automation workflow: {str(e)}") + raise HTTPException(status_code=500, detail=str(e)) + + @router.get("/{automationId}", response_model=AutomationDefinition) @limiter.limit("30/minute") def get_automation( @@ -379,7 +871,7 @@ async def execute_automation_route( if not automation: raise ValueError(f"Automation {automationId} not found") - from modules.workflows.automation import executeAutomation + from modules.workflows.automation import executeAutomation, chatStop workflow = await executeAutomation(automationId, automation, context.user, services) return workflow except HTTPException: diff --git a/modules/features/chatplayground/routeFeatureChatplayground.py b/modules/features/chatplayground/routeFeatureChatplayground.py index c08f4b62..8a87084a 100644 --- a/modules/features/chatplayground/routeFeatureChatplayground.py +++ b/modules/features/chatplayground/routeFeatureChatplayground.py @@ -5,9 +5,10 @@ Chat Playground Feature Routes. Implements the endpoints for chat playground workflow management as a feature. """ +import json import logging from typing import Optional, Dict, Any -from fastapi import APIRouter, HTTPException, Depends, Body, Path, Query, Request +from fastapi import APIRouter, HTTPException, Depends, Body, Path, Query, Request, status # Import auth modules from modules.auth import limiter, getRequestContext, RequestContext @@ -16,15 +17,31 @@ from modules.auth import limiter, getRequestContext, RequestContext from modules.interfaces import interfaceDbChat # Import models -from modules.datamodels.datamodelChat import ChatWorkflow, UserInputRequest, WorkflowModeEnum +from modules.datamodels.datamodelChat import ( + ChatWorkflow, + ChatMessage, + ChatLog, + UserInputRequest, + WorkflowModeEnum, +) +from modules.datamodels.datamodelPagination import ( + PaginationParams, + PaginatedResponse, + PaginationMetadata, + normalize_pagination_dict, +) # Import workflow control functions from modules.workflows.automation import chatStart, chatStop from modules.features.chatplayground.mainChatplayground import getChatplaygroundServices +from modules.shared.attributeUtils import getModelAttributeDefinitions # Configure logger logger = logging.getLogger(__name__) +# Model attributes for ChatWorkflow (workflow attributes endpoint) +workflowAttributes = getModelAttributeDefinitions(ChatWorkflow) + # Create router for chat playground feature endpoints router = APIRouter( prefix="/api/chatplayground", @@ -130,8 +147,8 @@ async def start_workflow( ) -# Stop workflow endpoint -@router.post("/{instanceId}/{workflowId}/stop", response_model=ChatWorkflow) +# Stop workflow endpoint (under /workflows/{workflowId}/ for consistency) +@router.post("/{instanceId}/workflows/{workflowId}/stop", response_model=ChatWorkflow) @limiter.limit("120/minute") async def stop_workflow( request: Request, @@ -174,8 +191,8 @@ async def stop_workflow( ) -# Unified Chat Data Endpoint for Polling -@router.get("/{instanceId}/{workflowId}/chatData") +# Unified Chat Data Endpoint for Polling (under /workflows/{workflowId}/ for consistency) +@router.get("/{instanceId}/workflows/{workflowId}/chatData") @limiter.limit("120/minute") def get_workflow_chat_data( request: Request, @@ -218,18 +235,32 @@ def get_workflow_chat_data( ) +# Get workflow attributes (ChatWorkflow model) +@router.get("/{instanceId}/workflows/attributes", response_model=Dict[str, Any]) +@limiter.limit("120/minute") +def get_workflow_attributes( + request: Request, + instanceId: str = Path(..., description="Feature instance ID"), + context: RequestContext = Depends(getRequestContext) +) -> Dict[str, Any]: + """Get attribute definitions for ChatWorkflow model.""" + _validateInstanceAccess(instanceId, context) + return {"attributes": workflowAttributes} + + # Get workflows for this instance -@router.get("/{instanceId}/workflows") +@router.get("/{instanceId}/workflows", response_model=PaginatedResponse[ChatWorkflow]) @limiter.limit("120/minute") def get_workflows( request: Request, instanceId: str = Path(..., description="Feature instance ID"), - page: int = Query(1, ge=1, description="Page number"), - pageSize: int = Query(20, ge=1, le=100, description="Items per page"), + pagination: Optional[str] = Query(None, description="JSON-encoded PaginationParams object"), + page: int = Query(1, ge=1, description="Page number (legacy)"), + pageSize: int = Query(20, ge=1, le=100, description="Items per page (legacy)"), context: RequestContext = Depends(getRequestContext) -) -> Dict[str, Any]: +) -> PaginatedResponse[ChatWorkflow]: """ - Get all workflows for this feature instance. + Get all workflows for this feature instance with optional pagination. """ try: # Validate access @@ -238,13 +269,38 @@ def get_workflows( # Get service with feature instance context chatInterface = _getServiceChat(context, featureInstanceId=instanceId) - # Get workflows with pagination - from modules.datamodels.datamodelPagination import PaginationParams - pagination = PaginationParams(page=page, pageSize=pageSize) + # Parse pagination parameter + paginationParams = None + if pagination: + try: + paginationDict = json.loads(pagination) + if paginationDict: + paginationDict = normalize_pagination_dict(paginationDict) + paginationParams = PaginationParams(**paginationDict) + except (json.JSONDecodeError, ValueError) as e: + raise HTTPException( + status_code=400, + detail=f"Invalid pagination parameter: {str(e)}" + ) + else: + paginationParams = PaginationParams(page=page, pageSize=pageSize) - result = chatInterface.getWorkflows(pagination=pagination) + result = chatInterface.getWorkflows(pagination=paginationParams) - return result + if paginationParams: + return PaginatedResponse( + items=result.items, + pagination=PaginationMetadata( + currentPage=paginationParams.page, + pageSize=paginationParams.pageSize, + totalItems=result.totalItems, + totalPages=result.totalPages, + sort=paginationParams.sort, + filters=paginationParams.filters + ) + ) + else: + return PaginatedResponse(items=result, pagination=None) except HTTPException: raise @@ -254,3 +310,410 @@ def get_workflows( status_code=500, detail=f"Error getting workflows: {str(e)}" ) + + +# Action Discovery Endpoints (must be before /{workflowId} to avoid path conflict) +@router.get("/{instanceId}/workflows/actions", response_model=Dict[str, Any]) +@limiter.limit("120/minute") +def get_all_workflow_actions( + request: Request, + instanceId: str = Path(..., description="Feature instance ID"), + context: RequestContext = Depends(getRequestContext) +) -> Dict[str, Any]: + """Get all available workflow actions for the current user (filtered by RBAC).""" + try: + mandateId = _validateInstanceAccess(instanceId, context) + services = getChatplaygroundServices( + context.user, + mandateId=mandateId, + featureInstanceId=instanceId, + ) + from modules.workflows.processing.shared.methodDiscovery import discoverMethods, methods + discoverMethods(services) + allActions = [] + for methodName, methodInfo in methods.items(): + if methodName.startswith('Method'): + continue + methodInstance = methodInfo['instance'] + methodActions = methodInstance.actions + for actionName, actionInfo in methodActions.items(): + actionResponse = { + "module": methodInstance.name, + "actionId": f"{methodInstance.name}.{actionName}", + "name": actionName, + "description": actionInfo.get('description', ''), + "parameters": actionInfo.get('parameters', {}) + } + allActions.append(actionResponse) + return {"actions": allActions} + except HTTPException: + raise + except Exception as e: + logger.error(f"Error getting all actions: {str(e)}", exc_info=True) + raise HTTPException(status_code=500, detail=f"Failed to get actions: {str(e)}") + + +@router.get("/{instanceId}/workflows/actions/{method}", response_model=Dict[str, Any]) +@limiter.limit("120/minute") +def get_method_workflow_actions( + request: Request, + instanceId: str = Path(..., description="Feature instance ID"), + method: str = Path(..., description="Method name (e.g., 'outlook', 'sharepoint')"), + context: RequestContext = Depends(getRequestContext) +) -> Dict[str, Any]: + """Get all available actions for a specific method.""" + try: + mandateId = _validateInstanceAccess(instanceId, context) + services = getChatplaygroundServices( + context.user, + mandateId=mandateId, + featureInstanceId=instanceId, + ) + from modules.workflows.processing.shared.methodDiscovery import discoverMethods, methods + discoverMethods(services) + methodInstance = None + for methodName, methodInfo in methods.items(): + if methodInfo['instance'].name == method: + methodInstance = methodInfo['instance'] + break + if not methodInstance: + raise HTTPException(status_code=404, detail=f"Method '{method}' not found") + actions = [] + for actionName, actionInfo in methodInstance.actions.items(): + actionResponse = { + "actionId": f"{methodInstance.name}.{actionName}", + "name": actionName, + "description": actionInfo.get('description', ''), + "parameters": actionInfo.get('parameters', {}) + } + actions.append(actionResponse) + return {"module": methodInstance.name, "description": methodInstance.description, "actions": actions} + except HTTPException: + raise + except Exception as e: + logger.error(f"Error getting actions for method {method}: {str(e)}", exc_info=True) + raise HTTPException(status_code=500, detail=f"Failed to get actions for method {method}: {str(e)}") + + +@router.get("/{instanceId}/workflows/actions/{method}/{action}", response_model=Dict[str, Any]) +@limiter.limit("120/minute") +def get_action_schema( + request: Request, + instanceId: str = Path(..., description="Feature instance ID"), + method: str = Path(..., description="Method name (e.g., 'outlook', 'sharepoint')"), + action: str = Path(..., description="Action name (e.g., 'readEmails', 'uploadDocument')"), + context: RequestContext = Depends(getRequestContext) +) -> Dict[str, Any]: + """Get action schema with parameter definitions for a specific action.""" + try: + mandateId = _validateInstanceAccess(instanceId, context) + services = getChatplaygroundServices( + context.user, + mandateId=mandateId, + featureInstanceId=instanceId, + ) + from modules.workflows.processing.shared.methodDiscovery import discoverMethods, methods + discoverMethods(services) + methodInstance = None + for methodName, methodInfo in methods.items(): + if methodInfo['instance'].name == method: + methodInstance = methodInfo['instance'] + break + if not methodInstance: + raise HTTPException(status_code=404, detail=f"Method '{method}' not found") + methodActions = methodInstance.actions + if action not in methodActions: + raise HTTPException(status_code=404, detail=f"Action '{action}' not found in method '{method}'") + actionInfo = methodActions[action] + return { + "method": methodInstance.name, + "action": action, + "actionId": f"{methodInstance.name}.{action}", + "description": actionInfo.get('description', ''), + "parameters": actionInfo.get('parameters', {}) + } + except HTTPException: + raise + except Exception as e: + logger.error(f"Error getting action schema for {method}.{action}: {str(e)}", exc_info=True) + raise HTTPException(status_code=500, detail=f"Failed to get action schema: {str(e)}") + + +# Get single workflow by ID +@router.get("/{instanceId}/workflows/{workflowId}", response_model=ChatWorkflow) +@limiter.limit("120/minute") +def get_workflow( + request: Request, + instanceId: str = Path(..., description="Feature instance ID"), + workflowId: str = Path(..., description="ID of the workflow"), + context: RequestContext = Depends(getRequestContext) +) -> ChatWorkflow: + """Get workflow by ID.""" + try: + _validateInstanceAccess(instanceId, context) + chatInterface = _getServiceChat(context, featureInstanceId=instanceId) + workflow = chatInterface.getWorkflow(workflowId) + if not workflow: + raise HTTPException(status_code=404, detail="Workflow not found") + return workflow + except HTTPException: + raise + except Exception as e: + logger.error(f"Error getting workflow: {str(e)}") + raise HTTPException(status_code=500, detail=f"Failed to get workflow: {str(e)}") + + +# Update workflow +@router.put("/{instanceId}/workflows/{workflowId}", response_model=ChatWorkflow) +@limiter.limit("120/minute") +def update_workflow( + request: Request, + instanceId: str = Path(..., description="Feature instance ID"), + workflowId: str = Path(..., description="ID of the workflow to update"), + workflowData: Dict[str, Any] = Body(...), + context: RequestContext = Depends(getRequestContext) +) -> ChatWorkflow: + """Update workflow by ID.""" + try: + _validateInstanceAccess(instanceId, context) + chatInterface = _getServiceChat(context, featureInstanceId=instanceId) + workflow = chatInterface.getWorkflow(workflowId) + if not workflow: + raise HTTPException(status_code=404, detail="Workflow not found") + if not chatInterface.checkRbacPermission(ChatWorkflow, "update", workflowId): + raise HTTPException(status_code=403, detail="You don't have permission to update this workflow") + updatedWorkflow = chatInterface.updateWorkflow(workflowId, workflowData) + if not updatedWorkflow: + raise HTTPException(status_code=500, detail="Failed to update workflow") + return updatedWorkflow + except HTTPException: + raise + except Exception as e: + logger.error(f"Error updating workflow: {str(e)}") + raise HTTPException(status_code=500, detail=f"Failed to update workflow: {str(e)}") + + +# Delete workflow +@router.delete("/{instanceId}/workflows/{workflowId}") +@limiter.limit("120/minute") +def delete_workflow( + request: Request, + instanceId: str = Path(..., description="Feature instance ID"), + workflowId: str = Path(..., description="ID of the workflow to delete"), + context: RequestContext = Depends(getRequestContext) +) -> Dict[str, Any]: + """Deletes a workflow and its associated data.""" + try: + _validateInstanceAccess(instanceId, context) + chatInterface = _getServiceChat(context, featureInstanceId=instanceId) + workflow = chatInterface.getWorkflow(workflowId) + if not workflow: + raise HTTPException(status_code=404, detail=f"Workflow with ID {workflowId} not found") + if not chatInterface.checkRbacPermission(ChatWorkflow, "delete", workflowId): + raise HTTPException(status_code=403, detail="You don't have permission to delete this workflow") + success = chatInterface.deleteWorkflow(workflowId) + if not success: + raise HTTPException(status_code=500, detail="Failed to delete workflow") + return {"id": workflowId, "message": "Workflow and associated data deleted successfully"} + except HTTPException: + raise + except Exception as e: + logger.error(f"Error deleting workflow: {str(e)}", exc_info=True) + raise HTTPException(status_code=500, detail=f"Error deleting workflow: {str(e)}") + + +# Get workflow status +@router.get("/{instanceId}/workflows/{workflowId}/status", response_model=ChatWorkflow) +@limiter.limit("120/minute") +def get_workflow_status( + request: Request, + instanceId: str = Path(..., description="Feature instance ID"), + workflowId: str = Path(..., description="ID of the workflow"), + context: RequestContext = Depends(getRequestContext) +) -> ChatWorkflow: + """Get the current status of a workflow.""" + try: + _validateInstanceAccess(instanceId, context) + chatInterface = _getServiceChat(context, featureInstanceId=instanceId) + workflow = chatInterface.getWorkflow(workflowId) + if not workflow: + raise HTTPException(status_code=404, detail=f"Workflow with ID {workflowId} not found") + return workflow + except HTTPException: + raise + except Exception as e: + logger.error(f"Error getting workflow status: {str(e)}", exc_info=True) + raise HTTPException(status_code=500, detail=f"Error getting workflow status: {str(e)}") + + +# Get workflow logs +@router.get("/{instanceId}/workflows/{workflowId}/logs", response_model=PaginatedResponse[ChatLog]) +@limiter.limit("120/minute") +def get_workflow_logs( + request: Request, + instanceId: str = Path(..., description="Feature instance ID"), + workflowId: str = Path(..., description="ID of the workflow"), + logId: Optional[str] = Query(None, description="Optional log ID for selective data transfer"), + pagination: Optional[str] = Query(None, description="JSON-encoded PaginationParams object"), + context: RequestContext = Depends(getRequestContext) +) -> PaginatedResponse[ChatLog]: + """Get logs for a workflow with optional pagination.""" + try: + _validateInstanceAccess(instanceId, context) + chatInterface = _getServiceChat(context, featureInstanceId=instanceId) + workflow = chatInterface.getWorkflow(workflowId) + if not workflow: + raise HTTPException(status_code=404, detail=f"Workflow with ID {workflowId} not found") + + paginationParams = None + if pagination: + try: + paginationDict = json.loads(pagination) + if paginationDict: + paginationDict = normalize_pagination_dict(paginationDict) + paginationParams = PaginationParams(**paginationDict) + except (json.JSONDecodeError, ValueError) as e: + raise HTTPException(status_code=400, detail=f"Invalid pagination parameter: {str(e)}") + + result = chatInterface.getLogs(workflowId, pagination=paginationParams) + + if logId: + allLogs = result.items if paginationParams else result + logIndex = next((i for i, log in enumerate(allLogs) if log.id == logId), -1) + if logIndex >= 0: + filteredLogs = allLogs[logIndex + 1:] + return PaginatedResponse(items=filteredLogs, pagination=None) + + if paginationParams: + return PaginatedResponse( + items=result.items, + pagination=PaginationMetadata( + currentPage=paginationParams.page, + pageSize=paginationParams.pageSize, + totalItems=result.totalItems, + totalPages=result.totalPages, + sort=paginationParams.sort, + filters=paginationParams.filters + ) + ) + return PaginatedResponse(items=result, pagination=None) + except HTTPException: + raise + except Exception as e: + logger.error(f"Error getting workflow logs: {str(e)}", exc_info=True) + raise HTTPException(status_code=500, detail=f"Error getting workflow logs: {str(e)}") + + +# Get workflow messages +@router.get("/{instanceId}/workflows/{workflowId}/messages", response_model=PaginatedResponse[ChatMessage]) +@limiter.limit("120/minute") +def get_workflow_messages( + request: Request, + instanceId: str = Path(..., description="Feature instance ID"), + workflowId: str = Path(..., description="ID of the workflow"), + messageId: Optional[str] = Query(None, description="Optional message ID for selective data transfer"), + pagination: Optional[str] = Query(None, description="JSON-encoded PaginationParams object"), + context: RequestContext = Depends(getRequestContext) +) -> PaginatedResponse[ChatMessage]: + """Get messages for a workflow with optional pagination.""" + try: + _validateInstanceAccess(instanceId, context) + chatInterface = _getServiceChat(context, featureInstanceId=instanceId) + workflow = chatInterface.getWorkflow(workflowId) + if not workflow: + raise HTTPException(status_code=404, detail=f"Workflow with ID {workflowId} not found") + + paginationParams = None + if pagination: + try: + paginationDict = json.loads(pagination) + if paginationDict: + paginationDict = normalize_pagination_dict(paginationDict) + paginationParams = PaginationParams(**paginationDict) + except (json.JSONDecodeError, ValueError) as e: + raise HTTPException(status_code=400, detail=f"Invalid pagination parameter: {str(e)}") + + result = chatInterface.getMessages(workflowId, pagination=paginationParams) + + if messageId: + allMessages = result.items if paginationParams else result + messageIndex = next((i for i, msg in enumerate(allMessages) if msg.id == messageId), -1) + if messageIndex >= 0: + filteredMessages = allMessages[messageIndex + 1:] + return PaginatedResponse(items=filteredMessages, pagination=None) + + if paginationParams: + return PaginatedResponse( + items=result.items, + pagination=PaginationMetadata( + currentPage=paginationParams.page, + pageSize=paginationParams.pageSize, + totalItems=result.totalItems, + totalPages=result.totalPages, + sort=paginationParams.sort, + filters=paginationParams.filters + ) + ) + return PaginatedResponse(items=result, pagination=None) + except HTTPException: + raise + except Exception as e: + logger.error(f"Error getting workflow messages: {str(e)}", exc_info=True) + raise HTTPException(status_code=500, detail=f"Error getting workflow messages: {str(e)}") + + +# Delete message from workflow +@router.delete("/{instanceId}/workflows/{workflowId}/messages/{messageId}") +@limiter.limit("120/minute") +def delete_workflow_message( + request: Request, + instanceId: str = Path(..., description="Feature instance ID"), + workflowId: str = Path(..., description="ID of the workflow"), + messageId: str = Path(..., description="ID of the message to delete"), + context: RequestContext = Depends(getRequestContext) +) -> Dict[str, Any]: + """Delete a message from a workflow.""" + try: + _validateInstanceAccess(instanceId, context) + chatInterface = _getServiceChat(context, featureInstanceId=instanceId) + workflow = chatInterface.getWorkflow(workflowId) + if not workflow: + raise HTTPException(status_code=404, detail=f"Workflow with ID {workflowId} not found") + success = chatInterface.deleteMessage(workflowId, messageId) + if not success: + raise HTTPException(status_code=404, detail=f"Message with ID {messageId} not found in workflow {workflowId}") + return {"workflowId": workflowId, "messageId": messageId, "message": "Message deleted successfully"} + except HTTPException: + raise + except Exception as e: + logger.error(f"Error deleting message: {str(e)}", exc_info=True) + raise HTTPException(status_code=500, detail=f"Error deleting message: {str(e)}") + + +# Delete file from message +@router.delete("/{instanceId}/workflows/{workflowId}/messages/{messageId}/files/{fileId}") +@limiter.limit("120/minute") +def delete_file_from_message( + request: Request, + instanceId: str = Path(..., description="Feature instance ID"), + workflowId: str = Path(..., description="ID of the workflow"), + messageId: str = Path(..., description="ID of the message"), + fileId: str = Path(..., description="ID of the file to delete"), + context: RequestContext = Depends(getRequestContext) +) -> Dict[str, Any]: + """Delete a file reference from a message in a workflow.""" + try: + _validateInstanceAccess(instanceId, context) + chatInterface = _getServiceChat(context, featureInstanceId=instanceId) + workflow = chatInterface.getWorkflow(workflowId) + if not workflow: + raise HTTPException(status_code=404, detail=f"Workflow with ID {workflowId} not found") + success = chatInterface.deleteFileFromMessage(workflowId, messageId, fileId) + if not success: + raise HTTPException(status_code=404, detail=f"File with ID {fileId} not found in message {messageId}") + return {"workflowId": workflowId, "messageId": messageId, "fileId": fileId, "message": "File reference deleted successfully"} + except HTTPException: + raise + except Exception as e: + logger.error(f"Error deleting file reference: {str(e)}", exc_info=True) + raise HTTPException(status_code=500, detail=f"Error deleting file reference: {str(e)}") diff --git a/modules/routes/routeDataWorkflows.py b/modules/routes/routeDataWorkflows.py deleted file mode 100644 index be580ec9..00000000 --- a/modules/routes/routeDataWorkflows.py +++ /dev/null @@ -1,857 +0,0 @@ -# Copyright (c) 2025 Patrick Motsch -# All rights reserved. -""" -Workflow routes for the backend API. -Implements the endpoints for workflow management according to the state machine. -""" - -import logging -import json -from typing import List, Dict, Any, Optional -from fastapi import APIRouter, HTTPException, Depends, Body, Path, Query, Response, status, Request - -# Import auth modules -from modules.auth import limiter, getCurrentUser -from modules.auth.authentication import getRequestContext, RequestContext - -# Import interfaces from feature containers -import modules.interfaces.interfaceDbChat as interfaceDbChat -from modules.interfaces.interfaceDbChat import getInterface -from modules.interfaces.interfaceRbac import getRecordsetWithRBAC - -# Import models from feature containers -from modules.datamodels.datamodelChat import ( - ChatWorkflow, - ChatMessage, - ChatLog, - ChatStat, - ChatDocument -) -from modules.shared.attributeUtils import getModelAttributeDefinitions, AttributeResponse -from modules.datamodels.datamodelUam import User -from modules.datamodels.datamodelPagination import PaginationParams, PaginatedResponse, PaginationMetadata, normalize_pagination_dict - - -# Configure logger -logger = logging.getLogger(__name__) - -# Model attributes for ChatWorkflow -workflowAttributes = getModelAttributeDefinitions(ChatWorkflow) - -# Create router for workflow endpoints -router = APIRouter( - prefix="/api/workflows", - tags=["Workflow"], - responses={404: {"description": "Not found"}} -) - -def getServiceChat(ctx: RequestContext): - # Workflows are not feature-instance-specific — only mandateId is needed for RBAC. - # Passing featureInstanceId would add a SQL WHERE filter that excludes workflows - # created by other features (e.g., automation vs chatbot). - return interfaceDbChat.getInterface(ctx.user, mandateId=ctx.mandateId) - -# Consolidated endpoint for getting all workflows -@router.get("/", response_model=PaginatedResponse[ChatWorkflow]) -@limiter.limit("120/minute") -def get_workflows( - request: Request, - pagination: Optional[str] = Query(None, description="JSON-encoded PaginationParams object"), - ctx: RequestContext = Depends(getRequestContext) -) -> PaginatedResponse[ChatWorkflow]: - """ - Get workflows with optional pagination, sorting, and filtering. - - Query Parameters: - - pagination: JSON-encoded PaginationParams object, or None for no pagination - - Examples: - - GET /api/workflows/ (no pagination - returns all workflows) - - GET /api/workflows/?pagination={"page":1,"pageSize":10,"sort":[]} - """ - try: - # Parse pagination parameter - paginationParams = None - if pagination: - try: - paginationDict = json.loads(pagination) - if paginationDict: - # Normalize pagination dict (handles top-level "search" field) - paginationDict = normalize_pagination_dict(paginationDict) - paginationParams = PaginationParams(**paginationDict) - except (json.JSONDecodeError, ValueError) as e: - raise HTTPException( - status_code=400, - detail=f"Invalid pagination parameter: {str(e)}" - ) - - appInterface = getServiceChat(ctx) - result = appInterface.getWorkflows(pagination=paginationParams) - - # If pagination was requested, result is PaginatedResult with items as dicts - # If no pagination, result is List[Dict] - if paginationParams: - workflows = result.items - totalItems = result.totalItems - totalPages = result.totalPages - else: - workflows = result - totalItems = len(result) - totalPages = 1 - - if paginationParams: - return PaginatedResponse( - items=workflows, - pagination=PaginationMetadata( - currentPage=paginationParams.page, - pageSize=paginationParams.pageSize, - totalItems=totalItems, - totalPages=totalPages, - sort=paginationParams.sort, - filters=paginationParams.filters - ) - ) - else: - return PaginatedResponse( - items=workflows, - pagination=None - ) - except HTTPException: - raise - except Exception as e: - logger.error(f"Error getting workflows: {str(e)}") - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=f"Failed to get workflows: {str(e)}" - ) - -@router.get("/{workflowId}", response_model=ChatWorkflow) -@limiter.limit("120/minute") -def get_workflow( - request: Request, - workflowId: str = Path(..., description="ID of the workflow"), - ctx: RequestContext = Depends(getRequestContext) -) -> ChatWorkflow: - """Get workflow by ID""" - try: - # Get workflow interface with current user context - workflowInterface = getServiceChat(ctx) - - # Get workflow - workflow = workflowInterface.getWorkflow(workflowId) - if not workflow: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail="Workflow not found" - ) - - return workflow - - except Exception as e: - logger.error(f"Error getting workflow: {str(e)}") - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=f"Failed to get workflow: {str(e)}" - ) - -@router.put("/{workflowId}", response_model=ChatWorkflow) -@limiter.limit("120/minute") -def update_workflow( - request: Request, - workflowId: str = Path(..., description="ID of the workflow to update"), - workflowData: Dict[str, Any] = Body(...), - ctx: RequestContext = Depends(getRequestContext) -) -> ChatWorkflow: - """Update workflow by ID""" - try: - # Get workflow interface with current user context - workflowInterface = getServiceChat(ctx) - - # Get workflow using interface method to check permissions - workflow = workflowInterface.getWorkflow(workflowId) - if not workflow: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail="Workflow not found" - ) - - # Check if user has permission to update using RBAC - if not workflowInterface.checkRbacPermission(ChatWorkflow, "update", workflowId): - raise HTTPException( - status_code=status.HTTP_403_FORBIDDEN, - detail="You don't have permission to update this workflow" - ) - - # Update workflow - updatedWorkflow = workflowInterface.updateWorkflow(workflowId, workflowData) - if not updatedWorkflow: - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail="Failed to update workflow" - ) - - return updatedWorkflow - - except HTTPException: - raise - except Exception as e: - logger.error(f"Error updating workflow: {str(e)}") - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=f"Failed to update workflow: {str(e)}" - ) - -# API Endpoint for workflow status -@router.get("/{workflowId}/status", response_model=ChatWorkflow) -@limiter.limit("120/minute") -def get_workflow_status( - request: Request, - workflowId: str = Path(..., description="ID of the workflow"), - ctx: RequestContext = Depends(getRequestContext) -) -> ChatWorkflow: - """Get the current status of a workflow.""" - try: - # Get service center - interfaceDbChat = getServiceChat(ctx) - - # Retrieve workflow - workflow = interfaceDbChat.getWorkflow(workflowId) - if not workflow: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail=f"Workflow with ID {workflowId} not found" - ) - - return workflow - except HTTPException: - raise - except Exception as e: - logger.error(f"Error getting workflow status: {str(e)}", exc_info=True) - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=f"Error getting workflow status: {str(e)}" - ) - - -# API Endpoint for stopping a workflow -@router.post("/{workflowId}/stop", response_model=ChatWorkflow) -@limiter.limit("120/minute") -async def stop_workflow( - request: Request, - workflowId: str = Path(..., description="ID of the workflow to stop"), - ctx: RequestContext = Depends(getRequestContext) -) -> ChatWorkflow: - """ - Stop a running workflow. - This is a general endpoint that can be used by any feature to stop a workflow. - """ - try: - from modules.workflows.automation import chatStop - - # Get the workflow first to get mandateId - interfaceChatDb = getServiceChat(ctx) - workflow = interfaceChatDb.getWorkflow(workflowId) - - if not workflow: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail=f"Workflow with ID {workflowId} not found" - ) - - mandateId = workflow.get("mandateId") if isinstance(workflow, dict) else getattr(workflow, "mandateId", None) - - # Stop the workflow - stoppedWorkflow = await chatStop(ctx.user, workflowId, mandateId=mandateId) - - return stoppedWorkflow - - except HTTPException: - raise - except Exception as e: - logger.error(f"Error stopping workflow: {str(e)}", exc_info=True) - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=f"Error stopping workflow: {str(e)}" - ) - - -# API Endpoint for workflow logs with selective data transfer -@router.get("/{workflowId}/logs", response_model=PaginatedResponse[ChatLog]) -@limiter.limit("120/minute") -def get_workflow_logs( - request: Request, - workflowId: str = Path(..., description="ID of the workflow"), - logId: Optional[str] = Query(None, description="Optional log ID to get only newer logs (legacy selective data transfer)"), - pagination: Optional[str] = Query(None, description="JSON-encoded PaginationParams object"), - ctx: RequestContext = Depends(getRequestContext) -) -> PaginatedResponse[ChatLog]: - """ - Get logs for a workflow with optional pagination, sorting, and filtering. - Also supports legacy selective data transfer via logId parameter. - - Query Parameters: - - logId: Optional log ID for selective data transfer (returns only logs after this ID) - - pagination: JSON-encoded PaginationParams object, or None for no pagination - """ - try: - # Parse pagination parameter - paginationParams = None - if pagination: - try: - paginationDict = json.loads(pagination) - if paginationDict: - # Normalize pagination dict (handles top-level "search" field) - paginationDict = normalize_pagination_dict(paginationDict) - paginationParams = PaginationParams(**paginationDict) - except (json.JSONDecodeError, ValueError) as e: - raise HTTPException( - status_code=400, - detail=f"Invalid pagination parameter: {str(e)}" - ) - - # Get service center - interfaceDbChat = getServiceChat(ctx) - - # Verify workflow exists - workflow = interfaceDbChat.getWorkflow(workflowId) - if not workflow: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail=f"Workflow with ID {workflowId} not found" - ) - - # Get logs with optional pagination - result = interfaceDbChat.getLogs(workflowId, pagination=paginationParams) - - # Handle legacy selective data transfer if logId is provided (takes precedence over pagination) - if logId: - # If pagination was requested, result is PaginatedResult, otherwise List[ChatLog] - allLogs = result.items if paginationParams else result - - # Find the index of the log with the given ID - logIndex = next((i for i, log in enumerate(allLogs) if log.id == logId), -1) - if logIndex >= 0: - # Return only logs after the specified log - filteredLogs = allLogs[logIndex + 1:] - return PaginatedResponse( - items=filteredLogs, - pagination=None - ) - - # If pagination was requested, result is PaginatedResult - # If no pagination, result is List[ChatLog] - if paginationParams: - return PaginatedResponse( - items=result.items, - pagination=PaginationMetadata( - currentPage=paginationParams.page, - pageSize=paginationParams.pageSize, - totalItems=result.totalItems, - totalPages=result.totalPages, - sort=paginationParams.sort, - filters=paginationParams.filters - ) - ) - else: - return PaginatedResponse( - items=result, - pagination=None - ) - except HTTPException: - raise - except Exception as e: - logger.error(f"Error getting workflow logs: {str(e)}", exc_info=True) - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=f"Error getting workflow logs: {str(e)}" - ) - -# API Endpoint for workflow messages with selective data transfer -@router.get("/{workflowId}/messages", response_model=PaginatedResponse[ChatMessage]) -@limiter.limit("120/minute") -def get_workflow_messages( - request: Request, - workflowId: str = Path(..., description="ID of the workflow"), - messageId: Optional[str] = Query(None, description="Optional message ID to get only newer messages (legacy selective data transfer)"), - pagination: Optional[str] = Query(None, description="JSON-encoded PaginationParams object"), - ctx: RequestContext = Depends(getRequestContext) -) -> PaginatedResponse[ChatMessage]: - """ - Get messages for a workflow with optional pagination, sorting, and filtering. - Also supports legacy selective data transfer via messageId parameter. - - Query Parameters: - - messageId: Optional message ID for selective data transfer (returns only messages after this ID) - - pagination: JSON-encoded PaginationParams object, or None for no pagination - """ - try: - # Parse pagination parameter - paginationParams = None - if pagination: - try: - paginationDict = json.loads(pagination) - if paginationDict: - # Normalize pagination dict (handles top-level "search" field) - paginationDict = normalize_pagination_dict(paginationDict) - paginationParams = PaginationParams(**paginationDict) - except (json.JSONDecodeError, ValueError) as e: - raise HTTPException( - status_code=400, - detail=f"Invalid pagination parameter: {str(e)}" - ) - - # Get service center - interfaceDbChat = getServiceChat(ctx) - - # Verify workflow exists - workflow = interfaceDbChat.getWorkflow(workflowId) - if not workflow: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail=f"Workflow with ID {workflowId} not found" - ) - - # Get messages with optional pagination - result = interfaceDbChat.getMessages(workflowId, pagination=paginationParams) - - # Handle legacy selective data transfer if messageId is provided (takes precedence over pagination) - if messageId: - # If pagination was requested, result is PaginatedResult, otherwise List[ChatMessage] - allMessages = result.items if paginationParams else result - - # Find the index of the message with the given ID - messageIndex = next((i for i, msg in enumerate(allMessages) if msg.id == messageId), -1) - if messageIndex >= 0: - # Return only messages after the specified message - filteredMessages = allMessages[messageIndex + 1:] - return PaginatedResponse( - items=filteredMessages, - pagination=None - ) - - # If pagination was requested, result is PaginatedResult - # If no pagination, result is List[ChatMessage] - if paginationParams: - return PaginatedResponse( - items=result.items, - pagination=PaginationMetadata( - currentPage=paginationParams.page, - pageSize=paginationParams.pageSize, - totalItems=result.totalItems, - totalPages=result.totalPages, - sort=paginationParams.sort, - filters=paginationParams.filters - ) - ) - else: - return PaginatedResponse( - items=result, - pagination=None - ) - except HTTPException: - raise - except Exception as e: - logger.error(f"Error getting workflow messages: {str(e)}", exc_info=True) - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=f"Error getting workflow messages: {str(e)}" - ) - - -# State 11: Workflow Reset/Deletion endpoint -@router.delete("/{workflowId}", response_model=Dict[str, Any]) -@limiter.limit("120/minute") -def delete_workflow( - request: Request, - workflowId: str = Path(..., description="ID of the workflow to delete"), - ctx: RequestContext = Depends(getRequestContext) -) -> Dict[str, Any]: - """Deletes a workflow and its associated data.""" - try: - # Get service center - interfaceDbChat = getServiceChat(ctx) - - # Check workflow access and permission using RBAC - workflows = getRecordsetWithRBAC( - interfaceDbChat.db, - ChatWorkflow, - ctx.user, - recordFilter={"id": workflowId}, - mandateId=ctx.mandateId - ) - if not workflows: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail=f"Workflow with ID {workflowId} not found" - ) - - workflow_data = workflows[0] - - # Check if user has permission to delete using RBAC - if not interfaceDbChat.checkRbacPermission(ChatWorkflow, "delete", workflowId): - raise HTTPException( - status_code=status.HTTP_403_FORBIDDEN, - detail="You don't have permission to delete this workflow" - ) - - # Delete workflow - success = interfaceDbChat.deleteWorkflow(workflowId) - - if not success: - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail="Failed to delete workflow" - ) - - return { - "id": workflowId, - "message": "Workflow and associated data deleted successfully" - } - except HTTPException: - raise - except Exception as e: - logger.error(f"Error deleting workflow: {str(e)}", exc_info=True) - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=f"Error deleting workflow: {str(e)}" - ) - - -# Document Management Endpoints - -@router.delete("/{workflowId}/messages/{messageId}", response_model=Dict[str, Any]) -@limiter.limit("120/minute") -def delete_workflow_message( - request: Request, - workflowId: str = Path(..., description="ID of the workflow"), - messageId: str = Path(..., description="ID of the message to delete"), - ctx: RequestContext = Depends(getRequestContext) -) -> Dict[str, Any]: - """Delete a message from a workflow.""" - try: - # Get service center - interfaceDbChat = getServiceChat(ctx) - - # Verify workflow exists - workflow = interfaceDbChat.getWorkflow(workflowId) - if not workflow: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail=f"Workflow with ID {workflowId} not found" - ) - - # Delete the message - success = interfaceDbChat.deleteMessage(workflowId, messageId) - - if not success: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail=f"Message with ID {messageId} not found in workflow {workflowId}" - ) - - # Messages are stored in ChatMessage table linked by workflowId -- - # no messageIds list on ChatWorkflow to update. - - return { - "workflowId": workflowId, - "messageId": messageId, - "message": "Message deleted successfully" - } - except HTTPException: - raise - except Exception as e: - logger.error(f"Error deleting message: {str(e)}", exc_info=True) - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=f"Error deleting message: {str(e)}" - ) - -@router.delete("/{workflowId}/messages/{messageId}/files/{fileId}", response_model=Dict[str, Any]) -@limiter.limit("120/minute") -def delete_file_from_message( - request: Request, - workflowId: str = Path(..., description="ID of the workflow"), - messageId: str = Path(..., description="ID of the message"), - fileId: str = Path(..., description="ID of the file to delete"), - ctx: RequestContext = Depends(getRequestContext) -) -> Dict[str, Any]: - """Delete a file reference from a message in a workflow.""" - try: - # Get service center - interfaceDbChat = getServiceChat(ctx) - - # Verify workflow exists - workflow = interfaceDbChat.getWorkflow(workflowId) - if not workflow: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail=f"Workflow with ID {workflowId} not found" - ) - - # Delete file reference from message - success = interfaceDbChat.deleteFileFromMessage(workflowId, messageId, fileId) - - if not success: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail=f"File with ID {fileId} not found in message {messageId}" - ) - - return { - "workflowId": workflowId, - "messageId": messageId, - "fileId": fileId, - "message": "File reference deleted successfully" - } - except HTTPException: - raise - except Exception as e: - logger.error(f"Error deleting file reference: {str(e)}", exc_info=True) - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=f"Error deleting file reference: {str(e)}" - ) - - -# Action Discovery Endpoints - -@router.get("/actions", response_model=Dict[str, Any]) -@limiter.limit("120/minute") -def get_all_actions( - request: Request, - currentUser: User = Depends(getCurrentUser) -) -> Dict[str, Any]: - """ - Get all available workflow actions for the current user (filtered by RBAC). - - Returns: - - Dictionary with actions grouped by module, filtered by RBAC permissions - - Example response: - { - "actions": [ - { - "module": "outlook", - "actionId": "outlook.readEmails", - "name": "readEmails", - "description": "Read emails and metadata from a mailbox folder", - "parameters": {...} - }, - ... - ] - } - """ - try: - from modules.services import getInterface as getServices - from modules.workflows.processing.shared.methodDiscovery import discoverMethods - - # Get services and discover methods - services = getServices(currentUser, None) - discoverMethods(services) - - # Import methods catalog - from modules.workflows.processing.shared.methodDiscovery import methods - - # Collect all actions from all methods - allActions = [] - for methodName, methodInfo in methods.items(): - # Skip duplicate entries (same method stored with full and short name) - if methodName.startswith('Method'): - continue - - methodInstance = methodInfo['instance'] - methodActions = methodInstance.actions - - for actionName, actionInfo in methodActions.items(): - # Build action response - actionResponse = { - "module": methodInstance.name, - "actionId": f"{methodInstance.name}.{actionName}", - "name": actionName, - "description": actionInfo.get('description', ''), - "parameters": actionInfo.get('parameters', {}) - } - allActions.append(actionResponse) - - return { - "actions": allActions - } - - except Exception as e: - logger.error(f"Error getting all actions: {str(e)}", exc_info=True) - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=f"Failed to get actions: {str(e)}" - ) - - -@router.get("/actions/{method}", response_model=Dict[str, Any]) -@limiter.limit("120/minute") -def get_method_actions( - request: Request, - method: str = Path(..., description="Method name (e.g., 'outlook', 'sharepoint')"), - currentUser: User = Depends(getCurrentUser) -) -> Dict[str, Any]: - """ - Get all available actions for a specific method (filtered by RBAC). - - Path Parameters: - - method: Method name (e.g., 'outlook', 'sharepoint', 'ai') - - Returns: - - Dictionary with actions for the specified method - - Example response: - { - "module": "outlook", - "actions": [ - { - "actionId": "outlook.readEmails", - "name": "readEmails", - "description": "Read emails and metadata from a mailbox folder", - "parameters": {...} - }, - ... - ] - } - """ - try: - from modules.services import getInterface as getServices - from modules.workflows.processing.shared.methodDiscovery import discoverMethods - - # Get services and discover methods - services = getServices(currentUser, None) - discoverMethods(services) - - # Import methods catalog - from modules.workflows.processing.shared.methodDiscovery import methods - - # Find method instance - methodInstance = None - for methodName, methodInfo in methods.items(): - if methodInfo['instance'].name == method: - methodInstance = methodInfo['instance'] - break - - if not methodInstance: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail=f"Method '{method}' not found" - ) - - # Collect actions for this method - actions = [] - methodActions = methodInstance.actions - - for actionName, actionInfo in methodActions.items(): - actionResponse = { - "actionId": f"{methodInstance.name}.{actionName}", - "name": actionName, - "description": actionInfo.get('description', ''), - "parameters": actionInfo.get('parameters', {}) - } - actions.append(actionResponse) - - return { - "module": methodInstance.name, - "description": methodInstance.description, - "actions": actions - } - - except HTTPException: - raise - except Exception as e: - logger.error(f"Error getting actions for method {method}: {str(e)}", exc_info=True) - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=f"Failed to get actions for method {method}: {str(e)}" - ) - - -@router.get("/actions/{method}/{action}", response_model=Dict[str, Any]) -@limiter.limit("120/minute") -def get_action_schema( - request: Request, - method: str = Path(..., description="Method name (e.g., 'outlook', 'sharepoint')"), - action: str = Path(..., description="Action name (e.g., 'readEmails', 'uploadDocument')"), - currentUser: User = Depends(getCurrentUser) -) -> Dict[str, Any]: - """ - Get action schema with parameter definitions for a specific action. - - Path Parameters: - - method: Method name (e.g., 'outlook', 'sharepoint', 'ai') - - action: Action name (e.g., 'readEmails', 'uploadDocument') - - Returns: - - Action schema with full parameter definitions - - Example response: - { - "method": "outlook", - "action": "readEmails", - "actionId": "outlook.readEmails", - "description": "Read emails and metadata from a mailbox folder", - "parameters": { - "connectionReference": { - "name": "connectionReference", - "type": "str", - "frontendType": "userConnection", - "frontendOptions": "user.connection", - "required": true, - "description": "Microsoft connection label" - }, - ... - } - } - """ - try: - from modules.services import getInterface as getServices - from modules.workflows.processing.shared.methodDiscovery import discoverMethods - - # Get services and discover methods - services = getServices(currentUser, None) - discoverMethods(services) - - # Import methods catalog - from modules.workflows.processing.shared.methodDiscovery import methods - - # Find method instance - methodInstance = None - for methodName, methodInfo in methods.items(): - if methodInfo['instance'].name == method: - methodInstance = methodInfo['instance'] - break - - if not methodInstance: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail=f"Method '{method}' not found" - ) - - # Get action - methodActions = methodInstance.actions - if action not in methodActions: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail=f"Action '{action}' not found in method '{method}'" - ) - - actionInfo = methodActions[action] - - return { - "method": methodInstance.name, - "action": action, - "actionId": f"{methodInstance.name}.{action}", - "description": actionInfo.get('description', ''), - "parameters": actionInfo.get('parameters', {}) - } - - except HTTPException: - raise - except Exception as e: - logger.error(f"Error getting action schema for {method}.{action}: {str(e)}", exc_info=True) - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=f"Failed to get action schema: {str(e)}" - ) \ No newline at end of file diff --git a/scripts/migrate_async_to_sync.py b/scripts/migrate_async_to_sync.py index d0f8ef67..d2b030a4 100644 --- a/scripts/migrate_async_to_sync.py +++ b/scripts/migrate_async_to_sync.py @@ -52,9 +52,6 @@ _MUST_STAY_ASYNC: Dict[str, Set[str]] = { "modules/routes/routeDataFiles.py": { "upload_file", # await file.read() }, - "modules/routes/routeDataWorkflows.py": { - "stop_workflow", # await chatStop(...) - }, # These files have many genuinely async routes (httpx, external APIs) -- keep ALL async: "modules/routes/routeRealEstate.py": "__ALL__", "modules/routes/routeSharepoint.py": "__ALL__",