From 13322e7cf811d52e37c605cc0aad397b53ef5710 Mon Sep 17 00:00:00 2001 From: Ida Dittrich Date: Fri, 6 Mar 2026 19:21:34 +0100 Subject: [PATCH 1/9] feat: included service center in playground feature to replace old services --- .../chatplayground/mainChatplayground.py | 94 ++++++++++++++++++- .../routeFeatureChatplayground.py | 42 +++++++-- modules/workflows/automation/mainWorkflow.py | 26 +++-- 3 files changed, 142 insertions(+), 20 deletions(-) diff --git a/modules/features/chatplayground/mainChatplayground.py b/modules/features/chatplayground/mainChatplayground.py index 246236a1..d01c1c23 100644 --- a/modules/features/chatplayground/mainChatplayground.py +++ b/modules/features/chatplayground/mainChatplayground.py @@ -6,7 +6,7 @@ Handles feature initialization and RBAC catalog registration. """ import logging -from typing import Dict, List, Any +from typing import Dict, List, Any, Optional logger = logging.getLogger(__name__) @@ -48,6 +48,16 @@ RESOURCE_OBJECTS = [ }, ] +# Service requirements - services this feature needs from the service center +# Same as automation: chatplayground runs the same WorkflowManager and workflow methods +REQUIRED_SERVICES = [ + {"serviceKey": "chat", "meta": {"usage": "Workflow CRUD, messages, logs"}}, + {"serviceKey": "ai", "meta": {"usage": "AI planning for workflow execution"}}, + {"serviceKey": "utils", "meta": {"usage": "Timestamps, utilities"}}, + {"serviceKey": "billing", "meta": {"usage": "AI call billing"}}, + {"serviceKey": "extraction", "meta": {"usage": "Workflow method actions"}}, + {"serviceKey": "sharepoint", "meta": {"usage": "SharePoint actions (listDocuments, uploadDocument, etc.)"}}, +] # Template roles for this feature # Role names MUST follow convention: {featureCode}-{roleName} TEMPLATE_ROLES = [ @@ -104,6 +114,88 @@ TEMPLATE_ROLES = [ ] +def getRequiredServiceKeys() -> List[str]: + """Return list of service keys this feature requires.""" + return [s["serviceKey"] for s in REQUIRED_SERVICES] + + +def getChatplaygroundServices( + user, + mandateId: Optional[str] = None, + featureInstanceId: Optional[str] = None, + workflow=None, +) -> "_ChatplaygroundServiceHub": + """ + Get a service hub for the chatplayground feature using the service center. + Resolves only the services declared in REQUIRED_SERVICES. + No legacy fallback - service center only. + + Returns a hub-like object with: chat, ai, utils, billing, extraction, + sharepoint, rbac, interfaceDbApp, interfaceDbComponent, interfaceDbChat. + """ + from modules.serviceCenter import getService + from modules.serviceCenter.context import ServiceCenterContext + + _workflow = workflow + if _workflow is None: + _workflow = type("_Placeholder", (), {"featureCode": FEATURE_CODE})() + ctx = ServiceCenterContext( + user=user, + mandate_id=mandateId, + feature_instance_id=featureInstanceId, + workflow=_workflow, + ) + + hub = _ChatplaygroundServiceHub() + hub.user = user + hub.mandateId = mandateId + hub.featureInstanceId = featureInstanceId + hub.workflow = workflow + hub.featureCode = FEATURE_CODE + hub.allowedProviders = None + + for spec in REQUIRED_SERVICES: + key = spec["serviceKey"] + try: + svc = getService(key, ctx, legacy_hub=None) + setattr(hub, key, svc) + except Exception as e: + logger.warning(f"Could not resolve service '{key}' for chatplayground: {e}") + setattr(hub, key, None) + + # Copy interfaces from chat service for WorkflowManager compatibility + if hub.chat: + hub.interfaceDbApp = getattr(hub.chat, "interfaceDbApp", None) + hub.interfaceDbComponent = getattr(hub.chat, "interfaceDbComponent", None) + hub.interfaceDbChat = getattr(hub.chat, "interfaceDbChat", None) + + # RBAC for MethodBase action permission checks (workflow methods) + hub.rbac = getattr(hub.interfaceDbApp, "rbac", None) if hub.interfaceDbApp else None + + return hub + + +class _ChatplaygroundServiceHub: + """Lightweight hub exposing only services required by the chatplayground feature.""" + + user = None + mandateId = None + featureInstanceId = None + workflow = None + featureCode = "chatplayground" + allowedProviders = None + interfaceDbApp = None + interfaceDbComponent = None + interfaceDbChat = None + rbac = None + chat = None + ai = None + utils = None + billing = None + extraction = None + sharepoint = None + + def getFeatureDefinition() -> Dict[str, Any]: """Return the feature definition for registration.""" return { diff --git a/modules/features/chatplayground/routeFeatureChatplayground.py b/modules/features/chatplayground/routeFeatureChatplayground.py index e3787904..c08f4b62 100644 --- a/modules/features/chatplayground/routeFeatureChatplayground.py +++ b/modules/features/chatplayground/routeFeatureChatplayground.py @@ -20,6 +20,7 @@ from modules.datamodels.datamodelChat import ChatWorkflow, UserInputRequest, Wor # Import workflow control functions from modules.workflows.automation import chatStart, chatStop +from modules.features.chatplayground.mainChatplayground import getChatplaygroundServices # Configure logger logger = logging.getLogger(__name__) @@ -95,15 +96,26 @@ async def start_workflow( # Validate access and get mandate ID mandateId = _validateInstanceAccess(instanceId, context) - # Start or continue workflow - workflow = await chatStart( - context.user, - userInput, - workflowMode, - workflowId, + # Get chatplayground services from service center (not automation) + services = getChatplaygroundServices( + context.user, mandateId=mandateId, featureInstanceId=instanceId, - featureCode='chatplayground' + ) + services.featureCode = 'chatplayground' + if hasattr(userInput, 'allowedProviders') and userInput.allowedProviders: + services.allowedProviders = userInput.allowedProviders + + # Start or continue workflow + workflow = await chatStart( + context.user, + userInput, + workflowMode, + workflowId, + mandateId=mandateId, + featureInstanceId=instanceId, + featureCode='chatplayground', + services=services, ) return workflow @@ -132,12 +144,22 @@ async def stop_workflow( # Validate access and get mandate ID mandateId = _validateInstanceAccess(instanceId, context) + # Get chatplayground services from service center (not automation) + services = getChatplaygroundServices( + context.user, + mandateId=mandateId, + featureInstanceId=instanceId, + ) + services.featureCode = 'chatplayground' + # Stop workflow (pass featureInstanceId for proper RBAC filtering) workflow = await chatStop( - context.user, - workflowId, + context.user, + workflowId, mandateId=mandateId, - featureInstanceId=instanceId + featureInstanceId=instanceId, + featureCode='chatplayground', + services=services, ) return workflow diff --git a/modules/workflows/automation/mainWorkflow.py b/modules/workflows/automation/mainWorkflow.py index 1296f3fe..625384d7 100644 --- a/modules/workflows/automation/mainWorkflow.py +++ b/modules/workflows/automation/mainWorkflow.py @@ -24,7 +24,7 @@ from .subAutomationUtils import parseScheduleToCron, planToPrompt, replacePlaceh logger = logging.getLogger(__name__) -async def chatStart(currentUser: User, userInput: UserInputRequest, workflowMode: WorkflowModeEnum, workflowId: Optional[str] = None, mandateId: Optional[str] = None, featureInstanceId: Optional[str] = None, featureCode: Optional[str] = None) -> ChatWorkflow: +async def chatStart(currentUser: User, userInput: UserInputRequest, workflowMode: WorkflowModeEnum, workflowId: Optional[str] = None, mandateId: Optional[str] = None, featureInstanceId: Optional[str] = None, featureCode: Optional[str] = None, services=None) -> ChatWorkflow: """ Starts a new chat or continues an existing one, then launches processing asynchronously. @@ -36,9 +36,11 @@ async def chatStart(currentUser: User, userInput: UserInputRequest, workflowMode mandateId: Mandate ID (required for billing) featureInstanceId: Feature instance ID (required for billing) featureCode: Feature code (e.g., 'chatplayground', 'automation') + services: Pre-built service hub from the calling feature (required). Each feature must pass its own services. """ + if services is None: + raise ValueError("services is required: each feature must pass its own service hub (e.g. getChatplaygroundServices, getAutomationServices)") try: - services = getAutomationServices(currentUser, mandateId=mandateId, featureInstanceId=featureInstanceId) # Store allowedProviders in services context for model selection if hasattr(userInput, 'allowedProviders') and userInput.allowedProviders: @@ -56,10 +58,11 @@ async def chatStart(currentUser: User, userInput: UserInputRequest, workflowMode logger.error(f"Error starting chat: {str(e)}") raise -async def chatStop(currentUser: User, workflowId: str, mandateId: Optional[str] = None, featureInstanceId: Optional[str] = None, featureCode: Optional[str] = None) -> ChatWorkflow: - """Stops a running chat.""" +async def chatStop(currentUser: User, workflowId: str, mandateId: Optional[str] = None, featureInstanceId: Optional[str] = None, featureCode: Optional[str] = None, services=None) -> ChatWorkflow: + """Stops a running chat. Caller must pass services from the owning feature.""" + if services is None: + raise ValueError("services is required: each feature must pass its own service hub (e.g. getChatplaygroundServices, getAutomationServices)") try: - services = getAutomationServices(currentUser, mandateId=mandateId, featureInstanceId=featureInstanceId) if featureCode: services.featureCode = featureCode workflowManager = WorkflowManager(services) @@ -146,6 +149,12 @@ async def executeAutomation(automationId: str, automation, creatorUser: User, se # 3. Start workflow using chatStart with creator's context # mandateId and featureInstanceId come from the automation definition + # Each feature must pass its own services - no fallback + creatorServices = getAutomationServices( + creatorUser, + mandateId=automationMandateId, + featureInstanceId=automationFeatureInstanceId, + ) workflow = await chatStart( currentUser=creatorUser, userInput=userInput, @@ -153,7 +162,8 @@ async def executeAutomation(automationId: str, automation, creatorUser: User, se workflowId=None, mandateId=automationMandateId, featureInstanceId=automationFeatureInstanceId, - featureCode='automation' + featureCode='automation', + services=creatorServices, ) executionLog["workflowId"] = workflow.id @@ -161,9 +171,7 @@ async def executeAutomation(automationId: str, automation, creatorUser: User, se executionLog["messages"].append(f"Workflow {workflow.id} started successfully") logger.info(f"Started workflow {workflow.id} with plan containing {len(plan.get('tasks', []))} tasks (plan embedded in userInput)") - # Set workflow name with "automated" prefix — use creatorUser's Services - # (services parameter is eventServices with eventUser context, must use creatorUser context) - creatorServices = getAutomationServices(creatorUser, mandateId=automationMandateId, featureInstanceId=automationFeatureInstanceId) + # Set workflow name with "automated" prefix — use creatorServices from chatStart automationLabel = automation.label or "Unknown Automation" workflowName = f"automated: {automationLabel}" creatorServices.interfaceDbChat.updateWorkflow(workflow.id, {"name": workflowName}) From d59c6c8576d0ec58ccc8ec02065a1f045a106142 Mon Sep 17 00:00:00 2001 From: Ida Dittrich Date: Mon, 9 Mar 2026 10:31:02 +0100 Subject: [PATCH 2/9] fix: duplicate automation definition for pydantic model --- .../automation/interfaceFeatureAutomation.py | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/modules/features/automation/interfaceFeatureAutomation.py b/modules/features/automation/interfaceFeatureAutomation.py index 79fbd8ad..3b20ca3d 100644 --- a/modules/features/automation/interfaceFeatureAutomation.py +++ b/modules/features/automation/interfaceFeatureAutomation.py @@ -791,20 +791,23 @@ class AutomationObjects: if not self.checkRbacPermission(AutomationDefinition, "create"): raise PermissionError("No permission to create definitions") + # getAutomationDefinition returns Pydantic model; convert to dict for .get() access + existing_data = existing.model_dump() if hasattr(existing, "model_dump") else existing + # Build duplicate data duplicateData = { "id": str(uuid.uuid4()), - "mandateId": existing.get("mandateId"), - "featureInstanceId": existing.get("featureInstanceId"), - "label": f"{existing.get('label', '')} (Kopie)", - "schedule": existing.get("schedule", ""), - "template": existing.get("template", ""), - "placeholders": existing.get("placeholders", {}), + "mandateId": existing_data.get("mandateId"), + "featureInstanceId": existing_data.get("featureInstanceId"), + "label": f"{existing_data.get('label', '')} (Kopie)", + "schedule": existing_data.get("schedule", ""), + "template": existing_data.get("template", ""), + "placeholders": existing_data.get("placeholders", {}), "active": False, "eventId": None, "status": None, "executionLogs": [], - "allowedProviders": existing.get("allowedProviders", []), + "allowedProviders": existing_data.get("allowedProviders", []), } # Ensure database connector has correct userId context From d7636d72c4cab1c8a6e5a9f31b767bf78dc049e7 Mon Sep 17 00:00:00 2001 From: Ida Dittrich Date: Mon, 9 Mar 2026 10:49:15 +0100 Subject: [PATCH 3/9] bug:service center --- modules/features/automation/mainAutomation.py | 6 +++++- modules/features/chatplayground/mainChatplayground.py | 1 + .../serviceCenter/services/serviceAi/mainServiceAi.py | 9 ++++++--- .../services/serviceGeneration/mainServiceGeneration.py | 9 ++++++--- modules/workflows/processing/workflowProcessor.py | 9 +++++++-- 5 files changed, 25 insertions(+), 9 deletions(-) diff --git a/modules/features/automation/mainAutomation.py b/modules/features/automation/mainAutomation.py index a7e381e1..aead6767 100644 --- a/modules/features/automation/mainAutomation.py +++ b/modules/features/automation/mainAutomation.py @@ -129,6 +129,7 @@ REQUIRED_SERVICES = [ {"serviceKey": "billing", "meta": {"usage": "AI call billing"}}, {"serviceKey": "extraction", "meta": {"usage": "Workflow method actions"}}, {"serviceKey": "sharepoint", "meta": {"usage": "SharePoint actions (listDocuments, uploadDocument, etc.)"}}, + {"serviceKey": "generation", "meta": {"usage": "Action completion messages, document creation from results"}}, ] @@ -158,7 +159,8 @@ def getAutomationServices( _workflow = workflow if _workflow is None: - _workflow = type("_Placeholder", (), {"featureCode": FEATURE_CODE})() + # Placeholder must have 'id' and 'workflowMode' to avoid AttributeError when services use context.workflow + _workflow = type("_Placeholder", (), {"featureCode": FEATURE_CODE, "id": None, "workflowMode": None})() ctx = ServiceCenterContext( user=user, mandate_id=mandateId, @@ -170,6 +172,7 @@ def getAutomationServices( hub.user = user hub.mandateId = mandateId hub.featureInstanceId = featureInstanceId + hub._service_context = ctx # Store context so workflow updates propagate to services hub.workflow = workflow hub.featureCode = FEATURE_CODE hub.allowedProviders = None @@ -206,6 +209,7 @@ class _AutomationServiceHub: user = None mandateId = None featureInstanceId = None + _service_context = None # ServiceCenterContext; when workflow is set, context.workflow is updated workflow = None featureCode = "automation" allowedProviders = None diff --git a/modules/features/chatplayground/mainChatplayground.py b/modules/features/chatplayground/mainChatplayground.py index d01c1c23..269f74d5 100644 --- a/modules/features/chatplayground/mainChatplayground.py +++ b/modules/features/chatplayground/mainChatplayground.py @@ -57,6 +57,7 @@ REQUIRED_SERVICES = [ {"serviceKey": "billing", "meta": {"usage": "AI call billing"}}, {"serviceKey": "extraction", "meta": {"usage": "Workflow method actions"}}, {"serviceKey": "sharepoint", "meta": {"usage": "SharePoint actions (listDocuments, uploadDocument, etc.)"}}, + {"serviceKey": "generation", "meta": {"usage": "Action completion messages, document creation from results"}}, ] # Template roles for this feature # Role names MUST follow convention: {featureCode}-{roleName} diff --git a/modules/serviceCenter/services/serviceAi/mainServiceAi.py b/modules/serviceCenter/services/serviceAi/mainServiceAi.py index 08f3ccb3..eff5671f 100644 --- a/modules/serviceCenter/services/serviceAi/mainServiceAi.py +++ b/modules/serviceCenter/services/serviceAi/mainServiceAi.py @@ -31,15 +31,18 @@ AiCallRequest.model_rebuild() class _ServicesAdapter: - """Adapter providing Services-like interface from (context, get_service).""" - + """Adapter providing Services-like interface from (context, get_service). + Workflow is read from context dynamically so propagation updates are visible.""" def __init__(self, context, get_service: Callable[[str], Any]): self._context = context self._get_service = get_service self.user = context.user self.mandateId = context.mandate_id self.featureInstanceId = context.feature_instance_id - self.workflow = context.workflow + + @property + def workflow(self): + return self._context.workflow @property def chat(self): diff --git a/modules/serviceCenter/services/serviceGeneration/mainServiceGeneration.py b/modules/serviceCenter/services/serviceGeneration/mainServiceGeneration.py index db0d2346..8fccd4e4 100644 --- a/modules/serviceCenter/services/serviceGeneration/mainServiceGeneration.py +++ b/modules/serviceCenter/services/serviceGeneration/mainServiceGeneration.py @@ -19,19 +19,22 @@ logger = logging.getLogger(__name__) class _ServicesAdapter: - """Adapter providing Services-like interface from (context, get_service).""" - + """Adapter providing Services-like interface from (context, get_service). + Workflow is read from context dynamically so propagation updates are visible.""" def __init__(self, context, get_service: Callable[[str], Any]): self._context = context self._get_service = get_service self.user = context.user self.mandateId = context.mandate_id self.featureInstanceId = context.feature_instance_id - self.workflow = context.workflow chat = get_service("chat") self.interfaceDbComponent = chat.interfaceDbComponent self.interfaceDbChat = chat.interfaceDbChat + @property + def workflow(self): + return self._context.workflow + @property def chat(self): return self._get_service("chat") diff --git a/modules/workflows/processing/workflowProcessor.py b/modules/workflows/processing/workflowProcessor.py index 72f45cce..3f83379b 100644 --- a/modules/workflows/processing/workflowProcessor.py +++ b/modules/workflows/processing/workflowProcessor.py @@ -27,9 +27,14 @@ class WorkflowProcessor: def __init__(self, services): self.services = services - self.mode = self._createMode(services.workflow.workflowMode) - self.mode.processor = self # So mode can call persistTaskResult for per-action chaining self.workflow = services.workflow + if not self.workflow: + raise ValueError("WorkflowProcessor requires services.workflow (set by WorkflowManager before processing)") + workflowMode = getattr(self.workflow, 'workflowMode', None) + if not workflowMode: + raise ValueError("WorkflowProcessor requires services.workflow.workflowMode") + self.mode = self._createMode(workflowMode) + self.mode.processor = self # So mode can call persistTaskResult for per-action chaining self.workflowExecOperationId = None # Will be set by workflowManager for task hierarchy def _createMode(self, workflowMode: WorkflowModeEnum) -> BaseMode: From 22e70aa8787f2db1bda69a62d77950ac818eaf2b Mon Sep 17 00:00:00 2001 From: Ida Dittrich Date: Mon, 9 Mar 2026 10:53:23 +0100 Subject: [PATCH 4/9] fix: chat-playground and automation feature routes unification --- .../automation/routeFeatureAutomation.py | 498 +++++++++- .../routeFeatureChatplayground.py | 495 +++++++++- modules/routes/routeDataWorkflows.py | 857 ------------------ scripts/migrate_async_to_sync.py | 3 - 4 files changed, 974 insertions(+), 879 deletions(-) delete mode 100644 modules/routes/routeDataWorkflows.py diff --git a/modules/features/automation/routeFeatureAutomation.py b/modules/features/automation/routeFeatureAutomation.py index 1e548bba..1b7d703e 100644 --- a/modules/features/automation/routeFeatureAutomation.py +++ b/modules/features/automation/routeFeatureAutomation.py @@ -17,14 +17,16 @@ from modules.features.automation.interfaceFeatureAutomation import getInterface from modules.features.automation.mainAutomation import getAutomationServices from modules.auth import limiter, getRequestContext, RequestContext from modules.features.automation.datamodelFeatureAutomation import AutomationDefinition, AutomationTemplate -from modules.datamodels.datamodelChat import ChatWorkflow +from modules.datamodels.datamodelChat import ChatWorkflow, ChatMessage, ChatLog from modules.datamodels.datamodelPagination import PaginationParams, PaginatedResponse, PaginationMetadata, normalize_pagination_dict from modules.shared.attributeUtils import getModelAttributeDefinitions +from modules.interfaces import interfaceDbChat # Configure logger logger = logging.getLogger(__name__) -# Model attributes for AutomationDefinition +# Model attributes for AutomationDefinition and ChatWorkflow automationAttributes = getModelAttributeDefinitions(AutomationDefinition) +workflowAttributes = getModelAttributeDefinitions(ChatWorkflow) # Create router for automation endpoints router = APIRouter( @@ -231,6 +233,496 @@ def get_available_actions( ) +# ----------------------------------------------------------------------------- +# Workflow routes under /{instanceId}/workflows/ (instance-scoped, same as chatplayground) +# ----------------------------------------------------------------------------- + +def _validateAutomationInstanceAccess(instanceId: str, context: RequestContext) -> Optional[str]: + """Validate user has access to the automation feature instance. Returns mandateId.""" + from modules.interfaces.interfaceDbApp import getRootInterface + rootInterface = getRootInterface() + instance = rootInterface.getFeatureInstance(instanceId) + if not instance: + raise HTTPException(status_code=404, detail=f"Feature instance {instanceId} not found") + featureAccess = rootInterface.getFeatureAccess(str(context.user.id), instanceId) + if not featureAccess or not featureAccess.enabled: + raise HTTPException(status_code=403, detail="Access denied to this feature instance") + return str(instance.mandateId) if instance.mandateId else None + + +def _getAutomationServiceChat(context: RequestContext, featureInstanceId: str = None, mandateId: str = None): + """Get chat interface with feature instance context for workflows.""" + return interfaceDbChat.getInterface( + context.user, + mandateId=mandateId or (str(context.mandateId) if context.mandateId else None), + featureInstanceId=featureInstanceId + ) + + +@router.get("/{instanceId}/workflows", response_model=PaginatedResponse[ChatWorkflow]) +@limiter.limit("120/minute") +def get_automation_workflows( + request: Request, + instanceId: str = Path(..., description="Feature instance ID"), + pagination: Optional[str] = Query(None, description="JSON-encoded PaginationParams object"), + page: int = Query(1, ge=1, description="Page number (legacy)"), + pageSize: int = Query(20, ge=1, le=100, description="Items per page (legacy)"), + context: RequestContext = Depends(getRequestContext) +) -> PaginatedResponse[ChatWorkflow]: + """Get all workflows for this automation feature instance.""" + try: + mandateId = _validateAutomationInstanceAccess(instanceId, context) + chatInterface = _getAutomationServiceChat(context, featureInstanceId=instanceId, mandateId=mandateId) + paginationParams = None + if pagination: + try: + paginationDict = json.loads(pagination) + if paginationDict: + paginationDict = normalize_pagination_dict(paginationDict) + paginationParams = PaginationParams(**paginationDict) + except (json.JSONDecodeError, ValueError) as e: + raise HTTPException(status_code=400, detail=f"Invalid pagination parameter: {str(e)}") + else: + paginationParams = PaginationParams(page=page, pageSize=pageSize) + result = chatInterface.getWorkflows(pagination=paginationParams) + if paginationParams: + return PaginatedResponse( + items=result.items, + pagination=PaginationMetadata( + currentPage=paginationParams.page, + pageSize=paginationParams.pageSize, + totalItems=result.totalItems, + totalPages=result.totalPages, + sort=paginationParams.sort, + filters=paginationParams.filters + ) + ) + return PaginatedResponse(items=result, pagination=None) + except HTTPException: + raise + except Exception as e: + logger.error(f"Error getting automation workflows: {str(e)}", exc_info=True) + raise HTTPException(status_code=500, detail=f"Error getting workflows: {str(e)}") + + +# Workflow attributes (ChatWorkflow model) +@router.get("/{instanceId}/workflows/attributes", response_model=Dict[str, Any]) +@limiter.limit("120/minute") +def get_automation_workflow_attributes( + request: Request, + instanceId: str = Path(..., description="Feature instance ID"), + context: RequestContext = Depends(getRequestContext) +) -> Dict[str, Any]: + """Get attribute definitions for ChatWorkflow model.""" + _validateAutomationInstanceAccess(instanceId, context) + return {"attributes": workflowAttributes} + + +# Actions (must be before /{workflowId} to avoid path conflict) +@router.get("/{instanceId}/workflows/actions", response_model=Dict[str, Any]) +@limiter.limit("120/minute") +def get_automation_workflow_actions( + request: Request, + instanceId: str = Path(..., description="Feature instance ID"), + context: RequestContext = Depends(getRequestContext) +) -> Dict[str, Any]: + """Get all available workflow actions.""" + try: + mandateId = _validateAutomationInstanceAccess(instanceId, context) + services = getAutomationServices(context.user, mandateId=mandateId, featureInstanceId=instanceId) + from modules.workflows.processing.shared.methodDiscovery import discoverMethods, methods + discoverMethods(services) + allActions = [] + for methodName, methodInfo in methods.items(): + if methodName.startswith('Method'): + continue + methodInstance = methodInfo['instance'] + for actionName, actionInfo in methodInstance.actions.items(): + allActions.append({ + "module": methodInstance.name, + "actionId": f"{methodInstance.name}.{actionName}", + "name": actionName, + "description": actionInfo.get('description', ''), + "parameters": actionInfo.get('parameters', {}) + }) + return {"actions": allActions} + except HTTPException: + raise + except Exception as e: + logger.error(f"Error getting actions: {str(e)}", exc_info=True) + raise HTTPException(status_code=500, detail=f"Failed to get actions: {str(e)}") + + +@router.get("/{instanceId}/workflows/actions/{method}", response_model=Dict[str, Any]) +@limiter.limit("120/minute") +def get_automation_method_actions( + request: Request, + instanceId: str = Path(..., description="Feature instance ID"), + method: str = Path(..., description="Method name"), + context: RequestContext = Depends(getRequestContext) +) -> Dict[str, Any]: + """Get actions for a specific method.""" + try: + _validateAutomationInstanceAccess(instanceId, context) + services = getAutomationServices(context.user, mandateId=str(context.mandateId) if context.mandateId else None, featureInstanceId=instanceId) + from modules.workflows.processing.shared.methodDiscovery import discoverMethods, methods + discoverMethods(services) + methodInstance = None + for mn, mi in methods.items(): + if mi['instance'].name == method: + methodInstance = mi['instance'] + break + if not methodInstance: + raise HTTPException(status_code=404, detail=f"Method '{method}' not found") + actions = [{"actionId": f"{methodInstance.name}.{an}", "name": an, "description": ai.get('description', ''), "parameters": ai.get('parameters', {})} + for an, ai in methodInstance.actions.items()] + return {"module": methodInstance.name, "description": methodInstance.description, "actions": actions} + except HTTPException: + raise + except Exception as e: + logger.error(f"Error getting actions for {method}: {str(e)}", exc_info=True) + raise HTTPException(status_code=500, detail=f"Failed to get actions: {str(e)}") + + +@router.get("/{instanceId}/workflows/actions/{method}/{action}", response_model=Dict[str, Any]) +@limiter.limit("120/minute") +def get_automation_action_schema( + request: Request, + instanceId: str = Path(..., description="Feature instance ID"), + method: str = Path(..., description="Method name"), + action: str = Path(..., description="Action name"), + context: RequestContext = Depends(getRequestContext) +) -> Dict[str, Any]: + """Get action schema for a specific action.""" + try: + _validateAutomationInstanceAccess(instanceId, context) + services = getAutomationServices(context.user, mandateId=str(context.mandateId) if context.mandateId else None, featureInstanceId=instanceId) + from modules.workflows.processing.shared.methodDiscovery import discoverMethods, methods + discoverMethods(services) + methodInstance = None + for mn, mi in methods.items(): + if mi['instance'].name == method: + methodInstance = mi['instance'] + break + if not methodInstance: + raise HTTPException(status_code=404, detail=f"Method '{method}' not found") + if action not in methodInstance.actions: + raise HTTPException(status_code=404, detail=f"Action '{action}' not found in method '{method}'") + ai = methodInstance.actions[action] + return {"method": methodInstance.name, "action": action, "actionId": f"{methodInstance.name}.{action}", + "description": ai.get('description', ''), "parameters": ai.get('parameters', {})} + except HTTPException: + raise + except Exception as e: + logger.error(f"Error getting action schema: {str(e)}", exc_info=True) + raise HTTPException(status_code=500, detail=f"Failed to get action schema: {str(e)}") + + +@router.get("/{instanceId}/workflows/{workflowId}", response_model=ChatWorkflow) +@limiter.limit("120/minute") +def get_automation_workflow( + request: Request, + instanceId: str = Path(..., description="Feature instance ID"), + workflowId: str = Path(..., description="Workflow ID"), + context: RequestContext = Depends(getRequestContext) +) -> ChatWorkflow: + """Get workflow by ID.""" + try: + _validateAutomationInstanceAccess(instanceId, context) + chatInterface = _getAutomationServiceChat(context, featureInstanceId=instanceId) + workflow = chatInterface.getWorkflow(workflowId) + if not workflow: + raise HTTPException(status_code=404, detail="Workflow not found") + return workflow + except HTTPException: + raise + except Exception as e: + logger.error(f"Error getting workflow: {str(e)}") + raise HTTPException(status_code=500, detail=f"Failed to get workflow: {str(e)}") + + +@router.put("/{instanceId}/workflows/{workflowId}", response_model=ChatWorkflow) +@limiter.limit("120/minute") +def update_automation_workflow( + request: Request, + instanceId: str = Path(..., description="Feature instance ID"), + workflowId: str = Path(..., description="Workflow ID"), + workflowData: Dict[str, Any] = Body(...), + context: RequestContext = Depends(getRequestContext) +) -> ChatWorkflow: + """Update workflow by ID.""" + try: + _validateAutomationInstanceAccess(instanceId, context) + chatInterface = _getAutomationServiceChat(context, featureInstanceId=instanceId) + workflow = chatInterface.getWorkflow(workflowId) + if not workflow: + raise HTTPException(status_code=404, detail="Workflow not found") + if not chatInterface.checkRbacPermission(ChatWorkflow, "update", workflowId): + raise HTTPException(status_code=403, detail="You don't have permission to update this workflow") + updated = chatInterface.updateWorkflow(workflowId, workflowData) + if not updated: + raise HTTPException(status_code=500, detail="Failed to update workflow") + return updated + except HTTPException: + raise + except Exception as e: + logger.error(f"Error updating workflow: {str(e)}") + raise HTTPException(status_code=500, detail=f"Failed to update workflow: {str(e)}") + + +@router.delete("/{instanceId}/workflows/{workflowId}") +@limiter.limit("120/minute") +def delete_automation_workflow( + request: Request, + instanceId: str = Path(..., description="Feature instance ID"), + workflowId: str = Path(..., description="Workflow ID"), + context: RequestContext = Depends(getRequestContext) +) -> Dict[str, Any]: + """Delete workflow and associated data.""" + try: + _validateAutomationInstanceAccess(instanceId, context) + chatInterface = _getAutomationServiceChat(context, featureInstanceId=instanceId) + workflow = chatInterface.getWorkflow(workflowId) + if not workflow: + raise HTTPException(status_code=404, detail=f"Workflow {workflowId} not found") + if not chatInterface.checkRbacPermission(ChatWorkflow, "delete", workflowId): + raise HTTPException(status_code=403, detail="You don't have permission to delete this workflow") + success = chatInterface.deleteWorkflow(workflowId) + if not success: + raise HTTPException(status_code=500, detail="Failed to delete workflow") + return {"id": workflowId, "message": "Workflow and associated data deleted successfully"} + except HTTPException: + raise + except Exception as e: + logger.error(f"Error deleting workflow: {str(e)}", exc_info=True) + raise HTTPException(status_code=500, detail=f"Error deleting workflow: {str(e)}") + + +@router.get("/{instanceId}/workflows/{workflowId}/status", response_model=ChatWorkflow) +@limiter.limit("120/minute") +def get_automation_workflow_status( + request: Request, + instanceId: str = Path(..., description="Feature instance ID"), + workflowId: str = Path(..., description="Workflow ID"), + context: RequestContext = Depends(getRequestContext) +) -> ChatWorkflow: + """Get workflow status.""" + try: + _validateAutomationInstanceAccess(instanceId, context) + chatInterface = _getAutomationServiceChat(context, featureInstanceId=instanceId) + workflow = chatInterface.getWorkflow(workflowId) + if not workflow: + raise HTTPException(status_code=404, detail=f"Workflow {workflowId} not found") + return workflow + except HTTPException: + raise + except Exception as e: + logger.error(f"Error getting workflow status: {str(e)}", exc_info=True) + raise HTTPException(status_code=500, detail=f"Error getting workflow status: {str(e)}") + + +@router.get("/{instanceId}/workflows/{workflowId}/logs", response_model=PaginatedResponse[ChatLog]) +@limiter.limit("120/minute") +def get_automation_workflow_logs( + request: Request, + instanceId: str = Path(..., description="Feature instance ID"), + workflowId: str = Path(..., description="Workflow ID"), + logId: Optional[str] = Query(None), + pagination: Optional[str] = Query(None), + context: RequestContext = Depends(getRequestContext) +) -> PaginatedResponse[ChatLog]: + """Get workflow logs.""" + try: + _validateAutomationInstanceAccess(instanceId, context) + chatInterface = _getAutomationServiceChat(context, featureInstanceId=instanceId) + workflow = chatInterface.getWorkflow(workflowId) + if not workflow: + raise HTTPException(status_code=404, detail=f"Workflow {workflowId} not found") + paginationParams = None + if pagination: + try: + pd = json.loads(pagination) + if pd: + pd = normalize_pagination_dict(pd) + paginationParams = PaginationParams(**pd) + except (json.JSONDecodeError, ValueError) as e: + raise HTTPException(status_code=400, detail=f"Invalid pagination: {str(e)}") + result = chatInterface.getLogs(workflowId, pagination=paginationParams) + if logId: + allLogs = result.items if paginationParams else result + idx = next((i for i, log in enumerate(allLogs) if log.id == logId), -1) + if idx >= 0: + return PaginatedResponse(items=allLogs[idx + 1:], pagination=None) + if paginationParams: + return PaginatedResponse(items=result.items, pagination=PaginationMetadata( + currentPage=paginationParams.page, pageSize=paginationParams.pageSize, + totalItems=result.totalItems, totalPages=result.totalPages, + sort=paginationParams.sort, filters=paginationParams.filters)) + return PaginatedResponse(items=result, pagination=None) + except HTTPException: + raise + except Exception as e: + logger.error(f"Error getting workflow logs: {str(e)}", exc_info=True) + raise HTTPException(status_code=500, detail=f"Error getting workflow logs: {str(e)}") + + +@router.get("/{instanceId}/workflows/{workflowId}/messages", response_model=PaginatedResponse[ChatMessage]) +@limiter.limit("120/minute") +def get_automation_workflow_messages( + request: Request, + instanceId: str = Path(..., description="Feature instance ID"), + workflowId: str = Path(..., description="Workflow ID"), + messageId: Optional[str] = Query(None), + pagination: Optional[str] = Query(None), + context: RequestContext = Depends(getRequestContext) +) -> PaginatedResponse[ChatMessage]: + """Get workflow messages.""" + try: + _validateAutomationInstanceAccess(instanceId, context) + chatInterface = _getAutomationServiceChat(context, featureInstanceId=instanceId) + workflow = chatInterface.getWorkflow(workflowId) + if not workflow: + raise HTTPException(status_code=404, detail=f"Workflow {workflowId} not found") + paginationParams = None + if pagination: + try: + pd = json.loads(pagination) + if pd: + pd = normalize_pagination_dict(pd) + paginationParams = PaginationParams(**pd) + except (json.JSONDecodeError, ValueError) as e: + raise HTTPException(status_code=400, detail=f"Invalid pagination: {str(e)}") + result = chatInterface.getMessages(workflowId, pagination=paginationParams) + if messageId: + allMsgs = result.items if paginationParams else result + idx = next((i for i, m in enumerate(allMsgs) if m.id == messageId), -1) + if idx >= 0: + return PaginatedResponse(items=allMsgs[idx + 1:], pagination=None) + if paginationParams: + return PaginatedResponse(items=result.items, pagination=PaginationMetadata( + currentPage=paginationParams.page, pageSize=paginationParams.pageSize, + totalItems=result.totalItems, totalPages=result.totalPages, + sort=paginationParams.sort, filters=paginationParams.filters)) + return PaginatedResponse(items=result, pagination=None) + except HTTPException: + raise + except Exception as e: + logger.error(f"Error getting workflow messages: {str(e)}", exc_info=True) + raise HTTPException(status_code=500, detail=f"Error getting workflow messages: {str(e)}") + + +@router.delete("/{instanceId}/workflows/{workflowId}/messages/{messageId}") +@limiter.limit("120/minute") +def delete_automation_workflow_message( + request: Request, + instanceId: str = Path(..., description="Feature instance ID"), + workflowId: str = Path(..., description="Workflow ID"), + messageId: str = Path(..., description="Message ID"), + context: RequestContext = Depends(getRequestContext) +) -> Dict[str, Any]: + """Delete message from workflow.""" + try: + _validateAutomationInstanceAccess(instanceId, context) + chatInterface = _getAutomationServiceChat(context, featureInstanceId=instanceId) + workflow = chatInterface.getWorkflow(workflowId) + if not workflow: + raise HTTPException(status_code=404, detail=f"Workflow {workflowId} not found") + success = chatInterface.deleteMessage(workflowId, messageId) + if not success: + raise HTTPException(status_code=404, detail=f"Message {messageId} not found") + return {"workflowId": workflowId, "messageId": messageId, "message": "Message deleted successfully"} + except HTTPException: + raise + except Exception as e: + logger.error(f"Error deleting message: {str(e)}", exc_info=True) + raise HTTPException(status_code=500, detail=f"Error deleting message: {str(e)}") + + +@router.delete("/{instanceId}/workflows/{workflowId}/messages/{messageId}/files/{fileId}") +@limiter.limit("120/minute") +def delete_automation_file_from_message( + request: Request, + instanceId: str = Path(..., description="Feature instance ID"), + workflowId: str = Path(..., description="Workflow ID"), + messageId: str = Path(..., description="Message ID"), + fileId: str = Path(..., description="File ID"), + context: RequestContext = Depends(getRequestContext) +) -> Dict[str, Any]: + """Delete file from message.""" + try: + _validateAutomationInstanceAccess(instanceId, context) + chatInterface = _getAutomationServiceChat(context, featureInstanceId=instanceId) + workflow = chatInterface.getWorkflow(workflowId) + if not workflow: + raise HTTPException(status_code=404, detail=f"Workflow {workflowId} not found") + success = chatInterface.deleteFileFromMessage(workflowId, messageId, fileId) + if not success: + raise HTTPException(status_code=404, detail=f"File {fileId} not found") + return {"workflowId": workflowId, "messageId": messageId, "fileId": fileId, "message": "File reference deleted successfully"} + except HTTPException: + raise + except Exception as e: + logger.error(f"Error deleting file: {str(e)}", exc_info=True) + raise HTTPException(status_code=500, detail=f"Error deleting file: {str(e)}") + + +@router.get("/{instanceId}/workflows/{workflowId}/chatData") +@limiter.limit("120/minute") +def get_automation_workflow_chat_data( + request: Request, + instanceId: str = Path(..., description="Feature instance ID"), + workflowId: str = Path(..., description="Workflow ID"), + afterTimestamp: Optional[float] = Query(None), + context: RequestContext = Depends(getRequestContext) +) -> Dict[str, Any]: + """Get unified chat data for workflow.""" + try: + _validateAutomationInstanceAccess(instanceId, context) + chatInterface = _getAutomationServiceChat(context, featureInstanceId=instanceId) + workflow = chatInterface.getWorkflow(workflowId) + if not workflow: + raise HTTPException(status_code=404, detail=f"Workflow {workflowId} not found") + return chatInterface.getUnifiedChatData(workflowId, afterTimestamp) + except HTTPException: + raise + except Exception as e: + logger.error(f"Error getting chat data: {str(e)}", exc_info=True) + raise HTTPException(status_code=500, detail=f"Error getting chat data: {str(e)}") + + +@router.post("/{instanceId}/workflows/{workflowId}/stop", response_model=ChatWorkflow) +@limiter.limit("30/minute") +async def stop_automation_workflow( + request: Request, + instanceId: str = Path(..., description="Feature instance ID"), + workflowId: str = Path(..., description="Workflow ID"), + context: RequestContext = Depends(getRequestContext) +) -> ChatWorkflow: + """Stop a running automation workflow. Uses instance-scoped services.""" + try: + from modules.workflows.automation import chatStop + mandateId = _validateAutomationInstanceAccess(instanceId, context) + services = getAutomationServices( + context.user, + mandateId=mandateId, + featureInstanceId=instanceId, + ) + services.featureCode = "automation" + return await chatStop( + context.user, + workflowId, + mandateId=mandateId, + featureInstanceId=instanceId, + featureCode="automation", + services=services, + ) + except HTTPException: + raise + except Exception as e: + logger.error(f"Error stopping automation workflow: {str(e)}") + raise HTTPException(status_code=500, detail=str(e)) + + @router.get("/{automationId}", response_model=AutomationDefinition) @limiter.limit("30/minute") def get_automation( @@ -379,7 +871,7 @@ async def execute_automation_route( if not automation: raise ValueError(f"Automation {automationId} not found") - from modules.workflows.automation import executeAutomation + from modules.workflows.automation import executeAutomation, chatStop workflow = await executeAutomation(automationId, automation, context.user, services) return workflow except HTTPException: diff --git a/modules/features/chatplayground/routeFeatureChatplayground.py b/modules/features/chatplayground/routeFeatureChatplayground.py index c08f4b62..8a87084a 100644 --- a/modules/features/chatplayground/routeFeatureChatplayground.py +++ b/modules/features/chatplayground/routeFeatureChatplayground.py @@ -5,9 +5,10 @@ Chat Playground Feature Routes. Implements the endpoints for chat playground workflow management as a feature. """ +import json import logging from typing import Optional, Dict, Any -from fastapi import APIRouter, HTTPException, Depends, Body, Path, Query, Request +from fastapi import APIRouter, HTTPException, Depends, Body, Path, Query, Request, status # Import auth modules from modules.auth import limiter, getRequestContext, RequestContext @@ -16,15 +17,31 @@ from modules.auth import limiter, getRequestContext, RequestContext from modules.interfaces import interfaceDbChat # Import models -from modules.datamodels.datamodelChat import ChatWorkflow, UserInputRequest, WorkflowModeEnum +from modules.datamodels.datamodelChat import ( + ChatWorkflow, + ChatMessage, + ChatLog, + UserInputRequest, + WorkflowModeEnum, +) +from modules.datamodels.datamodelPagination import ( + PaginationParams, + PaginatedResponse, + PaginationMetadata, + normalize_pagination_dict, +) # Import workflow control functions from modules.workflows.automation import chatStart, chatStop from modules.features.chatplayground.mainChatplayground import getChatplaygroundServices +from modules.shared.attributeUtils import getModelAttributeDefinitions # Configure logger logger = logging.getLogger(__name__) +# Model attributes for ChatWorkflow (workflow attributes endpoint) +workflowAttributes = getModelAttributeDefinitions(ChatWorkflow) + # Create router for chat playground feature endpoints router = APIRouter( prefix="/api/chatplayground", @@ -130,8 +147,8 @@ async def start_workflow( ) -# Stop workflow endpoint -@router.post("/{instanceId}/{workflowId}/stop", response_model=ChatWorkflow) +# Stop workflow endpoint (under /workflows/{workflowId}/ for consistency) +@router.post("/{instanceId}/workflows/{workflowId}/stop", response_model=ChatWorkflow) @limiter.limit("120/minute") async def stop_workflow( request: Request, @@ -174,8 +191,8 @@ async def stop_workflow( ) -# Unified Chat Data Endpoint for Polling -@router.get("/{instanceId}/{workflowId}/chatData") +# Unified Chat Data Endpoint for Polling (under /workflows/{workflowId}/ for consistency) +@router.get("/{instanceId}/workflows/{workflowId}/chatData") @limiter.limit("120/minute") def get_workflow_chat_data( request: Request, @@ -218,18 +235,32 @@ def get_workflow_chat_data( ) +# Get workflow attributes (ChatWorkflow model) +@router.get("/{instanceId}/workflows/attributes", response_model=Dict[str, Any]) +@limiter.limit("120/minute") +def get_workflow_attributes( + request: Request, + instanceId: str = Path(..., description="Feature instance ID"), + context: RequestContext = Depends(getRequestContext) +) -> Dict[str, Any]: + """Get attribute definitions for ChatWorkflow model.""" + _validateInstanceAccess(instanceId, context) + return {"attributes": workflowAttributes} + + # Get workflows for this instance -@router.get("/{instanceId}/workflows") +@router.get("/{instanceId}/workflows", response_model=PaginatedResponse[ChatWorkflow]) @limiter.limit("120/minute") def get_workflows( request: Request, instanceId: str = Path(..., description="Feature instance ID"), - page: int = Query(1, ge=1, description="Page number"), - pageSize: int = Query(20, ge=1, le=100, description="Items per page"), + pagination: Optional[str] = Query(None, description="JSON-encoded PaginationParams object"), + page: int = Query(1, ge=1, description="Page number (legacy)"), + pageSize: int = Query(20, ge=1, le=100, description="Items per page (legacy)"), context: RequestContext = Depends(getRequestContext) -) -> Dict[str, Any]: +) -> PaginatedResponse[ChatWorkflow]: """ - Get all workflows for this feature instance. + Get all workflows for this feature instance with optional pagination. """ try: # Validate access @@ -238,13 +269,38 @@ def get_workflows( # Get service with feature instance context chatInterface = _getServiceChat(context, featureInstanceId=instanceId) - # Get workflows with pagination - from modules.datamodels.datamodelPagination import PaginationParams - pagination = PaginationParams(page=page, pageSize=pageSize) + # Parse pagination parameter + paginationParams = None + if pagination: + try: + paginationDict = json.loads(pagination) + if paginationDict: + paginationDict = normalize_pagination_dict(paginationDict) + paginationParams = PaginationParams(**paginationDict) + except (json.JSONDecodeError, ValueError) as e: + raise HTTPException( + status_code=400, + detail=f"Invalid pagination parameter: {str(e)}" + ) + else: + paginationParams = PaginationParams(page=page, pageSize=pageSize) - result = chatInterface.getWorkflows(pagination=pagination) + result = chatInterface.getWorkflows(pagination=paginationParams) - return result + if paginationParams: + return PaginatedResponse( + items=result.items, + pagination=PaginationMetadata( + currentPage=paginationParams.page, + pageSize=paginationParams.pageSize, + totalItems=result.totalItems, + totalPages=result.totalPages, + sort=paginationParams.sort, + filters=paginationParams.filters + ) + ) + else: + return PaginatedResponse(items=result, pagination=None) except HTTPException: raise @@ -254,3 +310,410 @@ def get_workflows( status_code=500, detail=f"Error getting workflows: {str(e)}" ) + + +# Action Discovery Endpoints (must be before /{workflowId} to avoid path conflict) +@router.get("/{instanceId}/workflows/actions", response_model=Dict[str, Any]) +@limiter.limit("120/minute") +def get_all_workflow_actions( + request: Request, + instanceId: str = Path(..., description="Feature instance ID"), + context: RequestContext = Depends(getRequestContext) +) -> Dict[str, Any]: + """Get all available workflow actions for the current user (filtered by RBAC).""" + try: + mandateId = _validateInstanceAccess(instanceId, context) + services = getChatplaygroundServices( + context.user, + mandateId=mandateId, + featureInstanceId=instanceId, + ) + from modules.workflows.processing.shared.methodDiscovery import discoverMethods, methods + discoverMethods(services) + allActions = [] + for methodName, methodInfo in methods.items(): + if methodName.startswith('Method'): + continue + methodInstance = methodInfo['instance'] + methodActions = methodInstance.actions + for actionName, actionInfo in methodActions.items(): + actionResponse = { + "module": methodInstance.name, + "actionId": f"{methodInstance.name}.{actionName}", + "name": actionName, + "description": actionInfo.get('description', ''), + "parameters": actionInfo.get('parameters', {}) + } + allActions.append(actionResponse) + return {"actions": allActions} + except HTTPException: + raise + except Exception as e: + logger.error(f"Error getting all actions: {str(e)}", exc_info=True) + raise HTTPException(status_code=500, detail=f"Failed to get actions: {str(e)}") + + +@router.get("/{instanceId}/workflows/actions/{method}", response_model=Dict[str, Any]) +@limiter.limit("120/minute") +def get_method_workflow_actions( + request: Request, + instanceId: str = Path(..., description="Feature instance ID"), + method: str = Path(..., description="Method name (e.g., 'outlook', 'sharepoint')"), + context: RequestContext = Depends(getRequestContext) +) -> Dict[str, Any]: + """Get all available actions for a specific method.""" + try: + mandateId = _validateInstanceAccess(instanceId, context) + services = getChatplaygroundServices( + context.user, + mandateId=mandateId, + featureInstanceId=instanceId, + ) + from modules.workflows.processing.shared.methodDiscovery import discoverMethods, methods + discoverMethods(services) + methodInstance = None + for methodName, methodInfo in methods.items(): + if methodInfo['instance'].name == method: + methodInstance = methodInfo['instance'] + break + if not methodInstance: + raise HTTPException(status_code=404, detail=f"Method '{method}' not found") + actions = [] + for actionName, actionInfo in methodInstance.actions.items(): + actionResponse = { + "actionId": f"{methodInstance.name}.{actionName}", + "name": actionName, + "description": actionInfo.get('description', ''), + "parameters": actionInfo.get('parameters', {}) + } + actions.append(actionResponse) + return {"module": methodInstance.name, "description": methodInstance.description, "actions": actions} + except HTTPException: + raise + except Exception as e: + logger.error(f"Error getting actions for method {method}: {str(e)}", exc_info=True) + raise HTTPException(status_code=500, detail=f"Failed to get actions for method {method}: {str(e)}") + + +@router.get("/{instanceId}/workflows/actions/{method}/{action}", response_model=Dict[str, Any]) +@limiter.limit("120/minute") +def get_action_schema( + request: Request, + instanceId: str = Path(..., description="Feature instance ID"), + method: str = Path(..., description="Method name (e.g., 'outlook', 'sharepoint')"), + action: str = Path(..., description="Action name (e.g., 'readEmails', 'uploadDocument')"), + context: RequestContext = Depends(getRequestContext) +) -> Dict[str, Any]: + """Get action schema with parameter definitions for a specific action.""" + try: + mandateId = _validateInstanceAccess(instanceId, context) + services = getChatplaygroundServices( + context.user, + mandateId=mandateId, + featureInstanceId=instanceId, + ) + from modules.workflows.processing.shared.methodDiscovery import discoverMethods, methods + discoverMethods(services) + methodInstance = None + for methodName, methodInfo in methods.items(): + if methodInfo['instance'].name == method: + methodInstance = methodInfo['instance'] + break + if not methodInstance: + raise HTTPException(status_code=404, detail=f"Method '{method}' not found") + methodActions = methodInstance.actions + if action not in methodActions: + raise HTTPException(status_code=404, detail=f"Action '{action}' not found in method '{method}'") + actionInfo = methodActions[action] + return { + "method": methodInstance.name, + "action": action, + "actionId": f"{methodInstance.name}.{action}", + "description": actionInfo.get('description', ''), + "parameters": actionInfo.get('parameters', {}) + } + except HTTPException: + raise + except Exception as e: + logger.error(f"Error getting action schema for {method}.{action}: {str(e)}", exc_info=True) + raise HTTPException(status_code=500, detail=f"Failed to get action schema: {str(e)}") + + +# Get single workflow by ID +@router.get("/{instanceId}/workflows/{workflowId}", response_model=ChatWorkflow) +@limiter.limit("120/minute") +def get_workflow( + request: Request, + instanceId: str = Path(..., description="Feature instance ID"), + workflowId: str = Path(..., description="ID of the workflow"), + context: RequestContext = Depends(getRequestContext) +) -> ChatWorkflow: + """Get workflow by ID.""" + try: + _validateInstanceAccess(instanceId, context) + chatInterface = _getServiceChat(context, featureInstanceId=instanceId) + workflow = chatInterface.getWorkflow(workflowId) + if not workflow: + raise HTTPException(status_code=404, detail="Workflow not found") + return workflow + except HTTPException: + raise + except Exception as e: + logger.error(f"Error getting workflow: {str(e)}") + raise HTTPException(status_code=500, detail=f"Failed to get workflow: {str(e)}") + + +# Update workflow +@router.put("/{instanceId}/workflows/{workflowId}", response_model=ChatWorkflow) +@limiter.limit("120/minute") +def update_workflow( + request: Request, + instanceId: str = Path(..., description="Feature instance ID"), + workflowId: str = Path(..., description="ID of the workflow to update"), + workflowData: Dict[str, Any] = Body(...), + context: RequestContext = Depends(getRequestContext) +) -> ChatWorkflow: + """Update workflow by ID.""" + try: + _validateInstanceAccess(instanceId, context) + chatInterface = _getServiceChat(context, featureInstanceId=instanceId) + workflow = chatInterface.getWorkflow(workflowId) + if not workflow: + raise HTTPException(status_code=404, detail="Workflow not found") + if not chatInterface.checkRbacPermission(ChatWorkflow, "update", workflowId): + raise HTTPException(status_code=403, detail="You don't have permission to update this workflow") + updatedWorkflow = chatInterface.updateWorkflow(workflowId, workflowData) + if not updatedWorkflow: + raise HTTPException(status_code=500, detail="Failed to update workflow") + return updatedWorkflow + except HTTPException: + raise + except Exception as e: + logger.error(f"Error updating workflow: {str(e)}") + raise HTTPException(status_code=500, detail=f"Failed to update workflow: {str(e)}") + + +# Delete workflow +@router.delete("/{instanceId}/workflows/{workflowId}") +@limiter.limit("120/minute") +def delete_workflow( + request: Request, + instanceId: str = Path(..., description="Feature instance ID"), + workflowId: str = Path(..., description="ID of the workflow to delete"), + context: RequestContext = Depends(getRequestContext) +) -> Dict[str, Any]: + """Deletes a workflow and its associated data.""" + try: + _validateInstanceAccess(instanceId, context) + chatInterface = _getServiceChat(context, featureInstanceId=instanceId) + workflow = chatInterface.getWorkflow(workflowId) + if not workflow: + raise HTTPException(status_code=404, detail=f"Workflow with ID {workflowId} not found") + if not chatInterface.checkRbacPermission(ChatWorkflow, "delete", workflowId): + raise HTTPException(status_code=403, detail="You don't have permission to delete this workflow") + success = chatInterface.deleteWorkflow(workflowId) + if not success: + raise HTTPException(status_code=500, detail="Failed to delete workflow") + return {"id": workflowId, "message": "Workflow and associated data deleted successfully"} + except HTTPException: + raise + except Exception as e: + logger.error(f"Error deleting workflow: {str(e)}", exc_info=True) + raise HTTPException(status_code=500, detail=f"Error deleting workflow: {str(e)}") + + +# Get workflow status +@router.get("/{instanceId}/workflows/{workflowId}/status", response_model=ChatWorkflow) +@limiter.limit("120/minute") +def get_workflow_status( + request: Request, + instanceId: str = Path(..., description="Feature instance ID"), + workflowId: str = Path(..., description="ID of the workflow"), + context: RequestContext = Depends(getRequestContext) +) -> ChatWorkflow: + """Get the current status of a workflow.""" + try: + _validateInstanceAccess(instanceId, context) + chatInterface = _getServiceChat(context, featureInstanceId=instanceId) + workflow = chatInterface.getWorkflow(workflowId) + if not workflow: + raise HTTPException(status_code=404, detail=f"Workflow with ID {workflowId} not found") + return workflow + except HTTPException: + raise + except Exception as e: + logger.error(f"Error getting workflow status: {str(e)}", exc_info=True) + raise HTTPException(status_code=500, detail=f"Error getting workflow status: {str(e)}") + + +# Get workflow logs +@router.get("/{instanceId}/workflows/{workflowId}/logs", response_model=PaginatedResponse[ChatLog]) +@limiter.limit("120/minute") +def get_workflow_logs( + request: Request, + instanceId: str = Path(..., description="Feature instance ID"), + workflowId: str = Path(..., description="ID of the workflow"), + logId: Optional[str] = Query(None, description="Optional log ID for selective data transfer"), + pagination: Optional[str] = Query(None, description="JSON-encoded PaginationParams object"), + context: RequestContext = Depends(getRequestContext) +) -> PaginatedResponse[ChatLog]: + """Get logs for a workflow with optional pagination.""" + try: + _validateInstanceAccess(instanceId, context) + chatInterface = _getServiceChat(context, featureInstanceId=instanceId) + workflow = chatInterface.getWorkflow(workflowId) + if not workflow: + raise HTTPException(status_code=404, detail=f"Workflow with ID {workflowId} not found") + + paginationParams = None + if pagination: + try: + paginationDict = json.loads(pagination) + if paginationDict: + paginationDict = normalize_pagination_dict(paginationDict) + paginationParams = PaginationParams(**paginationDict) + except (json.JSONDecodeError, ValueError) as e: + raise HTTPException(status_code=400, detail=f"Invalid pagination parameter: {str(e)}") + + result = chatInterface.getLogs(workflowId, pagination=paginationParams) + + if logId: + allLogs = result.items if paginationParams else result + logIndex = next((i for i, log in enumerate(allLogs) if log.id == logId), -1) + if logIndex >= 0: + filteredLogs = allLogs[logIndex + 1:] + return PaginatedResponse(items=filteredLogs, pagination=None) + + if paginationParams: + return PaginatedResponse( + items=result.items, + pagination=PaginationMetadata( + currentPage=paginationParams.page, + pageSize=paginationParams.pageSize, + totalItems=result.totalItems, + totalPages=result.totalPages, + sort=paginationParams.sort, + filters=paginationParams.filters + ) + ) + return PaginatedResponse(items=result, pagination=None) + except HTTPException: + raise + except Exception as e: + logger.error(f"Error getting workflow logs: {str(e)}", exc_info=True) + raise HTTPException(status_code=500, detail=f"Error getting workflow logs: {str(e)}") + + +# Get workflow messages +@router.get("/{instanceId}/workflows/{workflowId}/messages", response_model=PaginatedResponse[ChatMessage]) +@limiter.limit("120/minute") +def get_workflow_messages( + request: Request, + instanceId: str = Path(..., description="Feature instance ID"), + workflowId: str = Path(..., description="ID of the workflow"), + messageId: Optional[str] = Query(None, description="Optional message ID for selective data transfer"), + pagination: Optional[str] = Query(None, description="JSON-encoded PaginationParams object"), + context: RequestContext = Depends(getRequestContext) +) -> PaginatedResponse[ChatMessage]: + """Get messages for a workflow with optional pagination.""" + try: + _validateInstanceAccess(instanceId, context) + chatInterface = _getServiceChat(context, featureInstanceId=instanceId) + workflow = chatInterface.getWorkflow(workflowId) + if not workflow: + raise HTTPException(status_code=404, detail=f"Workflow with ID {workflowId} not found") + + paginationParams = None + if pagination: + try: + paginationDict = json.loads(pagination) + if paginationDict: + paginationDict = normalize_pagination_dict(paginationDict) + paginationParams = PaginationParams(**paginationDict) + except (json.JSONDecodeError, ValueError) as e: + raise HTTPException(status_code=400, detail=f"Invalid pagination parameter: {str(e)}") + + result = chatInterface.getMessages(workflowId, pagination=paginationParams) + + if messageId: + allMessages = result.items if paginationParams else result + messageIndex = next((i for i, msg in enumerate(allMessages) if msg.id == messageId), -1) + if messageIndex >= 0: + filteredMessages = allMessages[messageIndex + 1:] + return PaginatedResponse(items=filteredMessages, pagination=None) + + if paginationParams: + return PaginatedResponse( + items=result.items, + pagination=PaginationMetadata( + currentPage=paginationParams.page, + pageSize=paginationParams.pageSize, + totalItems=result.totalItems, + totalPages=result.totalPages, + sort=paginationParams.sort, + filters=paginationParams.filters + ) + ) + return PaginatedResponse(items=result, pagination=None) + except HTTPException: + raise + except Exception as e: + logger.error(f"Error getting workflow messages: {str(e)}", exc_info=True) + raise HTTPException(status_code=500, detail=f"Error getting workflow messages: {str(e)}") + + +# Delete message from workflow +@router.delete("/{instanceId}/workflows/{workflowId}/messages/{messageId}") +@limiter.limit("120/minute") +def delete_workflow_message( + request: Request, + instanceId: str = Path(..., description="Feature instance ID"), + workflowId: str = Path(..., description="ID of the workflow"), + messageId: str = Path(..., description="ID of the message to delete"), + context: RequestContext = Depends(getRequestContext) +) -> Dict[str, Any]: + """Delete a message from a workflow.""" + try: + _validateInstanceAccess(instanceId, context) + chatInterface = _getServiceChat(context, featureInstanceId=instanceId) + workflow = chatInterface.getWorkflow(workflowId) + if not workflow: + raise HTTPException(status_code=404, detail=f"Workflow with ID {workflowId} not found") + success = chatInterface.deleteMessage(workflowId, messageId) + if not success: + raise HTTPException(status_code=404, detail=f"Message with ID {messageId} not found in workflow {workflowId}") + return {"workflowId": workflowId, "messageId": messageId, "message": "Message deleted successfully"} + except HTTPException: + raise + except Exception as e: + logger.error(f"Error deleting message: {str(e)}", exc_info=True) + raise HTTPException(status_code=500, detail=f"Error deleting message: {str(e)}") + + +# Delete file from message +@router.delete("/{instanceId}/workflows/{workflowId}/messages/{messageId}/files/{fileId}") +@limiter.limit("120/minute") +def delete_file_from_message( + request: Request, + instanceId: str = Path(..., description="Feature instance ID"), + workflowId: str = Path(..., description="ID of the workflow"), + messageId: str = Path(..., description="ID of the message"), + fileId: str = Path(..., description="ID of the file to delete"), + context: RequestContext = Depends(getRequestContext) +) -> Dict[str, Any]: + """Delete a file reference from a message in a workflow.""" + try: + _validateInstanceAccess(instanceId, context) + chatInterface = _getServiceChat(context, featureInstanceId=instanceId) + workflow = chatInterface.getWorkflow(workflowId) + if not workflow: + raise HTTPException(status_code=404, detail=f"Workflow with ID {workflowId} not found") + success = chatInterface.deleteFileFromMessage(workflowId, messageId, fileId) + if not success: + raise HTTPException(status_code=404, detail=f"File with ID {fileId} not found in message {messageId}") + return {"workflowId": workflowId, "messageId": messageId, "fileId": fileId, "message": "File reference deleted successfully"} + except HTTPException: + raise + except Exception as e: + logger.error(f"Error deleting file reference: {str(e)}", exc_info=True) + raise HTTPException(status_code=500, detail=f"Error deleting file reference: {str(e)}") diff --git a/modules/routes/routeDataWorkflows.py b/modules/routes/routeDataWorkflows.py deleted file mode 100644 index be580ec9..00000000 --- a/modules/routes/routeDataWorkflows.py +++ /dev/null @@ -1,857 +0,0 @@ -# Copyright (c) 2025 Patrick Motsch -# All rights reserved. -""" -Workflow routes for the backend API. -Implements the endpoints for workflow management according to the state machine. -""" - -import logging -import json -from typing import List, Dict, Any, Optional -from fastapi import APIRouter, HTTPException, Depends, Body, Path, Query, Response, status, Request - -# Import auth modules -from modules.auth import limiter, getCurrentUser -from modules.auth.authentication import getRequestContext, RequestContext - -# Import interfaces from feature containers -import modules.interfaces.interfaceDbChat as interfaceDbChat -from modules.interfaces.interfaceDbChat import getInterface -from modules.interfaces.interfaceRbac import getRecordsetWithRBAC - -# Import models from feature containers -from modules.datamodels.datamodelChat import ( - ChatWorkflow, - ChatMessage, - ChatLog, - ChatStat, - ChatDocument -) -from modules.shared.attributeUtils import getModelAttributeDefinitions, AttributeResponse -from modules.datamodels.datamodelUam import User -from modules.datamodels.datamodelPagination import PaginationParams, PaginatedResponse, PaginationMetadata, normalize_pagination_dict - - -# Configure logger -logger = logging.getLogger(__name__) - -# Model attributes for ChatWorkflow -workflowAttributes = getModelAttributeDefinitions(ChatWorkflow) - -# Create router for workflow endpoints -router = APIRouter( - prefix="/api/workflows", - tags=["Workflow"], - responses={404: {"description": "Not found"}} -) - -def getServiceChat(ctx: RequestContext): - # Workflows are not feature-instance-specific — only mandateId is needed for RBAC. - # Passing featureInstanceId would add a SQL WHERE filter that excludes workflows - # created by other features (e.g., automation vs chatbot). - return interfaceDbChat.getInterface(ctx.user, mandateId=ctx.mandateId) - -# Consolidated endpoint for getting all workflows -@router.get("/", response_model=PaginatedResponse[ChatWorkflow]) -@limiter.limit("120/minute") -def get_workflows( - request: Request, - pagination: Optional[str] = Query(None, description="JSON-encoded PaginationParams object"), - ctx: RequestContext = Depends(getRequestContext) -) -> PaginatedResponse[ChatWorkflow]: - """ - Get workflows with optional pagination, sorting, and filtering. - - Query Parameters: - - pagination: JSON-encoded PaginationParams object, or None for no pagination - - Examples: - - GET /api/workflows/ (no pagination - returns all workflows) - - GET /api/workflows/?pagination={"page":1,"pageSize":10,"sort":[]} - """ - try: - # Parse pagination parameter - paginationParams = None - if pagination: - try: - paginationDict = json.loads(pagination) - if paginationDict: - # Normalize pagination dict (handles top-level "search" field) - paginationDict = normalize_pagination_dict(paginationDict) - paginationParams = PaginationParams(**paginationDict) - except (json.JSONDecodeError, ValueError) as e: - raise HTTPException( - status_code=400, - detail=f"Invalid pagination parameter: {str(e)}" - ) - - appInterface = getServiceChat(ctx) - result = appInterface.getWorkflows(pagination=paginationParams) - - # If pagination was requested, result is PaginatedResult with items as dicts - # If no pagination, result is List[Dict] - if paginationParams: - workflows = result.items - totalItems = result.totalItems - totalPages = result.totalPages - else: - workflows = result - totalItems = len(result) - totalPages = 1 - - if paginationParams: - return PaginatedResponse( - items=workflows, - pagination=PaginationMetadata( - currentPage=paginationParams.page, - pageSize=paginationParams.pageSize, - totalItems=totalItems, - totalPages=totalPages, - sort=paginationParams.sort, - filters=paginationParams.filters - ) - ) - else: - return PaginatedResponse( - items=workflows, - pagination=None - ) - except HTTPException: - raise - except Exception as e: - logger.error(f"Error getting workflows: {str(e)}") - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=f"Failed to get workflows: {str(e)}" - ) - -@router.get("/{workflowId}", response_model=ChatWorkflow) -@limiter.limit("120/minute") -def get_workflow( - request: Request, - workflowId: str = Path(..., description="ID of the workflow"), - ctx: RequestContext = Depends(getRequestContext) -) -> ChatWorkflow: - """Get workflow by ID""" - try: - # Get workflow interface with current user context - workflowInterface = getServiceChat(ctx) - - # Get workflow - workflow = workflowInterface.getWorkflow(workflowId) - if not workflow: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail="Workflow not found" - ) - - return workflow - - except Exception as e: - logger.error(f"Error getting workflow: {str(e)}") - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=f"Failed to get workflow: {str(e)}" - ) - -@router.put("/{workflowId}", response_model=ChatWorkflow) -@limiter.limit("120/minute") -def update_workflow( - request: Request, - workflowId: str = Path(..., description="ID of the workflow to update"), - workflowData: Dict[str, Any] = Body(...), - ctx: RequestContext = Depends(getRequestContext) -) -> ChatWorkflow: - """Update workflow by ID""" - try: - # Get workflow interface with current user context - workflowInterface = getServiceChat(ctx) - - # Get workflow using interface method to check permissions - workflow = workflowInterface.getWorkflow(workflowId) - if not workflow: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail="Workflow not found" - ) - - # Check if user has permission to update using RBAC - if not workflowInterface.checkRbacPermission(ChatWorkflow, "update", workflowId): - raise HTTPException( - status_code=status.HTTP_403_FORBIDDEN, - detail="You don't have permission to update this workflow" - ) - - # Update workflow - updatedWorkflow = workflowInterface.updateWorkflow(workflowId, workflowData) - if not updatedWorkflow: - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail="Failed to update workflow" - ) - - return updatedWorkflow - - except HTTPException: - raise - except Exception as e: - logger.error(f"Error updating workflow: {str(e)}") - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=f"Failed to update workflow: {str(e)}" - ) - -# API Endpoint for workflow status -@router.get("/{workflowId}/status", response_model=ChatWorkflow) -@limiter.limit("120/minute") -def get_workflow_status( - request: Request, - workflowId: str = Path(..., description="ID of the workflow"), - ctx: RequestContext = Depends(getRequestContext) -) -> ChatWorkflow: - """Get the current status of a workflow.""" - try: - # Get service center - interfaceDbChat = getServiceChat(ctx) - - # Retrieve workflow - workflow = interfaceDbChat.getWorkflow(workflowId) - if not workflow: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail=f"Workflow with ID {workflowId} not found" - ) - - return workflow - except HTTPException: - raise - except Exception as e: - logger.error(f"Error getting workflow status: {str(e)}", exc_info=True) - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=f"Error getting workflow status: {str(e)}" - ) - - -# API Endpoint for stopping a workflow -@router.post("/{workflowId}/stop", response_model=ChatWorkflow) -@limiter.limit("120/minute") -async def stop_workflow( - request: Request, - workflowId: str = Path(..., description="ID of the workflow to stop"), - ctx: RequestContext = Depends(getRequestContext) -) -> ChatWorkflow: - """ - Stop a running workflow. - This is a general endpoint that can be used by any feature to stop a workflow. - """ - try: - from modules.workflows.automation import chatStop - - # Get the workflow first to get mandateId - interfaceChatDb = getServiceChat(ctx) - workflow = interfaceChatDb.getWorkflow(workflowId) - - if not workflow: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail=f"Workflow with ID {workflowId} not found" - ) - - mandateId = workflow.get("mandateId") if isinstance(workflow, dict) else getattr(workflow, "mandateId", None) - - # Stop the workflow - stoppedWorkflow = await chatStop(ctx.user, workflowId, mandateId=mandateId) - - return stoppedWorkflow - - except HTTPException: - raise - except Exception as e: - logger.error(f"Error stopping workflow: {str(e)}", exc_info=True) - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=f"Error stopping workflow: {str(e)}" - ) - - -# API Endpoint for workflow logs with selective data transfer -@router.get("/{workflowId}/logs", response_model=PaginatedResponse[ChatLog]) -@limiter.limit("120/minute") -def get_workflow_logs( - request: Request, - workflowId: str = Path(..., description="ID of the workflow"), - logId: Optional[str] = Query(None, description="Optional log ID to get only newer logs (legacy selective data transfer)"), - pagination: Optional[str] = Query(None, description="JSON-encoded PaginationParams object"), - ctx: RequestContext = Depends(getRequestContext) -) -> PaginatedResponse[ChatLog]: - """ - Get logs for a workflow with optional pagination, sorting, and filtering. - Also supports legacy selective data transfer via logId parameter. - - Query Parameters: - - logId: Optional log ID for selective data transfer (returns only logs after this ID) - - pagination: JSON-encoded PaginationParams object, or None for no pagination - """ - try: - # Parse pagination parameter - paginationParams = None - if pagination: - try: - paginationDict = json.loads(pagination) - if paginationDict: - # Normalize pagination dict (handles top-level "search" field) - paginationDict = normalize_pagination_dict(paginationDict) - paginationParams = PaginationParams(**paginationDict) - except (json.JSONDecodeError, ValueError) as e: - raise HTTPException( - status_code=400, - detail=f"Invalid pagination parameter: {str(e)}" - ) - - # Get service center - interfaceDbChat = getServiceChat(ctx) - - # Verify workflow exists - workflow = interfaceDbChat.getWorkflow(workflowId) - if not workflow: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail=f"Workflow with ID {workflowId} not found" - ) - - # Get logs with optional pagination - result = interfaceDbChat.getLogs(workflowId, pagination=paginationParams) - - # Handle legacy selective data transfer if logId is provided (takes precedence over pagination) - if logId: - # If pagination was requested, result is PaginatedResult, otherwise List[ChatLog] - allLogs = result.items if paginationParams else result - - # Find the index of the log with the given ID - logIndex = next((i for i, log in enumerate(allLogs) if log.id == logId), -1) - if logIndex >= 0: - # Return only logs after the specified log - filteredLogs = allLogs[logIndex + 1:] - return PaginatedResponse( - items=filteredLogs, - pagination=None - ) - - # If pagination was requested, result is PaginatedResult - # If no pagination, result is List[ChatLog] - if paginationParams: - return PaginatedResponse( - items=result.items, - pagination=PaginationMetadata( - currentPage=paginationParams.page, - pageSize=paginationParams.pageSize, - totalItems=result.totalItems, - totalPages=result.totalPages, - sort=paginationParams.sort, - filters=paginationParams.filters - ) - ) - else: - return PaginatedResponse( - items=result, - pagination=None - ) - except HTTPException: - raise - except Exception as e: - logger.error(f"Error getting workflow logs: {str(e)}", exc_info=True) - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=f"Error getting workflow logs: {str(e)}" - ) - -# API Endpoint for workflow messages with selective data transfer -@router.get("/{workflowId}/messages", response_model=PaginatedResponse[ChatMessage]) -@limiter.limit("120/minute") -def get_workflow_messages( - request: Request, - workflowId: str = Path(..., description="ID of the workflow"), - messageId: Optional[str] = Query(None, description="Optional message ID to get only newer messages (legacy selective data transfer)"), - pagination: Optional[str] = Query(None, description="JSON-encoded PaginationParams object"), - ctx: RequestContext = Depends(getRequestContext) -) -> PaginatedResponse[ChatMessage]: - """ - Get messages for a workflow with optional pagination, sorting, and filtering. - Also supports legacy selective data transfer via messageId parameter. - - Query Parameters: - - messageId: Optional message ID for selective data transfer (returns only messages after this ID) - - pagination: JSON-encoded PaginationParams object, or None for no pagination - """ - try: - # Parse pagination parameter - paginationParams = None - if pagination: - try: - paginationDict = json.loads(pagination) - if paginationDict: - # Normalize pagination dict (handles top-level "search" field) - paginationDict = normalize_pagination_dict(paginationDict) - paginationParams = PaginationParams(**paginationDict) - except (json.JSONDecodeError, ValueError) as e: - raise HTTPException( - status_code=400, - detail=f"Invalid pagination parameter: {str(e)}" - ) - - # Get service center - interfaceDbChat = getServiceChat(ctx) - - # Verify workflow exists - workflow = interfaceDbChat.getWorkflow(workflowId) - if not workflow: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail=f"Workflow with ID {workflowId} not found" - ) - - # Get messages with optional pagination - result = interfaceDbChat.getMessages(workflowId, pagination=paginationParams) - - # Handle legacy selective data transfer if messageId is provided (takes precedence over pagination) - if messageId: - # If pagination was requested, result is PaginatedResult, otherwise List[ChatMessage] - allMessages = result.items if paginationParams else result - - # Find the index of the message with the given ID - messageIndex = next((i for i, msg in enumerate(allMessages) if msg.id == messageId), -1) - if messageIndex >= 0: - # Return only messages after the specified message - filteredMessages = allMessages[messageIndex + 1:] - return PaginatedResponse( - items=filteredMessages, - pagination=None - ) - - # If pagination was requested, result is PaginatedResult - # If no pagination, result is List[ChatMessage] - if paginationParams: - return PaginatedResponse( - items=result.items, - pagination=PaginationMetadata( - currentPage=paginationParams.page, - pageSize=paginationParams.pageSize, - totalItems=result.totalItems, - totalPages=result.totalPages, - sort=paginationParams.sort, - filters=paginationParams.filters - ) - ) - else: - return PaginatedResponse( - items=result, - pagination=None - ) - except HTTPException: - raise - except Exception as e: - logger.error(f"Error getting workflow messages: {str(e)}", exc_info=True) - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=f"Error getting workflow messages: {str(e)}" - ) - - -# State 11: Workflow Reset/Deletion endpoint -@router.delete("/{workflowId}", response_model=Dict[str, Any]) -@limiter.limit("120/minute") -def delete_workflow( - request: Request, - workflowId: str = Path(..., description="ID of the workflow to delete"), - ctx: RequestContext = Depends(getRequestContext) -) -> Dict[str, Any]: - """Deletes a workflow and its associated data.""" - try: - # Get service center - interfaceDbChat = getServiceChat(ctx) - - # Check workflow access and permission using RBAC - workflows = getRecordsetWithRBAC( - interfaceDbChat.db, - ChatWorkflow, - ctx.user, - recordFilter={"id": workflowId}, - mandateId=ctx.mandateId - ) - if not workflows: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail=f"Workflow with ID {workflowId} not found" - ) - - workflow_data = workflows[0] - - # Check if user has permission to delete using RBAC - if not interfaceDbChat.checkRbacPermission(ChatWorkflow, "delete", workflowId): - raise HTTPException( - status_code=status.HTTP_403_FORBIDDEN, - detail="You don't have permission to delete this workflow" - ) - - # Delete workflow - success = interfaceDbChat.deleteWorkflow(workflowId) - - if not success: - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail="Failed to delete workflow" - ) - - return { - "id": workflowId, - "message": "Workflow and associated data deleted successfully" - } - except HTTPException: - raise - except Exception as e: - logger.error(f"Error deleting workflow: {str(e)}", exc_info=True) - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=f"Error deleting workflow: {str(e)}" - ) - - -# Document Management Endpoints - -@router.delete("/{workflowId}/messages/{messageId}", response_model=Dict[str, Any]) -@limiter.limit("120/minute") -def delete_workflow_message( - request: Request, - workflowId: str = Path(..., description="ID of the workflow"), - messageId: str = Path(..., description="ID of the message to delete"), - ctx: RequestContext = Depends(getRequestContext) -) -> Dict[str, Any]: - """Delete a message from a workflow.""" - try: - # Get service center - interfaceDbChat = getServiceChat(ctx) - - # Verify workflow exists - workflow = interfaceDbChat.getWorkflow(workflowId) - if not workflow: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail=f"Workflow with ID {workflowId} not found" - ) - - # Delete the message - success = interfaceDbChat.deleteMessage(workflowId, messageId) - - if not success: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail=f"Message with ID {messageId} not found in workflow {workflowId}" - ) - - # Messages are stored in ChatMessage table linked by workflowId -- - # no messageIds list on ChatWorkflow to update. - - return { - "workflowId": workflowId, - "messageId": messageId, - "message": "Message deleted successfully" - } - except HTTPException: - raise - except Exception as e: - logger.error(f"Error deleting message: {str(e)}", exc_info=True) - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=f"Error deleting message: {str(e)}" - ) - -@router.delete("/{workflowId}/messages/{messageId}/files/{fileId}", response_model=Dict[str, Any]) -@limiter.limit("120/minute") -def delete_file_from_message( - request: Request, - workflowId: str = Path(..., description="ID of the workflow"), - messageId: str = Path(..., description="ID of the message"), - fileId: str = Path(..., description="ID of the file to delete"), - ctx: RequestContext = Depends(getRequestContext) -) -> Dict[str, Any]: - """Delete a file reference from a message in a workflow.""" - try: - # Get service center - interfaceDbChat = getServiceChat(ctx) - - # Verify workflow exists - workflow = interfaceDbChat.getWorkflow(workflowId) - if not workflow: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail=f"Workflow with ID {workflowId} not found" - ) - - # Delete file reference from message - success = interfaceDbChat.deleteFileFromMessage(workflowId, messageId, fileId) - - if not success: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail=f"File with ID {fileId} not found in message {messageId}" - ) - - return { - "workflowId": workflowId, - "messageId": messageId, - "fileId": fileId, - "message": "File reference deleted successfully" - } - except HTTPException: - raise - except Exception as e: - logger.error(f"Error deleting file reference: {str(e)}", exc_info=True) - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=f"Error deleting file reference: {str(e)}" - ) - - -# Action Discovery Endpoints - -@router.get("/actions", response_model=Dict[str, Any]) -@limiter.limit("120/minute") -def get_all_actions( - request: Request, - currentUser: User = Depends(getCurrentUser) -) -> Dict[str, Any]: - """ - Get all available workflow actions for the current user (filtered by RBAC). - - Returns: - - Dictionary with actions grouped by module, filtered by RBAC permissions - - Example response: - { - "actions": [ - { - "module": "outlook", - "actionId": "outlook.readEmails", - "name": "readEmails", - "description": "Read emails and metadata from a mailbox folder", - "parameters": {...} - }, - ... - ] - } - """ - try: - from modules.services import getInterface as getServices - from modules.workflows.processing.shared.methodDiscovery import discoverMethods - - # Get services and discover methods - services = getServices(currentUser, None) - discoverMethods(services) - - # Import methods catalog - from modules.workflows.processing.shared.methodDiscovery import methods - - # Collect all actions from all methods - allActions = [] - for methodName, methodInfo in methods.items(): - # Skip duplicate entries (same method stored with full and short name) - if methodName.startswith('Method'): - continue - - methodInstance = methodInfo['instance'] - methodActions = methodInstance.actions - - for actionName, actionInfo in methodActions.items(): - # Build action response - actionResponse = { - "module": methodInstance.name, - "actionId": f"{methodInstance.name}.{actionName}", - "name": actionName, - "description": actionInfo.get('description', ''), - "parameters": actionInfo.get('parameters', {}) - } - allActions.append(actionResponse) - - return { - "actions": allActions - } - - except Exception as e: - logger.error(f"Error getting all actions: {str(e)}", exc_info=True) - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=f"Failed to get actions: {str(e)}" - ) - - -@router.get("/actions/{method}", response_model=Dict[str, Any]) -@limiter.limit("120/minute") -def get_method_actions( - request: Request, - method: str = Path(..., description="Method name (e.g., 'outlook', 'sharepoint')"), - currentUser: User = Depends(getCurrentUser) -) -> Dict[str, Any]: - """ - Get all available actions for a specific method (filtered by RBAC). - - Path Parameters: - - method: Method name (e.g., 'outlook', 'sharepoint', 'ai') - - Returns: - - Dictionary with actions for the specified method - - Example response: - { - "module": "outlook", - "actions": [ - { - "actionId": "outlook.readEmails", - "name": "readEmails", - "description": "Read emails and metadata from a mailbox folder", - "parameters": {...} - }, - ... - ] - } - """ - try: - from modules.services import getInterface as getServices - from modules.workflows.processing.shared.methodDiscovery import discoverMethods - - # Get services and discover methods - services = getServices(currentUser, None) - discoverMethods(services) - - # Import methods catalog - from modules.workflows.processing.shared.methodDiscovery import methods - - # Find method instance - methodInstance = None - for methodName, methodInfo in methods.items(): - if methodInfo['instance'].name == method: - methodInstance = methodInfo['instance'] - break - - if not methodInstance: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail=f"Method '{method}' not found" - ) - - # Collect actions for this method - actions = [] - methodActions = methodInstance.actions - - for actionName, actionInfo in methodActions.items(): - actionResponse = { - "actionId": f"{methodInstance.name}.{actionName}", - "name": actionName, - "description": actionInfo.get('description', ''), - "parameters": actionInfo.get('parameters', {}) - } - actions.append(actionResponse) - - return { - "module": methodInstance.name, - "description": methodInstance.description, - "actions": actions - } - - except HTTPException: - raise - except Exception as e: - logger.error(f"Error getting actions for method {method}: {str(e)}", exc_info=True) - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=f"Failed to get actions for method {method}: {str(e)}" - ) - - -@router.get("/actions/{method}/{action}", response_model=Dict[str, Any]) -@limiter.limit("120/minute") -def get_action_schema( - request: Request, - method: str = Path(..., description="Method name (e.g., 'outlook', 'sharepoint')"), - action: str = Path(..., description="Action name (e.g., 'readEmails', 'uploadDocument')"), - currentUser: User = Depends(getCurrentUser) -) -> Dict[str, Any]: - """ - Get action schema with parameter definitions for a specific action. - - Path Parameters: - - method: Method name (e.g., 'outlook', 'sharepoint', 'ai') - - action: Action name (e.g., 'readEmails', 'uploadDocument') - - Returns: - - Action schema with full parameter definitions - - Example response: - { - "method": "outlook", - "action": "readEmails", - "actionId": "outlook.readEmails", - "description": "Read emails and metadata from a mailbox folder", - "parameters": { - "connectionReference": { - "name": "connectionReference", - "type": "str", - "frontendType": "userConnection", - "frontendOptions": "user.connection", - "required": true, - "description": "Microsoft connection label" - }, - ... - } - } - """ - try: - from modules.services import getInterface as getServices - from modules.workflows.processing.shared.methodDiscovery import discoverMethods - - # Get services and discover methods - services = getServices(currentUser, None) - discoverMethods(services) - - # Import methods catalog - from modules.workflows.processing.shared.methodDiscovery import methods - - # Find method instance - methodInstance = None - for methodName, methodInfo in methods.items(): - if methodInfo['instance'].name == method: - methodInstance = methodInfo['instance'] - break - - if not methodInstance: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail=f"Method '{method}' not found" - ) - - # Get action - methodActions = methodInstance.actions - if action not in methodActions: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail=f"Action '{action}' not found in method '{method}'" - ) - - actionInfo = methodActions[action] - - return { - "method": methodInstance.name, - "action": action, - "actionId": f"{methodInstance.name}.{action}", - "description": actionInfo.get('description', ''), - "parameters": actionInfo.get('parameters', {}) - } - - except HTTPException: - raise - except Exception as e: - logger.error(f"Error getting action schema for {method}.{action}: {str(e)}", exc_info=True) - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=f"Failed to get action schema: {str(e)}" - ) \ No newline at end of file diff --git a/scripts/migrate_async_to_sync.py b/scripts/migrate_async_to_sync.py index d0f8ef67..d2b030a4 100644 --- a/scripts/migrate_async_to_sync.py +++ b/scripts/migrate_async_to_sync.py @@ -52,9 +52,6 @@ _MUST_STAY_ASYNC: Dict[str, Set[str]] = { "modules/routes/routeDataFiles.py": { "upload_file", # await file.read() }, - "modules/routes/routeDataWorkflows.py": { - "stop_workflow", # await chatStop(...) - }, # These files have many genuinely async routes (httpx, external APIs) -- keep ALL async: "modules/routes/routeRealEstate.py": "__ALL__", "modules/routes/routeSharepoint.py": "__ALL__", From 125c2bbce7bc88fd8b286f5ba6eeb0287fc3e6ae Mon Sep 17 00:00:00 2001 From: Ida Dittrich Date: Mon, 9 Mar 2026 10:56:24 +0100 Subject: [PATCH 5/9] stop automation bug fix --- .../workflows/processing/shared/stateTools.py | 4 +- modules/workflows/workflowManager.py | 51 +++++++++++++++++-- 2 files changed, 49 insertions(+), 6 deletions(-) diff --git a/modules/workflows/processing/shared/stateTools.py b/modules/workflows/processing/shared/stateTools.py index 485539f9..70259b3c 100644 --- a/modules/workflows/processing/shared/stateTools.py +++ b/modules/workflows/processing/shared/stateTools.py @@ -26,8 +26,8 @@ def checkWorkflowStopped(services: Any) -> None: Raises: WorkflowStoppedException: If workflow status is "stopped" """ - workflow = services.workflow - if not workflow: + workflow = getattr(services, 'workflow', None) + if not workflow or not hasattr(workflow, 'id') or workflow.id is None: return try: diff --git a/modules/workflows/workflowManager.py b/modules/workflows/workflowManager.py index c81977c1..b9b64a9a 100644 --- a/modules/workflows/workflowManager.py +++ b/modules/workflows/workflowManager.py @@ -19,6 +19,14 @@ from modules.workflows.processing.shared.stateTools import WorkflowStoppedExcept logger = logging.getLogger(__name__) +# Registry of running workflow tasks: workflowId -> asyncio.Task +# Used to cancel workflow immediately when stop is requested +_workflow_tasks: Dict[str, asyncio.Task] = {} + +def _unregister_workflow_task(workflow_id: str) -> None: + """Remove workflow task from registry when it completes.""" + _workflow_tasks.pop(workflow_id, None) + class WorkflowManager: """Manager for workflow processing and coordination""" @@ -26,6 +34,19 @@ class WorkflowManager: self.services = services self.workflowProcessor = None + def _propagateWorkflowToContext(self, workflow): + """Update workflow in all service contexts. Resolved services may be cached and hold + a different context than hub._service_context; update each service's _context.workflow.""" + # Update stored context if present + ctx = getattr(self.services, '_service_context', None) + if ctx is not None: + ctx.workflow = workflow + # Also update contexts on resolved services (they may be cached with different context refs) + for attr in ('chat', 'ai', 'extraction', 'sharepoint', 'utils', 'billing', 'generation'): + svc = getattr(self.services, attr, None) + if svc is not None and hasattr(svc, '_context') and svc._context is not None: + svc._context.workflow = workflow + # Exported functions async def workflowStart(self, userInput: UserInputRequest, workflowMode: WorkflowModeEnum, workflowId: Optional[str] = None) -> ChatWorkflow: @@ -42,9 +63,14 @@ class WorkflowManager: # Store workflow in services for reference (this is the ChatWorkflow object) self.services.workflow = workflow + self._propagateWorkflowToContext(workflow) if workflow.status == "running": logger.info(f"Stopping running workflow {workflowId} before processing new prompt") + # Cancel existing task immediately so we don't have two tasks for same workflow + existing = _workflow_tasks.pop(workflowId, None) + if existing and not existing.done(): + existing.cancel() workflow.status = "stopped" workflow.lastActivity = currentTime self.services.chat.updateWorkflow(workflowId, { @@ -104,6 +130,7 @@ class WorkflowManager: # Store workflow in services (this is the ChatWorkflow object) self.services.workflow = workflow + self._propagateWorkflowToContext(workflow) # CRITICAL: Update all method instances to use the current Services object with the correct workflow # This ensures cached method instances don't use stale workflow IDs from previous workflows @@ -111,8 +138,11 @@ class WorkflowManager: discoverMethods(self.services) logger.debug(f"Updated method instances to use workflow {self.services.workflow.id}") - # Start workflow processing asynchronously - asyncio.create_task(self._workflowProcess(userInput)) + # Start workflow processing asynchronously; register for immediate cancel on stop + task = asyncio.create_task(self._workflowProcess(userInput)) + wid = workflow.id + _workflow_tasks[wid] = task + task.add_done_callback(lambda _: _unregister_workflow_task(wid)) return workflow except Exception as e: @@ -128,6 +158,7 @@ class WorkflowManager: # Store workflow in services (this is the ChatWorkflow object) self.services.workflow = workflow + self._propagateWorkflowToContext(workflow) workflow.status = "stopped" workflow.lastActivity = self.services.utils.timestampGetUtc() @@ -141,6 +172,10 @@ class WorkflowManager: "status": "stopped", "progress": 1.0 }) + # Cancel the running task immediately so workflow stops without waiting for checkpoints + running_task = _workflow_tasks.pop(workflowId, None) + if running_task and not running_task.done(): + running_task.cancel() return workflow except Exception as e: logger.error(f"Error stopping workflow: {str(e)}") @@ -274,6 +309,10 @@ class WorkflowManager: await self._executeTasks(taskPlan) await self._processWorkflowResults() + except asyncio.CancelledError: + # Task was cancelled (user clicked stop) - ensure stopped message is created, then re-raise + self._handleWorkflowStop() + raise except WorkflowStoppedException: self._handleWorkflowStop() @@ -1317,8 +1356,12 @@ The following is the user's original input message. Analyze intent, normalize th async def _neutralizeContentIfEnabled(self, contentBytes: bytes, mimeType: str) -> bytes: """Neutralize content if neutralization is enabled in user settings""" try: + # Automation hub may not have neutralization service; skip if unavailable + neutralization = getattr(self.services, 'neutralization', None) + if not neutralization: + return contentBytes # Check if neutralization is enabled - config = self.services.neutralization.getConfig() + config = neutralization.getConfig() if not config or not config.enabled: return contentBytes @@ -1340,7 +1383,7 @@ The following is the user's original input message. Analyze intent, normalize th # Neutralize the text content # Note: The neutralization service should use names from config when processing - result = self.services.neutralization.processText(textContent) + result = neutralization.processText(textContent) if result and 'neutralized_text' in result: neutralizedText = result['neutralized_text'] # Encode back to bytes using the same encoding From d78f071682aa0d57e848720a9728940d22d4d58b Mon Sep 17 00:00:00 2001 From: Ida Dittrich Date: Mon, 9 Mar 2026 10:58:22 +0100 Subject: [PATCH 6/9] bug: sharepoint target address from automation feature --- .../methodSharepoint/helpers/apiClient.py | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/modules/workflows/methods/methodSharepoint/helpers/apiClient.py b/modules/workflows/methods/methodSharepoint/helpers/apiClient.py index 5b02aaab..309497a4 100644 --- a/modules/workflows/methods/methodSharepoint/helpers/apiClient.py +++ b/modules/workflows/methods/methodSharepoint/helpers/apiClient.py @@ -45,11 +45,24 @@ class ApiClientHelper: Dict with API response or error information """ try: - if not hasattr(self.services, 'sharepoint') or not self.services.sharepoint._target.accessToken: + sp = getattr(self.services, 'sharepoint', None) + if not sp: return {"error": "SharePoint service not configured with access token"} - + # Service center: accessToken on service directly. Legacy: wrapped in PublicService._target + try: + access_token = getattr(sp, 'accessToken', None) + if access_token is None: + target = getattr(sp, '_target', None) # Only legacy hub has _target + if target is not None: + access_token = getattr(target, 'accessToken', None) + except AttributeError as ae: + logger.warning(f"SharePoint token extraction failed: {ae}") + return {"error": "SharePoint service not configured with access token"} + if not access_token: + return {"error": "SharePoint service not configured with access token"} + headers = { - "Authorization": f"Bearer {self.services.sharepoint._target.accessToken}", + "Authorization": f"Bearer {access_token}", "Content-Type": "application/json" if data and method != "PUT" else "application/octet-stream" if data else "application/json" } From d1633adfb31a22966be7dbcdf271415de4da1c1f Mon Sep 17 00:00:00 2001 From: Ida Dittrich Date: Mon, 9 Mar 2026 11:01:14 +0100 Subject: [PATCH 7/9] fixed the route loading in app.py for route unification --- app.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/app.py b/app.py index 1fb68bf0..41873e7d 100644 --- a/app.py +++ b/app.py @@ -533,9 +533,6 @@ app.include_router(promptRouter) from modules.routes.routeDataConnections import router as connectionsRouter app.include_router(connectionsRouter) -from modules.routes.routeDataWorkflows import router as dataWorkflowsRouter -app.include_router(dataWorkflowsRouter) - from modules.routes.routeSecurityLocal import router as localRouter app.include_router(localRouter) From 7a287065a4e4c46a8b3a68839755a8fe8a6da484 Mon Sep 17 00:00:00 2001 From: Ida Dittrich Date: Mon, 9 Mar 2026 11:02:09 +0100 Subject: [PATCH 8/9] fix:deleted old documentation --- .../shared/RENDERING_ISSUE_ANALYSIS.md | 238 ------------------ 1 file changed, 238 deletions(-) delete mode 100644 modules/workflows/processing/shared/RENDERING_ISSUE_ANALYSIS.md diff --git a/modules/workflows/processing/shared/RENDERING_ISSUE_ANALYSIS.md b/modules/workflows/processing/shared/RENDERING_ISSUE_ANALYSIS.md deleted file mode 100644 index ee790a69..00000000 --- a/modules/workflows/processing/shared/RENDERING_ISSUE_ANALYSIS.md +++ /dev/null @@ -1,238 +0,0 @@ -# Rendering Issue Analysis -## Why HTML Documents Are Being Rendered as Text - -**Date**: 2025-12-22 -**Issue**: Documents requested as HTML are being output as text/plain - ---- - -## Root Cause Analysis - -### Issue 1: `resultType` Not Extracted from Task Objective ❌ **CRITICAL** - -**Problem**: -- Task objective clearly states: "Generate a complete, well-structured **HTML document**" -- Validation shows: `EXPECTED FORMATS: ['html']` -- But action was called with: `ai.generateDocument {}` (empty parameters) -- So `resultType` defaults to `"docx"` instead of `"html"` - -**Location**: -- `generateDocument.py` line 44: `resultType = parameters.get("resultType", "docx")` -- No parameter extraction from task objective/prompt - -**Impact**: **CRITICAL** - Wrong format is used even though task clearly requests HTML - -**Fix Needed**: -- Extract `resultType` from task objective/prompt before calling action -- Or enhance `generateDocument` to detect format from prompt if not provided - ---- - -### Issue 2: HTML Not in Action Definition Options ❌ **CRITICAL** - -**Problem**: -- Action definition in `methodAi.py` line 357 only lists: `["docx", "pdf", "txt", "md"]` -- `"html"` is **NOT** in the allowed options -- But docstring says HTML is supported: `"resultType (str, optional): Output format (docx, pdf, txt, md, html, etc.)"` - -**Location**: -- `methodAi.py` line 357: `frontendOptions=["docx", "pdf", "txt", "md"]` - -**Impact**: **CRITICAL** - Even if HTML is requested, it might be rejected or not recognized - -**Fix Needed**: -- Add `"html"` to `frontendOptions` list - ---- - -### Issue 3: Renderer Fallback to Text ❌ **CRITICAL** - -**Problem**: -- When `resultType="docx"` is used (default) -- If docx renderer fails or is not found -- System falls back to text renderer (line 403-404 of `mainServiceGeneration.py`) -- This explains why output is `text/plain` instead of HTML - -**Location**: -- `mainServiceGeneration.py` lines 393-409: `_getFormatRenderer()` method -- Line 403: `logger.warning(f"No renderer found for format {output_format}, falling back to text")` - -**Impact**: **CRITICAL** - Wrong format is rendered - -**Fix Needed**: -- Fix docx renderer if it's failing -- Or better: Extract correct format from prompt - ---- - -### Issue 4: Missing Parameter Extraction ❌ **HIGH PRIORITY** - -**Problem**: -- Task objective contains format information ("HTML document") -- But no parameter extraction step extracts `resultType` from prompt -- Action is called with empty parameters `{}` - -**Location**: -- Workflow execution - parameter extraction phase -- Should extract `resultType: "html"` from task objective - -**Impact**: **HIGH** - System can't infer format from user intent - -**Fix Needed**: -- Add parameter extraction that detects format from prompt -- Or enhance `generateDocument` to auto-detect format from prompt - ---- - -## Flow Analysis - -### Expected Flow: -``` -1. Task Objective: "Generate HTML document..." -2. Parameter Extraction: Extract resultType="html" from objective -3. Action Call: ai.generateDocument({resultType: "html", prompt: "..."}) -4. Content Generation: Generate sections with content -5. Integration: Merge sections into complete structure -6. Rendering: Call renderReport(outputFormat="html") -7. HTML Renderer: Render to HTML -8. Output: document.html (text/html) -``` - -### Actual Flow (Broken): -``` -1. Task Objective: "Generate HTML document..." -2. Parameter Extraction: ❌ MISSING - no extraction -3. Action Call: ai.generateDocument({}) ❌ Empty parameters -4. Content Generation: ✅ Generate sections with content -5. Integration: ✅ Merge sections into complete structure -6. Rendering: Call renderReport(outputFormat="docx") ❌ Wrong format -7. Docx Renderer: ❌ Fails or not found -8. Fallback: Text renderer ❌ Wrong renderer -9. Output: document.text (text/plain) ❌ Wrong format -``` - ---- - -## Fixes Required - -### Fix 1: Add HTML to Action Definition Options ✅ **EASY** - -**File**: `gateway/modules/workflows/methods/methodAi/methodAi.py` -**Line**: 357 - -**Change**: -```python -frontendOptions=["docx", "pdf", "txt", "md", "html"], # Added "html" -``` - ---- - -### Fix 2: Extract resultType from Prompt ✅ **MEDIUM** - -**Option A**: Enhance `generateDocument` to detect format from prompt - -**File**: `gateway/modules/workflows/methods/methodAi/actions/generateDocument.py` -**After line 44**: - -```python -resultType = parameters.get("resultType", "docx") - -# Auto-detect format from prompt if not provided -if resultType == "docx" and prompt: - promptLower = prompt.lower() - if "html" in promptLower or "html5" in promptLower: - resultType = "html" - elif "pdf" in promptLower: - resultType = "pdf" - elif "markdown" in promptLower or "md" in promptLower: - resultType = "md" - elif "text" in promptLower or "txt" in promptLower: - resultType = "txt" -``` - -**Option B**: Extract in parameter planning phase (better, but requires workflow changes) - ---- - -### Fix 3: Improve Renderer Error Handling ✅ **MEDIUM** - -**File**: `gateway/modules/services/serviceGeneration/mainServiceGeneration.py` -**Lines**: 393-409 - -**Enhance**: Better error messages and logging when renderer not found - -```python -def _getFormatRenderer(self, output_format: str): - """Get the appropriate renderer for the specified format using auto-discovery.""" - try: - from .renderers.registry import getRenderer - renderer = getRenderer(output_format, services=self.services) - - if renderer: - return renderer - - # Log available formats for debugging - from .renderers.registry import getSupportedFormats - availableFormats = getSupportedFormats() - logger.error( - f"No renderer found for format '{output_format}'. " - f"Available formats: {availableFormats}" - ) - - # Fallback to text renderer if no specific renderer found - logger.warning(f"Falling back to text renderer for format {output_format}") - fallbackRenderer = getRenderer('text', services=self.services) - if fallbackRenderer: - return fallbackRenderer - - logger.error("Even text renderer fallback failed") - return None - - except Exception as e: - logger.error(f"Error getting renderer for {output_format}: {str(e)}") - return None -``` - ---- - -## Verification Steps - -After fixes: - -1. **Test HTML Generation**: - - Task: "Generate HTML document about AI" - - Expected: `resultType="html"` extracted or detected - - Expected: HTML renderer used - - Expected: Output is `document.html` with `text/html` MIME type - -2. **Test Format Detection**: - - Task: "Generate PDF report" - - Expected: `resultType="pdf"` detected - - Expected: PDF renderer used - -3. **Test Explicit Parameter**: - - Action: `ai.generateDocument({resultType: "html", prompt: "..."})` - - Expected: HTML renderer used (no fallback) - ---- - -## Summary - -**Root Causes**: -1. ❌ `resultType` not extracted from task objective -2. ❌ HTML not in action definition options -3. ❌ Renderer fallback to text when docx fails -4. ❌ No format auto-detection from prompt - -**Priority**: **CRITICAL** - System cannot produce HTML documents as requested - -**Estimated Fix Time**: -- Fix 1: 5 minutes -- Fix 2: 30 minutes -- Fix 3: 15 minutes -- **Total**: ~1 hour - ---- - -**Analysis Complete**: 2025-12-22 - From 17341bfde1318be1d887a1aaad04b88119f22496 Mon Sep 17 00:00:00 2001 From: Ida Dittrich Date: Mon, 9 Mar 2026 11:15:29 +0100 Subject: [PATCH 9/9] fix:removed old route fallback --- modules/features/chatplayground/mainChatplayground.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/modules/features/chatplayground/mainChatplayground.py b/modules/features/chatplayground/mainChatplayground.py index 269f74d5..d5275a5f 100644 --- a/modules/features/chatplayground/mainChatplayground.py +++ b/modules/features/chatplayground/mainChatplayground.py @@ -39,12 +39,12 @@ RESOURCE_OBJECTS = [ { "objectKey": "resource.feature.chatplayground.stop", "label": {"en": "Stop Workflow", "de": "Workflow stoppen", "fr": "Arrêter workflow"}, - "meta": {"endpoint": "/api/chatplayground/{instanceId}/{workflowId}/stop", "method": "POST"} + "meta": {"endpoint": "/api/chatplayground/{instanceId}/workflows/{workflowId}/stop", "method": "POST"} }, { "objectKey": "resource.feature.chatplayground.chatData", "label": {"en": "Get Chat Data", "de": "Chat-Daten abrufen", "fr": "Récupérer données chat"}, - "meta": {"endpoint": "/api/chatplayground/{instanceId}/{workflowId}/chatData", "method": "GET"} + "meta": {"endpoint": "/api/chatplayground/{instanceId}/workflows/{workflowId}/chatData", "method": "GET"} }, ]