# Copyright (c) 2025 Patrick Motsch # All rights reserved. """ GraphicalEditor routes - node-types, execute, workflows, runs, tasks, connections, browse. """ import asyncio import json import logging import math from typing import Any, Dict, List, Optional from fastapi import APIRouter, Depends, Path, Query, Body, Request, HTTPException from fastapi.responses import JSONResponse, StreamingResponse from modules.auth import limiter, getRequestContext, RequestContext from modules.datamodels.datamodelPagination import PaginationParams, PaginationMetadata, normalize_pagination_dict from modules.routes.routeDataUsers import _applyFiltersAndSort from modules.features.graphicalEditor.mainGraphicalEditor import getGraphicalEditorServices from modules.features.graphicalEditor.nodeRegistry import getNodeTypesForApi from modules.features.graphicalEditor.interfaceFeatureGraphicalEditor import getGraphicalEditorInterface from modules.workflows.automation2.executionEngine import executeGraph from modules.workflows.automation2.runEnvelope import ( default_run_envelope, merge_run_envelope, normalize_run_envelope, ) from modules.features.graphicalEditor.entryPoints import find_invocation from modules.shared.i18nRegistry import apiRouteContext, resolveText routeApiMsg = apiRouteContext("routeFeatureGraphicalEditor") logger = logging.getLogger(__name__) def _build_execute_run_envelope( body: Dict[str, Any], workflow: Optional[Dict[str, Any]], user_id: Optional[str], requestLang: Optional[str] = None, ) -> Dict[str, Any]: """Build normalized run envelope from POST /execute body.""" if isinstance(body.get("runEnvelope"), dict): env = normalize_run_envelope(body["runEnvelope"], user_id=user_id) pl = body.get("payload") if isinstance(pl, dict): env = merge_run_envelope(env, {"payload": pl}) return env entry_point_id = body.get("entryPointId") if entry_point_id: if not workflow: raise HTTPException( status_code=400, detail=routeApiMsg("entryPointId requires a saved workflow (workflowId must refer to a stored workflow)"), ) inv = find_invocation(workflow, entry_point_id) if not inv: raise HTTPException(status_code=400, detail=routeApiMsg("entryPointId not found on workflow")) if not inv.get("enabled", True): raise HTTPException(status_code=400, detail=routeApiMsg("entry point is disabled")) kind = inv.get("kind", "manual") trig_map = { "manual": "manual", "form": "form", "schedule": "schedule", "always_on": "event", "email": "email", "webhook": "webhook", "api": "api", "event": "event", } trig = trig_map.get(kind, "manual") title = inv.get("title") or {} label = resolveText(title) base = default_run_envelope( trig, entry_point_id=inv.get("id"), entry_point_label=label or None, ) pl = body.get("payload") if isinstance(pl, dict): base = merge_run_envelope(base, {"payload": pl}) return normalize_run_envelope(base, user_id=user_id) env = normalize_run_envelope(None, user_id=user_id) pl = body.get("payload") if isinstance(pl, dict): env = merge_run_envelope(env, {"payload": pl}) return env router = APIRouter( prefix="/api/workflows", tags=["GraphicalEditor"], responses={404: {"description": "Not found"}, 403: {"description": "Forbidden"}}, ) def _validateInstanceAccess(instanceId: str, context: RequestContext) -> str: """Validate user has access to the graphicalEditor feature instance. Returns mandateId.""" from fastapi import HTTPException from modules.interfaces.interfaceDbApp import getRootInterface rootInterface = getRootInterface() instance = rootInterface.getFeatureInstance(instanceId) if not instance: raise HTTPException(status_code=404, detail=f"Feature instance {instanceId} not found") featureAccess = rootInterface.getFeatureAccess(str(context.user.id), instanceId) if not featureAccess or not featureAccess.enabled: raise HTTPException(status_code=403, detail=routeApiMsg("Access denied to this feature instance")) return str(instance.mandateId) if instance.mandateId else "" @router.get("/{instanceId}/info") @limiter.limit("60/minute") def get_info( request: Request, instanceId: str = Path(..., description="Feature instance ID"), context: RequestContext = Depends(getRequestContext), ) -> dict: """Minimal info endpoint - proves the feature works.""" _validateInstanceAccess(instanceId, context) return { "featureCode": "graphicalEditor", "instanceId": instanceId, "status": "ok", "message": "GraphicalEditor feature ready.", } @router.post("/{instanceId}/schedule-sync") @limiter.limit("10/minute") def post_schedule_sync( request: Request, instanceId: str = Path(..., description="Feature instance ID"), context: RequestContext = Depends(getRequestContext), ) -> dict: """Manually trigger schedule sync (re-register cron jobs for all schedule workflows).""" _validateInstanceAccess(instanceId, context) from modules.workflows.scheduler.mainScheduler import syncNow result = syncNow() return {"success": True, **(result or {})} @router.get("/{instanceId}/node-types") @limiter.limit("60/minute") def get_node_types( request: Request, instanceId: str = Path(..., description="Feature instance ID"), language: str = Query("en", description="Localization (en, de, fr)"), context: RequestContext = Depends(getRequestContext), ) -> dict: """Return node types for the flow builder: static + I/O from methodDiscovery.""" logger.info("graphicalEditor node-types request: instanceId=%s language=%s", instanceId, language) mandateId = _validateInstanceAccess(instanceId, context) services = getGraphicalEditorServices( context.user, mandateId=mandateId, featureInstanceId=instanceId, ) result = getNodeTypesForApi(services, language=language) logger.info( "graphicalEditor node-types response: %d nodeTypes %d categories", len(result.get("nodeTypes", [])), len(result.get("categories", [])), ) return result @router.post("/{instanceId}/execute") @limiter.limit("30/minute") async def post_execute( request: Request, instanceId: str = Path(..., description="Feature instance ID"), body: dict = Body(..., description="{ workflowId?, graph: { nodes, connections } }"), context: RequestContext = Depends(getRequestContext), ) -> dict: """Execute workflow graph. Body: { workflowId?, graph: { nodes, connections } }.""" userId = str(context.user.id) if context.user else None logger.info( "graphicalEditor execute request: instanceId=%s userId=%s body_keys=%s", instanceId, userId, list(body.keys()), ) mandateId = _validateInstanceAccess(instanceId, context) services = getGraphicalEditorServices( context.user, mandateId=mandateId, featureInstanceId=instanceId, ) from modules.workflows.processing.shared.methodDiscovery import discoverMethods discoverMethods(services) graph = body.get("graph") or body workflowId = body.get("workflowId") req_nodes = graph.get("nodes") or [] workflow_for_envelope: Optional[Dict[str, Any]] = None if workflowId and not str(workflowId).startswith("transient-"): iface = getGraphicalEditorInterface(context.user, mandateId, instanceId) workflow_for_envelope = iface.getWorkflow(workflowId) if workflowId and len(req_nodes) == 0: iface = getGraphicalEditorInterface(context.user, mandateId, instanceId) wf = iface.getWorkflow(workflowId) if wf and wf.get("graph"): graph = wf["graph"] logger.info("graphicalEditor execute: loaded graph from workflow %s", workflowId) workflow_for_envelope = wf if not workflowId: import uuid workflowId = f"transient-{uuid.uuid4().hex[:12]}" logger.info("graphicalEditor execute: using transient workflowId=%s", workflowId) nodes_count = len(graph.get("nodes") or []) connections_count = len(graph.get("connections") or []) logger.info( "graphicalEditor execute: graph nodes=%d connections=%d workflowId=%s mandateId=%s", nodes_count, connections_count, workflowId, mandateId, ) run_env = _build_execute_run_envelope( body, workflow_for_envelope, userId, getattr(context.user, "language", None) if context.user else None, ) ge_interface = getGraphicalEditorInterface(context.user, mandateId, instanceId) result = await executeGraph( graph=graph, services=services, workflowId=workflowId, instanceId=instanceId, userId=userId, mandateId=mandateId, automation2_interface=ge_interface, run_envelope=run_env, ) logger.info( "graphicalEditor execute result: success=%s error=%s nodeOutputs_keys=%s failedNode=%s paused=%s", result.get("success"), result.get("error"), list(result.get("nodeOutputs", {}).keys()) if result.get("nodeOutputs") else [], result.get("failedNode"), result.get("paused"), ) return result # ------------------------------------------------------------------------- # Run Tracing SSE Stream # ------------------------------------------------------------------------- @router.get("/{instanceId}/runs/{runId}/stream") async def get_run_stream( request: Request, instanceId: str = Path(..., description="Feature instance ID"), runId: str = Path(..., description="Run ID"), context: RequestContext = Depends(getRequestContext), ): """SSE stream for live step-log updates during a workflow run.""" _validateInstanceAccess(instanceId, context) from modules.serviceCenter.core.serviceStreaming.eventManager import get_event_manager sseEventManager = get_event_manager() queueId = f"run-trace-{runId}" sseEventManager.create_queue(queueId) async def _sseGenerator(): queue = sseEventManager.get_queue(queueId) if not queue: return while True: try: event = await asyncio.wait_for(queue.get(), timeout=30) except asyncio.TimeoutError: yield "data: {\"type\": \"keepalive\"}\n\n" continue if event is None: break payload = event.get("data", event) if isinstance(event, dict) else event yield f"data: {json.dumps(payload, default=str)}\n\n" eventType = payload.get("type", "") if isinstance(payload, dict) else "" if eventType in ("run_complete", "run_failed"): break await sseEventManager.cleanup(queueId, delay=10) return StreamingResponse( _sseGenerator(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no", }, ) # ------------------------------------------------------------------------- # Versions (AutoVersion Lifecycle) # ------------------------------------------------------------------------- @router.get("/{instanceId}/workflows/{workflowId}/versions") @limiter.limit("60/minute") def get_versions( request: Request, instanceId: str = Path(..., description="Feature instance ID"), workflowId: str = Path(..., description="Workflow ID"), context: RequestContext = Depends(getRequestContext), ) -> dict: """List all versions for a workflow.""" mandateId = _validateInstanceAccess(instanceId, context) iface = getGraphicalEditorInterface(context.user, mandateId, instanceId) versions = iface.getVersions(workflowId) return {"versions": versions} @router.post("/{instanceId}/workflows/{workflowId}/versions/draft") @limiter.limit("30/minute") def create_draft_version( request: Request, instanceId: str = Path(..., description="Feature instance ID"), workflowId: str = Path(..., description="Workflow ID"), context: RequestContext = Depends(getRequestContext), ) -> dict: """Create a new draft version from the workflow's current graph.""" mandateId = _validateInstanceAccess(instanceId, context) iface = getGraphicalEditorInterface(context.user, mandateId, instanceId) version = iface.createDraftVersion(workflowId) if not version: raise HTTPException(status_code=404, detail=routeApiMsg("Workflow not found")) return version @router.post("/{instanceId}/versions/{versionId}/publish") @limiter.limit("30/minute") def publish_version( request: Request, instanceId: str = Path(..., description="Feature instance ID"), versionId: str = Path(..., description="Version ID"), context: RequestContext = Depends(getRequestContext), ) -> dict: """Publish a draft version. Archives the previously published version.""" mandateId = _validateInstanceAccess(instanceId, context) iface = getGraphicalEditorInterface(context.user, mandateId, instanceId) userId = str(context.user.id) if context.user else None version = iface.publishVersion(versionId, userId=userId) if not version: raise HTTPException(status_code=400, detail=routeApiMsg("Version not found or not in draft status")) return version @router.post("/{instanceId}/versions/{versionId}/unpublish") @limiter.limit("30/minute") def unpublish_version( request: Request, instanceId: str = Path(..., description="Feature instance ID"), versionId: str = Path(..., description="Version ID"), context: RequestContext = Depends(getRequestContext), ) -> dict: """Unpublish a version (revert to draft).""" mandateId = _validateInstanceAccess(instanceId, context) iface = getGraphicalEditorInterface(context.user, mandateId, instanceId) version = iface.unpublishVersion(versionId) if not version: raise HTTPException(status_code=400, detail=routeApiMsg("Version not found or not published")) return version @router.post("/{instanceId}/versions/{versionId}/archive") @limiter.limit("30/minute") def archive_version( request: Request, instanceId: str = Path(..., description="Feature instance ID"), versionId: str = Path(..., description="Version ID"), context: RequestContext = Depends(getRequestContext), ) -> dict: """Archive a version.""" mandateId = _validateInstanceAccess(instanceId, context) iface = getGraphicalEditorInterface(context.user, mandateId, instanceId) version = iface.archiveVersion(versionId) if not version: raise HTTPException(status_code=404, detail=routeApiMsg("Version not found")) return version # ------------------------------------------------------------------------- # Templates # ------------------------------------------------------------------------- @router.get("/{instanceId}/templates") @limiter.limit("60/minute") def get_templates( request: Request, instanceId: str = Path(..., description="Feature instance ID"), scope: Optional[str] = Query(None, description="Filter by scope: user, instance, mandate, system"), pagination: Optional[str] = Query(None, description="JSON-encoded PaginationParams object"), context: RequestContext = Depends(getRequestContext), ): """List workflow templates with optional pagination.""" mandateId = _validateInstanceAccess(instanceId, context) iface = getGraphicalEditorInterface(context.user, mandateId, instanceId) templates = iface.getTemplates(scope=scope) paginationParams = None if pagination: try: paginationDict = json.loads(pagination) if paginationDict: paginationDict = normalize_pagination_dict(paginationDict) paginationParams = PaginationParams(**paginationDict) except (json.JSONDecodeError, ValueError) as e: raise HTTPException(status_code=400, detail=f"Invalid pagination parameter: {str(e)}") if paginationParams: filtered = _applyFiltersAndSort(templates, paginationParams) totalItems = len(filtered) totalPages = math.ceil(totalItems / paginationParams.pageSize) if totalItems > 0 else 0 startIdx = (paginationParams.page - 1) * paginationParams.pageSize endIdx = startIdx + paginationParams.pageSize return { "items": filtered[startIdx:endIdx], "pagination": PaginationMetadata( currentPage=paginationParams.page, pageSize=paginationParams.pageSize, totalItems=totalItems, totalPages=totalPages, sort=paginationParams.sort, filters=paginationParams.filters, ).model_dump(), } return {"templates": templates} @router.post("/{instanceId}/templates/from-workflow") @limiter.limit("30/minute") def create_template_from_workflow( request: Request, instanceId: str = Path(..., description="Feature instance ID"), body: dict = Body(..., description="{ workflowId, scope? }"), context: RequestContext = Depends(getRequestContext), ) -> dict: """Create a template from an existing workflow.""" mandateId = _validateInstanceAccess(instanceId, context) workflowId = body.get("workflowId") scope = body.get("scope", "user") if not workflowId: raise HTTPException(status_code=400, detail=routeApiMsg("workflowId required")) iface = getGraphicalEditorInterface(context.user, mandateId, instanceId) template = iface.createTemplateFromWorkflow(workflowId, scope=scope) if not template: raise HTTPException(status_code=404, detail=routeApiMsg("Workflow not found")) return template @router.post("/{instanceId}/templates/{templateId}/copy") @limiter.limit("30/minute") def copy_template( request: Request, instanceId: str = Path(..., description="Feature instance ID"), templateId: str = Path(..., description="Template ID"), context: RequestContext = Depends(getRequestContext), ) -> dict: """Copy a template to a new user-owned workflow.""" mandateId = _validateInstanceAccess(instanceId, context) iface = getGraphicalEditorInterface(context.user, mandateId, instanceId) workflow = iface.copyTemplateToUser(templateId) if not workflow: raise HTTPException(status_code=404, detail=routeApiMsg("Template not found")) return workflow @router.post("/{instanceId}/templates/{templateId}/share") @limiter.limit("30/minute") def share_template( request: Request, instanceId: str = Path(..., description="Feature instance ID"), templateId: str = Path(..., description="Template ID"), body: dict = Body(..., description="{ scope }"), context: RequestContext = Depends(getRequestContext), ) -> dict: """Share a template by changing its scope.""" mandateId = _validateInstanceAccess(instanceId, context) scope = body.get("scope") if not scope or scope not in ("user", "instance", "mandate", "system"): raise HTTPException(status_code=400, detail=routeApiMsg("scope must be user, instance, mandate, or system")) iface = getGraphicalEditorInterface(context.user, mandateId, instanceId) template = iface.shareTemplate(templateId, scope=scope) if not template: raise HTTPException(status_code=404, detail=routeApiMsg("Template not found")) return template # ------------------------------------------------------------------------- # AI Chat for Editor # ------------------------------------------------------------------------- @router.post("/{instanceId}/{workflowId}/chat/stream") @limiter.limit("30/minute") async def post_editor_chat( request: Request, instanceId: str = Path(..., description="Feature instance ID"), workflowId: str = Path(..., description="Workflow ID"), body: dict = Body(..., description="{ message, conversationHistory?, userLanguage? }"), context: RequestContext = Depends(getRequestContext), ): """AI chat endpoint for the editor with SSE streaming. Uses workflow tools to mutate the graph.""" mandateId = _validateInstanceAccess(instanceId, context) message = body.get("message", "") if not message: raise HTTPException(status_code=400, detail=routeApiMsg("message required")) iface = getGraphicalEditorInterface(context.user, mandateId, instanceId) wf = iface.getWorkflow(workflowId) if not wf: raise HTTPException(status_code=404, detail=routeApiMsg("Workflow not found")) userLanguage = body.get("userLanguage", "de") conversationHistory = body.get("conversationHistory") or [] fileIds = body.get("fileIds") or [] dataSourceIds = body.get("dataSourceIds") or [] featureDataSourceIds = body.get("featureDataSourceIds") or [] from modules.serviceCenter.core.serviceStreaming import get_event_manager sseEventManager = get_event_manager() queueId = f"ge-chat-{workflowId}-{id(request)}" sseEventManager.create_queue(queueId) agentTask = asyncio.ensure_future( _runEditorAgent( workflowId=workflowId, queueId=queueId, prompt=message, instanceId=instanceId, user=context.user, mandateId=mandateId, sseEventManager=sseEventManager, userLanguage=userLanguage, conversationHistory=conversationHistory, fileIds=fileIds, dataSourceIds=dataSourceIds, featureDataSourceIds=featureDataSourceIds, ) ) sseEventManager.register_agent_task(queueId, agentTask) async def _sseGenerator(): queue = sseEventManager.get_queue(queueId) if not queue: return while True: try: event = await asyncio.wait_for(queue.get(), timeout=120) except asyncio.TimeoutError: yield "data: {\"type\": \"keepalive\"}\n\n" continue if event is None: break ssePayload = event.get("data", event) if isinstance(event, dict) else event yield f"data: {json.dumps(ssePayload, default=str)}\n\n" eventType = ssePayload.get("type", "") if isinstance(ssePayload, dict) else "" if eventType in ("complete", "error", "stopped"): break await sseEventManager.cleanup(queueId, delay=30) return StreamingResponse( _sseGenerator(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no", }, ) async def _runEditorAgent( workflowId: str, queueId: str, prompt: str, instanceId: str, user=None, mandateId: str = "", sseEventManager=None, userLanguage: str = "de", conversationHistory: List[Dict[str, Any]] = None, fileIds: List[str] = None, dataSourceIds: List[str] = None, featureDataSourceIds: List[str] = None, ): """Run the serviceAgent loop with workflow toolbox and forward events to the SSE queue.""" try: from modules.serviceCenter import getService from modules.serviceCenter.context import ServiceCenterContext from modules.serviceCenter.services.serviceAgent.datamodelAgent import AgentEventTypeEnum ctx = ServiceCenterContext( user=user, mandate_id=mandateId, feature_instance_id=instanceId, workflow_id=workflowId, feature_code="graphicalEditor", ) agentService = getService("agent", ctx) systemPrompt = ( "You are a workflow editor assistant. The user describes changes to a workflow graph. " "Use the available workflow tools (readWorkflowGraph, addNode, removeNode, connectNodes, " "setNodeParameter, listAvailableNodeTypes, validateGraph) to modify the graph. " "Always read the current graph first before making changes. " "Respond concisely and confirm what you changed." ) enrichedPrompt = prompt if dataSourceIds: from modules.features.workspace.routeFeatureWorkspace import _buildDataSourceContext chatSvc = getService("chat", ctx) dsInfo = _buildDataSourceContext(chatSvc, dataSourceIds) if dsInfo: enrichedPrompt = f"{prompt}\n\n[Active Data Sources]\n{dsInfo}" if featureDataSourceIds: from modules.features.workspace.routeFeatureWorkspace import _buildFeatureDataSourceContext fdsInfo = _buildFeatureDataSourceContext(featureDataSourceIds) if fdsInfo: enrichedPrompt = f"{enrichedPrompt}\n\n[Attached Feature Data Sources]\n{fdsInfo}" accumulatedText = "" async for event in agentService.runAgent( prompt=enrichedPrompt, fileIds=fileIds or [], workflowId=workflowId, userLanguage=userLanguage, conversationHistory=conversationHistory or [], toolSet="core", additionalTools=None, systemPromptOverride=systemPrompt, ): if sseEventManager.is_cancelled(queueId): logger.info("Editor chat agent cancelled for workflow %s", workflowId) break if event.type == AgentEventTypeEnum.CHUNK and event.content: accumulatedText += event.content sseEvent = { "type": event.type.value if hasattr(event.type, "value") else event.type, "workflowId": workflowId, } if event.content: sseEvent["content"] = event.content if event.data: sseEvent["item"] = event.data await sseEventManager.emit_event(queueId, sseEvent["type"], sseEvent) if event.type in (AgentEventTypeEnum.FINAL, AgentEventTypeEnum.ERROR): break await sseEventManager.emit_event(queueId, "complete", { "type": "complete", "workflowId": workflowId, }) except asyncio.CancelledError: logger.info("Editor chat agent task cancelled for workflow %s", workflowId) await sseEventManager.emit_event(queueId, "stopped", { "type": "stopped", "workflowId": workflowId, }) except Exception as e: logger.error("Editor chat agent error: %s", e, exc_info=True) await sseEventManager.emit_event(queueId, "error", { "type": "error", "content": str(e), "workflowId": workflowId, }) finally: sseEventManager._unregister_agent_task(queueId) # ------------------------------------------------------------------------- # Connections and Browse (for Email/SharePoint node config) # ------------------------------------------------------------------------- def _buildResolverDbInterface(chatService): """Build a DB adapter that ConnectorResolver can use to load UserConnections.""" class _ResolverDbAdapter: def __init__(self, appInterface): self._app = appInterface def getUserConnection(self, connectionId: str): if hasattr(self._app, "getUserConnectionById"): return self._app.getUserConnectionById(connectionId) return None appIf = getattr(chatService, "interfaceDbApp", None) if appIf: return _ResolverDbAdapter(appIf) return getattr(chatService, "interfaceDbComponent", None) @router.get("/{instanceId}/connections") @limiter.limit("300/minute") def list_connections( request: Request, instanceId: str = Path(..., description="Feature instance ID"), context: RequestContext = Depends(getRequestContext), ) -> dict: """Return the user's active connections (UserConnections) for Email/SharePoint node config.""" mandateId = _validateInstanceAccess(instanceId, context) from modules.serviceCenter import getService from modules.serviceCenter.context import ServiceCenterContext ctx = ServiceCenterContext( user=context.user, mandate_id=str(context.mandateId) if context.mandateId else mandateId, feature_instance_id=instanceId, ) chatService = getService("chat", ctx) connections = chatService.getUserConnections() items = [] for c in connections or []: conn = c if isinstance(c, dict) else (c.model_dump() if hasattr(c, "model_dump") else {}) authority = conn.get("authority") if hasattr(authority, "value"): authority = authority.value status = conn.get("status") if hasattr(status, "value"): status = status.value items.append({ "id": conn.get("id"), "authority": authority, "externalUsername": conn.get("externalUsername"), "externalEmail": conn.get("externalEmail"), "status": status, }) return {"connections": items} @router.get("/{instanceId}/connections/{connectionId}/services") @limiter.limit("120/minute") async def list_connection_services( request: Request, instanceId: str = Path(..., description="Feature instance ID"), connectionId: str = Path(..., description="Connection ID"), context: RequestContext = Depends(getRequestContext), ) -> dict: """Return the available services for a specific UserConnection.""" mandateId = _validateInstanceAccess(instanceId, context) try: from modules.connectors.connectorResolver import ConnectorResolver from modules.serviceCenter import getService as getSvc from modules.serviceCenter.context import ServiceCenterContext ctx = ServiceCenterContext( user=context.user, mandate_id=str(context.mandateId) if context.mandateId else mandateId, feature_instance_id=instanceId, ) chatService = getSvc("chat", ctx) securityService = getSvc("security", ctx) dbInterface = _buildResolverDbInterface(chatService) resolver = ConnectorResolver(securityService, dbInterface) provider = await resolver.resolve(connectionId) services = provider.getAvailableServices() _serviceLabels = { "sharepoint": "SharePoint", "clickup": "ClickUp", "outlook": "Outlook", "teams": "Teams", "onedrive": "OneDrive", "drive": "Google Drive", "gmail": "Gmail", "files": "Files (FTP)", } _serviceIcons = { "sharepoint": "sharepoint", "clickup": "folder", "outlook": "mail", "teams": "chat", "onedrive": "cloud", "drive": "cloud", "gmail": "mail", "files": "folder", } items = [ {"service": s, "label": _serviceLabels.get(s, s), "icon": _serviceIcons.get(s, "folder")} for s in services ] return {"services": items} except Exception as e: logger.error(f"Error listing services for connection {connectionId}: {e}") return JSONResponse({"services": [], "error": str(e)}, status_code=400) @router.get("/{instanceId}/connections/{connectionId}/browse") @limiter.limit("300/minute") async def browse_connection_service( request: Request, instanceId: str = Path(..., description="Feature instance ID"), connectionId: str = Path(..., description="Connection ID"), service: str = Query(..., description="Service name (e.g. sharepoint, onedrive, outlook)"), path: str = Query("/", description="Path within the service to browse"), context: RequestContext = Depends(getRequestContext), ) -> dict: """Browse folders/items within a connection's service at a given path.""" mandateId = _validateInstanceAccess(instanceId, context) try: from modules.connectors.connectorResolver import ConnectorResolver from modules.serviceCenter import getService as getSvc from modules.serviceCenter.context import ServiceCenterContext ctx = ServiceCenterContext( user=context.user, mandate_id=str(context.mandateId) if context.mandateId else mandateId, feature_instance_id=instanceId, ) chatService = getSvc("chat", ctx) securityService = getSvc("security", ctx) dbInterface = _buildResolverDbInterface(chatService) resolver = ConnectorResolver(securityService, dbInterface) adapter = await resolver.resolveService(connectionId, service) entries = await adapter.browse(path, filter=None) items = [] for entry in (entries or []): items.append({ "name": entry.name, "path": entry.path, "isFolder": entry.isFolder, "size": entry.size, "mimeType": entry.mimeType, "metadata": entry.metadata if hasattr(entry, "metadata") else {}, }) return {"items": items, "path": path, "service": service} except Exception as e: logger.error(f"Error browsing {service} for connection {connectionId} at '{path}': {e}") return JSONResponse({"items": [], "error": str(e)}, status_code=400) # ------------------------------------------------------------------------- # Workflow CRUD # ------------------------------------------------------------------------- def _get_node_label_from_graph(graph: dict, nodeId: str) -> str: """Extract human-readable label for a node from graph.""" if not graph or not nodeId: return nodeId or "" nodes = graph.get("nodes") or [] for n in nodes: if n.get("id") == nodeId: params = n.get("parameters") or {} config = params.get("config") or {} if isinstance(config, dict): label = config.get("title") or config.get("label") else: label = None return ( n.get("title") or label or params.get("title") or params.get("label") or n.get("type", "") or nodeId ) return nodeId or "" @router.get("/{instanceId}/workflows") @limiter.limit("60/minute") def get_workflows( request: Request, instanceId: str = Path(..., description="Feature instance ID"), active: Optional[bool] = Query(None, description="Filter by active: true|false"), pagination: Optional[str] = Query(None, description="JSON-encoded PaginationParams object"), context: RequestContext = Depends(getRequestContext), ): """List all workflows for this feature instance.""" mandateId = _validateInstanceAccess(instanceId, context) iface = getGraphicalEditorInterface(context.user, mandateId, instanceId) items = iface.getWorkflows(active=active) enriched = [] for wf in items: wf_id = wf.get("id") runs = iface.getRunsByWorkflow(wf_id) if wf_id else [] run_count = len(runs) active_run = None last_started_at = None for r in runs: ts = r.get("sysCreatedAt") if ts and (last_started_at is None or ts > last_started_at): last_started_at = ts if r.get("status") in ("running", "paused"): active_run = r stuck_at_node_id = active_run.get("currentNodeId") if active_run else None stuck_at_node_label = "" if stuck_at_node_id and wf.get("graph"): stuck_at_node_label = _get_node_label_from_graph(wf["graph"], stuck_at_node_id) enriched.append({ **wf, "runCount": run_count, "isRunning": active_run is not None, "runStatus": active_run.get("status") if active_run else None, "stuckAtNodeId": stuck_at_node_id, "stuckAtNodeLabel": stuck_at_node_label or stuck_at_node_id or "", "createdAt": wf.get("sysCreatedAt"), "lastStartedAt": last_started_at, }) paginationParams = None if pagination: try: paginationDict = json.loads(pagination) if paginationDict: paginationDict = normalize_pagination_dict(paginationDict) paginationParams = PaginationParams(**paginationDict) except (json.JSONDecodeError, ValueError) as e: raise HTTPException(status_code=400, detail=f"Invalid pagination parameter: {str(e)}") if paginationParams: filtered = _applyFiltersAndSort(enriched, paginationParams) totalItems = len(filtered) totalPages = math.ceil(totalItems / paginationParams.pageSize) if totalItems > 0 else 0 startIdx = (paginationParams.page - 1) * paginationParams.pageSize endIdx = startIdx + paginationParams.pageSize return { "items": filtered[startIdx:endIdx], "pagination": PaginationMetadata( currentPage=paginationParams.page, pageSize=paginationParams.pageSize, totalItems=totalItems, totalPages=totalPages, sort=paginationParams.sort, filters=paginationParams.filters, ).model_dump(), } return {"workflows": enriched} @router.get("/{instanceId}/workflows/{workflowId}") @limiter.limit("60/minute") def get_workflow( request: Request, instanceId: str = Path(..., description="Feature instance ID"), workflowId: str = Path(..., description="Workflow ID"), context: RequestContext = Depends(getRequestContext), ) -> dict: """Get a single workflow by ID.""" mandateId = _validateInstanceAccess(instanceId, context) iface = getGraphicalEditorInterface(context.user, mandateId, instanceId) wf = iface.getWorkflow(workflowId) if not wf: raise HTTPException(status_code=404, detail=routeApiMsg("Workflow not found")) return wf @router.post("/{instanceId}/workflows") @limiter.limit("30/minute") def create_workflow( request: Request, instanceId: str = Path(..., description="Feature instance ID"), body: dict = Body(..., description="{ label, graph }"), context: RequestContext = Depends(getRequestContext), ) -> dict: """Create a new workflow.""" mandateId = _validateInstanceAccess(instanceId, context) iface = getGraphicalEditorInterface(context.user, mandateId, instanceId) created = iface.createWorkflow(body) return created @router.put("/{instanceId}/workflows/{workflowId}") @limiter.limit("30/minute") def update_workflow( request: Request, instanceId: str = Path(..., description="Feature instance ID"), workflowId: str = Path(..., description="Workflow ID"), body: dict = Body(..., description="{ label?, graph? }"), context: RequestContext = Depends(getRequestContext), ) -> dict: """Update a workflow.""" mandateId = _validateInstanceAccess(instanceId, context) iface = getGraphicalEditorInterface(context.user, mandateId, instanceId) updated = iface.updateWorkflow(workflowId, body) if not updated: raise HTTPException(status_code=404, detail=routeApiMsg("Workflow not found")) return updated @router.delete("/{instanceId}/workflows/{workflowId}") @limiter.limit("30/minute") def delete_workflow( request: Request, instanceId: str = Path(..., description="Feature instance ID"), workflowId: str = Path(..., description="Workflow ID"), context: RequestContext = Depends(getRequestContext), ) -> dict: """Delete a workflow.""" mandateId = _validateInstanceAccess(instanceId, context) iface = getGraphicalEditorInterface(context.user, mandateId, instanceId) if not iface.deleteWorkflow(workflowId): raise HTTPException(status_code=404, detail=routeApiMsg("Workflow not found")) return {"success": True} @router.post("/{instanceId}/workflows/{workflowId}/webhooks/{entryPointId}") @limiter.limit("60/minute") async def post_workflow_webhook( request: Request, instanceId: str = Path(..., description="Feature instance ID"), workflowId: str = Path(..., description="Workflow ID"), entryPointId: str = Path(..., description="Entry point ID (kind must be webhook)"), body: dict = Body(default_factory=dict), context: RequestContext = Depends(getRequestContext), ) -> dict: """Invoke a workflow via a webhook entry point.""" mandateId = _validateInstanceAccess(instanceId, context) userId = str(context.user.id) if context.user else None iface = getGraphicalEditorInterface(context.user, mandateId, instanceId) wf = iface.getWorkflow(workflowId) if not wf or not wf.get("graph"): raise HTTPException(status_code=404, detail=routeApiMsg("Workflow not found")) inv = find_invocation(wf, entryPointId) if not inv: raise HTTPException(status_code=404, detail=routeApiMsg("Entry point not found")) if inv.get("kind") != "webhook": raise HTTPException(status_code=400, detail=routeApiMsg("Entry point is not a webhook")) if not inv.get("enabled", True): raise HTTPException(status_code=400, detail=routeApiMsg("Entry point is disabled")) cfg = inv.get("config") or {} secret = cfg.get("webhookSecret") if secret: hdr = request.headers.get("X-Webhook-Secret") if hdr != str(secret): raise HTTPException(status_code=403, detail=routeApiMsg("Invalid webhook secret")) services = getGraphicalEditorServices( context.user, mandateId=mandateId, featureInstanceId=instanceId, ) from modules.workflows.processing.shared.methodDiscovery import discoverMethods discoverMethods(services) title = inv.get("title") or {} label = resolveText(title) pl = body if isinstance(body, dict) else {} base = default_run_envelope( "webhook", entry_point_id=inv.get("id"), entry_point_label=label or None, payload=pl, raw={"httpBody": body}, ) run_env = normalize_run_envelope(base, user_id=userId) result = await executeGraph( graph=wf["graph"], services=services, workflowId=workflowId, instanceId=instanceId, userId=userId, mandateId=mandateId, automation2_interface=iface, run_envelope=run_env, ) return result @router.post("/{instanceId}/workflows/{workflowId}/forms/{entryPointId}/submit") @limiter.limit("60/minute") async def post_workflow_form_submit( request: Request, instanceId: str = Path(..., description="Feature instance ID"), workflowId: str = Path(..., description="Workflow ID"), entryPointId: str = Path(..., description="Entry point ID (kind must be form)"), body: dict = Body(default_factory=dict), context: RequestContext = Depends(getRequestContext), ) -> dict: """Form-style submit: same as execute with trigger.type form and payload from body.""" mandateId = _validateInstanceAccess(instanceId, context) userId = str(context.user.id) if context.user else None iface = getGraphicalEditorInterface(context.user, mandateId, instanceId) wf = iface.getWorkflow(workflowId) if not wf or not wf.get("graph"): raise HTTPException(status_code=404, detail=routeApiMsg("Workflow not found")) inv = find_invocation(wf, entryPointId) if not inv: raise HTTPException(status_code=404, detail=routeApiMsg("Entry point not found")) if inv.get("kind") != "form": raise HTTPException(status_code=400, detail=routeApiMsg("Entry point is not a form")) if not inv.get("enabled", True): raise HTTPException(status_code=400, detail=routeApiMsg("Entry point is disabled")) services = getGraphicalEditorServices( context.user, mandateId=mandateId, featureInstanceId=instanceId, ) from modules.workflows.processing.shared.methodDiscovery import discoverMethods discoverMethods(services) title = inv.get("title") or {} label = resolveText(title) pl = body if isinstance(body, dict) else {} base = default_run_envelope( "form", entry_point_id=inv.get("id"), entry_point_label=label or None, payload=pl, raw={"formBody": body}, ) run_env = normalize_run_envelope(base, user_id=userId) result = await executeGraph( graph=wf["graph"], services=services, workflowId=workflowId, instanceId=instanceId, userId=userId, mandateId=mandateId, automation2_interface=iface, run_envelope=run_env, ) return result # ------------------------------------------------------------------------- # Runs and Resume # ------------------------------------------------------------------------- @router.get("/{instanceId}/runs/completed") @limiter.limit("60/minute") def get_completed_runs( request: Request, instanceId: str = Path(..., description="Feature instance ID"), limit: int = Query(20, ge=1, le=50), context: RequestContext = Depends(getRequestContext), ) -> dict: """Get recently completed runs with output.""" mandateId = _validateInstanceAccess(instanceId, context) iface = getGraphicalEditorInterface(context.user, mandateId, instanceId) runs = iface.getRecentCompletedRuns(limit=limit) return {"runs": runs} @router.get("/{instanceId}/workflows/{workflowId}/runs") @limiter.limit("60/minute") def get_workflow_runs( request: Request, instanceId: str = Path(..., description="Feature instance ID"), workflowId: str = Path(..., description="Workflow ID"), context: RequestContext = Depends(getRequestContext), ) -> dict: """Get runs for a workflow.""" mandateId = _validateInstanceAccess(instanceId, context) iface = getGraphicalEditorInterface(context.user, mandateId, instanceId) if not iface.getWorkflow(workflowId): raise HTTPException(status_code=404, detail=routeApiMsg("Workflow not found")) runs = iface.getRunsByWorkflow(workflowId) return {"runs": runs} @router.get("/{instanceId}/runs/{runId}/steps") @limiter.limit("60/minute") def get_run_steps( request: Request, instanceId: str = Path(..., description="Feature instance ID"), runId: str = Path(..., description="Run ID"), context: RequestContext = Depends(getRequestContext), ) -> dict: """Get step logs for a run (AutoStepLog entries).""" mandateId = _validateInstanceAccess(instanceId, context) iface = getGraphicalEditorInterface(context.user, mandateId, instanceId) from modules.features.graphicalEditor.datamodelFeatureGraphicalEditor import AutoStepLog if not iface.db._ensureTableExists(AutoStepLog): return {"steps": []} records = iface.db.getRecordset(AutoStepLog, recordFilter={"runId": runId}) steps = [dict(r) for r in records] if records else [] steps.sort(key=lambda s: s.get("startedAt") or 0) return {"steps": steps} @router.post("/{instanceId}/runs/{runId}/resume") @limiter.limit("30/minute") async def resume_run( request: Request, instanceId: str = Path(..., description="Feature instance ID"), runId: str = Path(..., description="Run ID"), body: dict = Body(..., description="{ taskId, result }"), context: RequestContext = Depends(getRequestContext), ) -> dict: """Resume a paused run after task completion.""" mandateId = _validateInstanceAccess(instanceId, context) iface = getGraphicalEditorInterface(context.user, mandateId, instanceId) run = iface.getRun(runId) if not run: raise HTTPException(status_code=404, detail=routeApiMsg("Run not found")) taskId = body.get("taskId") result = body.get("result") if not taskId or result is None: raise HTTPException(status_code=400, detail=routeApiMsg("taskId and result required")) task = iface.getTask(taskId) if not task or task.get("runId") != runId: raise HTTPException(status_code=404, detail=routeApiMsg("Task not found")) if task.get("status") != "pending": raise HTTPException(status_code=400, detail=routeApiMsg("Task already completed")) iface.updateTask(taskId, status="completed", result=result) nodeId = task.get("nodeId") nodeOutputs = dict(run.get("nodeOutputs") or {}) nodeOutputs[nodeId] = result workflowId = run.get("workflowId") wf = iface.getWorkflow(workflowId) if workflowId else None if not wf or not wf.get("graph"): raise HTTPException(status_code=400, detail=routeApiMsg("Workflow graph not found")) graph = wf["graph"] services = getGraphicalEditorServices(context.user, mandateId=mandateId, featureInstanceId=instanceId) resume_result = await executeGraph( graph=graph, services=services, workflowId=workflowId, instanceId=instanceId, userId=str(context.user.id) if context.user else None, mandateId=mandateId, automation2_interface=iface, initialNodeOutputs=nodeOutputs, startAfterNodeId=nodeId, runId=runId, ) return resume_result # ------------------------------------------------------------------------- # Tasks # ------------------------------------------------------------------------- @router.get("/{instanceId}/tasks") @limiter.limit("60/minute") def get_tasks( request: Request, instanceId: str = Path(..., description="Feature instance ID"), workflowId: str = Query(None, description="Filter by workflow ID"), status: str = Query(None, description="Filter: pending, completed, rejected"), context: RequestContext = Depends(getRequestContext), ) -> dict: """Get tasks assigned to current user.""" mandateId = _validateInstanceAccess(instanceId, context) iface = getGraphicalEditorInterface(context.user, mandateId, instanceId) assigneeId = str(context.user.id) if context.user else None items = iface.getTasks(workflowId=workflowId, status=status, assigneeId=assigneeId) workflows = {w["id"]: w for w in iface.getWorkflows()} enriched = [] for t in items: wf = workflows.get(t.get("workflowId") or "") enriched.append({ **t, "workflowLabel": wf.get("label", t.get("workflowId", "")) if wf else t.get("workflowId", ""), "createdAt": t.get("sysCreatedAt"), }) return {"tasks": enriched} @router.post("/{instanceId}/tasks/{taskId}/complete") @limiter.limit("30/minute") async def complete_task( request: Request, instanceId: str = Path(..., description="Feature instance ID"), taskId: str = Path(..., description="Task ID"), body: dict = Body(..., description="{ result }"), context: RequestContext = Depends(getRequestContext), ) -> dict: """Complete a task and resume the run.""" mandateId = _validateInstanceAccess(instanceId, context) iface = getGraphicalEditorInterface(context.user, mandateId, instanceId) task = iface.getTask(taskId) if not task: raise HTTPException(status_code=404, detail=routeApiMsg("Task not found")) runId = task.get("runId") result = body.get("result") if result is None: raise HTTPException(status_code=400, detail=routeApiMsg("result required")) run = iface.getRun(runId) if not run: raise HTTPException(status_code=404, detail=routeApiMsg("Run not found")) if task.get("status") != "pending": raise HTTPException(status_code=400, detail=routeApiMsg("Task already completed")) iface.updateTask(taskId, status="completed", result=result) nodeId = task.get("nodeId") nodeOutputs = dict(run.get("nodeOutputs") or {}) nodeOutputs[nodeId] = result workflowId = run.get("workflowId") wf = iface.getWorkflow(workflowId) if workflowId else None if not wf or not wf.get("graph"): raise HTTPException(status_code=400, detail=routeApiMsg("Workflow graph not found")) graph = wf["graph"] services = getGraphicalEditorServices(context.user, mandateId=mandateId, featureInstanceId=instanceId) return await executeGraph( graph=graph, services=services, workflowId=workflowId, instanceId=instanceId, userId=str(context.user.id) if context.user else None, mandateId=mandateId, automation2_interface=iface, initialNodeOutputs=nodeOutputs, startAfterNodeId=nodeId, runId=runId, ) # ------------------------------------------------------------------------- # Monitoring / Metrics # ------------------------------------------------------------------------- @router.get("/{instanceId}/metrics") @limiter.limit("60/minute") def get_metrics( request: Request, instanceId: str = Path(..., description="Feature instance ID"), context: RequestContext = Depends(getRequestContext), ) -> dict: """Aggregated metrics for the monitoring dashboard.""" mandateId = _validateInstanceAccess(instanceId, context) iface = getGraphicalEditorInterface(context.user, mandateId, instanceId) from modules.features.graphicalEditor.datamodelFeatureGraphicalEditor import ( AutoWorkflow, AutoRun, AutoStepLog, AutoTask, ) workflows = iface.db.getRecordset(AutoWorkflow, recordFilter={ "mandateId": mandateId, "featureInstanceId": instanceId, "isTemplate": False, }) or [] runs = iface.db.getRecordset(AutoRun, recordFilter={ "workflowId": {"$in": [w.get("id") for w in workflows]} if workflows else "__none__", }) or [] tasks = iface.db.getRecordset(AutoTask, recordFilter={ "workflowId": {"$in": [w.get("id") for w in workflows]} if workflows else "__none__", }) or [] runsByStatus = {} totalTokens = 0 totalCredits = 0.0 for r in runs: s = r.get("status", "unknown") runsByStatus[s] = runsByStatus.get(s, 0) + 1 totalTokens += r.get("costTokens", 0) or 0 totalCredits += r.get("costCredits", 0.0) or 0.0 tasksByStatus = {} for t in tasks: s = t.get("status", "unknown") tasksByStatus[s] = tasksByStatus.get(s, 0) + 1 return { "workflowCount": len(workflows), "activeWorkflows": sum(1 for w in workflows if w.get("active")), "totalRuns": len(runs), "runsByStatus": runsByStatus, "totalTasks": len(tasks), "tasksByStatus": tasksByStatus, "totalTokens": totalTokens, "totalCredits": round(totalCredits, 4), }