# Copyright (c) 2025 Patrick Motsch # Main execution engine for automation2 graphs. import asyncio import logging import time import uuid from datetime import datetime, timezone from typing import Dict, Any, List, Set, Optional from modules.workflows.automation2.graphUtils import ( parseGraph, buildConnectionMap, validateGraph, topoSort, getInputSources, getLoopBodyNodeIds, getLoopDoneNodeIds, getLoopPrimaryInputSource, ) from modules.workflows.automation2.executors import ( TriggerExecutor, FlowExecutor, ActionNodeExecutor, InputExecutor, DataExecutor, PauseForHumanTaskError, PauseForEmailWaitError, ) from modules.features.graphicalEditor.portTypes import normalizeToSchema, wrapTransit, unwrapTransit from modules.features.graphicalEditor.nodeDefinitions import STATIC_NODE_TYPES from modules.serviceCenter.services.serviceSubscription.mainServiceSubscription import SubscriptionInactiveException as _SubscriptionInactiveException from modules.serviceCenter.services.serviceBilling.mainServiceBilling import BillingContextError as _BillingContextError from modules.workflows.automation2.runEnvelope import normalize_run_envelope logger = logging.getLogger(__name__) _NODE_DEF_BY_ID: Dict[str, dict] = {} # Registry of currently executing runs: runId -> context dict. # Used by requestRunStop() to set context["_stopped"] = True. _activeRunContexts: Dict[str, Dict[str, Any]] = {} def requestRunStop(runId: str) -> bool: """Request a running workflow to stop at the next node boundary. Returns True if the run was found and flagged, False otherwise. """ ctx = _activeRunContexts.get(runId) if ctx is not None: ctx["_stopped"] = True logger.info("requestRunStop: flagged runId=%s for stop", runId) return True logger.warning("requestRunStop: runId=%s not found in active runs", runId) return False def getActiveRunIds() -> list: """Return list of currently executing run IDs.""" return list(_activeRunContexts.keys()) def _getNodeDef(nodeType: str) -> Optional[dict]: """Lookup static node definition by type id (cached).""" if not _NODE_DEF_BY_ID: for nd in STATIC_NODE_TYPES: _NODE_DEF_BY_ID[nd["id"]] = nd return _NODE_DEF_BY_ID.get(nodeType) def _outputSchemaForNode(nodeType: str) -> Optional[str]: """Return the output port schema name for a node type (port 0), or None.""" nd = _getNodeDef(nodeType) if not nd: return None ports = nd.get("outputPorts") if isinstance(ports, dict): p0 = ports.get(0) or ports.get("0") if isinstance(p0, dict): spec = p0.get("schema") if isinstance(spec, dict) and spec.get("kind") == "fromGraph": return "FormPayload" if isinstance(spec, str): return spec return None def _isBarrierNode(nodeType: str) -> bool: """Barrier nodes wait for all connected predecessors before executing. Backwards compatible: ``flow.merge`` is always a barrier. Any other node may declare ``waitsForAllPredecessors: True`` in its STATIC_NODE_TYPES entry (e.g. ``context.mergeContext``). """ if nodeType == "flow.merge": return True for nd in STATIC_NODE_TYPES: if nd.get("id") == nodeType: return bool(nd.get("waitsForAllPredecessors")) return False # Legacy alias used inside this module. _isMergeNode = _isBarrierNode def _allMergePredecessorsReady( nodeId: str, connectionMap: Dict[str, List], nodeOutputs: Dict[str, Any], ) -> bool: """For barrier nodes: check that every connected predecessor has produced output or was skipped.""" for src, _, _ in connectionMap.get(nodeId, []): if src not in nodeOutputs: return False return True def _normalizeResult(result: Any, nodeType: str) -> Any: """Apply normalizeToSchema if the node has a declared output schema.""" schema = _outputSchemaForNode(nodeType) if schema and schema != "Transit" and isinstance(result, dict): try: return normalizeToSchema(result, schema) except Exception as e: logger.warning(f"_normalizeResult failed for nodeType={nodeType}, schema={schema}: {e}") return result def _getNodeTypeIds(services: Any = None) -> Set[str]: """Collect all known node type IDs from static definitions.""" return {n["id"] for n in STATIC_NODE_TYPES} def _is_node_on_active_path( nodeId: str, connectionMap: Dict[str, List], nodeOutputs: Dict[str, Any], ) -> bool: """ Return True if this node receives input only from active branches. Transit envelopes: routing metadata is in out["_meta"] (branch/match). Legacy format: branch/match directly on out. """ for src, source_output, _ in connectionMap.get(nodeId, []): out = nodeOutputs.get(src) if out is None: return False if not isinstance(out, dict): continue # Transit envelope: metadata in _meta meta = out.get("_meta", {}) if out.get("_transit") else out branch = meta.get("branch") match = meta.get("match") active_output = None if branch is not None: active_output = branch elif match is not None: if match < 0: return False active_output = match if active_output is not None and source_output != active_output: return False return True def _getExecutor( nodeType: str, services: Any, automation2_interface: Optional[Any] = None, ) -> Any: """Dispatch to correct executor based on node type.""" if nodeType.startswith("trigger."): return TriggerExecutor() if nodeType.startswith("flow."): return FlowExecutor() if nodeType.startswith("data."): return DataExecutor() if (nodeType.startswith("ai.") or nodeType.startswith("email.") or nodeType.startswith("sharepoint.") or nodeType.startswith("clickup.") or nodeType.startswith("file.") or nodeType.startswith("trustee.") or nodeType.startswith("context.")): return ActionNodeExecutor(services) if nodeType.startswith("input.") and automation2_interface: return InputExecutor(automation2_interface) return None _stepMeta: Dict[str, Dict[str, str]] = {} def _stripBinaryValues(obj: Any, depth: int = 0) -> Any: """Recursively replace bytes values with None to keep data JSON-safe for DB storage.""" if depth > 12: return obj if isinstance(obj, bytes): return None if isinstance(obj, dict): return {k: _stripBinaryValues(v, depth + 1) for k, v in obj.items()} if isinstance(obj, (list, tuple)): return [_stripBinaryValues(v, depth + 1) for v in obj] return obj def _serializableOutputs(nodeOutputs: Dict[str, Any]) -> Dict[str, Any]: """Return a JSON-safe copy of nodeOutputs: strip _context and binary data.""" cleaned = {k: v for k, v in nodeOutputs.items() if k != "_context"} return _stripBinaryValues(cleaned) def _emitStepEvent(runId: str, stepData: Dict[str, Any]) -> None: """Emit a step-log SSE event to any listening client for this run.""" try: from modules.serviceCenter.core.serviceStreaming.eventManager import get_event_manager em = get_event_manager() queueId = f"run-trace-{runId}" if not em.has_queue(queueId): return import asyncio loop = asyncio.get_event_loop() if loop.is_running(): asyncio.ensure_future(em.emit_event(queueId, "step", stepData, event_category="tracing")) except Exception as e: logger.warning(f"_emitStepEvent failed for runId={runId}: {e}") def _createStepLog(iface, runId: str, nodeId: str, nodeType: str, status: str = "running", inputSnapshot: Dict = None) -> Optional[str]: """Create an AutoStepLog entry. Returns the step log ID or None if interface unavailable.""" if not iface or not runId: return None try: from modules.features.graphicalEditor.datamodelFeatureGraphicalEditor import AutoStepLog stepId = str(uuid.uuid4()) startedAt = time.time() iface.db.recordCreate(AutoStepLog, { "id": stepId, "runId": runId, "nodeId": nodeId, "nodeType": nodeType, "status": status, "inputSnapshot": _stripBinaryValues(inputSnapshot) if inputSnapshot else {}, "startedAt": startedAt, }) _stepMeta[stepId] = {"runId": runId, "nodeId": nodeId, "nodeType": nodeType} _emitStepEvent(runId, { "id": stepId, "runId": runId, "nodeId": nodeId, "nodeType": nodeType, "status": status, "startedAt": startedAt, }) return stepId except Exception as e: logger.debug("Could not create AutoStepLog: %s", e) return None def _updateStepLog(iface, stepId: str, status: str, output: Dict = None, error: str = None, durationMs: int = None, tokensUsed: int = 0, retryCount: int = 0) -> None: """Update an AutoStepLog entry with results.""" if not iface or not stepId: return try: from modules.features.graphicalEditor.datamodelFeatureGraphicalEditor import AutoStepLog completedAt = time.time() updates: Dict[str, Any] = { "status": status, "completedAt": completedAt, } if output is not None: updates["output"] = _stripBinaryValues(output) if error is not None: updates["error"] = error if durationMs is not None: updates["durationMs"] = durationMs if tokensUsed: updates["tokensUsed"] = tokensUsed if retryCount: updates["retryCount"] = retryCount iface.db.recordModify(AutoStepLog, stepId, updates) meta = _stepMeta.pop(stepId, None) if meta: _emitStepEvent(meta["runId"], { "id": stepId, "runId": meta["runId"], "nodeId": meta["nodeId"], "nodeType": meta["nodeType"], "status": status, "completedAt": completedAt, "durationMs": durationMs, "error": error, "tokensUsed": tokensUsed, "retryCount": retryCount, }) except Exception as e: logger.debug("Could not update AutoStepLog %s: %s", stepId, e) async def _executeWithRetry(executor, node, context, maxRetries: int = 0, retryDelaySeconds: float = 1.0): """Execute a node with optional retry policy from node parameters.""" params = node.get("parameters") or {} retries = params.get("retryMaxAttempts", maxRetries) delay = params.get("retryDelaySeconds", retryDelaySeconds) attempt = 0 lastError = None while attempt <= retries: try: result = await executor.execute(node, context) return result, attempt except (PauseForHumanTaskError, PauseForEmailWaitError, _SubscriptionInactiveException, _BillingContextError): raise except Exception as e: lastError = e attempt += 1 if attempt <= retries: logger.warning( "Node %s failed (attempt %d/%d), retrying in %.1fs: %s", node.get("id"), attempt, retries + 1, delay, e, ) await asyncio.sleep(delay) delay = min(delay * 2, 60) else: raise lastError raise lastError def _substituteFeatureInstancePlaceholders( graph: Dict[str, Any], targetFeatureInstanceId: str, ) -> Dict[str, Any]: """Replace ``{{featureInstanceId}}`` placeholders in the serialised graph. Works on the full JSON representation so that placeholders inside nested parameter dicts, prompt strings, etc. are all caught. Already-resolved concrete UUIDs (pre-baked by ``_copyTemplateWorkflows``) are left untouched because the placeholder literal ``{{featureInstanceId}}`` will not match. """ import json as _json raw = _json.dumps(graph) if "{{featureInstanceId}}" not in raw: return graph replaced = raw.replace("{{featureInstanceId}}", targetFeatureInstanceId) logger.debug( "_substituteFeatureInstancePlaceholders: resolved %d occurrence(s) -> %s", raw.count("{{featureInstanceId}}"), targetFeatureInstanceId, ) return _json.loads(replaced) async def _run_post_loop_done_nodes( *, loop_node_id: str, body_ids: Set[str], items: List[Any], ordered: List[Dict], connectionMap: Dict[str, List], nodeOutputs: Dict[str, Any], context: Dict[str, Any], services: Any, automation2_interface: Optional[Any], runId: Optional[str], processed_in_loop: Set[str], ) -> Optional[Dict[str, Any]]: """After all loop iterations: merge upstream into loop output and run the Done (output 1) branch once.""" _prim_in = getLoopPrimaryInputSource(loop_node_id, connectionMap, body_ids) _upstream_loop = nodeOutputs.get(_prim_in[0]) if _prim_in else None _base_raw = unwrapTransit(_upstream_loop) if isinstance(_upstream_loop, dict) and _upstream_loop.get("_transit") else _upstream_loop _prev_loop_out = nodeOutputs.get(loop_node_id) # ``bodyResults`` lives on the plain iteration-state dict; after resume / edge # cases the loop slot may still be wrapped in Transit — unwrap before read. _prev_plain = _prev_loop_out if isinstance(_prev_loop_out, dict) and _prev_loop_out.get("_transit"): _prev_plain = unwrapTransit(_prev_loop_out) _body_results = ( _prev_plain.get("bodyResults") if isinstance(_prev_plain, dict) else None ) if not isinstance(_base_raw, dict): raise RuntimeError( f"flow.loop {loop_node_id}: primary upstream output must be a dict (JSON handover / node output); " f"got {type(_base_raw).__name__}" ) _merged_loop = {**_base_raw, "items": items, "count": len(items)} if _body_results is not None: _merged_loop["bodyResults"] = _body_results nodeOutputs[loop_node_id] = wrapTransit(_merged_loop, {"loopCompleted": True, "loopNodeId": loop_node_id}) _done_all = getLoopDoneNodeIds(loop_node_id, connectionMap) _done_only = _done_all - body_ids _done_ordered = [n for n in ordered if n.get("id") in _done_only] for _dn in _done_ordered: _dnid = _dn.get("id") if not _dnid or context.get("_stopped"): break if not _is_node_on_active_path(_dnid, connectionMap, nodeOutputs): _skipSnap = {"_skipReason": "inactive_branch"} for _sSrc, _, _ in connectionMap.get(_dnid, []): if _sSrc in nodeOutputs: _skipSnap[_sSrc] = nodeOutputs[_sSrc] _skId = _createStepLog(automation2_interface, runId, _dnid, _dn.get("type", ""), status="skipped", inputSnapshot=_skipSnap) if _skId: _updateStepLog(automation2_interface, _skId, "skipped") continue _dexec = _getExecutor(_dn.get("type", ""), services, automation2_interface) if not _dexec: nodeOutputs[_dnid] = None continue _dStart = time.time() _dIn = {} for _src, _, _ in connectionMap.get(_dnid, []): if _src in nodeOutputs: _dIn[_src] = nodeOutputs[_src] _dStepId = _createStepLog(automation2_interface, runId, _dnid, _dn.get("type", ""), "running", _dIn) try: _dres, _dRetry = await _executeWithRetry(_dexec, _dn, context) _dres = _normalizeResult(_dres, _dn.get("type", "")) nodeOutputs[_dnid] = _dres _dDur = int((time.time() - _dStart) * 1000) _dTok = _dres.get("tokensUsed", 0) if isinstance(_dres, dict) else 0 _updateStepLog(automation2_interface, _dStepId, "completed", output=_dres if isinstance(_dres, dict) else {"value": _dres}, durationMs=_dDur, tokensUsed=_dTok, retryCount=_dRetry) except PauseForHumanTaskError: _updateStepLog(automation2_interface, _dStepId, "completed", durationMs=int((time.time() - _dStart) * 1000)) raise except PauseForEmailWaitError: _updateStepLog(automation2_interface, _dStepId, "completed", durationMs=int((time.time() - _dStart) * 1000)) raise except (_SubscriptionInactiveException, _BillingContextError): _updateStepLog(automation2_interface, _dStepId, "failed", error="Subscription/Billing error", durationMs=int((time.time() - _dStart) * 1000)) raise except Exception as _dex: _updateStepLog(automation2_interface, _dStepId, "failed", error=str(_dex), durationMs=int((time.time() - _dStart) * 1000)) raise processed_in_loop.update(_done_only) return None async def executeGraph( graph: Dict[str, Any], services: Any, workflowId: str = None, instanceId: str = None, userId: str = None, mandateId: str = None, automation2_interface: Optional[Any] = None, initialNodeOutputs: Optional[Dict[str, Any]] = None, startAfterNodeId: Optional[str] = None, runId: Optional[str] = None, run_envelope: Optional[Dict[str, Any]] = None, label: Optional[str] = None, targetFeatureInstanceId: Optional[str] = None, ) -> Dict[str, Any]: """ Execute automation2 graph. Returns { success, nodeOutputs, error?, stopped? }. When an input node is reached and automation2_interface is provided, creates a task, pauses the run, and returns { success: False, paused: True, taskId, runId }. For resume: pass initialNodeOutputs (with result for the human node) and startAfterNodeId. For fresh runs: pass run_envelope (unified start payload for the start node); normalized with userId into context.runEnvelope. targetFeatureInstanceId: resolves {{featureInstanceId}} placeholders in the graph JSON before execution. """ logger.info( "executeGraph start: instanceId=%s workflowId=%s userId=%s mandateId=%s resume=%s targetInstance=%s", instanceId, workflowId, userId, mandateId, startAfterNodeId is not None, targetFeatureInstanceId, ) from modules.workflows.processing.shared.methodDiscovery import discoverMethods discoverMethods(services) from modules.workflows.automation2.pickNotPushMigration import ( materializeConnectionRefs, materializePrimaryTextHandover, ) from modules.workflows.automation2.featureInstanceRefMigration import ( materializeFeatureInstanceRefs, ) if targetFeatureInstanceId: graph = _substituteFeatureInstancePlaceholders(graph, targetFeatureInstanceId) # Phase-5 Schicht-4: typed-ref envelopes are materialized FIRST so the # subsequent connection-ref pass and validation see the canonical shape. graph = materializeFeatureInstanceRefs(graph) graph = materializeConnectionRefs(graph) graph = materializePrimaryTextHandover(graph) nodeTypeIds = _getNodeTypeIds(services) logger.debug("executeGraph nodeTypeIds (%d): %s", len(nodeTypeIds), sorted(nodeTypeIds)) errors = validateGraph(graph, nodeTypeIds) if errors: logger.warning("executeGraph validation failed: %s", errors) return {"success": False, "error": "; ".join(errors), "nodeOutputs": {}} nodes, connections = parseGraph(graph)[:2] connectionMap = buildConnectionMap(connections) inputSources = {n["id"]: getInputSources(n["id"], connectionMap) for n in nodes if n.get("id")} logger.info( "executeGraph parsed: nodes=%d connections=%d connectionMap_targets=%s", len(nodes), len(connections), list(connectionMap.keys()), ) ordered = topoSort(nodes, connectionMap) ordered_ids = [n.get("id") for n in ordered if n.get("id")] logger.info("executeGraph topoSort order: %s", ordered_ids) # Normalize resumed human-node output BEFORE copying into nodeOutputs — otherwise # normalizeToSchema only updates initialNodeOutputs and loop/refs still see raw # e.g. input.upload {files} without coerced DocumentList.documents. is_resume = startAfterNodeId is not None if is_resume and initialNodeOutputs and startAfterNodeId: resumedNode = next((n for n in nodes if n.get("id") == startAfterNodeId), None) if resumedNode: resumedType = resumedNode.get("type", "") resumedOutput = initialNodeOutputs.get(startAfterNodeId) if isinstance(resumedOutput, dict): schema = _outputSchemaForNode(resumedType) if schema and schema != "Transit": try: initialNodeOutputs[startAfterNodeId] = normalizeToSchema(resumedOutput, schema) except Exception as valErr: logger.warning("executeGraph resume: schema validation failed for %s: %s", startAfterNodeId, valErr) nodeOutputs: Dict[str, Any] = dict(initialNodeOutputs or {}) if not runId and automation2_interface and workflowId and not is_resume: run_context = { "connectionMap": connectionMap, "inputSources": inputSources, "orderedNodeIds": ordered_ids, } if userId: run_context["ownerId"] = userId if mandateId: run_context["mandateId"] = mandateId if instanceId: run_context["instanceId"] = instanceId run_label = label if not run_label and automation2_interface and workflowId: try: wfObj = automation2_interface.getWorkflow(workflowId) if wfObj: wfDict = wfObj if isinstance(wfObj, dict) else ( wfObj.model_dump() if hasattr(wfObj, "model_dump") else {} ) run_label = wfDict.get("label") except Exception: pass if not run_label: ts = datetime.now(timezone.utc).strftime("%d.%m.%Y %H:%M") run_label = f"Manuell ({ts})" run = automation2_interface.createRun( workflowId=workflowId, nodeOutputs=nodeOutputs, context=run_context, label=run_label, ) runId = run.get("id") if run else None logger.info("executeGraph created run %s label=%s", runId, run_label) env_for_run = normalize_run_envelope(run_envelope, user_id=userId) context = { "workflowId": workflowId, "instanceId": instanceId, "userId": userId, "mandateId": mandateId, "nodeOutputs": nodeOutputs, "connectionMap": connectionMap, "inputSources": inputSources, "services": services, "_runId": runId, "_orderedNodes": ordered, "runEnvelope": env_for_run, } # Lets graph actions (e.g. ``context.setContext`` human-task mode) call # ``createTask`` / ``updateRun`` without threading the interface through services. if automation2_interface: context["_automation2Interface"] = automation2_interface # _context key in nodeOutputs for system variable resolution nodeOutputs["_context"] = context if runId: _activeRunContexts[runId] = context skip_until_passed = bool(startAfterNodeId) processed_in_loop: Set[str] = set() _aggregateAccumulators: Dict[str, list] = {} STEPLOG_BATCH_THRESHOLD = 100 AGGREGATE_FLUSH_THRESHOLD = 1000 _aggregateTempChunks: Dict[str, List[list]] = {} # Check for loop resume: run was paused inside a loop, we're resuming for next iteration run = automation2_interface.getRun(runId) if (runId and automation2_interface) else None loop_resume_state = (run.get("context") or {}).get("_loopState") if run else None if loop_resume_state and startAfterNodeId: loop_node_id = loop_resume_state.get("loopNodeId") next_index = loop_resume_state.get("currentIndex", -1) + 1 items = loop_resume_state.get("items") or [] body_ids = getLoopBodyNodeIds(loop_node_id, connectionMap) if loop_node_id else set() body_ordered = [n for n in ordered if n.get("id") in body_ids] processed_in_loop = set(body_ids) | {loop_node_id} if loop_node_id else set() _resume_feedback_body_node_id = None for _fb_src, _fb_so, _fb_ti in (connectionMap.get(loop_node_id) or []): if _fb_src in body_ids and _fb_ti == 0: _resume_feedback_body_node_id = _fb_src break if not _resume_feedback_body_node_id and body_ordered: _resume_feedback_body_node_id = body_ordered[-1].get("id") _resume_body_results: List[Any] = [] while next_index < len(items) and loop_node_id: nodeOutputs[loop_node_id] = { "items": items, "count": len(items), "currentItem": items[next_index], "currentIndex": next_index, } context["_loopState"] = {"loopNodeId": loop_node_id, "currentIndex": next_index, "items": items} for body_node in body_ordered: bnid = body_node.get("id") if not bnid or context.get("_stopped"): break if not _is_node_on_active_path(bnid, connectionMap, nodeOutputs): continue executor = _getExecutor(body_node.get("type", ""), services, automation2_interface) if not executor: nodeOutputs[bnid] = None continue _rStepStart = time.time() _rInputSnap = {"_loopItem": items[next_index], "_loopIndex": next_index} for _rSrc, _, _ in connectionMap.get(bnid, []): if _rSrc in nodeOutputs: _rInputSnap[_rSrc] = nodeOutputs[_rSrc] _rStepId = _createStepLog(automation2_interface, runId, bnid, body_node.get("type", ""), "running", _rInputSnap) try: result, _rRetry = await _executeWithRetry(executor, body_node, context) if body_node.get("type") == "data.aggregate": if bnid not in _aggregateAccumulators: _aggregateAccumulators[bnid] = [] accItems = result.get("items", [result]) if isinstance(result, dict) else [result] _aggregateAccumulators[bnid].extend(accItems) nodeOutputs[bnid] = result _rDur = int((time.time() - _rStepStart) * 1000) _updateStepLog(automation2_interface, _rStepId, "completed", output=result if isinstance(result, dict) else {"value": result}, durationMs=_rDur, retryCount=_rRetry) logger.info("executeGraph loop resume body node %s done (iter %d, retries=%d)", bnid, next_index, _rRetry) if _resume_feedback_body_node_id and bnid == _resume_feedback_body_node_id: _resume_body_results.append(result) except PauseForHumanTaskError as e: _updateStepLog(automation2_interface, _rStepId, "completed", durationMs=int((time.time() - _rStepStart) * 1000)) if automation2_interface: run_ctx = dict(run.get("context") or {}) run_ctx["_loopState"] = {"loopNodeId": loop_node_id, "currentIndex": next_index, "items": items} automation2_interface.updateRun(e.runId, status="paused", nodeOutputs=_serializableOutputs(nodeOutputs), currentNodeId=e.nodeId, context=run_ctx) return {"success": False, "paused": True, "taskId": e.taskId, "runId": e.runId, "nodeId": e.nodeId, "nodeOutputs": _serializableOutputs(nodeOutputs)} except PauseForEmailWaitError as e: _updateStepLog(automation2_interface, _rStepId, "completed", durationMs=int((time.time() - _rStepStart) * 1000)) raise except (_SubscriptionInactiveException, _BillingContextError): _updateStepLog(automation2_interface, _rStepId, "failed", error="Subscription/Billing error", durationMs=int((time.time() - _rStepStart) * 1000)) raise except Exception as ex: _updateStepLog(automation2_interface, _rStepId, "failed", error=str(ex), durationMs=int((time.time() - _rStepStart) * 1000)) logger.exception("executeGraph loop body node %s FAILED: %s", bnid, ex) nodeOutputs[bnid] = {"error": str(ex), "success": False} if runId and automation2_interface: automation2_interface.updateRun(runId, status="failed", nodeOutputs=_serializableOutputs(nodeOutputs)) if runId: _activeRunContexts.pop(runId, None) return {"success": False, "error": str(ex), "nodeOutputs": _serializableOutputs(nodeOutputs), "failedNode": bnid, "runId": runId} next_index += 1 if loop_node_id: for aggId, accItems in _aggregateAccumulators.items(): nodeOutputs[aggId] = {"items": accItems, "count": len(accItems), "_success": True} _aggregateAccumulators.clear() if _resume_body_results: _rlo = nodeOutputs.get(loop_node_id) if isinstance(_rlo, dict): _rlo["bodyResults"] = _resume_body_results nodeOutputs[loop_node_id] = _rlo await _run_post_loop_done_nodes( loop_node_id=loop_node_id, body_ids=body_ids, items=items, ordered=ordered, connectionMap=connectionMap, nodeOutputs=nodeOutputs, context=context, services=services, automation2_interface=automation2_interface, runId=runId, processed_in_loop=processed_in_loop, ) for i, node in enumerate(ordered): if skip_until_passed: if node.get("id") == startAfterNodeId: skip_until_passed = False continue if node.get("id") in processed_in_loop: continue if context.get("_stopped"): logger.info("executeGraph stopped early at step %d", i) break nodeId = node.get("id") nodeType = node.get("type", "") # flow.loop: the feedback edge (body → loop input 0) hasn't run yet on the first # pass → would make _is_node_on_active_path return False. Only check the # *primary* predecessor (the one outside the loop body). if nodeType == "flow.loop": _loop_body_ids = getLoopBodyNodeIds(nodeId, connectionMap) _loop_primary = getLoopPrimaryInputSource(nodeId, connectionMap, _loop_body_ids) _loop_check_map = ( {nodeId: [(_loop_primary[0], _loop_primary[1], 0)]} if _loop_primary else connectionMap ) _loop_active = _is_node_on_active_path(nodeId, _loop_check_map, nodeOutputs) else: _loop_active = _is_node_on_active_path(nodeId, connectionMap, nodeOutputs) if not _loop_active: logger.info("executeGraph step %d/%d: nodeId=%s SKIP (inactive branch)", i + 1, len(ordered), nodeId) _skipInputSnap = {"_skipReason": "inactive_branch"} for _sSrc, _, _ in connectionMap.get(nodeId, []): if _sSrc in nodeOutputs: _skipInputSnap[_sSrc] = nodeOutputs[_sSrc] _skipStepId = _createStepLog(automation2_interface, runId, nodeId, nodeType, status="skipped", inputSnapshot=_skipInputSnap) if _skipStepId: _updateStepLog(automation2_interface, _skipStepId, "skipped") continue executor = _getExecutor(nodeType, services, automation2_interface) logger.info( "executeGraph step %d/%d: nodeId=%s nodeType=%s executor=%s", i + 1, len(ordered), nodeId, nodeType, type(executor).__name__ if executor else "None", ) if not executor: nodeOutputs[nodeId] = None logger.debug("executeGraph node %s: no executor, output=None", nodeId) continue _stepStartMs = time.time() _stepId = None try: if nodeType == "flow.loop": _loopInputSnap = {} for _lSrc, _, _ in connectionMap.get(nodeId, []): if _lSrc in nodeOutputs: _loopInputSnap[_lSrc] = nodeOutputs[_lSrc] _stepId = _createStepLog(automation2_interface, runId, nodeId, nodeType, "running", _loopInputSnap) result = await executor.execute(node, context) items = result.get("items") or [] body_ids = getLoopBodyNodeIds(nodeId, connectionMap) body_ordered = [n for n in ordered if n.get("id") in body_ids] processed_in_loop.update(body_ids) processed_in_loop.add(nodeId) _loopConcurrency = int((node.get("parameters") or {}).get("concurrency", 1)) _loopConcurrency = max(1, min(_loopConcurrency, 20)) _batchMode = len(items) > STEPLOG_BATCH_THRESHOLD _aggLock = asyncio.Lock() # Prefer the *last* body node wired to loop input 0 (feedback / # pipeline end) — first matching inbound edge can be a shallow node. _feedback_candidates = [ _fb_src for _fb_src, _fb_so, _fb_ti in (connectionMap.get(nodeId) or []) if _fb_src in body_ids and _fb_ti == 0 ] _feedback_body_node_id = _feedback_candidates[-1] if _feedback_candidates else None if not _feedback_body_node_id and body_ordered: _feedback_body_node_id = body_ordered[-1].get("id") _bodyResultsPerIter: List[Any] = [None] * len(items) async def _runLoopIteration(_idx: int, _item: Any) -> Optional[Dict]: """Execute all body nodes for one iteration. Returns error dict or None.""" _iterOutputs = dict(nodeOutputs) _iterOutputs[nodeId] = {"items": items, "count": len(items), "currentItem": _item, "currentIndex": _idx} _iterCtx = dict(context) _iterCtx["nodeOutputs"] = _iterOutputs if _loopConcurrency > 1 else nodeOutputs _iterCtx["_loopState"] = {"loopNodeId": nodeId, "currentIndex": _idx, "items": items} if _loopConcurrency == 1: nodeOutputs[nodeId] = _iterOutputs[nodeId] context["_loopState"] = _iterCtx["_loopState"] _activeOutputs = _iterOutputs if _loopConcurrency > 1 else nodeOutputs _activeCtx = _iterCtx if _loopConcurrency > 1 else context for body_node in body_ordered: bnid = body_node.get("id") if not bnid or context.get("_stopped"): break if not _is_node_on_active_path(bnid, connectionMap, _activeOutputs): continue bexec = _getExecutor(body_node.get("type", ""), services, automation2_interface) if not bexec: _activeOutputs[bnid] = None continue _bStepStart = time.time() _bStepId = None if not _batchMode or _idx == 0 or _idx == len(items) - 1: _bInputSnap = {"_loopItem": _item, "_loopIndex": _idx} _bStepId = _createStepLog(automation2_interface, runId, bnid, body_node.get("type", ""), "running", _bInputSnap) try: bres, _bRetry = await _executeWithRetry(bexec, body_node, _activeCtx) if body_node.get("type") == "data.aggregate": async with _aggLock: if bnid not in _aggregateAccumulators: _aggregateAccumulators[bnid] = [] accItems = bres.get("items", [bres]) if isinstance(bres, dict) else [bres] _aggregateAccumulators[bnid].extend(accItems) if len(_aggregateAccumulators[bnid]) >= AGGREGATE_FLUSH_THRESHOLD: _aggregateTempChunks.setdefault(bnid, []).append(_aggregateAccumulators[bnid]) _aggregateAccumulators[bnid] = [] _activeOutputs[bnid] = bres if _bStepId: _bDur = int((time.time() - _bStepStart) * 1000) _updateStepLog(automation2_interface, _bStepId, "completed", output=bres if isinstance(bres, dict) else {"value": bres}, durationMs=_bDur, retryCount=_bRetry) if _loopConcurrency == 1: nodeOutputs[bnid] = bres except PauseForHumanTaskError as e: if _bStepId: _updateStepLog(automation2_interface, _bStepId, "completed", durationMs=int((time.time() - _bStepStart) * 1000)) if runId and automation2_interface: _run = automation2_interface.getRun(runId) or {} _run_ctx = dict(_run.get("context") or {}) _run_ctx["_loopState"] = {"loopNodeId": nodeId, "currentIndex": _idx, "items": items} automation2_interface.updateRun(e.runId, status="paused", nodeOutputs=_serializableOutputs(nodeOutputs), currentNodeId=e.nodeId, context=_run_ctx) return {"_pause": True, "taskId": e.taskId, "runId": e.runId, "nodeId": e.nodeId} except PauseForEmailWaitError: if _bStepId: _updateStepLog(automation2_interface, _bStepId, "completed", durationMs=int((time.time() - _bStepStart) * 1000)) raise except (_SubscriptionInactiveException, _BillingContextError): if _bStepId: _updateStepLog(automation2_interface, _bStepId, "failed", error="Subscription/Billing error", durationMs=int((time.time() - _bStepStart) * 1000)) raise except Exception as ex: if _bStepId: _updateStepLog(automation2_interface, _bStepId, "failed", error=str(ex), durationMs=int((time.time() - _bStepStart) * 1000)) logger.exception("executeGraph loop body node %s FAILED (iter %d): %s", bnid, _idx, ex) return {"_error": str(ex), "failedNode": bnid} if _feedback_body_node_id: async with _aggLock: if _idx < len(_bodyResultsPerIter): _bodyResultsPerIter[_idx] = _activeOutputs.get(_feedback_body_node_id) if _batchMode and _idx > 0 and _idx % STEPLOG_BATCH_THRESHOLD == 0 and runId: _emitStepEvent(runId, {"type": "loop_progress", "nodeId": nodeId, "iteration": _idx, "total": len(items)}) return None if _loopConcurrency <= 1: for idx, item in enumerate(items): iterErr = await _runLoopIteration(idx, item) if iterErr: if iterErr.get("_pause"): return {"success": False, "paused": True, "taskId": iterErr["taskId"], "runId": iterErr["runId"], "nodeId": iterErr["nodeId"], "nodeOutputs": _serializableOutputs(nodeOutputs)} nodeOutputs[iterErr.get("failedNode", nodeId)] = {"error": iterErr["_error"], "success": False} if runId and automation2_interface: automation2_interface.updateRun(runId, status="failed", nodeOutputs=_serializableOutputs(nodeOutputs)) if runId: _activeRunContexts.pop(runId, None) return {"success": False, "error": iterErr["_error"], "nodeOutputs": _serializableOutputs(nodeOutputs), "failedNode": iterErr.get("failedNode"), "runId": runId} else: _sem = asyncio.Semaphore(_loopConcurrency) async def _concurrentIter(_ci: int, _citem: Any): async with _sem: return await _runLoopIteration(_ci, _citem) _tasks = [_concurrentIter(ci, citem) for ci, citem in enumerate(items)] _results = await asyncio.gather(*_tasks, return_exceptions=True) for _ri, _rval in enumerate(_results): if isinstance(_rval, Exception): logger.exception("Loop iteration %d raised: %s", _ri, _rval) if runId and automation2_interface: automation2_interface.updateRun(runId, status="failed", nodeOutputs=_serializableOutputs(nodeOutputs)) if runId: _activeRunContexts.pop(runId, None) return {"success": False, "error": str(_rval), "nodeOutputs": _serializableOutputs(nodeOutputs), "runId": runId} if isinstance(_rval, dict): if _rval.get("_pause"): return {"success": False, "paused": True, "taskId": _rval["taskId"], "runId": _rval["runId"], "nodeId": _rval["nodeId"], "nodeOutputs": _serializableOutputs(nodeOutputs)} if _rval.get("_error"): if runId and automation2_interface: automation2_interface.updateRun(runId, status="failed", nodeOutputs=_serializableOutputs(nodeOutputs)) if runId: _activeRunContexts.pop(runId, None) return {"success": False, "error": _rval["_error"], "nodeOutputs": _serializableOutputs(nodeOutputs), "failedNode": _rval.get("failedNode"), "runId": runId} for aggId, accItems in _aggregateAccumulators.items(): allChunks = _aggregateTempChunks.pop(aggId, []) finalItems = [] for chunk in allChunks: finalItems.extend(chunk) finalItems.extend(accItems) nodeOutputs[aggId] = {"items": finalItems, "count": len(finalItems), "_success": True} _aggregateAccumulators.clear() # Always attach ``bodyResults`` (list per iteration, possibly None # placeholders) so DataRefs to ``bodyResults`` resolve and # ``context.mergeContext`` can fall back to the wired loop output. _lo = nodeOutputs.get(nodeId) if isinstance(_lo, dict): _lo["bodyResults"] = _bodyResultsPerIter nodeOutputs[nodeId] = _lo await _run_post_loop_done_nodes( loop_node_id=nodeId, body_ids=body_ids, items=items, ordered=ordered, connectionMap=connectionMap, nodeOutputs=nodeOutputs, context=context, services=services, automation2_interface=automation2_interface, runId=runId, processed_in_loop=processed_in_loop, ) _updateStepLog(automation2_interface, _stepId, "completed", output={"iterationCount": len(items), "items": len(items), "concurrency": _loopConcurrency, "batchMode": _batchMode}, durationMs=int((time.time() - _stepStartMs) * 1000)) logger.info("executeGraph flow.loop done: %d iterations (concurrency=%d, batchMode=%s)", len(items), _loopConcurrency, _batchMode) elif _isBarrierNode(nodeType): if not _allMergePredecessorsReady(nodeId, connectionMap, nodeOutputs): logger.info("executeGraph node %s (%s): waiting — not all predecessors ready, deferring", nodeId, nodeType) nodeOutputs[nodeId] = None continue _stepStartMs = time.time() _inputSnap = {} for src, _, _ in connectionMap.get(nodeId, []): if src in nodeOutputs: _inputSnap[src] = nodeOutputs[src] _stepId = _createStepLog(automation2_interface, runId, nodeId, nodeType, "running", _inputSnap) result, retryCount = await _executeWithRetry(executor, node, context) result = _normalizeResult(result, nodeType) nodeOutputs[nodeId] = result else: _stepStartMs = time.time() _inputSnap = {} for src, _, _ in connectionMap.get(nodeId, []): if src in nodeOutputs: _inputSnap[src] = nodeOutputs[src] _stepId = _createStepLog(automation2_interface, runId, nodeId, nodeType, "running", _inputSnap) result, retryCount = await _executeWithRetry(executor, node, context) result = _normalizeResult(result, nodeType) nodeOutputs[nodeId] = result _durMs = int((time.time() - _stepStartMs) * 1000) _tokens = result.get("tokensUsed", 0) if isinstance(result, dict) else 0 _updateStepLog(automation2_interface, _stepId, "completed", output=result if isinstance(result, dict) else {"value": result}, durationMs=_durMs, tokensUsed=_tokens, retryCount=retryCount) logger.info( "executeGraph node %s done: result_type=%s result_keys=%s retries=%d duration=%dms", nodeId, type(result).__name__, list(result.keys()) if isinstance(result, dict) else "n/a", retryCount, _durMs, ) except PauseForHumanTaskError as e: _updateStepLog(automation2_interface, _stepId, "completed", durationMs=int((time.time() - _stepStartMs) * 1000)) logger.info("executeGraph paused for human task %s", e.taskId) return { "success": False, "paused": True, "taskId": e.taskId, "runId": e.runId, "nodeId": e.nodeId, "nodeOutputs": _serializableOutputs(nodeOutputs), } except PauseForEmailWaitError as e: _updateStepLog(automation2_interface, _stepId, "completed", durationMs=int((time.time() - _stepStartMs) * 1000)) logger.info("executeGraph paused for email wait (run %s, node %s)", e.runId, e.nodeId) try: from modules.interfaces.interfaceDbApp import getRootInterface from modules.features.graphicalEditor.emailPoller import ensureRunning root = getRootInterface() event_user = root.getUserByUsername("event") if root else None if event_user: ensureRunning(event_user) except Exception as poll_err: logger.warning("Could not start email poller: %s", poll_err) paused_at = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") run_ctx = { "connectionMap": context.get("connectionMap"), "inputSources": context.get("inputSources"), "orderedNodeIds": [n.get("id") for n in context.get("_orderedNodes", []) if n.get("id")], "waitReason": "email", "waitConfig": e.waitConfig, "pausedAt": paused_at, "lastCheckedAt": None, "ownerId": context.get("userId"), "mandateId": context.get("mandateId"), "instanceId": context.get("instanceId"), } automation2_interface.updateRun( e.runId, status="paused", nodeOutputs=_serializableOutputs(nodeOutputs), currentNodeId=e.nodeId, context=run_ctx, ) return { "success": False, "paused": True, "waitReason": "email", "runId": e.runId, "nodeId": e.nodeId, "nodeOutputs": _serializableOutputs(nodeOutputs), } except Exception as e: logger.exception("executeGraph node %s (%s) FAILED: %s", nodeId, nodeType, e) nodeOutputs[nodeId] = {"error": str(e), "success": False} _durMs = int((time.time() - _stepStartMs) * 1000) _updateStepLog(automation2_interface, _stepId, "failed", error=str(e), durationMs=_durMs) if runId and automation2_interface: automation2_interface.updateRun(runId, status="failed", nodeOutputs=_serializableOutputs(nodeOutputs)) if runId: _emitStepEvent(runId, {"type": "run_failed", "runId": runId, "status": "failed", "error": str(e), "failedNode": nodeId}) try: _wfObj = automation2_interface.getWorkflow(workflowId) if automation2_interface and workflowId else None _wfDict = _wfObj if isinstance(_wfObj, dict) else ( _wfObj.model_dump() if hasattr(_wfObj, "model_dump") else {} ) if _wfObj else {} _shouldNotify = _wfDict.get("notifyOnFailure", True) if _wfDict else True if _shouldNotify: from modules.workflows.scheduler.mainScheduler import notifyRunFailed notifyRunFailed( workflowId or "", runId or "", str(e), mandateId=mandateId, workflowLabel=_wfDict.get("label"), ) except Exception as notifyErr: logger.warning(f"executeGraph: failure notification failed for run={runId}: {notifyErr}") if runId: _activeRunContexts.pop(runId, None) return { "success": False, "error": str(e), "nodeOutputs": _serializableOutputs(nodeOutputs), "failedNode": nodeId, "runId": runId, } _safeOutputs = _serializableOutputs(nodeOutputs) _wasStopped = context.get("_stopped", False) _finalStatus = "stopped" if _wasStopped else "completed" if runId and automation2_interface: automation2_interface.updateRun(runId, status=_finalStatus, nodeOutputs=_safeOutputs) if runId: _emitStepEvent(runId, {"type": "run_complete", "runId": runId, "status": _finalStatus}) _activeRunContexts.pop(runId, None) logger.info( "executeGraph complete: success=True nodeOutputs_keys=%s stopped=%s", list(nodeOutputs.keys()), _wasStopped, ) return { "success": True, "nodeOutputs": _safeOutputs, "stopped": _wasStopped, "runId": runId, }