diff --git a/modules/datamodels/datamodelChat.py b/modules/datamodels/datamodelChat.py index 6160e7c8..96eb01ef 100644 --- a/modules/datamodels/datamodelChat.py +++ b/modules/datamodels/datamodelChat.py @@ -128,6 +128,21 @@ class ChatWorkflow(PowerOnModel): "fk_target": {"db": "poweron_app", "table": "FeatureInstance"}, }, ) + linkedWorkflowId: Optional[str] = Field( + None, + description=( + "Optional foreign key linking this chat to an entity outside the " + "ChatWorkflow table (e.g. an Automation2Workflow in the GraphicalEditor " + "AI editor chat). NULL for the default workspace chats. Combined with " + "featureInstanceId this gives a 1:1 relation entity ↔ chat per feature." + ), + json_schema_extra={ + "label": "Verknüpfter Workflow", + "frontend_type": "text", + "frontend_readonly": True, + "frontend_required": False, + }, + ) status: str = Field(default="running", description="Current status of the workflow", json_schema_extra={"frontend_type": "select", "frontend_readonly": False, "frontend_required": False, "frontend_options": [ {"value": "running", "label": "Running"}, {"value": "completed", "label": "Completed"}, diff --git a/modules/features/graphicalEditor/routeFeatureGraphicalEditor.py b/modules/features/graphicalEditor/routeFeatureGraphicalEditor.py index f02364a0..7b30ec16 100644 --- a/modules/features/graphicalEditor/routeFeatureGraphicalEditor.py +++ b/modules/features/graphicalEditor/routeFeatureGraphicalEditor.py @@ -470,16 +470,63 @@ def share_template( # ------------------------------------------------------------------------- +def _editorChatQueueId(workflowId: str) -> str: + """Deterministic SSE queue id for the editor chat (one active stream per workflow). + + Mirrors the workspace pattern (``workspace-{workflowId}``) so stop/cancel can + target the running task by workflowId without needing per-request handles. + """ + return f"ge-chat-{workflowId}" + + +def _getEditorChatInterface(context: RequestContext, mandateId: str, instanceId: str): + """Build the ChatObjects interface used to persist editor-chat messages.""" + from modules.interfaces import interfaceDbChat + return interfaceDbChat.getInterface( + context.user, + mandateId=mandateId, + featureInstanceId=instanceId, + ) + + +def _editorConversationHistoryFromPersisted(chatInterface, chatWorkflowId: str) -> List[Dict[str, Any]]: + """Load persisted ChatMessages for the editor chat and shape them as the + agent expects (``[{role, message}]``). Skips empty / system messages. + """ + try: + msgs = chatInterface.getMessages(chatWorkflowId) or [] + except Exception as e: + logger.warning("Editor chat: could not load persisted history for %s: %s", chatWorkflowId, e) + return [] + history: List[Dict[str, Any]] = [] + for m in msgs: + role = (getattr(m, "role", None) or (m.get("role") if isinstance(m, dict) else None) or "").strip() + text = (getattr(m, "message", None) or (m.get("message") if isinstance(m, dict) else None) or "").strip() + if not role or not text: + continue + if role not in ("user", "assistant", "system"): + continue + history.append({"role": role, "message": text}) + return history + + @router.post("/{instanceId}/{workflowId}/chat/stream") @limiter.limit("30/minute") async def post_editor_chat( request: Request, instanceId: str = Path(..., description="Feature instance ID"), workflowId: str = Path(..., description="Workflow ID"), - body: dict = Body(..., description="{ message, conversationHistory?, userLanguage? }"), + body: dict = Body(..., description="{ message, userLanguage? }"), context: RequestContext = Depends(getRequestContext), ): - """AI chat endpoint for the editor with SSE streaming. Uses workflow tools to mutate the graph.""" + """AI chat endpoint for the editor with SSE streaming. Uses workflow tools to mutate the graph. + + Persistence: the chat is stored in the standard ``ChatWorkflow`` table linked + to this Automation2Workflow via ``ChatWorkflow.linkedWorkflowId``. The user + message is persisted before the agent starts; the assistant message after. + Conversation history is loaded server-side from this linked ChatWorkflow — + the client does not need to maintain it. + """ mandateId = _validateInstanceAccess(instanceId, context) message = body.get("message", "") if not message: @@ -491,14 +538,35 @@ async def post_editor_chat( raise HTTPException(status_code=404, detail=routeApiMsg("Workflow not found")) userLanguage = body.get("userLanguage", "de") - conversationHistory = body.get("conversationHistory") or [] fileIds = body.get("fileIds") or [] dataSourceIds = body.get("dataSourceIds") or [] featureDataSourceIds = body.get("featureDataSourceIds") or [] + chatInterface = _getEditorChatInterface(context, mandateId, instanceId) + wfLabel = wf.get("label") if isinstance(wf, dict) else getattr(wf, "label", None) + chatWorkflow = chatInterface.getOrCreateLinkedWorkflow( + featureInstanceId=instanceId, + linkedWorkflowId=workflowId, + name=wfLabel or f"Editor Chat ({workflowId})", + ) + chatWorkflowId = chatWorkflow.id if hasattr(chatWorkflow, "id") else chatWorkflow.get("id") + + conversationHistory = _editorConversationHistoryFromPersisted(chatInterface, chatWorkflowId) + + try: + chatInterface.createMessage({ + "workflowId": chatWorkflowId, + "role": "user", + "message": message, + "status": "first" if not conversationHistory else "step", + }) + except Exception as e: + logger.error("Editor chat: failed to persist user message: %s", e) + from modules.serviceCenter.core.serviceStreaming import get_event_manager sseEventManager = get_event_manager() - queueId = f"ge-chat-{workflowId}-{id(request)}" + queueId = _editorChatQueueId(workflowId) + await sseEventManager.cancel_agent(queueId) sseEventManager.create_queue(queueId) agentTask = asyncio.ensure_future( @@ -515,6 +583,8 @@ async def post_editor_chat( fileIds=fileIds, dataSourceIds=dataSourceIds, featureDataSourceIds=featureDataSourceIds, + chatInterface=chatInterface, + chatWorkflowId=chatWorkflowId, ) ) sseEventManager.register_agent_task(queueId, agentTask) @@ -549,6 +619,80 @@ async def post_editor_chat( ) +@router.get("/{instanceId}/{workflowId}/chat/messages") +@limiter.limit("120/minute") +def get_editor_chat_messages( + request: Request, + instanceId: str = Path(..., description="Feature instance ID"), + workflowId: str = Path(..., description="Workflow ID (Automation2Workflow)"), + context: RequestContext = Depends(getRequestContext), +): + """Return persisted editor-chat messages for an Automation2Workflow. + + The chat is stored in ``ChatWorkflow`` with ``linkedWorkflowId == workflowId``; + if no chat has been started yet for this workflow we return an empty list (we + do NOT eagerly create one — the row is created on the first POST /chat/stream). + """ + mandateId = _validateInstanceAccess(instanceId, context) + chatInterface = _getEditorChatInterface(context, mandateId, instanceId) + chatWorkflow = chatInterface.getWorkflowByLink( + featureInstanceId=instanceId, + linkedWorkflowId=workflowId, + ) + if not chatWorkflow: + return JSONResponse({ + "chatWorkflowId": None, + "messages": [], + }) + + chatWorkflowId = chatWorkflow.id if hasattr(chatWorkflow, "id") else chatWorkflow.get("id") + rawMessages = chatInterface.getMessages(chatWorkflowId) or [] + + items: List[Dict[str, Any]] = [] + for m in rawMessages: + getter = (lambda key, default=None: getattr(m, key, default)) if not isinstance(m, dict) else (lambda key, default=None: m.get(key, default)) + role = (getter("role") or "").strip() + content = (getter("message") or "").strip() + if not role or not content: + continue + items.append({ + "id": getter("id"), + "role": role, + "content": content, + "timestamp": getter("publishedAt") or 0, + "sequenceNr": getter("sequenceNr") or 0, + }) + + items.sort(key=lambda x: (float(x.get("timestamp") or 0), int(x.get("sequenceNr") or 0))) + + return JSONResponse({ + "chatWorkflowId": chatWorkflowId, + "messages": items, + }) + + +@router.post("/{instanceId}/{workflowId}/chat/stop") +@limiter.limit("120/minute") +async def post_editor_chat_stop( + request: Request, + instanceId: str = Path(..., description="Feature instance ID"), + workflowId: str = Path(..., description="Workflow ID"), + context: RequestContext = Depends(getRequestContext), +): + """Stop a running editor-chat agent for the given workflow.""" + _validateInstanceAccess(instanceId, context) + from modules.serviceCenter.core.serviceStreaming import get_event_manager + sseEventManager = get_event_manager() + queueId = _editorChatQueueId(workflowId) + cancelled = await sseEventManager.cancel_agent(queueId) + await sseEventManager.emit_event(queueId, "stopped", { + "type": "stopped", + "workflowId": workflowId, + }) + logger.info("Editor chat stop requested for workflow %s, cancelled=%s", workflowId, cancelled) + return JSONResponse({"status": "stopped", "workflowId": workflowId, "cancelled": cancelled}) + + async def _runEditorAgent( workflowId: str, queueId: str, @@ -562,12 +706,41 @@ async def _runEditorAgent( fileIds: List[str] = None, dataSourceIds: List[str] = None, featureDataSourceIds: List[str] = None, + chatInterface=None, + chatWorkflowId: Optional[str] = None, ): - """Run the serviceAgent loop with workflow toolbox and forward events to the SSE queue.""" + """Run the serviceAgent loop with workflow toolbox and forward events to the SSE queue. + + Persists the assistant response to ``ChatMessage`` (linked via ``chatWorkflowId``) + on FINAL/ERROR. On cancellation any partial accumulated text is still saved so + the editor chat history reflects what the user actually saw on screen. + """ + assistantPersisted = False + + def _persistAssistant(text: str) -> None: + nonlocal assistantPersisted + if assistantPersisted or not chatInterface or not chatWorkflowId: + return + cleaned = (text or "").strip() + if not cleaned: + return + try: + chatInterface.createMessage({ + "workflowId": chatWorkflowId, + "role": "assistant", + "message": cleaned, + "status": "last", + }) + assistantPersisted = True + except Exception as msgErr: + logger.error("Editor chat: failed to persist assistant message: %s", msgErr) + try: from modules.serviceCenter import getService from modules.serviceCenter.context import ServiceCenterContext - from modules.serviceCenter.services.serviceAgent.datamodelAgent import AgentEventTypeEnum + from modules.serviceCenter.services.serviceAgent.datamodelAgent import ( + AgentEventTypeEnum, AgentConfig, + ) ctx = ServiceCenterContext( user=user, @@ -579,11 +752,22 @@ async def _runEditorAgent( agentService = getService("agent", ctx) systemPrompt = ( - "You are a workflow editor assistant. The user describes changes to a workflow graph. " - "Use the available workflow tools (readWorkflowGraph, addNode, removeNode, connectNodes, " - "setNodeParameter, listAvailableNodeTypes, validateGraph) to modify the graph. " - "Always read the current graph first before making changes. " - "Respond concisely and confirm what you changed." + "You are a workflow EDITOR assistant for the GraphicalEditor. " + "Your ONLY job is to BUILD or MODIFY the workflow graph (nodes + connections) " + "for the user — you must NEVER execute the workflow or any of its actions. " + "Even when the user says 'create a workflow that sends an email', you build the " + "graph (e.g. add an email node, connect it) — you do NOT actually send an email. " + "Use these workflow tools to mutate the graph: " + "readWorkflowGraph, listAvailableNodeTypes, addNode, removeNode, connectNodes, " + "setNodeParameter, validateGraph. " + "Always read the current graph and list available node types first, then plan the " + "smallest set of mutations, then apply them. Respond concisely in the user's " + "language and confirm what you changed in the graph." + ) + + editorConfig = AgentConfig( + toolSet="core", + excludeActionTools=True, ) enrichedPrompt = prompt @@ -605,6 +789,7 @@ async def _runEditorAgent( async for event in agentService.runAgent( prompt=enrichedPrompt, fileIds=fileIds or [], + config=editorConfig, workflowId=workflowId, userLanguage=userLanguage, conversationHistory=conversationHistory or [], @@ -631,8 +816,13 @@ async def _runEditorAgent( await sseEventManager.emit_event(queueId, sseEvent["type"], sseEvent) if event.type in (AgentEventTypeEnum.FINAL, AgentEventTypeEnum.ERROR): + _persistAssistant(event.content or accumulatedText) break + # Fallback: any streamed content not yet stored (cancellation path, no FINAL). + if not assistantPersisted and accumulatedText.strip(): + _persistAssistant(accumulatedText) + await sseEventManager.emit_event(queueId, "complete", { "type": "complete", "workflowId": workflowId, @@ -640,6 +830,12 @@ async def _runEditorAgent( except asyncio.CancelledError: logger.info("Editor chat agent task cancelled for workflow %s", workflowId) + # Save whatever the user already saw before cancelling so the next reload + # shows the same partial answer (matches workspace behaviour). + try: + _persistAssistant(accumulatedText if "accumulatedText" in locals() else "") + except Exception: + pass await sseEventManager.emit_event(queueId, "stopped", { "type": "stopped", "workflowId": workflowId, diff --git a/modules/interfaces/interfaceDbChat.py b/modules/interfaces/interfaceDbChat.py index c8bbadf0..adeac55b 100644 --- a/modules/interfaces/interfaceDbChat.py +++ b/modules/interfaces/interfaceDbChat.py @@ -655,17 +655,27 @@ class ChatObjects: totalPages=totalPages ) - def getLastMessageTimestamp(self, workflowId: str) -> Optional[str]: - """Return the latest publishedAt/sysCreatedAt from ChatMessage for a workflow.""" + def getLastMessageTimestamp(self, workflowId: str) -> Optional[float]: + """ + Return the latest publishedAt/sysCreatedAt from ChatMessage for a workflow + as UTC seconds (float) — matches the timestamp format used across the + rest of the chat data model (lastActivity, startedAt, publishedAt). + """ messages = self._getRecordset(ChatMessage, recordFilter={"workflowId": workflowId}) if not messages: return None - latest = None + latest: Optional[float] = None for msg in messages: - ts = msg.get("publishedAt") or msg.get("sysCreatedAt") - if ts and (latest is None or str(ts) > str(latest)): + raw = msg.get("publishedAt") or msg.get("sysCreatedAt") + if raw is None: + continue + try: + ts = float(raw) + except (TypeError, ValueError): + continue + if latest is None or ts > latest: latest = ts - return str(latest) if latest else None + return latest def searchWorkflowsByContent(self, query: str, limit: int = 50) -> List[str]: """Return workflow IDs whose messages contain the query string (case-insensitive).""" @@ -712,6 +722,8 @@ class ChatObjects: return ChatWorkflow( id=workflow["id"], + featureInstanceId=workflow.get("featureInstanceId"), + linkedWorkflowId=workflow.get("linkedWorkflowId"), status=workflow.get("status", "running"), name=workflow.get("name"), currentRound=_toInt(workflow.get("currentRound")), @@ -728,6 +740,54 @@ class ChatObjects: logger.error(f"getWorkflow: data validation failed for {workflowId}: {e}") return None + def getWorkflowByLink( + self, + featureInstanceId: str, + linkedWorkflowId: str, + ) -> Optional[ChatWorkflow]: + """Return the ChatWorkflow linked to (featureInstanceId, linkedWorkflowId), if any. + + Used by editor-style features (e.g. GraphicalEditor AI editor chat) to + find the persisted chat for a specific external entity (Automation2Workflow). + Falls under the same RBAC as ``getWorkflow``. + """ + if not featureInstanceId or not linkedWorkflowId: + return None + rows = self._getRecordset( + ChatWorkflow, + recordFilter={ + "featureInstanceId": featureInstanceId, + "linkedWorkflowId": linkedWorkflowId, + }, + ) or [] + if not rows: + return None + # Return the most recently active one if multiple ever exist (defensive). + rows.sort(key=lambda r: float(r.get("lastActivity") or r.get("startedAt") or 0), reverse=True) + return self.getWorkflow(rows[0]["id"]) + + def getOrCreateLinkedWorkflow( + self, + featureInstanceId: str, + linkedWorkflowId: str, + name: Optional[str] = None, + ) -> ChatWorkflow: + """Find or create the ChatWorkflow linked to a specific external entity. + + Editor-style features call this once at the start of a chat exchange to + guarantee a 1:1 mapping between (featureInstanceId, linkedWorkflowId) + and a persisted ChatWorkflow row. + """ + existing = self.getWorkflowByLink(featureInstanceId, linkedWorkflowId) + if existing: + return existing + return self.createWorkflow({ + "featureInstanceId": featureInstanceId, + "linkedWorkflowId": linkedWorkflowId, + "status": "active", + "name": name or "", + }) + def createWorkflow(self, workflowData: Dict[str, Any]) -> ChatWorkflow: """Creates a new workflow if user has permission.""" if not self.checkRbacPermission(ChatWorkflow, "create"): @@ -775,6 +835,8 @@ class ChatObjects: # Convert to ChatWorkflow model (empty related data for new workflow) return ChatWorkflow( id=created["id"], + featureInstanceId=created.get("featureInstanceId"), + linkedWorkflowId=created.get("linkedWorkflowId"), status=created.get("status", "running"), name=created.get("name"), currentRound=created.get("currentRound", 0) or 0, diff --git a/modules/interfaces/interfaceDbManagement.py b/modules/interfaces/interfaceDbManagement.py index 1a27a8db..cca98ffa 100644 --- a/modules/interfaces/interfaceDbManagement.py +++ b/modules/interfaces/interfaceDbManagement.py @@ -1404,6 +1404,24 @@ class ComponentObjects: self._validateFolderName(newName, folder.get("parentId"), excludeFolderId=folderId) return self.db.recordModify(FileFolder, folderId, {"name": newName}) + def updateFolder(self, folderId: str, updateData: Dict[str, Any]) -> bool: + """ + Update folder metadata (e.g. ``scope``, ``neutralize``). Owner-only, + same access model as renameFolder/moveFolder. Use ``renameFolder`` for + ``name`` changes (uniqueness validation) and ``moveFolder`` for + ``parentId`` changes (cycle/uniqueness validation). + """ + if not updateData: + return True + folder = self.getFolder(folderId) + if not folder: + raise FileNotFoundError(f"Folder {folderId} not found") + forbiddenKeys = {"id", "sysCreatedBy", "sysCreatedAt", "sysUpdatedAt"} + cleaned: Dict[str, Any] = {k: v for k, v in updateData.items() if k not in forbiddenKeys} + if "name" in cleaned: + self._validateFolderName(cleaned["name"], folder.get("parentId"), excludeFolderId=folderId) + return self.db.recordModify(FileFolder, folderId, cleaned) + def moveFolder(self, folderId: str, targetParentId: Optional[str] = None) -> bool: """Move a folder to a new parent, with circular reference and unique name checks.""" folder = self.getFolder(folderId) diff --git a/modules/serviceCenter/services/serviceAgent/datamodelAgent.py b/modules/serviceCenter/services/serviceAgent/datamodelAgent.py index 053569b0..16cb1964 100644 --- a/modules/serviceCenter/services/serviceAgent/datamodelAgent.py +++ b/modules/serviceCenter/services/serviceAgent/datamodelAgent.py @@ -93,6 +93,14 @@ class AgentConfig(BaseModel): availableToolboxes: List[str] = Field(default_factory=list) temperature: Optional[float] = Field(default=None, ge=0.0, le=2.0) operationType: Optional[OperationTypeEnum] = Field(default=None, description="Override the default AGENT operationType for model selection") + excludeActionTools: bool = Field( + default=False, + description=( + "If True, do NOT register workflow-action methods as agent tools. " + "Used by editor-style agents (e.g. GraphicalEditor) that should only " + "manipulate the workflow graph, not execute its actions." + ), + ) class AgentState(BaseModel): diff --git a/modules/serviceCenter/services/serviceAgent/mainServiceAgent.py b/modules/serviceCenter/services/serviceAgent/mainServiceAgent.py index 9094e952..fb54199e 100644 --- a/modules/serviceCenter/services/serviceAgent/mainServiceAgent.py +++ b/modules/serviceCenter/services/serviceAgent/mainServiceAgent.py @@ -330,16 +330,20 @@ class AgentService: except Exception as e: logger.warning("discoverMethods failed before action tools: %s", e) - try: - from modules.workflows.processing.core.actionExecutor import ActionExecutor - actionExecutor = ActionExecutor(self.services) - adapter = ActionToolAdapter(actionExecutor) - adapter.registerAll(registry) - except Exception as e: - logger.warning(f"Could not register action tools: {e}") + if not getattr(config, "excludeActionTools", False): + try: + from modules.workflows.processing.core.actionExecutor import ActionExecutor + actionExecutor = ActionExecutor(self.services) + adapter = ActionToolAdapter(actionExecutor) + adapter.registerAll(registry) + except Exception as e: + logger.warning(f"Could not register action tools: {e}") + else: + logger.info("excludeActionTools=True: skipping ActionToolAdapter registration (editor-mode agent)") self._activateToolboxes(registry, config) - self._registerRequestToolbox(registry) + if not getattr(config, "excludeActionTools", False): + self._registerRequestToolbox(registry) return registry diff --git a/modules/serviceCenter/services/serviceAgent/workflowTools.py b/modules/serviceCenter/services/serviceAgent/workflowTools.py index a63abb65..3e1bb9ad 100644 --- a/modules/serviceCenter/services/serviceAgent/workflowTools.py +++ b/modules/serviceCenter/services/serviceAgent/workflowTools.py @@ -4,11 +4,21 @@ Workflow Toolbox - AI-assisted graph manipulation tools for the GraphicalEditor. Tools: readWorkflowGraph, addNode, removeNode, connectNodes, setNodeParameter, listAvailableNodeTypes, validateGraph, listWorkflowHistory, readWorkflowMessages. + +Conventions enforced here (matches coreTools / actionToolAdapter): + - Every ``ToolResult(...)`` provides ``toolCallId`` and ``toolName`` (pydantic + requires both); ``ToolRegistry.dispatch`` overwrites ``toolCallId`` later + but the model still validates at construction. + - ``ToolResult.data`` is a ``str``; structured payloads are JSON-encoded. + - ``workflowId`` and ``instanceId`` are auto-injected from the agent + ``context`` dict (``workflowId``, ``featureInstanceId``) when the model + omits them — the editor agent always runs in exactly one workflow. """ +import json import logging import uuid -from typing import Dict, Any, List, Optional +from typing import Dict, Any, List, Tuple from modules.serviceCenter.services.serviceAgent.datamodelAgent import ToolResult @@ -17,65 +27,124 @@ logger = logging.getLogger(__name__) TOOLBOX_ID = "workflow" +def _toData(payload: Any) -> str: + """Encode a structured payload into ToolResult.data (which is a string).""" + if isinstance(payload, str): + return payload + try: + return json.dumps(payload, default=str, ensure_ascii=False) + except Exception: + return str(payload) + + +def _err(toolName: str, message: str) -> ToolResult: + return ToolResult(toolCallId="", toolName=toolName, success=False, error=message) + + +def _ok(toolName: str, payload: Any) -> ToolResult: + return ToolResult(toolCallId="", toolName=toolName, success=True, data=_toData(payload)) + + +def _resolveIds(params: Dict[str, Any], context: Any) -> Tuple[str, str]: + """Return (workflowId, instanceId), auto-injecting from context when missing. + + The editor agent context (``agentLoop._executeToolCalls``) is a dict with + ``workflowId`` and ``featureInstanceId`` — use them as defaults so the + model doesn't have to re-state the ids on every tool call. + """ + ctx: Dict[str, Any] = context if isinstance(context, dict) else {} + workflowId = params.get("workflowId") or ctx.get("workflowId") or "" + instanceId = ( + params.get("instanceId") + or ctx.get("featureInstanceId") + or ctx.get("instanceId") + or "" + ) + return workflowId, instanceId + + +def _resolveUser(context: Any): + """Return the User object for the current agent context (lazy DB fetch).""" + if not isinstance(context, dict): + return getattr(context, "user", None) + user = context.get("user") + if user is not None: + return user + userId = context.get("userId") + if not userId: + return None + try: + from modules.interfaces.interfaceDbApp import getRootInterface + return getRootInterface().getUser(str(userId)) + except Exception as e: + logger.warning("workflowTools: could not resolve user %s: %s", userId, e) + return None + + +def _resolveMandateId(context: Any) -> str: + if not isinstance(context, dict): + return getattr(context, "mandateId", "") or "" + return context.get("mandateId") or "" + + +def _getInterface(context: Any, instanceId: str): + from modules.features.graphicalEditor.interfaceFeatureGraphicalEditor import getGraphicalEditorInterface + return getGraphicalEditorInterface(_resolveUser(context), _resolveMandateId(context), instanceId) + + async def _readWorkflowGraph(params: Dict[str, Any], context: Any) -> ToolResult: """Read the current workflow graph (nodes and connections).""" + name = "readWorkflowGraph" try: - workflowId = params.get("workflowId") - instanceId = params.get("instanceId") + workflowId, instanceId = _resolveIds(params, context) if not workflowId or not instanceId: - return ToolResult(success=False, error="workflowId and instanceId required") + return _err(name, "workflowId and instanceId required (and not present in agent context)") - from modules.features.graphicalEditor.interfaceFeatureGraphicalEditor import getGraphicalEditorInterface - user = getattr(context, "user", None) - mandateId = getattr(context, "mandateId", "") or "" - iface = getGraphicalEditorInterface(user, mandateId, instanceId) + iface = _getInterface(context, instanceId) wf = iface.getWorkflow(workflowId) if not wf: - return ToolResult(success=False, error=f"Workflow {workflowId} not found") + return _err(name, f"Workflow {workflowId} not found") - graph = wf.get("graph", {}) - nodes = graph.get("nodes", []) - connections = graph.get("connections", []) - return ToolResult( - success=True, - data={ - "workflowId": workflowId, - "label": wf.get("label", ""), - "nodeCount": len(nodes), - "connectionCount": len(connections), - "nodes": [{"id": n.get("id"), "type": n.get("type"), "title": n.get("title", "")} for n in nodes], - "connections": connections, - }, - ) + graph = wf.get("graph", {}) or {} + nodes = graph.get("nodes", []) or [] + connections = graph.get("connections", []) or [] + return _ok(name, { + "workflowId": workflowId, + "label": wf.get("label", ""), + "nodeCount": len(nodes), + "connectionCount": len(connections), + "nodes": [ + {"id": n.get("id"), "type": n.get("type"), "title": n.get("title", "")} + for n in nodes + ], + "connections": connections, + }) except Exception as e: logger.exception("readWorkflowGraph failed: %s", e) - return ToolResult(success=False, error=str(e)) + return _err(name, str(e)) async def _addNode(params: Dict[str, Any], context: Any) -> ToolResult: """Add a node to the workflow graph.""" + name = "addNode" try: - workflowId = params.get("workflowId") - instanceId = params.get("instanceId") + workflowId, instanceId = _resolveIds(params, context) nodeType = params.get("nodeType") if not workflowId or not instanceId or not nodeType: - return ToolResult(success=False, error="workflowId, instanceId, and nodeType required") + return _err(name, "workflowId, instanceId, and nodeType required") - from modules.features.graphicalEditor.interfaceFeatureGraphicalEditor import getGraphicalEditorInterface - user = getattr(context, "user", None) - mandateId = getattr(context, "mandateId", "") or "" - iface = getGraphicalEditorInterface(user, mandateId, instanceId) + iface = _getInterface(context, instanceId) wf = iface.getWorkflow(workflowId) if not wf: - return ToolResult(success=False, error=f"Workflow {workflowId} not found") + return _err(name, f"Workflow {workflowId} not found") - graph = dict(wf.get("graph", {})) - nodes = list(graph.get("nodes", [])) + graph = dict(wf.get("graph", {}) or {}) + nodes = list(graph.get("nodes", []) or []) nodeId = params.get("nodeId") or str(uuid.uuid4())[:8] title = params.get("title", "") - nodeParams = params.get("parameters", {}) - position = params.get("position", {"x": len(nodes) * 200, "y": 100}) + nodeParams = params.get("parameters", {}) or {} + position = params.get("position") or {"x": len(nodes) * 200, "y": 100} newNode = { "id": nodeId, @@ -88,68 +157,63 @@ async def _addNode(params: Dict[str, Any], context: Any) -> ToolResult: graph["nodes"] = nodes iface.updateWorkflow(workflowId, {"graph": graph}) - return ToolResult( - success=True, - data={"nodeId": nodeId, "nodeType": nodeType, "message": f"Node '{title or nodeType}' added"}, - ) + return _ok(name, { + "nodeId": nodeId, + "nodeType": nodeType, + "message": f"Node '{title or nodeType}' added", + }) except Exception as e: logger.exception("addNode failed: %s", e) - return ToolResult(success=False, error=str(e)) + return _err(name, str(e)) async def _removeNode(params: Dict[str, Any], context: Any) -> ToolResult: """Remove a node and its connections from the workflow graph.""" + name = "removeNode" try: - workflowId = params.get("workflowId") - instanceId = params.get("instanceId") + workflowId, instanceId = _resolveIds(params, context) nodeId = params.get("nodeId") if not workflowId or not instanceId or not nodeId: - return ToolResult(success=False, error="workflowId, instanceId, and nodeId required") + return _err(name, "workflowId, instanceId, and nodeId required") - from modules.features.graphicalEditor.interfaceFeatureGraphicalEditor import getGraphicalEditorInterface - user = getattr(context, "user", None) - mandateId = getattr(context, "mandateId", "") or "" - iface = getGraphicalEditorInterface(user, mandateId, instanceId) + iface = _getInterface(context, instanceId) wf = iface.getWorkflow(workflowId) if not wf: - return ToolResult(success=False, error=f"Workflow {workflowId} not found") + return _err(name, f"Workflow {workflowId} not found") - graph = dict(wf.get("graph", {})) - nodes = [n for n in graph.get("nodes", []) if n.get("id") != nodeId] + graph = dict(wf.get("graph", {}) or {}) + nodes = [n for n in (graph.get("nodes", []) or []) if n.get("id") != nodeId] connections = [ - c for c in graph.get("connections", []) + c for c in (graph.get("connections", []) or []) if c.get("source") != nodeId and c.get("target") != nodeId ] graph["nodes"] = nodes graph["connections"] = connections iface.updateWorkflow(workflowId, {"graph": graph}) - return ToolResult(success=True, data={"nodeId": nodeId, "message": f"Node {nodeId} removed"}) + return _ok(name, {"nodeId": nodeId, "message": f"Node {nodeId} removed"}) except Exception as e: logger.exception("removeNode failed: %s", e) - return ToolResult(success=False, error=str(e)) + return _err(name, str(e)) async def _connectNodes(params: Dict[str, Any], context: Any) -> ToolResult: """Connect two nodes in the workflow graph.""" + name = "connectNodes" try: - workflowId = params.get("workflowId") - instanceId = params.get("instanceId") + workflowId, instanceId = _resolveIds(params, context) sourceId = params.get("sourceId") targetId = params.get("targetId") if not workflowId or not instanceId or not sourceId or not targetId: - return ToolResult(success=False, error="workflowId, instanceId, sourceId, and targetId required") + return _err(name, "workflowId, instanceId, sourceId, and targetId required") - from modules.features.graphicalEditor.interfaceFeatureGraphicalEditor import getGraphicalEditorInterface - user = getattr(context, "user", None) - mandateId = getattr(context, "mandateId", "") or "" - iface = getGraphicalEditorInterface(user, mandateId, instanceId) + iface = _getInterface(context, instanceId) wf = iface.getWorkflow(workflowId) if not wf: - return ToolResult(success=False, error=f"Workflow {workflowId} not found") + return _err(name, f"Workflow {workflowId} not found") - graph = dict(wf.get("graph", {})) - connections = list(graph.get("connections", [])) + graph = dict(wf.get("graph", {}) or {}) + connections = list(graph.get("connections", []) or []) newConn = { "source": sourceId, "target": targetId, @@ -160,93 +224,113 @@ async def _connectNodes(params: Dict[str, Any], context: Any) -> ToolResult: graph["connections"] = connections iface.updateWorkflow(workflowId, {"graph": graph}) - return ToolResult(success=True, data={"connection": newConn, "message": f"Connected {sourceId} -> {targetId}"}) + return _ok(name, {"connection": newConn, "message": f"Connected {sourceId} -> {targetId}"}) except Exception as e: logger.exception("connectNodes failed: %s", e) - return ToolResult(success=False, error=str(e)) + return _err(name, str(e)) async def _setNodeParameter(params: Dict[str, Any], context: Any) -> ToolResult: """Set a parameter on a node.""" + name = "setNodeParameter" try: - workflowId = params.get("workflowId") - instanceId = params.get("instanceId") + workflowId, instanceId = _resolveIds(params, context) nodeId = params.get("nodeId") paramName = params.get("parameterName") paramValue = params.get("parameterValue") if not workflowId or not instanceId or not nodeId or not paramName: - return ToolResult(success=False, error="workflowId, instanceId, nodeId, and parameterName required") + return _err(name, "workflowId, instanceId, nodeId, and parameterName required") - from modules.features.graphicalEditor.interfaceFeatureGraphicalEditor import getGraphicalEditorInterface - user = getattr(context, "user", None) - mandateId = getattr(context, "mandateId", "") or "" - iface = getGraphicalEditorInterface(user, mandateId, instanceId) + iface = _getInterface(context, instanceId) wf = iface.getWorkflow(workflowId) if not wf: - return ToolResult(success=False, error=f"Workflow {workflowId} not found") + return _err(name, f"Workflow {workflowId} not found") - graph = dict(wf.get("graph", {})) - nodes = list(graph.get("nodes", [])) + graph = dict(wf.get("graph", {}) or {}) + nodes = list(graph.get("nodes", []) or []) found = False for n in nodes: if n.get("id") == nodeId: - nodeParams = dict(n.get("parameters", {})) + nodeParams = dict(n.get("parameters", {}) or {}) nodeParams[paramName] = paramValue n["parameters"] = nodeParams found = True break if not found: - return ToolResult(success=False, error=f"Node {nodeId} not found in graph") + return _err(name, f"Node {nodeId} not found in graph") graph["nodes"] = nodes iface.updateWorkflow(workflowId, {"graph": graph}) - return ToolResult(success=True, data={"nodeId": nodeId, "parameter": paramName, "message": f"Parameter '{paramName}' set"}) + return _ok(name, { + "nodeId": nodeId, + "parameter": paramName, + "message": f"Parameter '{paramName}' set", + }) except Exception as e: logger.exception("setNodeParameter failed: %s", e) - return ToolResult(success=False, error=str(e)) + return _err(name, str(e)) + + +def _coerceLabel(rawLabel: Any, fallback: str) -> str: + """Normalize a node label which may be a string, dict {locale: str}, or other.""" + if isinstance(rawLabel, str): + return rawLabel + if isinstance(rawLabel, dict): + for key in ("en", "de", "fr"): + value = rawLabel.get(key) + if isinstance(value, str) and value: + return value + for value in rawLabel.values(): + if isinstance(value, str) and value: + return value + return fallback async def _listAvailableNodeTypes(params: Dict[str, Any], context: Any) -> ToolResult: """List all available node types for the flow builder.""" + name = "listAvailableNodeTypes" try: from modules.features.graphicalEditor.nodeDefinitions import STATIC_NODE_TYPES - nodeTypes = [ - {"id": n.get("id"), "category": n.get("category"), "label": n.get("label", {}).get("en", n.get("id"))} - for n in STATIC_NODE_TYPES - ] - return ToolResult(success=True, data={"nodeTypes": nodeTypes, "count": len(nodeTypes)}) + nodeTypes = [] + for n in STATIC_NODE_TYPES: + if not isinstance(n, dict): + continue + nodeId = n.get("id") or "" + nodeTypes.append({ + "id": nodeId, + "category": n.get("category"), + "label": _coerceLabel(n.get("label"), nodeId), + }) + return _ok(name, {"nodeTypes": nodeTypes, "count": len(nodeTypes)}) except Exception as e: logger.exception("listAvailableNodeTypes failed: %s", e) - return ToolResult(success=False, error=str(e)) + return _err(name, str(e)) async def _validateGraph(params: Dict[str, Any], context: Any) -> ToolResult: """Validate a workflow graph for common issues.""" + name = "validateGraph" try: - workflowId = params.get("workflowId") - instanceId = params.get("instanceId") + workflowId, instanceId = _resolveIds(params, context) if not workflowId or not instanceId: - return ToolResult(success=False, error="workflowId and instanceId required") + return _err(name, "workflowId and instanceId required") - from modules.features.graphicalEditor.interfaceFeatureGraphicalEditor import getGraphicalEditorInterface - user = getattr(context, "user", None) - mandateId = getattr(context, "mandateId", "") or "" - iface = getGraphicalEditorInterface(user, mandateId, instanceId) + iface = _getInterface(context, instanceId) wf = iface.getWorkflow(workflowId) if not wf: - return ToolResult(success=False, error=f"Workflow {workflowId} not found") + return _err(name, f"Workflow {workflowId} not found") - graph = wf.get("graph", {}) - nodes = graph.get("nodes", []) - connections = graph.get("connections", []) + graph = wf.get("graph", {}) or {} + nodes = graph.get("nodes", []) or [] + connections = graph.get("connections", []) or [] issues: List[str] = [] nodeIds = {n.get("id") for n in nodes} if not nodes: issues.append("Graph has no nodes") - hasTrigger = any(n.get("type", "").startswith("trigger.") for n in nodes) + hasTrigger = any((n.get("type") or "").startswith("trigger.") for n in nodes) if not hasTrigger: issues.append("No trigger node found") @@ -260,64 +344,59 @@ async def _validateGraph(params: Dict[str, Any], context: Any) -> ToolResult: for c in connections: connectedNodes.add(c.get("source")) connectedNodes.add(c.get("target")) - orphans = [n.get("id") for n in nodes if n.get("id") not in connectedNodes and not n.get("type", "").startswith("trigger.")] + orphans = [ + n.get("id") for n in nodes + if n.get("id") not in connectedNodes and not (n.get("type") or "").startswith("trigger.") + ] if orphans: issues.append(f"Orphan nodes (not connected): {', '.join(orphans)}") - return ToolResult( - success=True, - data={ - "valid": len(issues) == 0, - "issues": issues, - "nodeCount": len(nodes), - "connectionCount": len(connections), - }, - ) + return _ok(name, { + "valid": len(issues) == 0, + "issues": issues, + "nodeCount": len(nodes), + "connectionCount": len(connections), + }) except Exception as e: logger.exception("validateGraph failed: %s", e) - return ToolResult(success=False, error=str(e)) + return _err(name, str(e)) async def _listWorkflowHistory(params: Dict[str, Any], context: Any) -> ToolResult: """List versions (history) for a workflow.""" + name = "listWorkflowHistory" try: - workflowId = params.get("workflowId", "") - instanceId = params.get("instanceId", "") - from modules.features.graphicalEditor.interfaceFeatureGraphicalEditor import getGraphicalEditorInterface - user = getattr(context, "user", None) - mandateId = getattr(context, "mandateId", "") or "" - iface = getGraphicalEditorInterface(user, mandateId, instanceId) - versions = iface.getVersions(workflowId) - return ToolResult( - success=True, - data={ - "workflowId": workflowId, - "versions": [ - { - "id": v.get("id"), - "versionNumber": v.get("versionNumber"), - "status": v.get("status"), - "publishedAt": v.get("publishedAt"), - "publishedBy": v.get("publishedBy"), - } - for v in versions - ], - }, - ) + workflowId, instanceId = _resolveIds(params, context) + if not workflowId or not instanceId: + return _err(name, "workflowId and instanceId required") + iface = _getInterface(context, instanceId) + versions = iface.getVersions(workflowId) or [] + return _ok(name, { + "workflowId": workflowId, + "versions": [ + { + "id": v.get("id"), + "versionNumber": v.get("versionNumber"), + "status": v.get("status"), + "publishedAt": v.get("publishedAt"), + "publishedBy": v.get("publishedBy"), + } + for v in versions + ], + }) except Exception as e: logger.exception("listWorkflowHistory failed: %s", e) - return ToolResult(success=False, error=str(e)) + return _err(name, str(e)) async def _readWorkflowMessages(params: Dict[str, Any], context: Any) -> ToolResult: """Read recent run logs/messages for a workflow.""" + name = "readWorkflowMessages" try: - workflowId = params.get("workflowId", "") - instanceId = params.get("instanceId", "") - from modules.features.graphicalEditor.interfaceFeatureGraphicalEditor import getGraphicalEditorInterface - user = getattr(context, "user", None) - mandateId = getattr(context, "mandateId", "") or "" - iface = getGraphicalEditorInterface(user, mandateId, instanceId) + workflowId, instanceId = _resolveIds(params, context) + if not workflowId or not instanceId: + return _err(name, "workflowId and instanceId required") + iface = _getInterface(context, instanceId) from modules.features.graphicalEditor.datamodelFeatureGraphicalEditor import AutoRun runs = iface.db.getRecordset(AutoRun, recordFilter={"workflowId": workflowId}) or [] runSummaries = [] @@ -329,104 +408,106 @@ async def _readWorkflowMessages(params: Dict[str, Any], context: Any) -> ToolRes "completedAt": r.get("completedAt"), "error": r.get("error"), }) - return ToolResult( - success=True, - data={"workflowId": workflowId, "recentRuns": runSummaries}, - ) + return _ok(name, {"workflowId": workflowId, "recentRuns": runSummaries}) except Exception as e: logger.exception("readWorkflowMessages failed: %s", e) - return ToolResult(success=False, error=str(e)) + return _err(name, str(e)) def getWorkflowToolDefinitions() -> List[Dict[str, Any]]: - """Return tool definitions for registration in the ToolRegistry.""" + """Return tool definitions for registration in the ToolRegistry. + + Note: ``workflowId`` and ``instanceId`` are NOT marked ``required`` — + they are auto-injected from the agent context by ``_resolveIds``. The + model may still pass them explicitly (e.g. to target a different + workflow) but doesn't have to repeat them on every call. + """ + _idFields = { + "workflowId": {"type": "string", "description": "Workflow ID (defaults to the current editor workflow)"}, + "instanceId": {"type": "string", "description": "Feature instance ID (defaults to the current editor instance)"}, + } return [ { "name": "readWorkflowGraph", "handler": _readWorkflowGraph, - "description": "Read the current workflow graph (nodes and connections)", + "description": "Read the current workflow graph (nodes and connections). Always call this first to understand the current state before making changes.", "parameters": { "type": "object", - "properties": { - "workflowId": {"type": "string", "description": "Workflow ID"}, - "instanceId": {"type": "string", "description": "Feature instance ID"}, - }, - "required": ["workflowId", "instanceId"], + "properties": {**_idFields}, + "required": [], }, + "readOnly": True, "toolSet": TOOLBOX_ID, }, { "name": "addNode", "handler": _addNode, - "description": "Add a node to the workflow graph", + "description": "Add a node to the workflow graph.", "parameters": { "type": "object", "properties": { - "workflowId": {"type": "string"}, - "instanceId": {"type": "string"}, - "nodeType": {"type": "string", "description": "Node type (e.g. ai.chat, email.send)"}, + **_idFields, + "nodeType": {"type": "string", "description": "Node type id (e.g. ai.chat, email.send) — use listAvailableNodeTypes to discover"}, "title": {"type": "string", "description": "Human-readable title"}, "parameters": {"type": "object", "description": "Node parameters"}, "position": {"type": "object", "description": "Canvas position {x, y}"}, + "nodeId": {"type": "string", "description": "Optional explicit node id"}, }, - "required": ["workflowId", "instanceId", "nodeType"], + "required": ["nodeType"], }, "toolSet": TOOLBOX_ID, }, { "name": "removeNode", "handler": _removeNode, - "description": "Remove a node and its connections from the graph", + "description": "Remove a node and its connections from the graph.", "parameters": { "type": "object", "properties": { - "workflowId": {"type": "string"}, - "instanceId": {"type": "string"}, + **_idFields, "nodeId": {"type": "string", "description": "ID of the node to remove"}, }, - "required": ["workflowId", "instanceId", "nodeId"], + "required": ["nodeId"], }, "toolSet": TOOLBOX_ID, }, { "name": "connectNodes", "handler": _connectNodes, - "description": "Connect two nodes in the graph", + "description": "Connect two nodes in the graph (source -> target).", "parameters": { "type": "object", "properties": { - "workflowId": {"type": "string"}, - "instanceId": {"type": "string"}, + **_idFields, "sourceId": {"type": "string"}, "targetId": {"type": "string"}, "sourceOutput": {"type": "integer", "default": 0}, "targetInput": {"type": "integer", "default": 0}, }, - "required": ["workflowId", "instanceId", "sourceId", "targetId"], + "required": ["sourceId", "targetId"], }, "toolSet": TOOLBOX_ID, }, { "name": "setNodeParameter", "handler": _setNodeParameter, - "description": "Set a parameter on a node", + "description": "Set a single parameter on a node.", "parameters": { "type": "object", "properties": { - "workflowId": {"type": "string"}, - "instanceId": {"type": "string"}, + **_idFields, "nodeId": {"type": "string"}, "parameterName": {"type": "string"}, "parameterValue": {"description": "Value to set (any type)"}, }, - "required": ["workflowId", "instanceId", "nodeId", "parameterName", "parameterValue"], + "required": ["nodeId", "parameterName", "parameterValue"], }, "toolSet": TOOLBOX_ID, }, { "name": "listAvailableNodeTypes", "handler": _listAvailableNodeTypes, - "description": "List all available node types for the flow builder", + "description": "List all available node types for the flow builder. Call this once to discover ids before using addNode.", "parameters": {"type": "object", "properties": {}}, "readOnly": True, "toolSet": TOOLBOX_ID, @@ -434,14 +515,11 @@ def getWorkflowToolDefinitions() -> List[Dict[str, Any]]: { "name": "validateGraph", "handler": _validateGraph, - "description": "Validate a workflow graph for common issues", + "description": "Validate a workflow graph for common issues (missing trigger, dangling connections, orphans).", "parameters": { "type": "object", - "properties": { - "workflowId": {"type": "string"}, - "instanceId": {"type": "string"}, - }, - "required": ["workflowId", "instanceId"], + "properties": {**_idFields}, + "required": [], }, "readOnly": True, "toolSet": TOOLBOX_ID, @@ -449,14 +527,11 @@ def getWorkflowToolDefinitions() -> List[Dict[str, Any]]: { "name": "listWorkflowHistory", "handler": _listWorkflowHistory, - "description": "List version history for a workflow (AutoVersion entries)", + "description": "List version history for a workflow (AutoVersion entries).", "parameters": { "type": "object", - "properties": { - "workflowId": {"type": "string"}, - "instanceId": {"type": "string"}, - }, - "required": ["workflowId", "instanceId"], + "properties": {**_idFields}, + "required": [], }, "readOnly": True, "toolSet": TOOLBOX_ID, @@ -464,14 +539,11 @@ def getWorkflowToolDefinitions() -> List[Dict[str, Any]]: { "name": "readWorkflowMessages", "handler": _readWorkflowMessages, - "description": "Read recent run logs and status for a workflow", + "description": "Read recent run logs and status for a workflow.", "parameters": { "type": "object", - "properties": { - "workflowId": {"type": "string"}, - "instanceId": {"type": "string"}, - }, - "required": ["workflowId", "instanceId"], + "properties": {**_idFields}, + "required": [], }, "readOnly": True, "toolSet": TOOLBOX_ID,