# Copyright (c) 2025 Patrick Motsch # Main execution engine for automation2 graphs. import logging from datetime import datetime, timezone from typing import Dict, Any, List, Set, Optional from modules.workflows.automation2.graphUtils import ( parseGraph, buildConnectionMap, validateGraph, topoSort, getInputSources, getLoopBodyNodeIds, ) from modules.workflows.automation2.executors import ( TriggerExecutor, FlowExecutor, ActionNodeExecutor, InputExecutor, PauseForHumanTaskError, PauseForEmailWaitError, ) from modules.features.automation2.nodeDefinitions import STATIC_NODE_TYPES from modules.workflows.automation2.runEnvelope import normalize_run_envelope logger = logging.getLogger(__name__) def _getNodeTypeIds(services: Any = None) -> Set[str]: """Collect all known node type IDs from static definitions.""" return {n["id"] for n in STATIC_NODE_TYPES} def _is_node_on_active_path( nodeId: str, connectionMap: Dict[str, List], nodeOutputs: Dict[str, Any], ) -> bool: """ Return True if this node receives input only from active branches. - flow.ifElse: only one output (0=yes, 1=no) is active; uses "branch". - flow.switch: only one output (0, 1, 2, ...) is active; uses "match". Nodes connected to inactive outputs must be skipped. Also skip when a predecessor was skipped (not in nodeOutputs). """ for src, source_output, _ in connectionMap.get(nodeId, []): out = nodeOutputs.get(src) if out is None: return False if not isinstance(out, dict): continue branch = out.get("branch") match = out.get("match") active_output = None if branch is not None: active_output = branch elif match is not None: if match < 0: return False # switch: no case matched, skip all downstream active_output = match if active_output is not None and source_output != active_output: return False return True def _getExecutor( nodeType: str, services: Any, automation2_interface: Optional[Any] = None, ) -> Any: """Dispatch to correct executor based on node type.""" if nodeType.startswith("trigger."): return TriggerExecutor() if nodeType.startswith("flow."): return FlowExecutor() if nodeType.startswith("ai.") or nodeType.startswith("email.") or nodeType.startswith("sharepoint.") or nodeType.startswith("clickup.") or nodeType.startswith("file."): return ActionNodeExecutor(services) if nodeType.startswith("input.") and automation2_interface: return InputExecutor(automation2_interface) return None async def executeGraph( graph: Dict[str, Any], services: Any, workflowId: str = None, instanceId: str = None, userId: str = None, mandateId: str = None, automation2_interface: Optional[Any] = None, initialNodeOutputs: Optional[Dict[str, Any]] = None, startAfterNodeId: Optional[str] = None, runId: Optional[str] = None, run_envelope: Optional[Dict[str, Any]] = None, ) -> Dict[str, Any]: """ Execute automation2 graph. Returns { success, nodeOutputs, error?, stopped? }. When an input node is reached and automation2_interface is provided, creates a task, pauses the run, and returns { success: False, paused: True, taskId, runId }. For resume: pass initialNodeOutputs (with result for the human node) and startAfterNodeId. For fresh runs: pass run_envelope (unified start payload for the start node); normalized with userId into context.runEnvelope. """ logger.info( "executeGraph start: instanceId=%s workflowId=%s userId=%s mandateId=%s resume=%s", instanceId, workflowId, userId, mandateId, startAfterNodeId is not None, ) from modules.workflows.processing.shared.methodDiscovery import discoverMethods discoverMethods(services) nodeTypeIds = _getNodeTypeIds(services) logger.debug("executeGraph nodeTypeIds (%d): %s", len(nodeTypeIds), sorted(nodeTypeIds)) errors = validateGraph(graph, nodeTypeIds) if errors: logger.warning("executeGraph validation failed: %s", errors) return {"success": False, "error": "; ".join(errors), "nodeOutputs": {}} nodes, connections = parseGraph(graph)[:2] connectionMap = buildConnectionMap(connections) inputSources = {n["id"]: getInputSources(n["id"], connectionMap) for n in nodes if n.get("id")} logger.info( "executeGraph parsed: nodes=%d connections=%d connectionMap_targets=%s", len(nodes), len(connections), list(connectionMap.keys()), ) ordered = topoSort(nodes, connectionMap) ordered_ids = [n.get("id") for n in ordered if n.get("id")] logger.info("executeGraph topoSort order: %s", ordered_ids) nodeOutputs: Dict[str, Any] = dict(initialNodeOutputs or {}) is_resume = startAfterNodeId is not None if not runId and automation2_interface and workflowId and not is_resume: run_context = { "connectionMap": connectionMap, "inputSources": inputSources, "orderedNodeIds": ordered_ids, } if userId: run_context["ownerId"] = userId if mandateId: run_context["mandateId"] = mandateId if instanceId: run_context["instanceId"] = instanceId run = automation2_interface.createRun( workflowId=workflowId, nodeOutputs=nodeOutputs, context=run_context, ) runId = run.get("id") if run else None logger.info("executeGraph created run %s", runId) env_for_run = normalize_run_envelope(run_envelope, user_id=userId) context = { "workflowId": workflowId, "instanceId": instanceId, "userId": userId, "mandateId": mandateId, "nodeOutputs": nodeOutputs, "connectionMap": connectionMap, "inputSources": inputSources, "services": services, "_runId": runId, "_orderedNodes": ordered, "runEnvelope": env_for_run, } skip_until_passed = bool(startAfterNodeId) processed_in_loop: Set[str] = set() # Check for loop resume: run was paused inside a loop, we're resuming for next iteration run = automation2_interface.getRun(runId) if (runId and automation2_interface) else None loop_resume_state = (run.get("context") or {}).get("_loopState") if run else None if loop_resume_state and startAfterNodeId: loop_node_id = loop_resume_state.get("loopNodeId") next_index = loop_resume_state.get("currentIndex", -1) + 1 items = loop_resume_state.get("items") or [] body_ids = getLoopBodyNodeIds(loop_node_id, connectionMap) if loop_node_id else set() body_ordered = [n for n in ordered if n.get("id") in body_ids] processed_in_loop = set(body_ids) | {loop_node_id} if loop_node_id else set() while next_index < len(items) and loop_node_id: nodeOutputs[loop_node_id] = { "items": items, "count": len(items), "currentItem": items[next_index], "currentIndex": next_index, } context["_loopState"] = {"loopNodeId": loop_node_id, "currentIndex": next_index, "items": items} for body_node in body_ordered: bnid = body_node.get("id") if not bnid or context.get("_stopped"): break if not _is_node_on_active_path(bnid, connectionMap, nodeOutputs): continue executor = _getExecutor(body_node.get("type", ""), services, automation2_interface) if not executor: nodeOutputs[bnid] = None continue try: result = await executor.execute(body_node, context) nodeOutputs[bnid] = result logger.info("executeGraph loop resume body node %s done (iter %d)", bnid, next_index) except PauseForHumanTaskError as e: if automation2_interface: run_ctx = dict(run.get("context") or {}) run_ctx["_loopState"] = {"loopNodeId": loop_node_id, "currentIndex": next_index, "items": items} automation2_interface.updateRun(e.runId, status="paused", nodeOutputs=dict(nodeOutputs), currentNodeId=e.nodeId, context=run_ctx) return {"success": False, "paused": True, "taskId": e.taskId, "runId": e.runId, "nodeId": e.nodeId, "nodeOutputs": dict(nodeOutputs)} except Exception as ex: logger.exception("executeGraph loop body node %s FAILED: %s", bnid, ex) nodeOutputs[bnid] = {"error": str(ex), "success": False} if runId and automation2_interface: automation2_interface.updateRun(runId, status="failed", nodeOutputs=nodeOutputs) return {"success": False, "error": str(ex), "nodeOutputs": nodeOutputs, "failedNode": bnid} next_index += 1 if loop_node_id: nodeOutputs[loop_node_id] = {"items": items, "count": len(items)} processed_in_loop = set(body_ids) | {loop_node_id} for i, node in enumerate(ordered): if skip_until_passed: if node.get("id") == startAfterNodeId: skip_until_passed = False continue if node.get("id") in processed_in_loop: continue if context.get("_stopped"): logger.info("executeGraph stopped early at step %d", i) break nodeId = node.get("id") nodeType = node.get("type", "") if not _is_node_on_active_path(nodeId, connectionMap, nodeOutputs): logger.info("executeGraph step %d/%d: nodeId=%s SKIP (inactive branch)", i + 1, len(ordered), nodeId) continue executor = _getExecutor(nodeType, services, automation2_interface) logger.info( "executeGraph step %d/%d: nodeId=%s nodeType=%s executor=%s", i + 1, len(ordered), nodeId, nodeType, type(executor).__name__ if executor else "None", ) if not executor: nodeOutputs[nodeId] = None logger.debug("executeGraph node %s: no executor, output=None", nodeId) continue try: if nodeType == "flow.loop": result = await executor.execute(node, context) items = result.get("items") or [] body_ids = getLoopBodyNodeIds(nodeId, connectionMap) body_ordered = [n for n in ordered if n.get("id") in body_ids] processed_in_loop.update(body_ids) processed_in_loop.add(nodeId) for idx, item in enumerate(items): nodeOutputs[nodeId] = {"items": items, "count": len(items), "currentItem": item, "currentIndex": idx} context["_loopState"] = {"loopNodeId": nodeId, "currentIndex": idx, "items": items} for body_node in body_ordered: bnid = body_node.get("id") if not bnid or context.get("_stopped"): break if not _is_node_on_active_path(bnid, connectionMap, nodeOutputs): continue bexec = _getExecutor(body_node.get("type", ""), services, automation2_interface) if not bexec: nodeOutputs[bnid] = None continue try: bres = await bexec.execute(body_node, context) nodeOutputs[bnid] = bres logger.info("executeGraph loop body node %s done (iter %d)", bnid, idx) except PauseForHumanTaskError as e: if runId and automation2_interface: run = automation2_interface.getRun(runId) or {} run_ctx = dict(run.get("context") or {}) run_ctx["_loopState"] = {"loopNodeId": nodeId, "currentIndex": idx, "items": items} automation2_interface.updateRun(e.runId, status="paused", nodeOutputs=dict(nodeOutputs), currentNodeId=e.nodeId, context=run_ctx) return {"success": False, "paused": True, "taskId": e.taskId, "runId": e.runId, "nodeId": e.nodeId, "nodeOutputs": dict(nodeOutputs)} except Exception as ex: logger.exception("executeGraph loop body node %s FAILED: %s", bnid, ex) nodeOutputs[bnid] = {"error": str(ex), "success": False} if runId and automation2_interface: automation2_interface.updateRun(runId, status="failed", nodeOutputs=nodeOutputs) return {"success": False, "error": str(ex), "nodeOutputs": nodeOutputs, "failedNode": bnid} nodeOutputs[nodeId] = {"items": items, "count": len(items)} logger.info("executeGraph flow.loop done: %d iterations", len(items)) else: result = await executor.execute(node, context) nodeOutputs[nodeId] = result logger.info( "executeGraph node %s done: result_type=%s result_keys=%s", nodeId, type(result).__name__, list(result.keys()) if isinstance(result, dict) else "n/a", ) except PauseForHumanTaskError as e: logger.info("executeGraph paused for human task %s", e.taskId) return { "success": False, "paused": True, "taskId": e.taskId, "runId": e.runId, "nodeId": e.nodeId, "nodeOutputs": dict(nodeOutputs), } except PauseForEmailWaitError as e: logger.info("executeGraph paused for email wait (run %s, node %s)", e.runId, e.nodeId) # Start email poller on-demand (only runs while workflows wait for email) try: from modules.interfaces.interfaceDbApp import getRootInterface from modules.features.automation2.emailPoller import ensureRunning root = getRootInterface() event_user = root.getUserByUsername("event") if root else None if event_user: ensureRunning(event_user) except Exception as poll_err: logger.warning("Could not start email poller: %s", poll_err) paused_at = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") run_ctx = { "connectionMap": context.get("connectionMap"), "inputSources": context.get("inputSources"), "orderedNodeIds": [n.get("id") for n in context.get("_orderedNodes", []) if n.get("id")], "waitReason": "email", "waitConfig": e.waitConfig, "pausedAt": paused_at, "lastCheckedAt": None, "ownerId": context.get("userId"), "mandateId": context.get("mandateId"), "instanceId": context.get("instanceId"), } automation2_interface.updateRun( e.runId, status="paused", nodeOutputs=dict(nodeOutputs), currentNodeId=e.nodeId, context=run_ctx, ) return { "success": False, "paused": True, "waitReason": "email", "runId": e.runId, "nodeId": e.nodeId, "nodeOutputs": dict(nodeOutputs), } except Exception as e: logger.exception("executeGraph node %s (%s) FAILED: %s", nodeId, nodeType, e) nodeOutputs[nodeId] = {"error": str(e), "success": False} if runId and automation2_interface: automation2_interface.updateRun(runId, status="failed", nodeOutputs=nodeOutputs) return { "success": False, "error": str(e), "nodeOutputs": nodeOutputs, "failedNode": nodeId, } if runId and automation2_interface: automation2_interface.updateRun(runId, status="completed", nodeOutputs=nodeOutputs) logger.info( "executeGraph complete: success=True nodeOutputs_keys=%s stopped=%s", list(nodeOutputs.keys()), context.get("_stopped", False), ) return { "success": True, "nodeOutputs": nodeOutputs, "stopped": context.get("_stopped", False), }