695 lines
32 KiB
Python
695 lines
32 KiB
Python
# Copyright (c) 2025 Patrick Motsch
|
|
# Main execution engine for automation2 graphs.
|
|
|
|
import asyncio
|
|
import logging
|
|
import time
|
|
import uuid
|
|
from datetime import datetime, timezone
|
|
from typing import Dict, Any, List, Set, Optional
|
|
|
|
from modules.workflows.automation2.graphUtils import (
|
|
parseGraph,
|
|
buildConnectionMap,
|
|
validateGraph,
|
|
topoSort,
|
|
getInputSources,
|
|
getLoopBodyNodeIds,
|
|
)
|
|
|
|
from modules.workflows.automation2.executors import (
|
|
TriggerExecutor,
|
|
FlowExecutor,
|
|
ActionNodeExecutor,
|
|
InputExecutor,
|
|
DataExecutor,
|
|
PauseForHumanTaskError,
|
|
PauseForEmailWaitError,
|
|
)
|
|
from modules.features.graphicalEditor.portTypes import _normalizeToSchema
|
|
from modules.features.graphicalEditor.nodeDefinitions import STATIC_NODE_TYPES
|
|
from modules.workflows.automation2.runEnvelope import normalize_run_envelope
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
_NODE_DEF_BY_ID: Dict[str, dict] = {}
|
|
|
|
|
|
def _getNodeDef(nodeType: str) -> Optional[dict]:
|
|
"""Lookup static node definition by type id (cached)."""
|
|
if not _NODE_DEF_BY_ID:
|
|
for nd in STATIC_NODE_TYPES:
|
|
_NODE_DEF_BY_ID[nd["id"]] = nd
|
|
return _NODE_DEF_BY_ID.get(nodeType)
|
|
|
|
|
|
def _outputSchemaForNode(nodeType: str) -> Optional[str]:
|
|
"""Return the output port schema name for a node type (port 0), or None."""
|
|
nd = _getNodeDef(nodeType)
|
|
if not nd:
|
|
return None
|
|
ports = nd.get("outputPorts")
|
|
if isinstance(ports, dict):
|
|
p0 = ports.get(0) or ports.get("0")
|
|
if isinstance(p0, dict):
|
|
return p0.get("schema")
|
|
return None
|
|
|
|
|
|
def _isMergeNode(nodeType: str) -> bool:
|
|
return nodeType == "flow.merge"
|
|
|
|
|
|
def _allMergePredecessorsReady(
|
|
nodeId: str,
|
|
connectionMap: Dict[str, List],
|
|
nodeOutputs: Dict[str, Any],
|
|
) -> bool:
|
|
"""For flow.merge: check that every connected predecessor has produced output or was skipped."""
|
|
for src, _, _ in connectionMap.get(nodeId, []):
|
|
if src not in nodeOutputs:
|
|
return False
|
|
return True
|
|
|
|
|
|
def _normalizeResult(result: Any, nodeType: str) -> Any:
|
|
"""Apply _normalizeToSchema if the node has a declared output schema."""
|
|
schema = _outputSchemaForNode(nodeType)
|
|
if schema and schema != "Transit" and isinstance(result, dict):
|
|
try:
|
|
return _normalizeToSchema(result, schema)
|
|
except Exception:
|
|
pass
|
|
return result
|
|
|
|
|
|
def _getNodeTypeIds(services: Any = None) -> Set[str]:
|
|
"""Collect all known node type IDs from static definitions."""
|
|
return {n["id"] for n in STATIC_NODE_TYPES}
|
|
|
|
|
|
def _is_node_on_active_path(
|
|
nodeId: str,
|
|
connectionMap: Dict[str, List],
|
|
nodeOutputs: Dict[str, Any],
|
|
) -> bool:
|
|
"""
|
|
Return True if this node receives input only from active branches.
|
|
Transit envelopes: routing metadata is in out["_meta"] (branch/match).
|
|
Legacy format: branch/match directly on out.
|
|
"""
|
|
for src, source_output, _ in connectionMap.get(nodeId, []):
|
|
out = nodeOutputs.get(src)
|
|
if out is None:
|
|
return False
|
|
if not isinstance(out, dict):
|
|
continue
|
|
|
|
# Transit envelope: metadata in _meta
|
|
meta = out.get("_meta", {}) if out.get("_transit") else out
|
|
branch = meta.get("branch")
|
|
match = meta.get("match")
|
|
|
|
active_output = None
|
|
if branch is not None:
|
|
active_output = branch
|
|
elif match is not None:
|
|
if match < 0:
|
|
return False
|
|
active_output = match
|
|
if active_output is not None and source_output != active_output:
|
|
return False
|
|
return True
|
|
|
|
|
|
def _getExecutor(
|
|
nodeType: str,
|
|
services: Any,
|
|
automation2_interface: Optional[Any] = None,
|
|
) -> Any:
|
|
"""Dispatch to correct executor based on node type."""
|
|
if nodeType.startswith("trigger."):
|
|
return TriggerExecutor()
|
|
if nodeType.startswith("flow."):
|
|
return FlowExecutor()
|
|
if nodeType.startswith("data."):
|
|
return DataExecutor()
|
|
if (nodeType.startswith("ai.") or nodeType.startswith("email.")
|
|
or nodeType.startswith("sharepoint.") or nodeType.startswith("clickup.")
|
|
or nodeType.startswith("file.") or nodeType.startswith("trustee.")):
|
|
return ActionNodeExecutor(services)
|
|
if nodeType.startswith("input.") and automation2_interface:
|
|
return InputExecutor(automation2_interface)
|
|
return None
|
|
|
|
|
|
_stepMeta: Dict[str, Dict[str, str]] = {}
|
|
|
|
|
|
def _serializableOutputs(nodeOutputs: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Return a shallow copy of nodeOutputs without the circular _context reference."""
|
|
return {k: v for k, v in nodeOutputs.items() if k != "_context"}
|
|
|
|
|
|
def _emitStepEvent(runId: str, stepData: Dict[str, Any]) -> None:
|
|
"""Emit a step-log SSE event to any listening client for this run."""
|
|
try:
|
|
from modules.serviceCenter.core.serviceStreaming.eventManager import get_event_manager
|
|
em = get_event_manager()
|
|
queueId = f"run-trace-{runId}"
|
|
if not em.has_queue(queueId):
|
|
return
|
|
import asyncio
|
|
loop = asyncio.get_event_loop()
|
|
if loop.is_running():
|
|
asyncio.ensure_future(em.emit_event(queueId, "step", stepData, event_category="tracing"))
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def _createStepLog(iface, runId: str, nodeId: str, nodeType: str, status: str = "running", inputSnapshot: Dict = None) -> Optional[str]:
|
|
"""Create an AutoStepLog entry. Returns the step log ID or None if interface unavailable."""
|
|
if not iface or not runId:
|
|
return None
|
|
try:
|
|
from modules.features.graphicalEditor.datamodelFeatureGraphicalEditor import AutoStepLog
|
|
stepId = str(uuid.uuid4())
|
|
startedAt = time.time()
|
|
iface.db.recordCreate(AutoStepLog, {
|
|
"id": stepId,
|
|
"runId": runId,
|
|
"nodeId": nodeId,
|
|
"nodeType": nodeType,
|
|
"status": status,
|
|
"inputSnapshot": inputSnapshot or {},
|
|
"startedAt": startedAt,
|
|
})
|
|
_stepMeta[stepId] = {"runId": runId, "nodeId": nodeId, "nodeType": nodeType}
|
|
_emitStepEvent(runId, {
|
|
"id": stepId, "runId": runId, "nodeId": nodeId, "nodeType": nodeType,
|
|
"status": status, "startedAt": startedAt,
|
|
})
|
|
return stepId
|
|
except Exception as e:
|
|
logger.debug("Could not create AutoStepLog: %s", e)
|
|
return None
|
|
|
|
|
|
def _updateStepLog(iface, stepId: str, status: str, output: Dict = None, error: str = None,
|
|
durationMs: int = None, tokensUsed: int = 0, retryCount: int = 0) -> None:
|
|
"""Update an AutoStepLog entry with results."""
|
|
if not iface or not stepId:
|
|
return
|
|
try:
|
|
from modules.features.graphicalEditor.datamodelFeatureGraphicalEditor import AutoStepLog
|
|
completedAt = time.time()
|
|
updates: Dict[str, Any] = {
|
|
"status": status,
|
|
"completedAt": completedAt,
|
|
}
|
|
if output is not None:
|
|
updates["output"] = output
|
|
if error is not None:
|
|
updates["error"] = error
|
|
if durationMs is not None:
|
|
updates["durationMs"] = durationMs
|
|
if tokensUsed:
|
|
updates["tokensUsed"] = tokensUsed
|
|
if retryCount:
|
|
updates["retryCount"] = retryCount
|
|
iface.db.recordModify(AutoStepLog, stepId, updates)
|
|
meta = _stepMeta.pop(stepId, None)
|
|
if meta:
|
|
_emitStepEvent(meta["runId"], {
|
|
"id": stepId, "runId": meta["runId"], "nodeId": meta["nodeId"],
|
|
"nodeType": meta["nodeType"], "status": status,
|
|
"completedAt": completedAt, "durationMs": durationMs,
|
|
"error": error, "tokensUsed": tokensUsed, "retryCount": retryCount,
|
|
})
|
|
except Exception as e:
|
|
logger.debug("Could not update AutoStepLog %s: %s", stepId, e)
|
|
|
|
|
|
async def _executeWithRetry(executor, node, context, maxRetries: int = 0, retryDelaySeconds: float = 1.0):
|
|
"""Execute a node with optional retry policy from node parameters."""
|
|
params = node.get("parameters") or {}
|
|
retries = params.get("retryMaxAttempts", maxRetries)
|
|
delay = params.get("retryDelaySeconds", retryDelaySeconds)
|
|
attempt = 0
|
|
lastError = None
|
|
while attempt <= retries:
|
|
try:
|
|
result = await executor.execute(node, context)
|
|
return result, attempt
|
|
except (PauseForHumanTaskError, PauseForEmailWaitError):
|
|
raise
|
|
except Exception as e:
|
|
lastError = e
|
|
attempt += 1
|
|
if attempt <= retries:
|
|
logger.warning(
|
|
"Node %s failed (attempt %d/%d), retrying in %.1fs: %s",
|
|
node.get("id"), attempt, retries + 1, delay, e,
|
|
)
|
|
await asyncio.sleep(delay)
|
|
delay = min(delay * 2, 60)
|
|
else:
|
|
raise lastError
|
|
raise lastError
|
|
|
|
|
|
async def executeGraph(
|
|
graph: Dict[str, Any],
|
|
services: Any,
|
|
workflowId: str = None,
|
|
instanceId: str = None,
|
|
userId: str = None,
|
|
mandateId: str = None,
|
|
automation2_interface: Optional[Any] = None,
|
|
initialNodeOutputs: Optional[Dict[str, Any]] = None,
|
|
startAfterNodeId: Optional[str] = None,
|
|
runId: Optional[str] = None,
|
|
run_envelope: Optional[Dict[str, Any]] = None,
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Execute automation2 graph. Returns { success, nodeOutputs, error?, stopped? }.
|
|
When an input node is reached and automation2_interface is provided, creates a task,
|
|
pauses the run, and returns { success: False, paused: True, taskId, runId }.
|
|
For resume: pass initialNodeOutputs (with result for the human node) and startAfterNodeId.
|
|
For fresh runs: pass run_envelope (unified start payload for the start node); normalized with userId into context.runEnvelope.
|
|
"""
|
|
logger.info(
|
|
"executeGraph start: instanceId=%s workflowId=%s userId=%s mandateId=%s resume=%s",
|
|
instanceId,
|
|
workflowId,
|
|
userId,
|
|
mandateId,
|
|
startAfterNodeId is not None,
|
|
)
|
|
from modules.workflows.processing.shared.methodDiscovery import discoverMethods
|
|
discoverMethods(services)
|
|
nodeTypeIds = _getNodeTypeIds(services)
|
|
logger.debug("executeGraph nodeTypeIds (%d): %s", len(nodeTypeIds), sorted(nodeTypeIds))
|
|
errors = validateGraph(graph, nodeTypeIds)
|
|
if errors:
|
|
logger.warning("executeGraph validation failed: %s", errors)
|
|
return {"success": False, "error": "; ".join(errors), "nodeOutputs": {}}
|
|
|
|
nodes, connections = parseGraph(graph)[:2]
|
|
connectionMap = buildConnectionMap(connections)
|
|
inputSources = {n["id"]: getInputSources(n["id"], connectionMap) for n in nodes if n.get("id")}
|
|
logger.info(
|
|
"executeGraph parsed: nodes=%d connections=%d connectionMap_targets=%s",
|
|
len(nodes),
|
|
len(connections),
|
|
list(connectionMap.keys()),
|
|
)
|
|
|
|
ordered = topoSort(nodes, connectionMap)
|
|
ordered_ids = [n.get("id") for n in ordered if n.get("id")]
|
|
logger.info("executeGraph topoSort order: %s", ordered_ids)
|
|
|
|
nodeOutputs: Dict[str, Any] = dict(initialNodeOutputs or {})
|
|
is_resume = startAfterNodeId is not None
|
|
|
|
if is_resume and initialNodeOutputs and startAfterNodeId:
|
|
resumedNode = next((n for n in nodes if n.get("id") == startAfterNodeId), None)
|
|
if resumedNode:
|
|
resumedType = resumedNode.get("type", "")
|
|
resumedOutput = initialNodeOutputs.get(startAfterNodeId)
|
|
if isinstance(resumedOutput, dict):
|
|
schema = _outputSchemaForNode(resumedType)
|
|
if schema and schema != "Transit":
|
|
try:
|
|
initialNodeOutputs[startAfterNodeId] = _normalizeToSchema(resumedOutput, schema)
|
|
except Exception as valErr:
|
|
logger.warning("executeGraph resume: schema validation failed for %s: %s", startAfterNodeId, valErr)
|
|
if not runId and automation2_interface and workflowId and not is_resume:
|
|
run_context = {
|
|
"connectionMap": connectionMap,
|
|
"inputSources": inputSources,
|
|
"orderedNodeIds": ordered_ids,
|
|
}
|
|
if userId:
|
|
run_context["ownerId"] = userId
|
|
if mandateId:
|
|
run_context["mandateId"] = mandateId
|
|
if instanceId:
|
|
run_context["instanceId"] = instanceId
|
|
run = automation2_interface.createRun(
|
|
workflowId=workflowId,
|
|
nodeOutputs=nodeOutputs,
|
|
context=run_context,
|
|
)
|
|
runId = run.get("id") if run else None
|
|
logger.info("executeGraph created run %s", runId)
|
|
|
|
env_for_run = normalize_run_envelope(run_envelope, user_id=userId)
|
|
|
|
context = {
|
|
"workflowId": workflowId,
|
|
"instanceId": instanceId,
|
|
"userId": userId,
|
|
"mandateId": mandateId,
|
|
"nodeOutputs": nodeOutputs,
|
|
"connectionMap": connectionMap,
|
|
"inputSources": inputSources,
|
|
"services": services,
|
|
"_runId": runId,
|
|
"_orderedNodes": ordered,
|
|
"runEnvelope": env_for_run,
|
|
}
|
|
# _context key in nodeOutputs for system variable resolution
|
|
nodeOutputs["_context"] = context
|
|
|
|
skip_until_passed = bool(startAfterNodeId)
|
|
processed_in_loop: Set[str] = set()
|
|
_aggregateAccumulators: Dict[str, list] = {}
|
|
|
|
# Check for loop resume: run was paused inside a loop, we're resuming for next iteration
|
|
run = automation2_interface.getRun(runId) if (runId and automation2_interface) else None
|
|
loop_resume_state = (run.get("context") or {}).get("_loopState") if run else None
|
|
if loop_resume_state and startAfterNodeId:
|
|
loop_node_id = loop_resume_state.get("loopNodeId")
|
|
next_index = loop_resume_state.get("currentIndex", -1) + 1
|
|
items = loop_resume_state.get("items") or []
|
|
body_ids = getLoopBodyNodeIds(loop_node_id, connectionMap) if loop_node_id else set()
|
|
body_ordered = [n for n in ordered if n.get("id") in body_ids]
|
|
processed_in_loop = set(body_ids) | {loop_node_id} if loop_node_id else set()
|
|
while next_index < len(items) and loop_node_id:
|
|
nodeOutputs[loop_node_id] = {
|
|
"items": items,
|
|
"count": len(items),
|
|
"currentItem": items[next_index],
|
|
"currentIndex": next_index,
|
|
}
|
|
context["_loopState"] = {"loopNodeId": loop_node_id, "currentIndex": next_index, "items": items}
|
|
for body_node in body_ordered:
|
|
bnid = body_node.get("id")
|
|
if not bnid or context.get("_stopped"):
|
|
break
|
|
if not _is_node_on_active_path(bnid, connectionMap, nodeOutputs):
|
|
continue
|
|
executor = _getExecutor(body_node.get("type", ""), services, automation2_interface)
|
|
if not executor:
|
|
nodeOutputs[bnid] = None
|
|
continue
|
|
_rStepStart = time.time()
|
|
_rInputSnap = {"_loopItem": items[next_index], "_loopIndex": next_index}
|
|
for _rSrc, _, _ in connectionMap.get(bnid, []):
|
|
if _rSrc in nodeOutputs:
|
|
_rInputSnap[_rSrc] = nodeOutputs[_rSrc]
|
|
_rStepId = _createStepLog(automation2_interface, runId, bnid, body_node.get("type", ""), "running", _rInputSnap)
|
|
try:
|
|
result, _rRetry = await _executeWithRetry(executor, body_node, context)
|
|
if body_node.get("type") == "data.aggregate":
|
|
if bnid not in _aggregateAccumulators:
|
|
_aggregateAccumulators[bnid] = []
|
|
accItems = result.get("items", [result]) if isinstance(result, dict) else [result]
|
|
_aggregateAccumulators[bnid].extend(accItems)
|
|
nodeOutputs[bnid] = result
|
|
_rDur = int((time.time() - _rStepStart) * 1000)
|
|
_updateStepLog(automation2_interface, _rStepId, "completed",
|
|
output=result if isinstance(result, dict) else {"value": result},
|
|
durationMs=_rDur, retryCount=_rRetry)
|
|
logger.info("executeGraph loop resume body node %s done (iter %d, retries=%d)", bnid, next_index, _rRetry)
|
|
except PauseForHumanTaskError as e:
|
|
_updateStepLog(automation2_interface, _rStepId, "completed",
|
|
durationMs=int((time.time() - _rStepStart) * 1000))
|
|
if automation2_interface:
|
|
run_ctx = dict(run.get("context") or {})
|
|
run_ctx["_loopState"] = {"loopNodeId": loop_node_id, "currentIndex": next_index, "items": items}
|
|
automation2_interface.updateRun(e.runId, status="paused", nodeOutputs=_serializableOutputs(nodeOutputs), currentNodeId=e.nodeId, context=run_ctx)
|
|
return {"success": False, "paused": True, "taskId": e.taskId, "runId": e.runId, "nodeId": e.nodeId, "nodeOutputs": _serializableOutputs(nodeOutputs)}
|
|
except PauseForEmailWaitError as e:
|
|
_updateStepLog(automation2_interface, _rStepId, "completed",
|
|
durationMs=int((time.time() - _rStepStart) * 1000))
|
|
raise
|
|
except Exception as ex:
|
|
_updateStepLog(automation2_interface, _rStepId, "failed",
|
|
error=str(ex), durationMs=int((time.time() - _rStepStart) * 1000))
|
|
logger.exception("executeGraph loop body node %s FAILED: %s", bnid, ex)
|
|
nodeOutputs[bnid] = {"error": str(ex), "success": False}
|
|
if runId and automation2_interface:
|
|
automation2_interface.updateRun(runId, status="failed", nodeOutputs=_serializableOutputs(nodeOutputs))
|
|
return {"success": False, "error": str(ex), "nodeOutputs": _serializableOutputs(nodeOutputs), "failedNode": bnid, "runId": runId}
|
|
next_index += 1
|
|
if loop_node_id:
|
|
nodeOutputs[loop_node_id] = {"items": items, "count": len(items)}
|
|
for aggId, accItems in _aggregateAccumulators.items():
|
|
nodeOutputs[aggId] = {"items": accItems, "count": len(accItems), "_success": True}
|
|
_aggregateAccumulators.clear()
|
|
processed_in_loop = set(body_ids) | {loop_node_id}
|
|
|
|
for i, node in enumerate(ordered):
|
|
if skip_until_passed:
|
|
if node.get("id") == startAfterNodeId:
|
|
skip_until_passed = False
|
|
continue
|
|
if node.get("id") in processed_in_loop:
|
|
continue
|
|
if context.get("_stopped"):
|
|
logger.info("executeGraph stopped early at step %d", i)
|
|
break
|
|
nodeId = node.get("id")
|
|
nodeType = node.get("type", "")
|
|
if not _is_node_on_active_path(nodeId, connectionMap, nodeOutputs):
|
|
logger.info("executeGraph step %d/%d: nodeId=%s SKIP (inactive branch)", i + 1, len(ordered), nodeId)
|
|
_skipInputSnap = {"_skipReason": "inactive_branch"}
|
|
for _sSrc, _, _ in connectionMap.get(nodeId, []):
|
|
if _sSrc in nodeOutputs:
|
|
_skipInputSnap[_sSrc] = nodeOutputs[_sSrc]
|
|
_skipStepId = _createStepLog(automation2_interface, runId, nodeId, nodeType, status="skipped", inputSnapshot=_skipInputSnap)
|
|
if _skipStepId:
|
|
_updateStepLog(automation2_interface, _skipStepId, "skipped")
|
|
continue
|
|
executor = _getExecutor(nodeType, services, automation2_interface)
|
|
logger.info(
|
|
"executeGraph step %d/%d: nodeId=%s nodeType=%s executor=%s",
|
|
i + 1,
|
|
len(ordered),
|
|
nodeId,
|
|
nodeType,
|
|
type(executor).__name__ if executor else "None",
|
|
)
|
|
if not executor:
|
|
nodeOutputs[nodeId] = None
|
|
logger.debug("executeGraph node %s: no executor, output=None", nodeId)
|
|
continue
|
|
_stepStartMs = time.time()
|
|
_stepId = None
|
|
try:
|
|
if nodeType == "flow.loop":
|
|
_loopInputSnap = {}
|
|
for _lSrc, _, _ in connectionMap.get(nodeId, []):
|
|
if _lSrc in nodeOutputs:
|
|
_loopInputSnap[_lSrc] = nodeOutputs[_lSrc]
|
|
_stepId = _createStepLog(automation2_interface, runId, nodeId, nodeType, "running", _loopInputSnap)
|
|
result = await executor.execute(node, context)
|
|
items = result.get("items") or []
|
|
body_ids = getLoopBodyNodeIds(nodeId, connectionMap)
|
|
body_ordered = [n for n in ordered if n.get("id") in body_ids]
|
|
processed_in_loop.update(body_ids)
|
|
processed_in_loop.add(nodeId)
|
|
for idx, item in enumerate(items):
|
|
nodeOutputs[nodeId] = {"items": items, "count": len(items), "currentItem": item, "currentIndex": idx}
|
|
context["_loopState"] = {"loopNodeId": nodeId, "currentIndex": idx, "items": items}
|
|
for body_node in body_ordered:
|
|
bnid = body_node.get("id")
|
|
if not bnid or context.get("_stopped"):
|
|
break
|
|
if not _is_node_on_active_path(bnid, connectionMap, nodeOutputs):
|
|
continue
|
|
bexec = _getExecutor(body_node.get("type", ""), services, automation2_interface)
|
|
if not bexec:
|
|
nodeOutputs[bnid] = None
|
|
continue
|
|
_bStepStart = time.time()
|
|
_bInputSnap = {"_loopItem": item, "_loopIndex": idx}
|
|
for _bSrc, _, _ in connectionMap.get(bnid, []):
|
|
if _bSrc in nodeOutputs:
|
|
_bInputSnap[_bSrc] = nodeOutputs[_bSrc]
|
|
_bStepId = _createStepLog(automation2_interface, runId, bnid, body_node.get("type", ""), "running", _bInputSnap)
|
|
try:
|
|
bres, _bRetry = await _executeWithRetry(bexec, body_node, context)
|
|
# data.aggregate: accumulate instead of overwrite
|
|
if body_node.get("type") == "data.aggregate":
|
|
if bnid not in _aggregateAccumulators:
|
|
_aggregateAccumulators[bnid] = []
|
|
accItems = bres.get("items", [bres]) if isinstance(bres, dict) else [bres]
|
|
_aggregateAccumulators[bnid].extend(accItems)
|
|
nodeOutputs[bnid] = bres
|
|
_bDur = int((time.time() - _bStepStart) * 1000)
|
|
_updateStepLog(automation2_interface, _bStepId, "completed",
|
|
output=bres if isinstance(bres, dict) else {"value": bres},
|
|
durationMs=_bDur, retryCount=_bRetry)
|
|
logger.info("executeGraph loop body node %s done (iter %d, retries=%d)", bnid, idx, _bRetry)
|
|
except PauseForHumanTaskError as e:
|
|
_updateStepLog(automation2_interface, _bStepId, "completed",
|
|
durationMs=int((time.time() - _bStepStart) * 1000))
|
|
if runId and automation2_interface:
|
|
run = automation2_interface.getRun(runId) or {}
|
|
run_ctx = dict(run.get("context") or {})
|
|
run_ctx["_loopState"] = {"loopNodeId": nodeId, "currentIndex": idx, "items": items}
|
|
automation2_interface.updateRun(e.runId, status="paused", nodeOutputs=_serializableOutputs(nodeOutputs), currentNodeId=e.nodeId, context=run_ctx)
|
|
return {"success": False, "paused": True, "taskId": e.taskId, "runId": e.runId, "nodeId": e.nodeId, "nodeOutputs": _serializableOutputs(nodeOutputs)}
|
|
except PauseForEmailWaitError as e:
|
|
_updateStepLog(automation2_interface, _bStepId, "completed",
|
|
durationMs=int((time.time() - _bStepStart) * 1000))
|
|
raise
|
|
except Exception as ex:
|
|
_updateStepLog(automation2_interface, _bStepId, "failed",
|
|
error=str(ex), durationMs=int((time.time() - _bStepStart) * 1000))
|
|
logger.exception("executeGraph loop body node %s FAILED: %s", bnid, ex)
|
|
nodeOutputs[bnid] = {"error": str(ex), "success": False}
|
|
if runId and automation2_interface:
|
|
automation2_interface.updateRun(runId, status="failed", nodeOutputs=_serializableOutputs(nodeOutputs))
|
|
return {"success": False, "error": str(ex), "nodeOutputs": _serializableOutputs(nodeOutputs), "failedNode": bnid, "runId": runId}
|
|
nodeOutputs[nodeId] = {"items": items, "count": len(items)}
|
|
# Finalize aggregate accumulators after loop
|
|
for aggId, accItems in _aggregateAccumulators.items():
|
|
nodeOutputs[aggId] = {"items": accItems, "count": len(accItems), "_success": True}
|
|
_aggregateAccumulators.clear()
|
|
_updateStepLog(automation2_interface, _stepId, "completed",
|
|
output={"iterationCount": len(items), "items": len(items)},
|
|
durationMs=int((time.time() - _stepStartMs) * 1000))
|
|
logger.info("executeGraph flow.loop done: %d iterations", len(items))
|
|
elif _isMergeNode(nodeType):
|
|
if not _allMergePredecessorsReady(nodeId, connectionMap, nodeOutputs):
|
|
logger.info("executeGraph node %s (flow.merge): waiting — not all predecessors ready, deferring", nodeId)
|
|
nodeOutputs[nodeId] = None
|
|
continue
|
|
_stepStartMs = time.time()
|
|
_inputSnap = {}
|
|
for src, _, _ in connectionMap.get(nodeId, []):
|
|
if src in nodeOutputs:
|
|
_inputSnap[src] = nodeOutputs[src]
|
|
_stepId = _createStepLog(automation2_interface, runId, nodeId, nodeType, "running", _inputSnap)
|
|
result, retryCount = await _executeWithRetry(executor, node, context)
|
|
result = _normalizeResult(result, nodeType)
|
|
nodeOutputs[nodeId] = result
|
|
else:
|
|
_stepStartMs = time.time()
|
|
_inputSnap = {}
|
|
for src, _, _ in connectionMap.get(nodeId, []):
|
|
if src in nodeOutputs:
|
|
_inputSnap[src] = nodeOutputs[src]
|
|
_stepId = _createStepLog(automation2_interface, runId, nodeId, nodeType, "running", _inputSnap)
|
|
result, retryCount = await _executeWithRetry(executor, node, context)
|
|
result = _normalizeResult(result, nodeType)
|
|
nodeOutputs[nodeId] = result
|
|
_durMs = int((time.time() - _stepStartMs) * 1000)
|
|
_tokens = result.get("tokensUsed", 0) if isinstance(result, dict) else 0
|
|
_updateStepLog(automation2_interface, _stepId, "completed",
|
|
output=result if isinstance(result, dict) else {"value": result},
|
|
durationMs=_durMs, tokensUsed=_tokens, retryCount=retryCount)
|
|
logger.info(
|
|
"executeGraph node %s done: result_type=%s result_keys=%s retries=%d duration=%dms",
|
|
nodeId,
|
|
type(result).__name__,
|
|
list(result.keys()) if isinstance(result, dict) else "n/a",
|
|
retryCount,
|
|
_durMs,
|
|
)
|
|
except PauseForHumanTaskError as e:
|
|
_updateStepLog(automation2_interface, _stepId, "completed",
|
|
durationMs=int((time.time() - _stepStartMs) * 1000))
|
|
logger.info("executeGraph paused for human task %s", e.taskId)
|
|
return {
|
|
"success": False,
|
|
"paused": True,
|
|
"taskId": e.taskId,
|
|
"runId": e.runId,
|
|
"nodeId": e.nodeId,
|
|
"nodeOutputs": _serializableOutputs(nodeOutputs),
|
|
}
|
|
except PauseForEmailWaitError as e:
|
|
_updateStepLog(automation2_interface, _stepId, "completed",
|
|
durationMs=int((time.time() - _stepStartMs) * 1000))
|
|
logger.info("executeGraph paused for email wait (run %s, node %s)", e.runId, e.nodeId)
|
|
try:
|
|
from modules.interfaces.interfaceDbApp import getRootInterface
|
|
from modules.features.graphicalEditor.emailPoller import ensureRunning
|
|
root = getRootInterface()
|
|
event_user = root.getUserByUsername("event") if root else None
|
|
if event_user:
|
|
ensureRunning(event_user)
|
|
except Exception as poll_err:
|
|
logger.warning("Could not start email poller: %s", poll_err)
|
|
paused_at = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
|
|
run_ctx = {
|
|
"connectionMap": context.get("connectionMap"),
|
|
"inputSources": context.get("inputSources"),
|
|
"orderedNodeIds": [n.get("id") for n in context.get("_orderedNodes", []) if n.get("id")],
|
|
"waitReason": "email",
|
|
"waitConfig": e.waitConfig,
|
|
"pausedAt": paused_at,
|
|
"lastCheckedAt": None,
|
|
"ownerId": context.get("userId"),
|
|
"mandateId": context.get("mandateId"),
|
|
"instanceId": context.get("instanceId"),
|
|
}
|
|
automation2_interface.updateRun(
|
|
e.runId,
|
|
status="paused",
|
|
nodeOutputs=_serializableOutputs(nodeOutputs),
|
|
currentNodeId=e.nodeId,
|
|
context=run_ctx,
|
|
)
|
|
return {
|
|
"success": False,
|
|
"paused": True,
|
|
"waitReason": "email",
|
|
"runId": e.runId,
|
|
"nodeId": e.nodeId,
|
|
"nodeOutputs": _serializableOutputs(nodeOutputs),
|
|
}
|
|
except Exception as e:
|
|
logger.exception("executeGraph node %s (%s) FAILED: %s", nodeId, nodeType, e)
|
|
nodeOutputs[nodeId] = {"error": str(e), "success": False}
|
|
_durMs = int((time.time() - _stepStartMs) * 1000)
|
|
_updateStepLog(automation2_interface, _stepId, "failed", error=str(e), durationMs=_durMs)
|
|
if runId and automation2_interface:
|
|
automation2_interface.updateRun(runId, status="failed", nodeOutputs=_serializableOutputs(nodeOutputs))
|
|
if runId:
|
|
_emitStepEvent(runId, {"type": "run_failed", "runId": runId, "status": "failed", "error": str(e), "failedNode": nodeId})
|
|
try:
|
|
_wfObj = automation2_interface.getWorkflow(workflowId) if automation2_interface and workflowId else None
|
|
_wfDict = _wfObj if isinstance(_wfObj, dict) else (
|
|
_wfObj.model_dump() if hasattr(_wfObj, "model_dump") else {}
|
|
) if _wfObj else {}
|
|
_shouldNotify = _wfDict.get("notifyOnFailure", True) if _wfDict else True
|
|
if _shouldNotify:
|
|
from modules.workflows.scheduler.mainScheduler import _notifyRunFailed
|
|
_notifyRunFailed(
|
|
workflowId or "", runId or "", str(e),
|
|
mandateId=mandateId,
|
|
workflowLabel=_wfDict.get("label"),
|
|
)
|
|
except Exception:
|
|
pass
|
|
return {
|
|
"success": False,
|
|
"error": str(e),
|
|
"nodeOutputs": _serializableOutputs(nodeOutputs),
|
|
"failedNode": nodeId,
|
|
"runId": runId,
|
|
}
|
|
|
|
_safeOutputs = _serializableOutputs(nodeOutputs)
|
|
|
|
if runId and automation2_interface:
|
|
automation2_interface.updateRun(runId, status="completed", nodeOutputs=_safeOutputs)
|
|
if runId:
|
|
_emitStepEvent(runId, {"type": "run_complete", "runId": runId, "status": "completed"})
|
|
logger.info(
|
|
"executeGraph complete: success=True nodeOutputs_keys=%s stopped=%s",
|
|
list(nodeOutputs.keys()),
|
|
context.get("_stopped", False),
|
|
)
|
|
return {
|
|
"success": True,
|
|
"nodeOutputs": _safeOutputs,
|
|
"stopped": context.get("_stopped", False),
|
|
"runId": runId,
|
|
}
|