gateway/modules/serviceCenter/services/serviceAgent/workflowTools.py
2026-04-19 01:31:57 +02:00

832 lines
32 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Workflow Toolbox - AI-assisted graph manipulation tools for the GraphicalEditor.
Tools: readWorkflowGraph, addNode, removeNode, connectNodes, setNodeParameter,
listAvailableNodeTypes, describeNodeType, autoLayoutWorkflow,
validateGraph, listWorkflowHistory, readWorkflowMessages.
Conventions enforced here (matches coreTools / actionToolAdapter):
- Every ``ToolResult(...)`` provides ``toolCallId`` and ``toolName`` (pydantic
requires both); ``ToolRegistry.dispatch`` overwrites ``toolCallId`` later
but the model still validates at construction.
- ``ToolResult.data`` is a ``str``; structured payloads are JSON-encoded.
- ``workflowId`` and ``instanceId`` are auto-injected from the agent
``context`` dict (``workflowId``, ``featureInstanceId``) when the model
omits them — the editor agent always runs in exactly one workflow.
"""
import json
import logging
import uuid
from typing import Dict, Any, List, Tuple
from modules.serviceCenter.services.serviceAgent.datamodelAgent import ToolResult
logger = logging.getLogger(__name__)
TOOLBOX_ID = "workflow"
def _toData(payload: Any) -> str:
"""Encode a structured payload into ToolResult.data (which is a string)."""
if isinstance(payload, str):
return payload
try:
return json.dumps(payload, default=str, ensure_ascii=False)
except Exception:
return str(payload)
def _err(toolName: str, message: str) -> ToolResult:
return ToolResult(toolCallId="", toolName=toolName, success=False, error=message)
def _ok(toolName: str, payload: Any) -> ToolResult:
return ToolResult(toolCallId="", toolName=toolName, success=True, data=_toData(payload))
def _resolveIds(params: Dict[str, Any], context: Any) -> Tuple[str, str]:
"""Return (workflowId, instanceId), auto-injecting from context when missing.
The editor agent context (``agentLoop._executeToolCalls``) is a dict with
``workflowId`` and ``featureInstanceId`` — use them as defaults so the
model doesn't have to re-state the ids on every tool call.
"""
ctx: Dict[str, Any] = context if isinstance(context, dict) else {}
workflowId = params.get("workflowId") or ctx.get("workflowId") or ""
instanceId = (
params.get("instanceId")
or ctx.get("featureInstanceId")
or ctx.get("instanceId")
or ""
)
return workflowId, instanceId
def _resolveUser(context: Any):
"""Return the User object for the current agent context (lazy DB fetch)."""
if not isinstance(context, dict):
return getattr(context, "user", None)
user = context.get("user")
if user is not None:
return user
userId = context.get("userId")
if not userId:
return None
try:
from modules.interfaces.interfaceDbApp import getRootInterface
return getRootInterface().getUser(str(userId))
except Exception as e:
logger.warning("workflowTools: could not resolve user %s: %s", userId, e)
return None
def _resolveMandateId(context: Any) -> str:
if not isinstance(context, dict):
return getattr(context, "mandateId", "") or ""
return context.get("mandateId") or ""
def _getInterface(context: Any, instanceId: str):
from modules.features.graphicalEditor.interfaceFeatureGraphicalEditor import getGraphicalEditorInterface
return getGraphicalEditorInterface(_resolveUser(context), _resolveMandateId(context), instanceId)
async def _readWorkflowGraph(params: Dict[str, Any], context: Any) -> ToolResult:
"""Read the current workflow graph (nodes and connections)."""
name = "readWorkflowGraph"
try:
workflowId, instanceId = _resolveIds(params, context)
if not workflowId or not instanceId:
return _err(name, "workflowId and instanceId required (and not present in agent context)")
iface = _getInterface(context, instanceId)
wf = iface.getWorkflow(workflowId)
if not wf:
return _err(name, f"Workflow {workflowId} not found")
graph = wf.get("graph", {}) or {}
nodes = graph.get("nodes", []) or []
connections = graph.get("connections", []) or []
return _ok(name, {
"workflowId": workflowId,
"label": wf.get("label", ""),
"nodeCount": len(nodes),
"connectionCount": len(connections),
"nodes": [
{"id": n.get("id"), "type": n.get("type"), "title": n.get("title", "")}
for n in nodes
],
"connections": connections,
})
except Exception as e:
logger.exception("readWorkflowGraph failed: %s", e)
return _err(name, str(e))
async def _addNode(params: Dict[str, Any], context: Any) -> ToolResult:
"""Add a node to the workflow graph."""
name = "addNode"
try:
workflowId, instanceId = _resolveIds(params, context)
nodeType = params.get("nodeType")
if not workflowId or not instanceId or not nodeType:
return _err(name, "workflowId, instanceId, and nodeType required")
iface = _getInterface(context, instanceId)
wf = iface.getWorkflow(workflowId)
if not wf:
return _err(name, f"Workflow {workflowId} not found")
graph = dict(wf.get("graph", {}) or {})
nodes = list(graph.get("nodes", []) or [])
nodeId = params.get("nodeId") or str(uuid.uuid4())[:8]
title = params.get("title", "")
nodeParams = params.get("parameters", {}) or {}
# Frontend stores positions as TOP-LEVEL ``x`` / ``y`` on the node
# (see ``fromApiGraph`` / ``toApiGraph``). Accept either explicit
# ``x`` / ``y`` or a ``position={x,y}`` shape from the model and
# always persist as top-level ``x`` / ``y``. Fallback puts new
# nodes in a horizontal stripe so the user sees them even before
# ``autoLayoutWorkflow`` runs.
position = params.get("position") or {}
x = params.get("x")
if x is None:
x = position.get("x") if isinstance(position, dict) else None
if x is None:
x = 40 + len(nodes) * 260
y = params.get("y")
if y is None:
y = position.get("y") if isinstance(position, dict) else None
if y is None:
y = 40
newNode = {
"id": nodeId,
"type": nodeType,
"title": title,
"parameters": nodeParams,
"x": x,
"y": y,
}
nodes.append(newNode)
graph["nodes"] = nodes
iface.updateWorkflow(workflowId, {"graph": graph})
return _ok(name, {
"nodeId": nodeId,
"nodeType": nodeType,
"message": f"Node '{title or nodeType}' added",
})
except Exception as e:
logger.exception("addNode failed: %s", e)
return _err(name, str(e))
async def _removeNode(params: Dict[str, Any], context: Any) -> ToolResult:
"""Remove a node and its connections from the workflow graph."""
name = "removeNode"
try:
workflowId, instanceId = _resolveIds(params, context)
nodeId = params.get("nodeId")
if not workflowId or not instanceId or not nodeId:
return _err(name, "workflowId, instanceId, and nodeId required")
iface = _getInterface(context, instanceId)
wf = iface.getWorkflow(workflowId)
if not wf:
return _err(name, f"Workflow {workflowId} not found")
graph = dict(wf.get("graph", {}) or {})
nodes = [n for n in (graph.get("nodes", []) or []) if n.get("id") != nodeId]
connections = [
c for c in (graph.get("connections", []) or [])
if c.get("source") != nodeId and c.get("target") != nodeId
]
graph["nodes"] = nodes
graph["connections"] = connections
iface.updateWorkflow(workflowId, {"graph": graph})
return _ok(name, {"nodeId": nodeId, "message": f"Node {nodeId} removed"})
except Exception as e:
logger.exception("removeNode failed: %s", e)
return _err(name, str(e))
async def _connectNodes(params: Dict[str, Any], context: Any) -> ToolResult:
"""Connect two nodes in the workflow graph."""
name = "connectNodes"
try:
workflowId, instanceId = _resolveIds(params, context)
sourceId = params.get("sourceId")
targetId = params.get("targetId")
if not workflowId or not instanceId or not sourceId or not targetId:
return _err(name, "workflowId, instanceId, sourceId, and targetId required")
iface = _getInterface(context, instanceId)
wf = iface.getWorkflow(workflowId)
if not wf:
return _err(name, f"Workflow {workflowId} not found")
graph = dict(wf.get("graph", {}) or {})
connections = list(graph.get("connections", []) or [])
newConn = {
"source": sourceId,
"target": targetId,
"sourceOutput": params.get("sourceOutput", 0),
"targetInput": params.get("targetInput", 0),
}
connections.append(newConn)
graph["connections"] = connections
iface.updateWorkflow(workflowId, {"graph": graph})
return _ok(name, {"connection": newConn, "message": f"Connected {sourceId} -> {targetId}"})
except Exception as e:
logger.exception("connectNodes failed: %s", e)
return _err(name, str(e))
async def _setNodeParameter(params: Dict[str, Any], context: Any) -> ToolResult:
"""Set a parameter on a node."""
name = "setNodeParameter"
try:
workflowId, instanceId = _resolveIds(params, context)
nodeId = params.get("nodeId")
paramName = params.get("parameterName")
paramValue = params.get("parameterValue")
if not workflowId or not instanceId or not nodeId or not paramName:
return _err(name, "workflowId, instanceId, nodeId, and parameterName required")
iface = _getInterface(context, instanceId)
wf = iface.getWorkflow(workflowId)
if not wf:
return _err(name, f"Workflow {workflowId} not found")
graph = dict(wf.get("graph", {}) or {})
nodes = list(graph.get("nodes", []) or [])
found = False
for n in nodes:
if n.get("id") == nodeId:
nodeParams = dict(n.get("parameters", {}) or {})
nodeParams[paramName] = paramValue
n["parameters"] = nodeParams
found = True
break
if not found:
return _err(name, f"Node {nodeId} not found in graph")
graph["nodes"] = nodes
iface.updateWorkflow(workflowId, {"graph": graph})
return _ok(name, {
"nodeId": nodeId,
"parameter": paramName,
"message": f"Parameter '{paramName}' set",
})
except Exception as e:
logger.exception("setNodeParameter failed: %s", e)
return _err(name, str(e))
def _coerceLabel(rawLabel: Any, fallback: str) -> str:
"""Normalize a node label which may be a string, dict {locale: str}, or other."""
if isinstance(rawLabel, str):
return rawLabel
if isinstance(rawLabel, dict):
for key in ("en", "de", "fr"):
value = rawLabel.get(key)
if isinstance(value, str) and value:
return value
for value in rawLabel.values():
if isinstance(value, str) and value:
return value
return fallback
def _summarizeNodeForCatalog(n: Dict[str, Any]) -> Dict[str, Any]:
"""Compact summary used in ``listAvailableNodeTypes`` — small but
informative enough that the model can pick the right type and knows
whether ``describeNodeType`` is worth a follow-up call."""
nodeId = n.get("id") or ""
paramsList = n.get("parameters") or []
requiredCount = sum(1 for p in paramsList if isinstance(p, dict) and p.get("required"))
return {
"id": nodeId,
"category": n.get("category"),
"label": _coerceLabel(n.get("label"), nodeId),
"description": _coerceLabel(n.get("description"), ""),
"paramCount": len(paramsList),
"requiredParamCount": requiredCount,
"usesAi": bool(((n.get("meta") or {}).get("usesAi"))),
}
def _summarizeParameter(p: Dict[str, Any]) -> Dict[str, Any]:
"""Reduce a node parameter spec to just what the AI needs to fill it."""
out: Dict[str, Any] = {
"name": p.get("name"),
"type": p.get("type"),
"required": bool(p.get("required")),
"frontendType": p.get("frontendType"),
"description": _coerceLabel(p.get("description"), ""),
}
if "default" in p:
out["default"] = p.get("default")
feOpts = p.get("frontendOptions")
if isinstance(feOpts, dict):
# Expose enum-style choices ("options") so the model sticks to allowed values.
if isinstance(feOpts.get("options"), list):
out["allowedValues"] = feOpts.get("options")
if p.get("frontendType") == "userConnection":
out["hint"] = (
"Call listConnections to discover available connections; pass the "
"connectionId here. Required before this node can run."
)
return out
async def _listAvailableNodeTypes(params: Dict[str, Any], context: Any) -> ToolResult:
"""List all available node types for the flow builder (compact catalog).
Returns ``id``, ``category``, ``label``, short ``description``, and the
parameter counts. To learn HOW to fill a node's parameters use
``describeNodeType(nodeType=...)`` — that returns the full schema.
"""
name = "listAvailableNodeTypes"
try:
from modules.features.graphicalEditor.nodeDefinitions import STATIC_NODE_TYPES
nodeTypes = []
for n in STATIC_NODE_TYPES:
if not isinstance(n, dict):
continue
nodeTypes.append(_summarizeNodeForCatalog(n))
return _ok(name, {"nodeTypes": nodeTypes, "count": len(nodeTypes)})
except Exception as e:
logger.exception("listAvailableNodeTypes failed: %s", e)
return _err(name, str(e))
async def _describeNodeType(params: Dict[str, Any], context: Any) -> ToolResult:
"""Return the full schema for a single node type so the AI can fill
``addNode.parameters`` correctly (which fields are required, what types,
default values, allowed enum values, what each port expects/produces).
This is the canonical way to discover required parameters before
calling ``addNode`` — without it the model guesses ``parameters={}``
and the user gets empty configuration cards.
"""
name = "describeNodeType"
try:
nodeType = params.get("nodeType") or params.get("id")
if not nodeType:
return _err(name, "nodeType required")
from modules.features.graphicalEditor.nodeDefinitions import STATIC_NODE_TYPES
target: Dict[str, Any] = {}
for n in STATIC_NODE_TYPES:
if isinstance(n, dict) and n.get("id") == nodeType:
target = n
break
if not target:
return _err(name, f"Unknown nodeType '{nodeType}' — call listAvailableNodeTypes first")
rawParams = target.get("parameters") or []
parameters = [
_summarizeParameter(p) for p in rawParams if isinstance(p, dict)
]
def _portList(portsDict: Any) -> List[Dict[str, Any]]:
if not isinstance(portsDict, dict):
return []
out: List[Dict[str, Any]] = []
for idx, spec in sorted(portsDict.items(), key=lambda kv: int(kv[0]) if str(kv[0]).isdigit() else 0):
if not isinstance(spec, dict):
continue
entry: Dict[str, Any] = {"index": int(idx) if str(idx).isdigit() else idx}
if "schema" in spec:
entry["schema"] = spec.get("schema")
if "accepts" in spec:
entry["accepts"] = spec.get("accepts")
out.append(entry)
return out
meta = target.get("meta") or {}
return _ok(name, {
"id": target.get("id"),
"category": target.get("category"),
"label": _coerceLabel(target.get("label"), target.get("id") or ""),
"description": _coerceLabel(target.get("description"), ""),
"usesAi": bool(meta.get("usesAi")),
"inputs": int(target.get("inputs") or 0),
"outputs": int(target.get("outputs") or 0),
"inputPorts": _portList(target.get("inputPorts")),
"outputPorts": _portList(target.get("outputPorts")),
"parameters": parameters,
"requiredParameters": [p["name"] for p in parameters if p.get("required")],
})
except Exception as e:
logger.exception("describeNodeType failed: %s", e)
return _err(name, str(e))
# Geometry constants — MUST match the frontend (FlowCanvas.tsx) so the
# server-side auto-layout produces the exact same coordinates the user
# would get by clicking "Arrange" in the UI.
_NODE_WIDTH = 200
_NODE_HEIGHT = 72
_LAYOUT_V_GAP = 80
_LAYOUT_H_GAP = 60
_LAYOUT_START_X = 40
_LAYOUT_START_Y = 40
def _computeAutoLayout(
nodes: List[Dict[str, Any]],
connections: List[Dict[str, Any]],
) -> List[Dict[str, Any]]:
"""Topological-layer layout — port of ``computeAutoLayout`` in FlowCanvas.tsx.
Arranges nodes top-to-bottom in layers (one layer per BFS step from the
sources). Disconnected nodes are appended as extra single-node layers,
same as the frontend. Returns a NEW node list with updated top-level
``x``/``y``; legacy ``position`` keys are stripped to avoid two
competing sources of truth.
"""
if not nodes:
return nodes
nodeIds = {n.get("id") for n in nodes if n.get("id")}
inDegree: Dict[str, int] = {nid: 0 for nid in nodeIds if nid}
children: Dict[str, List[str]] = {nid: [] for nid in nodeIds if nid}
for c in connections or []:
src = c.get("source")
tgt = c.get("target")
if src in inDegree and tgt in inDegree:
inDegree[tgt] = inDegree[tgt] + 1
children[src].append(tgt)
layers: List[List[str]] = []
layerOf: Dict[str, int] = {}
queue: List[str] = [nid for nid, deg in inDegree.items() if deg == 0]
while queue:
batch = list(queue)
queue = []
layerIdx = len(layers)
layers.append(batch)
for nid in batch:
layerOf[nid] = layerIdx
for childId in children.get(nid, []):
inDegree[childId] = inDegree[childId] - 1
if inDegree[childId] == 0:
queue.append(childId)
# Cycles: append remaining nodes as their own layers (matches frontend).
for n in nodes:
nid = n.get("id")
if nid and nid not in layerOf:
layerIdx = len(layers)
layers.append([nid])
layerOf[nid] = layerIdx
out: List[Dict[str, Any]] = []
for n in nodes:
nid = n.get("id")
layer = layerOf.get(nid, 0) if nid else 0
siblings = layers[layer] if 0 <= layer < len(layers) else [nid]
idxInLayer = siblings.index(nid) if nid in siblings else 0
new = dict(n)
new["x"] = _LAYOUT_START_X + idxInLayer * (_NODE_WIDTH + _LAYOUT_H_GAP)
new["y"] = _LAYOUT_START_Y + layer * (_NODE_HEIGHT + _LAYOUT_V_GAP)
# Strip legacy ``position`` so frontend never sees two coordinates.
new.pop("position", None)
out.append(new)
return out
async def _autoLayoutWorkflow(params: Dict[str, Any], context: Any) -> ToolResult:
"""Re-arrange all nodes of the workflow into a clean top-down layered layout.
Same algorithm as the editor's "Arrange" button — call this after you
finished adding/connecting nodes so the user doesn't see an unreadable
pile of overlapping boxes.
"""
name = "autoLayoutWorkflow"
try:
workflowId, instanceId = _resolveIds(params, context)
if not workflowId or not instanceId:
return _err(name, "workflowId and instanceId required (and not present in agent context)")
iface = _getInterface(context, instanceId)
wf = iface.getWorkflow(workflowId)
if not wf:
return _err(name, f"Workflow {workflowId} not found")
graph = dict(wf.get("graph", {}) or {})
nodes = list(graph.get("nodes", []) or [])
connections = list(graph.get("connections", []) or [])
if not nodes:
return _ok(name, {"message": "No nodes to layout", "nodeCount": 0})
graph["nodes"] = _computeAutoLayout(nodes, connections)
iface.updateWorkflow(workflowId, {"graph": graph})
return _ok(name, {
"message": f"Auto-layout applied to {len(nodes)} nodes",
"nodeCount": len(nodes),
"layerCount": max((c.get("y", 0) for c in graph["nodes"]), default=_LAYOUT_START_Y) // (_NODE_HEIGHT + _LAYOUT_V_GAP) + 1,
})
except Exception as e:
logger.exception("autoLayoutWorkflow failed: %s", e)
return _err(name, str(e))
async def _validateGraph(params: Dict[str, Any], context: Any) -> ToolResult:
"""Validate a workflow graph for common issues."""
name = "validateGraph"
try:
workflowId, instanceId = _resolveIds(params, context)
if not workflowId or not instanceId:
return _err(name, "workflowId and instanceId required")
iface = _getInterface(context, instanceId)
wf = iface.getWorkflow(workflowId)
if not wf:
return _err(name, f"Workflow {workflowId} not found")
graph = wf.get("graph", {}) or {}
nodes = graph.get("nodes", []) or []
connections = graph.get("connections", []) or []
issues: List[str] = []
nodeIds = {n.get("id") for n in nodes}
if not nodes:
issues.append("Graph has no nodes")
hasTrigger = any((n.get("type") or "").startswith("trigger.") for n in nodes)
if not hasTrigger:
issues.append("No trigger node found")
for c in connections:
if c.get("source") not in nodeIds:
issues.append(f"Connection source '{c.get('source')}' not found")
if c.get("target") not in nodeIds:
issues.append(f"Connection target '{c.get('target')}' not found")
connectedNodes = set()
for c in connections:
connectedNodes.add(c.get("source"))
connectedNodes.add(c.get("target"))
orphans = [
n.get("id") for n in nodes
if n.get("id") not in connectedNodes and not (n.get("type") or "").startswith("trigger.")
]
if orphans:
issues.append(f"Orphan nodes (not connected): {', '.join(orphans)}")
return _ok(name, {
"valid": len(issues) == 0,
"issues": issues,
"nodeCount": len(nodes),
"connectionCount": len(connections),
})
except Exception as e:
logger.exception("validateGraph failed: %s", e)
return _err(name, str(e))
async def _listWorkflowHistory(params: Dict[str, Any], context: Any) -> ToolResult:
"""List versions (history) for a workflow."""
name = "listWorkflowHistory"
try:
workflowId, instanceId = _resolveIds(params, context)
if not workflowId or not instanceId:
return _err(name, "workflowId and instanceId required")
iface = _getInterface(context, instanceId)
versions = iface.getVersions(workflowId) or []
return _ok(name, {
"workflowId": workflowId,
"versions": [
{
"id": v.get("id"),
"versionNumber": v.get("versionNumber"),
"status": v.get("status"),
"publishedAt": v.get("publishedAt"),
"publishedBy": v.get("publishedBy"),
}
for v in versions
],
})
except Exception as e:
logger.exception("listWorkflowHistory failed: %s", e)
return _err(name, str(e))
async def _readWorkflowMessages(params: Dict[str, Any], context: Any) -> ToolResult:
"""Read recent run logs/messages for a workflow."""
name = "readWorkflowMessages"
try:
workflowId, instanceId = _resolveIds(params, context)
if not workflowId or not instanceId:
return _err(name, "workflowId and instanceId required")
iface = _getInterface(context, instanceId)
from modules.features.graphicalEditor.datamodelFeatureGraphicalEditor import AutoRun
runs = iface.db.getRecordset(AutoRun, recordFilter={"workflowId": workflowId}) or []
runSummaries = []
for r in sorted(runs, key=lambda x: x.get("startedAt") or 0, reverse=True)[:10]:
runSummaries.append({
"runId": r.get("id"),
"status": r.get("status"),
"startedAt": r.get("startedAt"),
"completedAt": r.get("completedAt"),
"error": r.get("error"),
})
return _ok(name, {"workflowId": workflowId, "recentRuns": runSummaries})
except Exception as e:
logger.exception("readWorkflowMessages failed: %s", e)
return _err(name, str(e))
def getWorkflowToolDefinitions() -> List[Dict[str, Any]]:
"""Return tool definitions for registration in the ToolRegistry.
Note: ``workflowId`` and ``instanceId`` are NOT marked ``required`` —
they are auto-injected from the agent context by ``_resolveIds``. The
model may still pass them explicitly (e.g. to target a different
workflow) but doesn't have to repeat them on every call.
"""
_idFields = {
"workflowId": {"type": "string", "description": "Workflow ID (defaults to the current editor workflow)"},
"instanceId": {"type": "string", "description": "Feature instance ID (defaults to the current editor instance)"},
}
return [
{
"name": "readWorkflowGraph",
"handler": _readWorkflowGraph,
"description": "Read the current workflow graph (nodes and connections). Always call this first to understand the current state before making changes.",
"parameters": {
"type": "object",
"properties": {**_idFields},
"required": [],
},
"readOnly": True,
"toolSet": TOOLBOX_ID,
},
{
"name": "addNode",
"handler": _addNode,
"description": "Add a node to the workflow graph.",
"parameters": {
"type": "object",
"properties": {
**_idFields,
"nodeType": {"type": "string", "description": "Node type id (e.g. ai.chat, email.send) — use listAvailableNodeTypes to discover"},
"title": {"type": "string", "description": "Human-readable title"},
"parameters": {"type": "object", "description": "Node parameters"},
"position": {"type": "object", "description": "Canvas position {x, y}"},
"nodeId": {"type": "string", "description": "Optional explicit node id"},
},
"required": ["nodeType"],
},
"toolSet": TOOLBOX_ID,
},
{
"name": "removeNode",
"handler": _removeNode,
"description": "Remove a node and its connections from the graph.",
"parameters": {
"type": "object",
"properties": {
**_idFields,
"nodeId": {"type": "string", "description": "ID of the node to remove"},
},
"required": ["nodeId"],
},
"toolSet": TOOLBOX_ID,
},
{
"name": "connectNodes",
"handler": _connectNodes,
"description": "Connect two nodes in the graph (source -> target).",
"parameters": {
"type": "object",
"properties": {
**_idFields,
"sourceId": {"type": "string"},
"targetId": {"type": "string"},
"sourceOutput": {"type": "integer", "default": 0},
"targetInput": {"type": "integer", "default": 0},
},
"required": ["sourceId", "targetId"],
},
"toolSet": TOOLBOX_ID,
},
{
"name": "setNodeParameter",
"handler": _setNodeParameter,
"description": "Set a single parameter on a node.",
"parameters": {
"type": "object",
"properties": {
**_idFields,
"nodeId": {"type": "string"},
"parameterName": {"type": "string"},
"parameterValue": {"description": "Value to set (any type)"},
},
"required": ["nodeId", "parameterName", "parameterValue"],
},
"toolSet": TOOLBOX_ID,
},
{
"name": "listAvailableNodeTypes",
"handler": _listAvailableNodeTypes,
"description": (
"List all available node types (compact catalog: id, label, "
"description, paramCount, requiredParamCount, usesAi). Call this "
"once to discover ids; then call describeNodeType for each type "
"you intend to add to learn the parameter schema."
),
"parameters": {"type": "object", "properties": {}},
"readOnly": True,
"toolSet": TOOLBOX_ID,
},
{
"name": "describeNodeType",
"handler": _describeNodeType,
"description": (
"Return the FULL parameter schema for a single node type "
"(name, type, required, default, allowedValues, description) "
"plus input/output ports. ALWAYS call this before addNode for "
"any node type that has requiredParamCount > 0, and pass all "
"required parameters into addNode — otherwise the user sees an "
"empty configuration card. For parameters with "
"frontendType='userConnection' call listConnections to obtain "
"a connectionId."
),
"parameters": {
"type": "object",
"properties": {
"nodeType": {"type": "string", "description": "Node type id from listAvailableNodeTypes (e.g. 'email.checkEmail', 'ai.prompt')"},
},
"required": ["nodeType"],
},
"readOnly": True,
"toolSet": TOOLBOX_ID,
},
{
"name": "autoLayoutWorkflow",
"handler": _autoLayoutWorkflow,
"description": (
"Re-arrange ALL nodes into a clean top-down layered layout "
"(same algorithm as the editor's 'Arrange' button). Call this "
"AFTER you finished adding nodes and connections, otherwise the "
"user sees a pile of overlapping boxes. Idempotent — safe to "
"call multiple times."
),
"parameters": {
"type": "object",
"properties": {**_idFields},
"required": [],
},
"toolSet": TOOLBOX_ID,
},
{
"name": "validateGraph",
"handler": _validateGraph,
"description": "Validate a workflow graph for common issues (missing trigger, dangling connections, orphans).",
"parameters": {
"type": "object",
"properties": {**_idFields},
"required": [],
},
"readOnly": True,
"toolSet": TOOLBOX_ID,
},
{
"name": "listWorkflowHistory",
"handler": _listWorkflowHistory,
"description": "List version history for a workflow (AutoVersion entries).",
"parameters": {
"type": "object",
"properties": {**_idFields},
"required": [],
},
"readOnly": True,
"toolSet": TOOLBOX_ID,
},
{
"name": "readWorkflowMessages",
"handler": _readWorkflowMessages,
"description": "Read recent run logs and status for a workflow.",
"parameters": {
"type": "object",
"properties": {**_idFields},
"required": [],
},
"readOnly": True,
"toolSet": TOOLBOX_ID,
},
]