# Copyright (c) 2026 PowerOn AG # All rights reserved. # Main execution engine for automation2 graphs. import asyncio import json import logging import time import uuid from datetime import datetime, timezone from typing import Dict, Any, List, Set, Optional from modules.workflowAutomation.engine.graphUtils import ( parseGraph, buildConnectionMap, validateGraph, topoSort, getInputSources, getLoopBodyNodeIds, getLoopDoneNodeIds, getLoopPrimaryInputSource, ) from modules.workflowAutomation.engine.executors import ( TriggerExecutor, FlowExecutor, ActionNodeExecutor, InputExecutor, DataExecutor, PauseForHumanTaskError, PauseForEmailWaitError, ) from modules.nodeCatalog.portTypes import normalizeToSchema, wrapTransit, unwrapTransit from modules.nodeCatalog.nodeDefinitions import STATIC_NODE_TYPES from modules.datamodels.serviceExceptions import SubscriptionInactiveException as _SubscriptionInactiveException, BillingContextError as _BillingContextError from modules.workflowAutomation.engine.runFileLogger import ( RunFileLogger, workflowAutomationRunFileLoggingEnabled, mergeRunContextWithWaLogPrefix, ) from modules.workflowAutomation.engine.runEnvelope import normalize_run_envelope logger = logging.getLogger(__name__) _NODE_DEF_BY_ID: Dict[str, dict] = {} # Registry of currently executing runs: runId -> context dict. # Used by requestRunStop() to set context["_stopped"] = True. _activeRunContexts: Dict[str, Dict[str, Any]] = {} def requestRunStop(runId: str) -> bool: """Request a running workflow to stop at the next node boundary. Returns True if the run was found and flagged, False otherwise. """ ctx = _activeRunContexts.get(runId) if ctx is not None: ctx["_stopped"] = True logger.info("requestRunStop: flagged runId=%s for stop", runId) return True logger.warning("requestRunStop: runId=%s not found in active runs", runId) return False def getActiveRunIds() -> list: """Return list of currently executing run IDs.""" return list(_activeRunContexts.keys()) def _getNodeDef(nodeType: str) -> Optional[dict]: """Lookup static node definition by type id (cached).""" if not _NODE_DEF_BY_ID: for nd in STATIC_NODE_TYPES: _NODE_DEF_BY_ID[nd["id"]] = nd return _NODE_DEF_BY_ID.get(nodeType) def _outputSchemaForNode(nodeType: str) -> Optional[str]: """Return the output port schema name for a node type (port 0), or None.""" nd = _getNodeDef(nodeType) if not nd: return None ports = nd.get("outputPorts") if isinstance(ports, dict): p0 = ports.get(0) or ports.get("0") if isinstance(p0, dict): spec = p0.get("schema") if isinstance(spec, dict) and spec.get("kind") == "fromGraph": # Read override from the port definition — ``FormPayload`` is the # fallback for true form nodes; dynamic context nodes (e.g. # context.transformContext) declare ``fromGraphResultSchema`` to # avoid wrong normalization. return p0.get("fromGraphResultSchema") or "FormPayload" if isinstance(spec, str): return spec return None def _isBarrierNode(nodeType: str) -> bool: """Barrier nodes wait for all connected predecessors before executing. Backwards compatible: ``flow.merge`` is always a barrier. Any other node may declare ``waitsForAllPredecessors: True`` in its STATIC_NODE_TYPES entry. Note: ``context.mergeContext`` is NOT a barrier — it receives its list of inputs via the ``dataSource`` DataRef parameter (typically ``loop.bodyResults``) and executes once its single upstream edge is satisfied. """ if nodeType == "flow.merge": return True for nd in STATIC_NODE_TYPES: if nd.get("id") == nodeType: return bool(nd.get("waitsForAllPredecessors")) return False def _allMergePredecessorsReady( nodeId: str, connectionMap: Dict[str, List], nodeOutputs: Dict[str, Any], ) -> bool: """For barrier nodes: check that every connected predecessor has produced output or was skipped.""" for src, _, _ in connectionMap.get(nodeId, []): if src not in nodeOutputs: return False return True def _normalizeResult(result: Any, nodeType: str) -> Any: """Apply normalizeToSchema if the node has a declared output schema.""" schema = _outputSchemaForNode(nodeType) if schema and schema != "Transit" and isinstance(result, dict): try: return normalizeToSchema(result, schema) except Exception as e: logger.warning(f"_normalizeResult failed for nodeType={nodeType}, schema={schema}: {e}") return result def _getNodeTypeIds(services: Any = None) -> Set[str]: """Collect all known node type IDs from static definitions.""" return {n["id"] for n in STATIC_NODE_TYPES} def _is_node_on_active_path( nodeId: str, connectionMap: Dict[str, List], nodeOutputs: Dict[str, Any], ) -> bool: """ Return True if this node receives input only from active branches. Transit envelopes: routing metadata is in out["_meta"] (branch/match). Legacy format: branch/match directly on out. """ for src, source_output, _ in connectionMap.get(nodeId, []): out = nodeOutputs.get(src) if out is None: return False if not isinstance(out, dict): continue # Transit envelope: metadata in _meta meta = out.get("_meta", {}) if out.get("_transit") else out branch = meta.get("branch") match = meta.get("match") matches = meta.get("matches") active_output = None if branch is not None: active_output = branch elif isinstance(matches, list) and matches: if source_output not in matches: return False continue elif match is not None: if match < 0: return False active_output = match if active_output is not None and source_output != active_output: return False return True def _getExecutor( nodeType: str, services: Any, automation2_interface: Optional[Any] = None, ) -> Any: """Dispatch to correct executor based on node type.""" if nodeType.startswith("trigger."): return TriggerExecutor() if nodeType.startswith("flow."): return FlowExecutor() if nodeType.startswith("data."): return DataExecutor() if (nodeType.startswith("ai.") or nodeType.startswith("email.") or nodeType.startswith("sharepoint.") or nodeType.startswith("clickup.") or nodeType.startswith("file.") or nodeType.startswith("trustee.") or nodeType.startswith("context.")): return ActionNodeExecutor(services) if nodeType.startswith("input.") and automation2_interface: return InputExecutor(automation2_interface) return None _stepMeta: Dict[str, Dict[str, str]] = {} def _stripBinaryValues(obj: Any, depth: int = 0) -> Any: """Recursively replace bytes values with None to keep data JSON-safe for DB storage.""" if depth > 12: return obj if isinstance(obj, bytes): return None if isinstance(obj, dict): return {k: _stripBinaryValues(v, depth + 1) for k, v in obj.items()} if isinstance(obj, (list, tuple)): return [_stripBinaryValues(v, depth + 1) for v in obj] return obj def _serializableOutputs(nodeOutputs: Dict[str, Any]) -> Dict[str, Any]: """Return a JSON-safe copy of nodeOutputs: strip _context and binary data.""" cleaned = {k: v for k, v in nodeOutputs.items() if k != "_context"} return _stripBinaryValues(cleaned) def _merge_node_parameters_into_snap( snap: Optional[Dict[str, Any]], *, node_id: Optional[str], context: Optional[Dict[str, Any]], ) -> Dict[str, Any]: """Copy wire snapshot and attach **nodeParameters** from the graph definition (by ``node_id``). Uses ``context['graphNodesById']`` populated at executeGraph start — stable even when per-step node dict references differ. Field name is ``nodeParameters`` (no leading underscore) so it survives consumers that hide ``_*`` keys.""" merged: Dict[str, Any] = dict(snap or {}) if not node_id or not isinstance(context, dict): return merged cmap = context.get("graphNodesById") if not isinstance(cmap, dict): return merged gnode = cmap.get(node_id) if not isinstance(gnode, dict): return merged merged["nodeParameters"] = dict(gnode.get("parameters") or {}) return merged def _emitStepEvent(runId: str, stepData: Dict[str, Any]) -> None: """Emit a step-log SSE event to any listening client for this run.""" try: from modules.shared.eventManager import get_event_manager em = get_event_manager() queueId = f"run-trace-{runId}" if not em.has_queue(queueId): return loop = asyncio.get_event_loop() if loop.is_running(): asyncio.ensure_future(em.emit_event(queueId, "step", stepData, event_category="tracing")) except Exception as e: logger.warning(f"_emitStepEvent failed for runId={runId}: {e}") def _createStepLog(iface, runId: str, nodeId: str, nodeType: str, status: str = "running", inputSnapshot: Dict = None) -> Optional[str]: """Create an AutoStepLog entry. Returns the step log ID or None if interface unavailable.""" if not iface or not runId: return None try: from modules.datamodels.datamodelWorkflowAutomation import AutoStepLog stepId = str(uuid.uuid4()) startedAt = time.time() iface.db.recordCreate(AutoStepLog, { "id": stepId, "runId": runId, "nodeId": nodeId, "nodeType": nodeType, "status": status, "inputSnapshot": _stripBinaryValues(inputSnapshot) if inputSnapshot else {}, "startedAt": startedAt, }) _stepMeta[stepId] = {"runId": runId, "nodeId": nodeId, "nodeType": nodeType} _emitStepEvent(runId, { "id": stepId, "runId": runId, "nodeId": nodeId, "nodeType": nodeType, "status": status, "startedAt": startedAt, }) return stepId except Exception as e: logger.debug("Could not create AutoStepLog: %s", e) return None def _updateStepLog(iface, stepId: str, status: str, output: Dict = None, error: str = None, durationMs: int = None, tokensUsed: int = 0, retryCount: int = 0) -> None: """Update an AutoStepLog entry with results.""" if not iface or not stepId: return try: from modules.datamodels.datamodelWorkflowAutomation import AutoStepLog completedAt = time.time() updates: Dict[str, Any] = { "status": status, "completedAt": completedAt, } if output is not None: updates["output"] = _stripBinaryValues(output) if error is not None: updates["error"] = error if durationMs is not None: updates["durationMs"] = durationMs if tokensUsed: updates["tokensUsed"] = tokensUsed if retryCount: updates["retryCount"] = retryCount iface.db.recordModify(AutoStepLog, stepId, updates) meta = _stepMeta.pop(stepId, None) if meta: _emitStepEvent(meta["runId"], { "id": stepId, "runId": meta["runId"], "nodeId": meta["nodeId"], "nodeType": meta["nodeType"], "status": status, "completedAt": completedAt, "durationMs": durationMs, "error": error, "tokensUsed": tokensUsed, "retryCount": retryCount, }) except Exception as e: logger.debug("Could not update AutoStepLog %s: %s", stepId, e) def _ge_iso_timestamp() -> str: """UTC timestamp for NDJSON logs (readable, milliseconds).""" return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z" async def _ge_log_node_finished( file_logger: Optional[RunFileLogger], *, run_id: Optional[str], node_outputs: Dict[str, Any], run_envelope: Optional[Dict[str, Any]], node_id: str, node_type: str, status: str, input_snap: Optional[Dict[str, Any]], output: Any = None, error: Optional[str] = None, duration_ms: Optional[int] = None, retry_count: Optional[int] = None, skip_reason: Optional[str] = None, loop_index: Optional[int] = None, loop_node_id: Optional[str] = None, loop_item: Optional[Any] = None, exec_context: Optional[Dict[str, Any]] = None, ) -> None: """Append one execution line + one workflow-context snapshot (NDJSON).""" if file_logger is None or not run_id: return ts = _ge_iso_timestamp() snap = _merge_node_parameters_into_snap(input_snap, node_id=node_id, context=exec_context) exec_rec: Dict[str, Any] = { "timestamp": ts, "runId": run_id, "nodeId": node_id, "nodeType": node_type, "status": status, "input": _stripBinaryValues(snap), } if skip_reason: exec_rec["skipReason"] = skip_reason if duration_ms is not None: exec_rec["durationMs"] = duration_ms if retry_count is not None: exec_rec["retryCount"] = retry_count if loop_index is not None: exec_rec["loopIndex"] = loop_index if loop_node_id is not None: exec_rec["loopNodeId"] = loop_node_id if loop_item is not None: exec_rec["loopItem"] = _stripBinaryValues(loop_item) if error is not None: exec_rec["error"] = error if output is not None: exec_rec["output"] = ( _stripBinaryValues(output) if isinstance(output, dict) else {"value": _stripBinaryValues(output)} ) await file_logger.appendNodeExecutionLine(exec_rec) ctx_rec: Dict[str, Any] = { "timestamp": ts, "runId": run_id, "afterNodeId": node_id, "afterNodeType": node_type, "afterStatus": status, "nodeOutputsSnapshot": _serializableOutputs(node_outputs), "runEnvelope": _stripBinaryValues(dict(run_envelope or {})), } if loop_index is not None: ctx_rec["loopIndex"] = loop_index if loop_node_id is not None: ctx_rec["loopNodeId"] = loop_node_id await file_logger.appendContextSnapshotLine(ctx_rec) async def _executeWithRetry(executor, node, context, maxRetries: int = 0, retryDelaySeconds: float = 1.0): """Execute a node with optional retry policy from node parameters.""" params = node.get("parameters") or {} retries = params.get("retryMaxAttempts", maxRetries) delay = params.get("retryDelaySeconds", retryDelaySeconds) attempt = 0 lastError = None while attempt <= retries: try: result = await executor.execute(node, context) return result, attempt except (PauseForHumanTaskError, PauseForEmailWaitError, _SubscriptionInactiveException, _BillingContextError): raise except Exception as e: lastError = e attempt += 1 if attempt <= retries: logger.warning( "Node %s failed (attempt %d/%d), retrying in %.1fs: %s", node.get("id"), attempt, retries + 1, delay, e, ) await asyncio.sleep(delay) delay = min(delay * 2, 60) else: raise lastError raise lastError def _validateFeatureInstanceMandates(graph: Dict[str, Any], mandateId: str) -> None: """Verify that all FeatureInstanceRef IDs in the graph belong to the workflow's mandate. Logs a warning for each mismatch but does NOT abort execution — the node executor will fail on its own with a more specific error if the instance is truly inaccessible. This is a defence-in-depth guard (A0.2). """ nodes = graph.get("nodes") if isinstance(graph, dict) else None if not isinstance(nodes, list): return instanceIds: set = set() for node in nodes: if not isinstance(node, dict): continue params = node.get("parameters") or {} ref = params.get("featureInstanceId") if isinstance(ref, dict) and ref.get("$type") == "FeatureInstanceRef": iid = ref.get("id") if iid: instanceIds.add(iid) elif isinstance(ref, str) and ref.strip(): instanceIds.add(ref.strip()) if not instanceIds: return try: from modules.interfaces.interfaceDbApp import getRootInterface root = getRootInterface() from modules.datamodels.datamodelFeatures import FeatureInstance for iid in instanceIds: fi = root.db.getRecord(FeatureInstance, iid) if not fi: logger.warning( "MandateValidation: FeatureInstance %s referenced in graph not found", iid, ) continue fiMandateId = fi.get("mandateId") if isinstance(fi, dict) else getattr(fi, "mandateId", None) if fiMandateId and fiMandateId != mandateId: logger.warning( "MandateValidation: FeatureInstance %s belongs to mandate %s, " "but workflow mandate is %s — cross-mandate access", iid, fiMandateId, mandateId, ) except Exception as e: logger.debug("MandateValidation: could not verify instances: %s", e) def _substituteFeatureInstancePlaceholders( graph: Dict[str, Any], targetFeatureInstanceId: str, ) -> Dict[str, Any]: """Replace ``{{featureInstanceId}}`` placeholders in the serialised graph. Works on the full JSON representation so that placeholders inside nested parameter dicts, prompt strings, etc. are all caught. Already-resolved concrete UUIDs (pre-baked by ``_copyTemplateWorkflows``) are left untouched because the placeholder literal ``{{featureInstanceId}}`` will not match. """ raw = json.dumps(graph) if "{{featureInstanceId}}" not in raw: return graph replaced = raw.replace("{{featureInstanceId}}", targetFeatureInstanceId) logger.debug( "_substituteFeatureInstancePlaceholders: resolved %d occurrence(s) -> %s", raw.count("{{featureInstanceId}}"), targetFeatureInstanceId, ) return json.loads(replaced) async def _run_post_loop_done_nodes( *, loop_node_id: str, body_ids: Set[str], items: List[Any], ordered: List[Dict], connectionMap: Dict[str, List], nodeOutputs: Dict[str, Any], context: Dict[str, Any], services: Any, automation2_interface: Optional[Any], runId: Optional[str], processed_in_loop: Set[str], waFileLogger: Optional[RunFileLogger] = None, ) -> Optional[Dict[str, Any]]: """After all loop iterations: merge upstream into loop output and run the Done (output 1) branch once.""" _prim_in = getLoopPrimaryInputSource(loop_node_id, connectionMap, body_ids) _upstream_loop = nodeOutputs.get(_prim_in[0]) if _prim_in else None _base_raw = unwrapTransit(_upstream_loop) if isinstance(_upstream_loop, dict) and _upstream_loop.get("_transit") else _upstream_loop _prev_loop_out = nodeOutputs.get(loop_node_id) # ``bodyResults`` lives on the plain iteration-state dict; after resume / edge # cases the loop slot may still be wrapped in Transit — unwrap before read. _prev_plain = _prev_loop_out if isinstance(_prev_loop_out, dict) and _prev_loop_out.get("_transit"): _prev_plain = unwrapTransit(_prev_loop_out) _body_results = ( _prev_plain.get("bodyResults") if isinstance(_prev_plain, dict) else None ) if not isinstance(_base_raw, dict): raise RuntimeError( f"flow.loop {loop_node_id}: primary upstream output must be a dict (JSON handover / node output); " f"got {type(_base_raw).__name__}" ) _merged_loop = {**_base_raw, "items": items, "count": len(items)} if _body_results is not None: _merged_loop["bodyResults"] = _body_results nodeOutputs[loop_node_id] = wrapTransit(_merged_loop, {"loopCompleted": True, "loopNodeId": loop_node_id}) _done_all = getLoopDoneNodeIds(loop_node_id, connectionMap) _done_only = _done_all - body_ids _done_ordered = [n for n in ordered if n.get("id") in _done_only] for _dn in _done_ordered: _dnid = _dn.get("id") if not _dnid or context.get("_stopped"): break if not _is_node_on_active_path(_dnid, connectionMap, nodeOutputs): _skipSnap = {"_skipReason": "inactive_branch"} for _sSrc, _, _ in connectionMap.get(_dnid, []): if _sSrc in nodeOutputs: _skipSnap[_sSrc] = nodeOutputs[_sSrc] _skipSnap = _merge_node_parameters_into_snap(_skipSnap, node_id=_dnid, context=context) _skId = _createStepLog(automation2_interface, runId, _dnid, _dn.get("type", ""), status="skipped", inputSnapshot=_skipSnap) if _skId: _updateStepLog(automation2_interface, _skId, "skipped") await _ge_log_node_finished( waFileLogger, run_id=runId, node_outputs=nodeOutputs, run_envelope=context.get("runEnvelope"), exec_context=context, node_id=_dnid, node_type=_dn.get("type", ""), status="skipped", input_snap=_skipSnap, skip_reason=str(_skipSnap.get("_skipReason") or "inactive_branch"), ) continue _dexec = _getExecutor(_dn.get("type", ""), services, automation2_interface) if not _dexec: nodeOutputs[_dnid] = None continue _dStart = time.time() _dIn = {} for _src, _, _ in connectionMap.get(_dnid, []): if _src in nodeOutputs: _dIn[_src] = nodeOutputs[_src] _dIn = _merge_node_parameters_into_snap(_dIn, node_id=_dnid, context=context) _dStepId = _createStepLog(automation2_interface, runId, _dnid, _dn.get("type", ""), "running", _dIn) try: _dres, _dRetry = await _executeWithRetry(_dexec, _dn, context) _dres = _normalizeResult(_dres, _dn.get("type", "")) nodeOutputs[_dnid] = _dres _dDur = int((time.time() - _dStart) * 1000) _dTok = _dres.get("tokensUsed", 0) if isinstance(_dres, dict) else 0 _updateStepLog(automation2_interface, _dStepId, "completed", output=_dres if isinstance(_dres, dict) else {"value": _dres}, durationMs=_dDur, tokensUsed=_dTok, retryCount=_dRetry) await _ge_log_node_finished( waFileLogger, run_id=runId, node_outputs=nodeOutputs, run_envelope=context.get("runEnvelope"), exec_context=context, node_id=_dnid, node_type=_dn.get("type", ""), status="completed", input_snap=_dIn, output=_dres, duration_ms=_dDur, retry_count=_dRetry, ) except PauseForHumanTaskError: _updateStepLog(automation2_interface, _dStepId, "completed", durationMs=int((time.time() - _dStart) * 1000)) await _ge_log_node_finished( waFileLogger, run_id=runId, node_outputs=nodeOutputs, run_envelope=context.get("runEnvelope"), exec_context=context, node_id=_dnid, node_type=_dn.get("type", ""), status="completed", input_snap=_dIn, duration_ms=int((time.time() - _dStart) * 1000), ) raise except PauseForEmailWaitError: _updateStepLog(automation2_interface, _dStepId, "completed", durationMs=int((time.time() - _dStart) * 1000)) await _ge_log_node_finished( waFileLogger, run_id=runId, node_outputs=nodeOutputs, run_envelope=context.get("runEnvelope"), exec_context=context, node_id=_dnid, node_type=_dn.get("type", ""), status="completed", input_snap=_dIn, duration_ms=int((time.time() - _dStart) * 1000), ) raise except (_SubscriptionInactiveException, _BillingContextError): _dFailDur = int((time.time() - _dStart) * 1000) _updateStepLog(automation2_interface, _dStepId, "failed", error="Subscription/Billing error", durationMs=_dFailDur) await _ge_log_node_finished( waFileLogger, run_id=runId, node_outputs=nodeOutputs, run_envelope=context.get("runEnvelope"), exec_context=context, node_id=_dnid, node_type=_dn.get("type", ""), status="failed", input_snap=_dIn, error="Subscription/Billing error", duration_ms=_dFailDur, ) raise except Exception as _dex: _dFailDur2 = int((time.time() - _dStart) * 1000) _updateStepLog(automation2_interface, _dStepId, "failed", error=str(_dex), durationMs=_dFailDur2) await _ge_log_node_finished( waFileLogger, run_id=runId, node_outputs=nodeOutputs, run_envelope=context.get("runEnvelope"), exec_context=context, node_id=_dnid, node_type=_dn.get("type", ""), status="failed", input_snap=_dIn, error=str(_dex), duration_ms=_dFailDur2, ) raise processed_in_loop.update(_done_only) return None async def executeGraph( graph: Dict[str, Any], services: Any, workflowId: str = None, instanceId: str = None, userId: str = None, mandateId: str = None, automation2_interface: Optional[Any] = None, initialNodeOutputs: Optional[Dict[str, Any]] = None, startAfterNodeId: Optional[str] = None, runId: Optional[str] = None, run_envelope: Optional[Dict[str, Any]] = None, label: Optional[str] = None, targetFeatureInstanceId: Optional[str] = None, ) -> Dict[str, Any]: """ Execute automation2 graph. Returns { success, nodeOutputs, error?, stopped? }. When an input node is reached and automation2_interface is provided, creates a task, pauses the run, and returns { success: False, paused: True, taskId, runId }. For resume: pass initialNodeOutputs (with result for the human node) and startAfterNodeId. For fresh runs: pass run_envelope (unified start payload for the start node); normalized with userId into context.runEnvelope. targetFeatureInstanceId: resolves {{featureInstanceId}} placeholders in the graph JSON before execution. """ logger.info( "executeGraph start: instanceId=%s workflowId=%s userId=%s mandateId=%s resume=%s targetInstance=%s", instanceId, workflowId, userId, mandateId, startAfterNodeId is not None, targetFeatureInstanceId, ) from modules.workflows.processing.shared.methodDiscovery import discoverMethods discoverMethods(services) from modules.workflowAutomation.engine.pickNotPushMigration import ( materializeConnectionRefs, materializePrimaryTextHandover, materializeRecommendedDataPickRef, normalizeFileCreatePresentationRefs, ) from modules.workflowAutomation.engine.featureInstanceRefMigration import ( materializeFeatureInstanceRefs, ) if targetFeatureInstanceId: graph = _substituteFeatureInstancePlaceholders(graph, targetFeatureInstanceId) # Phase-5 Schicht-4: typed-ref envelopes are materialized FIRST so the # subsequent connection-ref pass and validation see the canonical shape. graph = materializeFeatureInstanceRefs(graph) if mandateId: _validateFeatureInstanceMandates(graph, mandateId) graph = materializeConnectionRefs(graph) graph = materializePrimaryTextHandover(graph) graph = materializeRecommendedDataPickRef(graph) graph = normalizeFileCreatePresentationRefs(graph) nodeTypeIds = _getNodeTypeIds(services) logger.debug("executeGraph nodeTypeIds (%d): %s", len(nodeTypeIds), sorted(nodeTypeIds)) errors = validateGraph(graph, nodeTypeIds) if errors: logger.warning("executeGraph validation failed: %s", errors) return {"success": False, "error": "; ".join(errors), "nodeOutputs": {}} nodes, connections = parseGraph(graph)[:2] connectionMap = buildConnectionMap(connections) inputSources = {n["id"]: getInputSources(n["id"], connectionMap) for n in nodes if n.get("id")} logger.info( "executeGraph parsed: nodes=%d connections=%d connectionMap_targets=%s", len(nodes), len(connections), list(connectionMap.keys()), ) ordered = topoSort(nodes, connectionMap) ordered_ids = [n.get("id") for n in ordered if n.get("id")] logger.info("executeGraph topoSort order: %s", ordered_ids) # Normalize resumed human-node output BEFORE copying into nodeOutputs — otherwise # normalizeToSchema only updates initialNodeOutputs and loop/refs still see raw # e.g. input.upload {files} without coerced DocumentList.documents. is_resume = startAfterNodeId is not None if is_resume and initialNodeOutputs and startAfterNodeId: resumedNode = next((n for n in nodes if n.get("id") == startAfterNodeId), None) if resumedNode: resumedType = resumedNode.get("type", "") resumedOutput = initialNodeOutputs.get(startAfterNodeId) if isinstance(resumedOutput, dict): schema = _outputSchemaForNode(resumedType) if schema and schema != "Transit": try: initialNodeOutputs[startAfterNodeId] = normalizeToSchema(resumedOutput, schema) except Exception as valErr: logger.warning("executeGraph resume: schema validation failed for %s: %s", startAfterNodeId, valErr) waFileLogger: Optional[RunFileLogger] = None nodeOutputs: Dict[str, Any] = dict(initialNodeOutputs or {}) if not runId and automation2_interface and workflowId and not is_resume: run_context = { "connectionMap": connectionMap, "inputSources": inputSources, "orderedNodeIds": ordered_ids, } if userId: run_context["ownerId"] = userId if mandateId: run_context["mandateId"] = mandateId if instanceId: run_context["instanceId"] = instanceId run_label = label if not run_label and automation2_interface and workflowId: try: wfObj = automation2_interface.getWorkflow(workflowId) if wfObj: wfDict = wfObj if isinstance(wfObj, dict) else ( wfObj.model_dump() if hasattr(wfObj, "model_dump") else {} ) run_label = wfDict.get("label") except Exception: pass if not run_label: ts = datetime.now(timezone.utc).strftime("%d.%m.%Y %H:%M") run_label = f"Manuell ({ts})" run = automation2_interface.createRun( workflowId=workflowId, nodeOutputs=nodeOutputs, context=run_context, label=run_label, ) runId = run.get("id") if run else None logger.info("executeGraph created run %s label=%s", runId, run_label) if runId and workflowAutomationRunFileLoggingEnabled(): waFileLogger = RunFileLogger.bootstrapNewRun( automation2_interface, runId, run_context, ) env_for_run = normalize_run_envelope(run_envelope, user_id=userId) graph_nodes_by_id: Dict[str, Any] = { str(n["id"]): n for n in nodes if n.get("id") } context = { "workflowId": workflowId, "instanceId": instanceId, "userId": userId, "mandateId": mandateId, "nodeOutputs": nodeOutputs, "connectionMap": connectionMap, "inputSources": inputSources, "services": services, "_runId": runId, "_orderedNodes": ordered, "runEnvelope": env_for_run, "graphNodesById": graph_nodes_by_id, } # Lets graph actions (e.g. ``context.setContext`` human-task mode) call # ``createTask`` / ``updateRun`` without threading the interface through services. if automation2_interface: context["_automation2Interface"] = automation2_interface # _context key in nodeOutputs for system variable resolution nodeOutputs["_context"] = context if runId: _activeRunContexts[runId] = context if ( workflowAutomationRunFileLoggingEnabled() and automation2_interface and runId and waFileLogger is None ): waFileLogger = RunFileLogger.ensureAttached( automation2_interface, runId, ) skip_until_passed = bool(startAfterNodeId) processed_in_loop: Set[str] = set() _aggregateAccumulators: Dict[str, list] = {} STEPLOG_BATCH_THRESHOLD = 100 AGGREGATE_FLUSH_THRESHOLD = 1000 _aggregateTempChunks: Dict[str, List[list]] = {} # Check for loop resume: run was paused inside a loop, we're resuming for next iteration run = automation2_interface.getRun(runId) if (runId and automation2_interface) else None loop_resume_state = (run.get("context") or {}).get("_loopState") if run else None if loop_resume_state and startAfterNodeId: loop_node_id = loop_resume_state.get("loopNodeId") next_index = loop_resume_state.get("currentIndex", -1) + 1 items = loop_resume_state.get("items") or [] body_ids = getLoopBodyNodeIds(loop_node_id, connectionMap) if loop_node_id else set() body_ordered = [n for n in ordered if n.get("id") in body_ids] processed_in_loop = set(body_ids) | {loop_node_id} if loop_node_id else set() _resume_feedback_body_node_id = None for _fb_src, _fb_so, _fb_ti in (connectionMap.get(loop_node_id) or []): if _fb_src in body_ids and _fb_ti == 0: _resume_feedback_body_node_id = _fb_src break if not _resume_feedback_body_node_id and body_ordered: _resume_feedback_body_node_id = body_ordered[-1].get("id") _resume_body_results: List[Any] = [] while next_index < len(items) and loop_node_id: nodeOutputs[loop_node_id] = { "items": items, "count": len(items), "currentItem": items[next_index], "currentIndex": next_index, } context["_loopState"] = {"loopNodeId": loop_node_id, "currentIndex": next_index, "items": items} for body_node in body_ordered: bnid = body_node.get("id") if not bnid or context.get("_stopped"): break if not _is_node_on_active_path(bnid, connectionMap, nodeOutputs): continue executor = _getExecutor(body_node.get("type", ""), services, automation2_interface) if not executor: nodeOutputs[bnid] = None continue _rStepStart = time.time() _rInputSnap = {"_loopItem": items[next_index], "_loopIndex": next_index} for _rSrc, _, _ in connectionMap.get(bnid, []): if _rSrc in nodeOutputs: _rInputSnap[_rSrc] = nodeOutputs[_rSrc] _rInputSnap = _merge_node_parameters_into_snap(_rInputSnap, node_id=bnid, context=context) _rStepId = _createStepLog(automation2_interface, runId, bnid, body_node.get("type", ""), "running", _rInputSnap) try: result, _rRetry = await _executeWithRetry(executor, body_node, context) if body_node.get("type") == "data.aggregate": if bnid not in _aggregateAccumulators: _aggregateAccumulators[bnid] = [] accItems = result.get("items", [result]) if isinstance(result, dict) else [result] _aggregateAccumulators[bnid].extend(accItems) nodeOutputs[bnid] = result _rDur = int((time.time() - _rStepStart) * 1000) _updateStepLog(automation2_interface, _rStepId, "completed", output=result if isinstance(result, dict) else {"value": result}, durationMs=_rDur, retryCount=_rRetry) await _ge_log_node_finished( waFileLogger, run_id=runId, node_outputs=nodeOutputs, run_envelope=context.get("runEnvelope"), exec_context=context, node_id=bnid, node_type=body_node.get("type", ""), status="completed", input_snap=_rInputSnap, output=result, duration_ms=_rDur, retry_count=_rRetry, loop_index=next_index, loop_node_id=loop_node_id, loop_item=items[next_index], ) logger.info("executeGraph loop resume body node %s done (iter %d, retries=%d)", bnid, next_index, _rRetry) if _resume_feedback_body_node_id and bnid == _resume_feedback_body_node_id: _resume_body_results.append(result) except PauseForHumanTaskError as e: _rPauseDur = int((time.time() - _rStepStart) * 1000) _updateStepLog(automation2_interface, _rStepId, "completed", durationMs=_rPauseDur) await _ge_log_node_finished( waFileLogger, run_id=runId, node_outputs=nodeOutputs, run_envelope=context.get("runEnvelope"), exec_context=context, node_id=bnid, node_type=body_node.get("type", ""), status="completed", input_snap=_rInputSnap, duration_ms=_rPauseDur, loop_index=next_index, loop_node_id=loop_node_id, loop_item=items[next_index], ) if automation2_interface: run_ctx = dict(run.get("context") or {}) run_ctx["_loopState"] = {"loopNodeId": loop_node_id, "currentIndex": next_index, "items": items} automation2_interface.updateRun(runId, status="paused", nodeOutputs=_serializableOutputs(nodeOutputs), currentNodeId=e.nodeId, context=run_ctx) return {"success": False, "paused": True, "taskId": e.taskId, "runId": e.runId, "nodeId": e.nodeId, "nodeOutputs": _serializableOutputs(nodeOutputs)} except PauseForEmailWaitError as e: _rEmailDur = int((time.time() - _rStepStart) * 1000) _updateStepLog(automation2_interface, _rStepId, "completed", durationMs=_rEmailDur) await _ge_log_node_finished( waFileLogger, run_id=runId, node_outputs=nodeOutputs, run_envelope=context.get("runEnvelope"), exec_context=context, node_id=bnid, node_type=body_node.get("type", ""), status="completed", input_snap=_rInputSnap, duration_ms=_rEmailDur, loop_index=next_index, loop_node_id=loop_node_id, loop_item=items[next_index], ) raise except (_SubscriptionInactiveException, _BillingContextError): _rFailDurSb = int((time.time() - _rStepStart) * 1000) _updateStepLog(automation2_interface, _rStepId, "failed", error="Subscription/Billing error", durationMs=_rFailDurSb) await _ge_log_node_finished( waFileLogger, run_id=runId, node_outputs=nodeOutputs, run_envelope=context.get("runEnvelope"), exec_context=context, node_id=bnid, node_type=body_node.get("type", ""), status="failed", input_snap=_rInputSnap, error="Subscription/Billing error", duration_ms=_rFailDurSb, loop_index=next_index, loop_node_id=loop_node_id, loop_item=items[next_index], ) raise except Exception as ex: _rFailDurEx = int((time.time() - _rStepStart) * 1000) _updateStepLog(automation2_interface, _rStepId, "failed", error=str(ex), durationMs=_rFailDurEx) await _ge_log_node_finished( waFileLogger, run_id=runId, node_outputs=nodeOutputs, run_envelope=context.get("runEnvelope"), exec_context=context, node_id=bnid, node_type=body_node.get("type", ""), status="failed", input_snap=_rInputSnap, error=str(ex), duration_ms=_rFailDurEx, loop_index=next_index, loop_node_id=loop_node_id, loop_item=items[next_index], ) logger.exception("executeGraph loop body node %s FAILED: %s", bnid, ex) nodeOutputs[bnid] = {"error": str(ex), "success": False} if runId and automation2_interface: automation2_interface.updateRun(runId, status="failed", nodeOutputs=_serializableOutputs(nodeOutputs)) if runId: _activeRunContexts.pop(runId, None) return {"success": False, "error": str(ex), "nodeOutputs": _serializableOutputs(nodeOutputs), "failedNode": bnid, "runId": runId} next_index += 1 if loop_node_id: for aggId, accItems in _aggregateAccumulators.items(): nodeOutputs[aggId] = {"items": accItems, "count": len(accItems), "_success": True} _aggregateAccumulators.clear() if _resume_body_results: _rlo = nodeOutputs.get(loop_node_id) if isinstance(_rlo, dict): _rlo["bodyResults"] = _resume_body_results nodeOutputs[loop_node_id] = _rlo await _run_post_loop_done_nodes( loop_node_id=loop_node_id, body_ids=body_ids, items=items, ordered=ordered, connectionMap=connectionMap, nodeOutputs=nodeOutputs, context=context, services=services, automation2_interface=automation2_interface, runId=runId, processed_in_loop=processed_in_loop, waFileLogger=waFileLogger, ) for i, node in enumerate(ordered): if skip_until_passed: if node.get("id") == startAfterNodeId: skip_until_passed = False continue if node.get("id") in processed_in_loop: continue if context.get("_stopped"): logger.info("executeGraph stopped early at step %d", i) break nodeId = node.get("id") nodeType = node.get("type", "") # flow.loop: the feedback edge (body → loop input 0) hasn't run yet on the first # pass → would make _is_node_on_active_path return False. Only check the # *primary* predecessor (the one outside the loop body). if nodeType == "flow.loop": _loop_body_ids = getLoopBodyNodeIds(nodeId, connectionMap) _loop_primary = getLoopPrimaryInputSource(nodeId, connectionMap, _loop_body_ids) _loop_check_map = ( {nodeId: [(_loop_primary[0], _loop_primary[1], 0)]} if _loop_primary else connectionMap ) _loop_active = _is_node_on_active_path(nodeId, _loop_check_map, nodeOutputs) else: _loop_active = _is_node_on_active_path(nodeId, connectionMap, nodeOutputs) if not _loop_active: logger.info("executeGraph step %d/%d: nodeId=%s SKIP (inactive branch)", i + 1, len(ordered), nodeId) _skipInputSnap = {"_skipReason": "inactive_branch"} for _sSrc, _, _ in connectionMap.get(nodeId, []): if _sSrc in nodeOutputs: _skipInputSnap[_sSrc] = nodeOutputs[_sSrc] _skipInputSnap = _merge_node_parameters_into_snap(_skipInputSnap, node_id=nodeId, context=context) _skipStepId = _createStepLog(automation2_interface, runId, nodeId, nodeType, status="skipped", inputSnapshot=_skipInputSnap) if _skipStepId: _updateStepLog(automation2_interface, _skipStepId, "skipped") await _ge_log_node_finished( waFileLogger, run_id=runId, node_outputs=nodeOutputs, run_envelope=context.get("runEnvelope"), exec_context=context, node_id=nodeId, node_type=nodeType, status="skipped", input_snap=_skipInputSnap, skip_reason=str(_skipInputSnap.get("_skipReason") or "inactive_branch"), ) continue executor = _getExecutor(nodeType, services, automation2_interface) logger.info( "executeGraph step %d/%d: nodeId=%s nodeType=%s executor=%s", i + 1, len(ordered), nodeId, nodeType, type(executor).__name__ if executor else "None", ) if not executor: nodeOutputs[nodeId] = None logger.debug("executeGraph node %s: no executor, output=None", nodeId) continue _stepStartMs = time.time() _stepId = None try: if nodeType == "flow.loop": _loopInputSnap = {} for _lSrc, _, _ in connectionMap.get(nodeId, []): if _lSrc in nodeOutputs: _loopInputSnap[_lSrc] = nodeOutputs[_lSrc] _loopInputSnap = _merge_node_parameters_into_snap(_loopInputSnap, node_id=nodeId, context=context) _stepId = _createStepLog(automation2_interface, runId, nodeId, nodeType, "running", _loopInputSnap) result = await executor.execute(node, context) items = result.get("items") or [] body_ids = getLoopBodyNodeIds(nodeId, connectionMap) body_ordered = [n for n in ordered if n.get("id") in body_ids] processed_in_loop.update(body_ids) processed_in_loop.add(nodeId) _loopConcurrency = int((node.get("parameters") or {}).get("concurrency", 1)) _loopConcurrency = max(1, min(_loopConcurrency, 20)) _batchMode = len(items) > STEPLOG_BATCH_THRESHOLD _aggLock = asyncio.Lock() # Prefer the *last* body node wired to loop input 0 (feedback / # pipeline end) — first matching inbound edge can be a shallow node. _feedback_candidates = [ _fb_src for _fb_src, _fb_so, _fb_ti in (connectionMap.get(nodeId) or []) if _fb_src in body_ids and _fb_ti == 0 ] _feedback_body_node_id = _feedback_candidates[-1] if _feedback_candidates else None if not _feedback_body_node_id and body_ordered: _feedback_body_node_id = body_ordered[-1].get("id") _bodyResultsPerIter: List[Any] = [None] * len(items) async def _runLoopIteration(_idx: int, _item: Any) -> Optional[Dict]: """Execute all body nodes for one iteration. Returns error dict or None.""" _iterOutputs = dict(nodeOutputs) _iterOutputs[nodeId] = {"items": items, "count": len(items), "currentItem": _item, "currentIndex": _idx} _iterCtx = dict(context) _iterCtx["nodeOutputs"] = _iterOutputs if _loopConcurrency > 1 else nodeOutputs _iterCtx["_loopState"] = {"loopNodeId": nodeId, "currentIndex": _idx, "items": items} if _loopConcurrency == 1: nodeOutputs[nodeId] = _iterOutputs[nodeId] context["_loopState"] = _iterCtx["_loopState"] _activeOutputs = _iterOutputs if _loopConcurrency > 1 else nodeOutputs _activeCtx = _iterCtx if _loopConcurrency > 1 else context for body_node in body_ordered: bnid = body_node.get("id") if not bnid or context.get("_stopped"): break if not _is_node_on_active_path(bnid, connectionMap, _activeOutputs): continue bexec = _getExecutor(body_node.get("type", ""), services, automation2_interface) if not bexec: _activeOutputs[bnid] = None continue _bStepStart = time.time() _bInputSnapAlways: Dict[str, Any] = {"_loopItem": _item, "_loopIndex": _idx} for _bSnapSrc, _, _ in connectionMap.get(bnid, []): if _bSnapSrc in _activeOutputs: _bInputSnapAlways[_bSnapSrc] = _activeOutputs[_bSnapSrc] _bInputSnapAlways = _merge_node_parameters_into_snap( _bInputSnapAlways, node_id=bnid, context=context ) _bStepId = None if not _batchMode or _idx == 0 or _idx == len(items) - 1: _bStepId = _createStepLog( automation2_interface, runId, bnid, body_node.get("type", ""), "running", _bInputSnapAlways, ) try: bres, _bRetry = await _executeWithRetry(bexec, body_node, _activeCtx) if body_node.get("type") == "data.aggregate": async with _aggLock: if bnid not in _aggregateAccumulators: _aggregateAccumulators[bnid] = [] accItems = bres.get("items", [bres]) if isinstance(bres, dict) else [bres] _aggregateAccumulators[bnid].extend(accItems) if len(_aggregateAccumulators[bnid]) >= AGGREGATE_FLUSH_THRESHOLD: _aggregateTempChunks.setdefault(bnid, []).append(_aggregateAccumulators[bnid]) _aggregateAccumulators[bnid] = [] _activeOutputs[bnid] = bres _bDur = int((time.time() - _bStepStart) * 1000) if _bStepId: _updateStepLog(automation2_interface, _bStepId, "completed", output=bres if isinstance(bres, dict) else {"value": bres}, durationMs=_bDur, retryCount=_bRetry) await _ge_log_node_finished( waFileLogger, run_id=runId, node_outputs=_activeOutputs, run_envelope=context.get("runEnvelope"), exec_context=context, node_id=bnid, node_type=body_node.get("type", ""), status="completed", input_snap=_bInputSnapAlways, output=bres, duration_ms=_bDur, retry_count=_bRetry, loop_index=_idx, loop_node_id=nodeId, loop_item=_item, ) if _loopConcurrency == 1: nodeOutputs[bnid] = bres except PauseForHumanTaskError as e: _bHd = int((time.time() - _bStepStart) * 1000) if _bStepId: _updateStepLog(automation2_interface, _bStepId, "completed", durationMs=_bHd) await _ge_log_node_finished( waFileLogger, run_id=runId, node_outputs=_activeOutputs, run_envelope=context.get("runEnvelope"), exec_context=context, node_id=bnid, node_type=body_node.get("type", ""), status="completed", input_snap=_bInputSnapAlways, duration_ms=_bHd, loop_index=_idx, loop_node_id=nodeId, loop_item=_item, ) if runId and automation2_interface: _run = automation2_interface.getRun(runId) or {} _run_ctx = dict(_run.get("context") or {}) _run_ctx["_loopState"] = {"loopNodeId": nodeId, "currentIndex": _idx, "items": items} automation2_interface.updateRun(e.runId, status="paused", nodeOutputs=_serializableOutputs(nodeOutputs), currentNodeId=e.nodeId, context=_run_ctx) return {"_pause": True, "taskId": e.taskId, "runId": e.runId, "nodeId": e.nodeId} except PauseForEmailWaitError: _bEd = int((time.time() - _bStepStart) * 1000) if _bStepId: _updateStepLog(automation2_interface, _bStepId, "completed", durationMs=_bEd) await _ge_log_node_finished( waFileLogger, run_id=runId, node_outputs=_activeOutputs, run_envelope=context.get("runEnvelope"), exec_context=context, node_id=bnid, node_type=body_node.get("type", ""), status="completed", input_snap=_bInputSnapAlways, duration_ms=_bEd, loop_index=_idx, loop_node_id=nodeId, loop_item=_item, ) raise except (_SubscriptionInactiveException, _BillingContextError): _bSb = int((time.time() - _bStepStart) * 1000) if _bStepId: _updateStepLog(automation2_interface, _bStepId, "failed", error="Subscription/Billing error", durationMs=_bSb) await _ge_log_node_finished( waFileLogger, run_id=runId, node_outputs=_activeOutputs, run_envelope=context.get("runEnvelope"), exec_context=context, node_id=bnid, node_type=body_node.get("type", ""), status="failed", input_snap=_bInputSnapAlways, error="Subscription/Billing error", duration_ms=_bSb, loop_index=_idx, loop_node_id=nodeId, loop_item=_item, ) raise except Exception as ex: _bFail = int((time.time() - _bStepStart) * 1000) if _bStepId: _updateStepLog(automation2_interface, _bStepId, "failed", error=str(ex), durationMs=_bFail) await _ge_log_node_finished( waFileLogger, run_id=runId, node_outputs=_activeOutputs, run_envelope=context.get("runEnvelope"), exec_context=context, node_id=bnid, node_type=body_node.get("type", ""), status="failed", input_snap=_bInputSnapAlways, error=str(ex), duration_ms=_bFail, loop_index=_idx, loop_node_id=nodeId, loop_item=_item, ) logger.exception("executeGraph loop body node %s FAILED (iter %d): %s", bnid, _idx, ex) return {"_error": str(ex), "failedNode": bnid} if _feedback_body_node_id: async with _aggLock: if _idx < len(_bodyResultsPerIter): _bodyResultsPerIter[_idx] = _activeOutputs.get(_feedback_body_node_id) if _batchMode and _idx > 0 and _idx % STEPLOG_BATCH_THRESHOLD == 0 and runId: _emitStepEvent(runId, {"type": "loop_progress", "nodeId": nodeId, "iteration": _idx, "total": len(items)}) return None if _loopConcurrency <= 1: for idx, item in enumerate(items): iterErr = await _runLoopIteration(idx, item) if iterErr: if iterErr.get("_pause"): return {"success": False, "paused": True, "taskId": iterErr["taskId"], "runId": iterErr["runId"], "nodeId": iterErr["nodeId"], "nodeOutputs": _serializableOutputs(nodeOutputs)} nodeOutputs[iterErr.get("failedNode", nodeId)] = {"error": iterErr["_error"], "success": False} if runId and automation2_interface: automation2_interface.updateRun(runId, status="failed", nodeOutputs=_serializableOutputs(nodeOutputs)) if runId: _activeRunContexts.pop(runId, None) return {"success": False, "error": iterErr["_error"], "nodeOutputs": _serializableOutputs(nodeOutputs), "failedNode": iterErr.get("failedNode"), "runId": runId} else: _sem = asyncio.Semaphore(_loopConcurrency) async def _concurrentIter(_ci: int, _citem: Any): async with _sem: return await _runLoopIteration(_ci, _citem) _tasks = [_concurrentIter(ci, citem) for ci, citem in enumerate(items)] _results = await asyncio.gather(*_tasks, return_exceptions=True) for _ri, _rval in enumerate(_results): if isinstance(_rval, Exception): logger.exception("Loop iteration %d raised: %s", _ri, _rval) if runId and automation2_interface: automation2_interface.updateRun(runId, status="failed", nodeOutputs=_serializableOutputs(nodeOutputs)) if runId: _activeRunContexts.pop(runId, None) return {"success": False, "error": str(_rval), "nodeOutputs": _serializableOutputs(nodeOutputs), "runId": runId} if isinstance(_rval, dict): if _rval.get("_pause"): return {"success": False, "paused": True, "taskId": _rval["taskId"], "runId": _rval["runId"], "nodeId": _rval["nodeId"], "nodeOutputs": _serializableOutputs(nodeOutputs)} if _rval.get("_error"): if runId and automation2_interface: automation2_interface.updateRun(runId, status="failed", nodeOutputs=_serializableOutputs(nodeOutputs)) if runId: _activeRunContexts.pop(runId, None) return {"success": False, "error": _rval["_error"], "nodeOutputs": _serializableOutputs(nodeOutputs), "failedNode": _rval.get("failedNode"), "runId": runId} for aggId, accItems in _aggregateAccumulators.items(): allChunks = _aggregateTempChunks.pop(aggId, []) finalItems = [] for chunk in allChunks: finalItems.extend(chunk) finalItems.extend(accItems) nodeOutputs[aggId] = {"items": finalItems, "count": len(finalItems), "_success": True} _aggregateAccumulators.clear() # Always attach ``bodyResults`` (list per iteration, possibly None # placeholders) so DataRefs to ``bodyResults`` resolve and # ``context.mergeContext`` can fall back to the wired loop output. _lo = nodeOutputs.get(nodeId) if isinstance(_lo, dict): _lo["bodyResults"] = _bodyResultsPerIter nodeOutputs[nodeId] = _lo await _run_post_loop_done_nodes( loop_node_id=nodeId, body_ids=body_ids, items=items, ordered=ordered, connectionMap=connectionMap, nodeOutputs=nodeOutputs, context=context, services=services, automation2_interface=automation2_interface, runId=runId, processed_in_loop=processed_in_loop, waFileLogger=waFileLogger, ) _loopDurMs = int((time.time() - _stepStartMs) * 1000) _loopStepOut = { "iterationCount": len(items), "items": len(items), "concurrency": _loopConcurrency, "batchMode": _batchMode, } _updateStepLog(automation2_interface, _stepId, "completed", output=_loopStepOut, durationMs=_loopDurMs) await _ge_log_node_finished( waFileLogger, run_id=runId, node_outputs=nodeOutputs, run_envelope=context.get("runEnvelope"), exec_context=context, node_id=nodeId, node_type=nodeType, status="completed", input_snap=_loopInputSnap, output=_loopStepOut, duration_ms=_loopDurMs, ) logger.info("executeGraph flow.loop done: %d iterations (concurrency=%d, batchMode=%s)", len(items), _loopConcurrency, _batchMode) elif _isBarrierNode(nodeType): if not _allMergePredecessorsReady(nodeId, connectionMap, nodeOutputs): logger.info("executeGraph node %s (%s): waiting — not all predecessors ready, deferring", nodeId, nodeType) nodeOutputs[nodeId] = None continue _stepStartMs = time.time() _inputSnap = {} for src, _, _ in connectionMap.get(nodeId, []): if src in nodeOutputs: _inputSnap[src] = nodeOutputs[src] _inputSnap = _merge_node_parameters_into_snap(_inputSnap, node_id=nodeId, context=context) _stepId = _createStepLog(automation2_interface, runId, nodeId, nodeType, "running", _inputSnap) result, retryCount = await _executeWithRetry(executor, node, context) result = _normalizeResult(result, nodeType) nodeOutputs[nodeId] = result _mergeDurMs = int((time.time() - _stepStartMs) * 1000) _mergeTok = result.get("tokensUsed", 0) if isinstance(result, dict) else 0 _updateStepLog(automation2_interface, _stepId, "completed", output=result if isinstance(result, dict) else {"value": result}, durationMs=_mergeDurMs, tokensUsed=_mergeTok, retryCount=retryCount) await _ge_log_node_finished( waFileLogger, run_id=runId, node_outputs=nodeOutputs, run_envelope=context.get("runEnvelope"), exec_context=context, node_id=nodeId, node_type=nodeType, status="completed", input_snap=_inputSnap, output=result, duration_ms=_mergeDurMs, retry_count=retryCount, ) else: _stepStartMs = time.time() _inputSnap = {} for src, _, _ in connectionMap.get(nodeId, []): if src in nodeOutputs: _inputSnap[src] = nodeOutputs[src] _inputSnap = _merge_node_parameters_into_snap(_inputSnap, node_id=nodeId, context=context) _stepId = _createStepLog(automation2_interface, runId, nodeId, nodeType, "running", _inputSnap) result, retryCount = await _executeWithRetry(executor, node, context) result = _normalizeResult(result, nodeType) nodeOutputs[nodeId] = result _durMs = int((time.time() - _stepStartMs) * 1000) _tokens = result.get("tokensUsed", 0) if isinstance(result, dict) else 0 _updateStepLog(automation2_interface, _stepId, "completed", output=result if isinstance(result, dict) else {"value": result}, durationMs=_durMs, tokensUsed=_tokens, retryCount=retryCount) await _ge_log_node_finished( waFileLogger, run_id=runId, node_outputs=nodeOutputs, run_envelope=context.get("runEnvelope"), exec_context=context, node_id=nodeId, node_type=nodeType, status="completed", input_snap=_inputSnap, output=result, duration_ms=_durMs, retry_count=retryCount, ) logger.info( "executeGraph node %s done: result_type=%s result_keys=%s retries=%d duration=%dms", nodeId, type(result).__name__, list(result.keys()) if isinstance(result, dict) else "n/a", retryCount, _durMs, ) except PauseForHumanTaskError as e: _huPauseMs = int((time.time() - _stepStartMs) * 1000) _updateStepLog(automation2_interface, _stepId, "completed", durationMs=_huPauseMs) _ge_in = locals().get("_inputSnap") if _ge_in is None: _ge_in = locals().get("_loopInputSnap") or {} await _ge_log_node_finished( waFileLogger, run_id=runId, node_outputs=nodeOutputs, run_envelope=context.get("runEnvelope"), exec_context=context, node_id=nodeId, node_type=nodeType, status="completed", input_snap=_ge_in, duration_ms=_huPauseMs, ) logger.info("executeGraph paused for human task %s", e.taskId) return { "success": False, "paused": True, "taskId": e.taskId, "runId": e.runId, "nodeId": e.nodeId, "nodeOutputs": _serializableOutputs(nodeOutputs), } except PauseForEmailWaitError as e: _emailPauseMs = int((time.time() - _stepStartMs) * 1000) _updateStepLog(automation2_interface, _stepId, "completed", durationMs=_emailPauseMs) _ge_email_in = locals().get("_inputSnap") if _ge_email_in is None: _ge_email_in = locals().get("_loopInputSnap") or {} await _ge_log_node_finished( waFileLogger, run_id=runId, node_outputs=nodeOutputs, run_envelope=context.get("runEnvelope"), exec_context=context, node_id=nodeId, node_type=nodeType, status="completed", input_snap=_ge_email_in, duration_ms=_emailPauseMs, ) logger.info("executeGraph paused for email wait (run %s, node %s)", e.runId, e.nodeId) paused_at = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") run_ctx = { "connectionMap": context.get("connectionMap"), "inputSources": context.get("inputSources"), "orderedNodeIds": [n.get("id") for n in context.get("_orderedNodes", []) if n.get("id")], "waitReason": "email", "waitConfig": e.waitConfig, "pausedAt": paused_at, "lastCheckedAt": None, "ownerId": context.get("userId"), "mandateId": context.get("mandateId"), "instanceId": context.get("instanceId"), } if automation2_interface and e.runId: prev_ctx = dict((automation2_interface.getRun(e.runId) or {}).get("context") or {}) run_ctx = mergeRunContextWithWaLogPrefix(prev_ctx, run_ctx) automation2_interface.updateRun( e.runId, status="paused", nodeOutputs=_serializableOutputs(nodeOutputs), currentNodeId=e.nodeId, context=run_ctx, ) return { "success": False, "paused": True, "waitReason": "email", "runId": e.runId, "nodeId": e.nodeId, "nodeOutputs": _serializableOutputs(nodeOutputs), } except Exception as e: logger.exception("executeGraph node %s (%s) FAILED: %s", nodeId, nodeType, e) nodeOutputs[nodeId] = {"error": str(e), "success": False} _durMs = int((time.time() - _stepStartMs) * 1000) _updateStepLog(automation2_interface, _stepId, "failed", error=str(e), durationMs=_durMs) _ge_fail_in = locals().get("_inputSnap") if _ge_fail_in is None: _ge_fail_in = locals().get("_loopInputSnap") or {} await _ge_log_node_finished( waFileLogger, run_id=runId, node_outputs=nodeOutputs, run_envelope=context.get("runEnvelope"), exec_context=context, node_id=nodeId, node_type=nodeType, status="failed", input_snap=_ge_fail_in, error=str(e), duration_ms=_durMs, ) if runId and automation2_interface: automation2_interface.updateRun(runId, status="failed", nodeOutputs=_serializableOutputs(nodeOutputs)) if runId: _emitStepEvent(runId, {"type": "run_failed", "runId": runId, "status": "failed", "error": str(e), "failedNode": nodeId}) try: _wfObj = automation2_interface.getWorkflow(workflowId) if automation2_interface and workflowId else None _wfDict = _wfObj if isinstance(_wfObj, dict) else ( _wfObj.model_dump() if hasattr(_wfObj, "model_dump") else {} ) if _wfObj else {} _shouldNotify = _wfDict.get("notifyOnFailure", True) if _wfDict else True if _shouldNotify: from modules.workflowAutomation.engine._runNotifications import notifyRunFailed notifyRunFailed( workflowId or "", runId or "", str(e), mandateId=mandateId, workflowLabel=_wfDict.get("label"), ) except Exception as notifyErr: logger.warning(f"executeGraph: failure notification failed for run={runId}: {notifyErr}") if runId: _activeRunContexts.pop(runId, None) return { "success": False, "error": str(e), "nodeOutputs": _serializableOutputs(nodeOutputs), "failedNode": nodeId, "runId": runId, } _safeOutputs = _serializableOutputs(nodeOutputs) _wasStopped = context.get("_stopped", False) _finalStatus = "stopped" if _wasStopped else "completed" if runId and automation2_interface: automation2_interface.updateRun(runId, status=_finalStatus, nodeOutputs=_safeOutputs) if runId: _emitStepEvent(runId, {"type": "run_complete", "runId": runId, "status": _finalStatus}) _activeRunContexts.pop(runId, None) logger.info( "executeGraph complete: success=True nodeOutputs_keys=%s stopped=%s", list(nodeOutputs.keys()), _wasStopped, ) return { "success": True, "nodeOutputs": _safeOutputs, "stopped": _wasStopped, "runId": runId, }