# Copyright (c) 2025 Patrick Motsch # All rights reserved. """ Workflow Toolbox - AI-assisted graph manipulation tools for the GraphicalEditor. Tools: readWorkflowGraph, addNode, removeNode, connectNodes, setNodeParameter, listAvailableNodeTypes, describeNodeType, autoLayoutWorkflow, validateGraph, listWorkflowHistory, readWorkflowMessages. Conventions enforced here (matches coreTools / actionToolAdapter): - Every ``ToolResult(...)`` provides ``toolCallId`` and ``toolName`` (pydantic requires both); ``ToolRegistry.dispatch`` overwrites ``toolCallId`` later but the model still validates at construction. - ``ToolResult.data`` is a ``str``; structured payloads are JSON-encoded. - ``workflowId`` and ``instanceId`` are auto-injected from the agent ``context`` dict (``workflowId``, ``featureInstanceId``) when the model omits them — the editor agent always runs in exactly one workflow. """ import json import logging import uuid from typing import Dict, Any, List, Tuple from modules.serviceCenter.services.serviceAgent.datamodelAgent import ToolResult logger = logging.getLogger(__name__) TOOLBOX_ID = "workflow" def _toData(payload: Any) -> str: """Encode a structured payload into ToolResult.data (which is a string).""" if isinstance(payload, str): return payload try: return json.dumps(payload, default=str, ensure_ascii=False) except Exception: return str(payload) def _err(toolName: str, message: str) -> ToolResult: return ToolResult(toolCallId="", toolName=toolName, success=False, error=message) def _ok(toolName: str, payload: Any) -> ToolResult: return ToolResult(toolCallId="", toolName=toolName, success=True, data=_toData(payload)) def _resolveIds(params: Dict[str, Any], context: Any) -> Tuple[str, str]: """Return (workflowId, instanceId), auto-injecting from context when missing. The editor agent context (``agentLoop._executeToolCalls``) is a dict with ``workflowId`` and ``featureInstanceId`` — use them as defaults so the model doesn't have to re-state the ids on every tool call. """ ctx: Dict[str, Any] = context if isinstance(context, dict) else {} workflowId = params.get("workflowId") or ctx.get("workflowId") or "" instanceId = ( params.get("instanceId") or ctx.get("featureInstanceId") or ctx.get("instanceId") or "" ) return workflowId, instanceId def _resolveUser(context: Any): """Return the User object for the current agent context (lazy DB fetch).""" if not isinstance(context, dict): return getattr(context, "user", None) user = context.get("user") if user is not None: return user userId = context.get("userId") if not userId: return None try: from modules.interfaces.interfaceDbApp import getRootInterface return getRootInterface().getUser(str(userId)) except Exception as e: logger.warning("workflowTools: could not resolve user %s: %s", userId, e) return None def _resolveMandateId(context: Any) -> str: if not isinstance(context, dict): return getattr(context, "mandateId", "") or "" return context.get("mandateId") or "" def _getInterface(context: Any, instanceId: str): from modules.features.graphicalEditor.interfaceFeatureGraphicalEditor import getGraphicalEditorInterface return getGraphicalEditorInterface(_resolveUser(context), _resolveMandateId(context), instanceId) async def _readWorkflowGraph(params: Dict[str, Any], context: Any) -> ToolResult: """Read the current workflow graph (nodes and connections).""" name = "readWorkflowGraph" try: workflowId, instanceId = _resolveIds(params, context) if not workflowId or not instanceId: return _err(name, "workflowId and instanceId required (and not present in agent context)") iface = _getInterface(context, instanceId) wf = iface.getWorkflow(workflowId) if not wf: return _err(name, f"Workflow {workflowId} not found") graph = wf.get("graph", {}) or {} nodes = graph.get("nodes", []) or [] connections = graph.get("connections", []) or [] return _ok(name, { "workflowId": workflowId, "label": wf.get("label", ""), "nodeCount": len(nodes), "connectionCount": len(connections), "nodes": [ {"id": n.get("id"), "type": n.get("type"), "title": n.get("title", "")} for n in nodes ], "connections": connections, }) except Exception as e: logger.exception("readWorkflowGraph failed: %s", e) return _err(name, str(e)) async def _addNode(params: Dict[str, Any], context: Any) -> ToolResult: """Add a node to the workflow graph.""" name = "addNode" try: workflowId, instanceId = _resolveIds(params, context) nodeType = params.get("nodeType") if not workflowId or not instanceId or not nodeType: return _err(name, "workflowId, instanceId, and nodeType required") iface = _getInterface(context, instanceId) wf = iface.getWorkflow(workflowId) if not wf: return _err(name, f"Workflow {workflowId} not found") graph = dict(wf.get("graph", {}) or {}) nodes = list(graph.get("nodes", []) or []) nodeId = params.get("nodeId") or str(uuid.uuid4())[:8] title = params.get("title", "") nodeParams = params.get("parameters", {}) or {} # Frontend stores positions as TOP-LEVEL ``x`` / ``y`` on the node # (see ``fromApiGraph`` / ``toApiGraph``). Accept either explicit # ``x`` / ``y`` or a ``position={x,y}`` shape from the model and # always persist as top-level ``x`` / ``y``. Fallback puts new # nodes in a horizontal stripe so the user sees them even before # ``autoLayoutWorkflow`` runs. position = params.get("position") or {} x = params.get("x") if x is None: x = position.get("x") if isinstance(position, dict) else None if x is None: x = 40 + len(nodes) * 260 y = params.get("y") if y is None: y = position.get("y") if isinstance(position, dict) else None if y is None: y = 40 newNode = { "id": nodeId, "type": nodeType, "title": title, "parameters": nodeParams, "x": x, "y": y, } nodes.append(newNode) graph["nodes"] = nodes iface.updateWorkflow(workflowId, {"graph": graph}) return _ok(name, { "nodeId": nodeId, "nodeType": nodeType, "message": f"Node '{title or nodeType}' added", }) except Exception as e: logger.exception("addNode failed: %s", e) return _err(name, str(e)) async def _removeNode(params: Dict[str, Any], context: Any) -> ToolResult: """Remove a node and its connections from the workflow graph.""" name = "removeNode" try: workflowId, instanceId = _resolveIds(params, context) nodeId = params.get("nodeId") if not workflowId or not instanceId or not nodeId: return _err(name, "workflowId, instanceId, and nodeId required") iface = _getInterface(context, instanceId) wf = iface.getWorkflow(workflowId) if not wf: return _err(name, f"Workflow {workflowId} not found") graph = dict(wf.get("graph", {}) or {}) nodes = [n for n in (graph.get("nodes", []) or []) if n.get("id") != nodeId] connections = [ c for c in (graph.get("connections", []) or []) if c.get("source") != nodeId and c.get("target") != nodeId ] graph["nodes"] = nodes graph["connections"] = connections iface.updateWorkflow(workflowId, {"graph": graph}) return _ok(name, {"nodeId": nodeId, "message": f"Node {nodeId} removed"}) except Exception as e: logger.exception("removeNode failed: %s", e) return _err(name, str(e)) async def _connectNodes(params: Dict[str, Any], context: Any) -> ToolResult: """Connect two nodes in the workflow graph.""" name = "connectNodes" try: workflowId, instanceId = _resolveIds(params, context) sourceId = params.get("sourceId") targetId = params.get("targetId") if not workflowId or not instanceId or not sourceId or not targetId: return _err(name, "workflowId, instanceId, sourceId, and targetId required") iface = _getInterface(context, instanceId) wf = iface.getWorkflow(workflowId) if not wf: return _err(name, f"Workflow {workflowId} not found") graph = dict(wf.get("graph", {}) or {}) connections = list(graph.get("connections", []) or []) newConn = { "source": sourceId, "target": targetId, "sourceOutput": params.get("sourceOutput", 0), "targetInput": params.get("targetInput", 0), } connections.append(newConn) graph["connections"] = connections iface.updateWorkflow(workflowId, {"graph": graph}) return _ok(name, {"connection": newConn, "message": f"Connected {sourceId} -> {targetId}"}) except Exception as e: logger.exception("connectNodes failed: %s", e) return _err(name, str(e)) async def _setNodeParameter(params: Dict[str, Any], context: Any) -> ToolResult: """Set a parameter on a node.""" name = "setNodeParameter" try: workflowId, instanceId = _resolveIds(params, context) nodeId = params.get("nodeId") paramName = params.get("parameterName") paramValue = params.get("parameterValue") if not workflowId or not instanceId or not nodeId or not paramName: return _err(name, "workflowId, instanceId, nodeId, and parameterName required") iface = _getInterface(context, instanceId) wf = iface.getWorkflow(workflowId) if not wf: return _err(name, f"Workflow {workflowId} not found") graph = dict(wf.get("graph", {}) or {}) nodes = list(graph.get("nodes", []) or []) found = False for n in nodes: if n.get("id") == nodeId: nodeParams = dict(n.get("parameters", {}) or {}) nodeParams[paramName] = paramValue n["parameters"] = nodeParams found = True break if not found: return _err(name, f"Node {nodeId} not found in graph") graph["nodes"] = nodes iface.updateWorkflow(workflowId, {"graph": graph}) return _ok(name, { "nodeId": nodeId, "parameter": paramName, "message": f"Parameter '{paramName}' set", }) except Exception as e: logger.exception("setNodeParameter failed: %s", e) return _err(name, str(e)) def _coerceLabel(rawLabel: Any, fallback: str) -> str: """Normalize a node label which may be a string, dict {locale: str}, or other.""" if isinstance(rawLabel, str): return rawLabel if isinstance(rawLabel, dict): for key in ("en", "de", "fr"): value = rawLabel.get(key) if isinstance(value, str) and value: return value for value in rawLabel.values(): if isinstance(value, str) and value: return value return fallback def _summarizeNodeForCatalog(n: Dict[str, Any]) -> Dict[str, Any]: """Compact summary used in ``listAvailableNodeTypes`` — small but informative enough that the model can pick the right type and knows whether ``describeNodeType`` is worth a follow-up call.""" nodeId = n.get("id") or "" paramsList = n.get("parameters") or [] requiredCount = sum(1 for p in paramsList if isinstance(p, dict) and p.get("required")) return { "id": nodeId, "category": n.get("category"), "label": _coerceLabel(n.get("label"), nodeId), "description": _coerceLabel(n.get("description"), ""), "paramCount": len(paramsList), "requiredParamCount": requiredCount, "usesAi": bool(((n.get("meta") or {}).get("usesAi"))), } def _summarizeParameter(p: Dict[str, Any]) -> Dict[str, Any]: """Reduce a node parameter spec to just what the AI needs to fill it.""" out: Dict[str, Any] = { "name": p.get("name"), "type": p.get("type"), "required": bool(p.get("required")), "frontendType": p.get("frontendType"), "description": _coerceLabel(p.get("description"), ""), } if "default" in p: out["default"] = p.get("default") feOpts = p.get("frontendOptions") if isinstance(feOpts, dict): # Expose enum-style choices ("options") so the model sticks to allowed values. if isinstance(feOpts.get("options"), list): out["allowedValues"] = feOpts.get("options") if p.get("frontendType") == "userConnection": out["hint"] = ( "Call listConnections to discover available connections; pass the " "connectionId here. Required before this node can run." ) return out async def _listAvailableNodeTypes(params: Dict[str, Any], context: Any) -> ToolResult: """List all available node types for the flow builder (compact catalog). Returns ``id``, ``category``, ``label``, short ``description``, and the parameter counts. To learn HOW to fill a node's parameters use ``describeNodeType(nodeType=...)`` — that returns the full schema. """ name = "listAvailableNodeTypes" try: from modules.features.graphicalEditor.nodeDefinitions import STATIC_NODE_TYPES nodeTypes = [] for n in STATIC_NODE_TYPES: if not isinstance(n, dict): continue nodeTypes.append(_summarizeNodeForCatalog(n)) return _ok(name, {"nodeTypes": nodeTypes, "count": len(nodeTypes)}) except Exception as e: logger.exception("listAvailableNodeTypes failed: %s", e) return _err(name, str(e)) async def _describeNodeType(params: Dict[str, Any], context: Any) -> ToolResult: """Return the full schema for a single node type so the AI can fill ``addNode.parameters`` correctly (which fields are required, what types, default values, allowed enum values, what each port expects/produces). This is the canonical way to discover required parameters before calling ``addNode`` — without it the model guesses ``parameters={}`` and the user gets empty configuration cards. """ name = "describeNodeType" try: nodeType = params.get("nodeType") or params.get("id") if not nodeType: return _err(name, "nodeType required") from modules.features.graphicalEditor.nodeDefinitions import STATIC_NODE_TYPES target: Dict[str, Any] = {} for n in STATIC_NODE_TYPES: if isinstance(n, dict) and n.get("id") == nodeType: target = n break if not target: return _err(name, f"Unknown nodeType '{nodeType}' — call listAvailableNodeTypes first") rawParams = target.get("parameters") or [] parameters = [ _summarizeParameter(p) for p in rawParams if isinstance(p, dict) ] def _portList(portsDict: Any) -> List[Dict[str, Any]]: if not isinstance(portsDict, dict): return [] out: List[Dict[str, Any]] = [] for idx, spec in sorted(portsDict.items(), key=lambda kv: int(kv[0]) if str(kv[0]).isdigit() else 0): if not isinstance(spec, dict): continue entry: Dict[str, Any] = {"index": int(idx) if str(idx).isdigit() else idx} if "schema" in spec: entry["schema"] = spec.get("schema") if "accepts" in spec: entry["accepts"] = spec.get("accepts") out.append(entry) return out meta = target.get("meta") or {} return _ok(name, { "id": target.get("id"), "category": target.get("category"), "label": _coerceLabel(target.get("label"), target.get("id") or ""), "description": _coerceLabel(target.get("description"), ""), "usesAi": bool(meta.get("usesAi")), "inputs": int(target.get("inputs") or 0), "outputs": int(target.get("outputs") or 0), "inputPorts": _portList(target.get("inputPorts")), "outputPorts": _portList(target.get("outputPorts")), "parameters": parameters, "requiredParameters": [p["name"] for p in parameters if p.get("required")], }) except Exception as e: logger.exception("describeNodeType failed: %s", e) return _err(name, str(e)) # Geometry constants — MUST match the frontend (FlowCanvas.tsx) so the # server-side auto-layout produces the exact same coordinates the user # would get by clicking "Arrange" in the UI. _NODE_WIDTH = 200 _NODE_HEIGHT = 72 _LAYOUT_V_GAP = 80 _LAYOUT_H_GAP = 60 _LAYOUT_START_X = 40 _LAYOUT_START_Y = 40 def _computeAutoLayout( nodes: List[Dict[str, Any]], connections: List[Dict[str, Any]], ) -> List[Dict[str, Any]]: """Topological-layer layout — port of ``computeAutoLayout`` in FlowCanvas.tsx. Arranges nodes top-to-bottom in layers (one layer per BFS step from the sources). Disconnected nodes are appended as extra single-node layers, same as the frontend. Returns a NEW node list with updated top-level ``x``/``y``; legacy ``position`` keys are stripped to avoid two competing sources of truth. """ if not nodes: return nodes nodeIds = {n.get("id") for n in nodes if n.get("id")} inDegree: Dict[str, int] = {nid: 0 for nid in nodeIds if nid} children: Dict[str, List[str]] = {nid: [] for nid in nodeIds if nid} for c in connections or []: src = c.get("source") tgt = c.get("target") if src in inDegree and tgt in inDegree: inDegree[tgt] = inDegree[tgt] + 1 children[src].append(tgt) layers: List[List[str]] = [] layerOf: Dict[str, int] = {} queue: List[str] = [nid for nid, deg in inDegree.items() if deg == 0] while queue: batch = list(queue) queue = [] layerIdx = len(layers) layers.append(batch) for nid in batch: layerOf[nid] = layerIdx for childId in children.get(nid, []): inDegree[childId] = inDegree[childId] - 1 if inDegree[childId] == 0: queue.append(childId) # Cycles: append remaining nodes as their own layers (matches frontend). for n in nodes: nid = n.get("id") if nid and nid not in layerOf: layerIdx = len(layers) layers.append([nid]) layerOf[nid] = layerIdx out: List[Dict[str, Any]] = [] for n in nodes: nid = n.get("id") layer = layerOf.get(nid, 0) if nid else 0 siblings = layers[layer] if 0 <= layer < len(layers) else [nid] idxInLayer = siblings.index(nid) if nid in siblings else 0 new = dict(n) new["x"] = _LAYOUT_START_X + idxInLayer * (_NODE_WIDTH + _LAYOUT_H_GAP) new["y"] = _LAYOUT_START_Y + layer * (_NODE_HEIGHT + _LAYOUT_V_GAP) # Strip legacy ``position`` so frontend never sees two coordinates. new.pop("position", None) out.append(new) return out async def _autoLayoutWorkflow(params: Dict[str, Any], context: Any) -> ToolResult: """Re-arrange all nodes of the workflow into a clean top-down layered layout. Same algorithm as the editor's "Arrange" button — call this after you finished adding/connecting nodes so the user doesn't see an unreadable pile of overlapping boxes. """ name = "autoLayoutWorkflow" try: workflowId, instanceId = _resolveIds(params, context) if not workflowId or not instanceId: return _err(name, "workflowId and instanceId required (and not present in agent context)") iface = _getInterface(context, instanceId) wf = iface.getWorkflow(workflowId) if not wf: return _err(name, f"Workflow {workflowId} not found") graph = dict(wf.get("graph", {}) or {}) nodes = list(graph.get("nodes", []) or []) connections = list(graph.get("connections", []) or []) if not nodes: return _ok(name, {"message": "No nodes to layout", "nodeCount": 0}) graph["nodes"] = _computeAutoLayout(nodes, connections) iface.updateWorkflow(workflowId, {"graph": graph}) return _ok(name, { "message": f"Auto-layout applied to {len(nodes)} nodes", "nodeCount": len(nodes), "layerCount": max((c.get("y", 0) for c in graph["nodes"]), default=_LAYOUT_START_Y) // (_NODE_HEIGHT + _LAYOUT_V_GAP) + 1, }) except Exception as e: logger.exception("autoLayoutWorkflow failed: %s", e) return _err(name, str(e)) async def _validateGraph(params: Dict[str, Any], context: Any) -> ToolResult: """Validate a workflow graph for common issues.""" name = "validateGraph" try: workflowId, instanceId = _resolveIds(params, context) if not workflowId or not instanceId: return _err(name, "workflowId and instanceId required") iface = _getInterface(context, instanceId) wf = iface.getWorkflow(workflowId) if not wf: return _err(name, f"Workflow {workflowId} not found") graph = wf.get("graph", {}) or {} nodes = graph.get("nodes", []) or [] connections = graph.get("connections", []) or [] issues: List[str] = [] nodeIds = {n.get("id") for n in nodes} if not nodes: issues.append("Graph has no nodes") hasTrigger = any((n.get("type") or "").startswith("trigger.") for n in nodes) if not hasTrigger: issues.append("No trigger node found") for c in connections: if c.get("source") not in nodeIds: issues.append(f"Connection source '{c.get('source')}' not found") if c.get("target") not in nodeIds: issues.append(f"Connection target '{c.get('target')}' not found") connectedNodes = set() for c in connections: connectedNodes.add(c.get("source")) connectedNodes.add(c.get("target")) orphans = [ n.get("id") for n in nodes if n.get("id") not in connectedNodes and not (n.get("type") or "").startswith("trigger.") ] if orphans: issues.append(f"Orphan nodes (not connected): {', '.join(orphans)}") return _ok(name, { "valid": len(issues) == 0, "issues": issues, "nodeCount": len(nodes), "connectionCount": len(connections), }) except Exception as e: logger.exception("validateGraph failed: %s", e) return _err(name, str(e)) async def _listWorkflowHistory(params: Dict[str, Any], context: Any) -> ToolResult: """List versions (history) for a workflow.""" name = "listWorkflowHistory" try: workflowId, instanceId = _resolveIds(params, context) if not workflowId or not instanceId: return _err(name, "workflowId and instanceId required") iface = _getInterface(context, instanceId) versions = iface.getVersions(workflowId) or [] return _ok(name, { "workflowId": workflowId, "versions": [ { "id": v.get("id"), "versionNumber": v.get("versionNumber"), "status": v.get("status"), "publishedAt": v.get("publishedAt"), "publishedBy": v.get("publishedBy"), } for v in versions ], }) except Exception as e: logger.exception("listWorkflowHistory failed: %s", e) return _err(name, str(e)) async def _readWorkflowMessages(params: Dict[str, Any], context: Any) -> ToolResult: """Read recent run logs/messages for a workflow.""" name = "readWorkflowMessages" try: workflowId, instanceId = _resolveIds(params, context) if not workflowId or not instanceId: return _err(name, "workflowId and instanceId required") iface = _getInterface(context, instanceId) from modules.features.graphicalEditor.datamodelFeatureGraphicalEditor import AutoRun runs = iface.db.getRecordset(AutoRun, recordFilter={"workflowId": workflowId}) or [] runSummaries = [] for r in sorted(runs, key=lambda x: x.get("startedAt") or 0, reverse=True)[:10]: runSummaries.append({ "runId": r.get("id"), "status": r.get("status"), "startedAt": r.get("startedAt"), "completedAt": r.get("completedAt"), "error": r.get("error"), }) return _ok(name, {"workflowId": workflowId, "recentRuns": runSummaries}) except Exception as e: logger.exception("readWorkflowMessages failed: %s", e) return _err(name, str(e)) def getWorkflowToolDefinitions() -> List[Dict[str, Any]]: """Return tool definitions for registration in the ToolRegistry. Note: ``workflowId`` and ``instanceId`` are NOT marked ``required`` — they are auto-injected from the agent context by ``_resolveIds``. The model may still pass them explicitly (e.g. to target a different workflow) but doesn't have to repeat them on every call. """ _idFields = { "workflowId": {"type": "string", "description": "Workflow ID (defaults to the current editor workflow)"}, "instanceId": {"type": "string", "description": "Feature instance ID (defaults to the current editor instance)"}, } return [ { "name": "readWorkflowGraph", "handler": _readWorkflowGraph, "description": "Read the current workflow graph (nodes and connections). Always call this first to understand the current state before making changes.", "parameters": { "type": "object", "properties": {**_idFields}, "required": [], }, "readOnly": True, "toolSet": TOOLBOX_ID, }, { "name": "addNode", "handler": _addNode, "description": "Add a node to the workflow graph.", "parameters": { "type": "object", "properties": { **_idFields, "nodeType": {"type": "string", "description": "Node type id (e.g. ai.chat, email.send) — use listAvailableNodeTypes to discover"}, "title": {"type": "string", "description": "Human-readable title"}, "parameters": {"type": "object", "description": "Node parameters"}, "position": {"type": "object", "description": "Canvas position {x, y}"}, "nodeId": {"type": "string", "description": "Optional explicit node id"}, }, "required": ["nodeType"], }, "toolSet": TOOLBOX_ID, }, { "name": "removeNode", "handler": _removeNode, "description": "Remove a node and its connections from the graph.", "parameters": { "type": "object", "properties": { **_idFields, "nodeId": {"type": "string", "description": "ID of the node to remove"}, }, "required": ["nodeId"], }, "toolSet": TOOLBOX_ID, }, { "name": "connectNodes", "handler": _connectNodes, "description": "Connect two nodes in the graph (source -> target).", "parameters": { "type": "object", "properties": { **_idFields, "sourceId": {"type": "string"}, "targetId": {"type": "string"}, "sourceOutput": {"type": "integer", "default": 0}, "targetInput": {"type": "integer", "default": 0}, }, "required": ["sourceId", "targetId"], }, "toolSet": TOOLBOX_ID, }, { "name": "setNodeParameter", "handler": _setNodeParameter, "description": "Set a single parameter on a node.", "parameters": { "type": "object", "properties": { **_idFields, "nodeId": {"type": "string"}, "parameterName": {"type": "string"}, "parameterValue": {"description": "Value to set (any type)"}, }, "required": ["nodeId", "parameterName", "parameterValue"], }, "toolSet": TOOLBOX_ID, }, { "name": "listAvailableNodeTypes", "handler": _listAvailableNodeTypes, "description": ( "List all available node types (compact catalog: id, label, " "description, paramCount, requiredParamCount, usesAi). Call this " "once to discover ids; then call describeNodeType for each type " "you intend to add to learn the parameter schema." ), "parameters": {"type": "object", "properties": {}}, "readOnly": True, "toolSet": TOOLBOX_ID, }, { "name": "describeNodeType", "handler": _describeNodeType, "description": ( "Return the FULL parameter schema for a single node type " "(name, type, required, default, allowedValues, description) " "plus input/output ports. ALWAYS call this before addNode for " "any node type that has requiredParamCount > 0, and pass all " "required parameters into addNode — otherwise the user sees an " "empty configuration card. For parameters with " "frontendType='userConnection' call listConnections to obtain " "a connectionId." ), "parameters": { "type": "object", "properties": { "nodeType": {"type": "string", "description": "Node type id from listAvailableNodeTypes (e.g. 'email.checkEmail', 'ai.prompt')"}, }, "required": ["nodeType"], }, "readOnly": True, "toolSet": TOOLBOX_ID, }, { "name": "autoLayoutWorkflow", "handler": _autoLayoutWorkflow, "description": ( "Re-arrange ALL nodes into a clean top-down layered layout " "(same algorithm as the editor's 'Arrange' button). Call this " "AFTER you finished adding nodes and connections, otherwise the " "user sees a pile of overlapping boxes. Idempotent — safe to " "call multiple times." ), "parameters": { "type": "object", "properties": {**_idFields}, "required": [], }, "toolSet": TOOLBOX_ID, }, { "name": "validateGraph", "handler": _validateGraph, "description": "Validate a workflow graph for common issues (missing trigger, dangling connections, orphans).", "parameters": { "type": "object", "properties": {**_idFields}, "required": [], }, "readOnly": True, "toolSet": TOOLBOX_ID, }, { "name": "listWorkflowHistory", "handler": _listWorkflowHistory, "description": "List version history for a workflow (AutoVersion entries).", "parameters": { "type": "object", "properties": {**_idFields}, "required": [], }, "readOnly": True, "toolSet": TOOLBOX_ID, }, { "name": "readWorkflowMessages", "handler": _readWorkflowMessages, "description": "Read recent run logs and status for a workflow.", "parameters": { "type": "object", "properties": {**_idFields}, "required": [], }, "readOnly": True, "toolSet": TOOLBOX_ID, }, ]