# Copyright (c) 2025 Patrick Motsch # Flow control node executor (ifElse, merge, wait, stop). import asyncio import logging from typing import Dict, Any logger = logging.getLogger(__name__) class FlowExecutor: """Execute flow control nodes.""" async def execute( self, node: Dict[str, Any], context: Dict[str, Any], ) -> Any: nodeType = node.get("type", "") nodeOutputs = context.get("nodeOutputs", {}) connectionMap = context.get("connectionMap", {}) nodeId = node.get("id", "") inputSources = context.get("inputSources", {}).get(nodeId, {}) logger.info( "FlowExecutor node %s type=%s inputSources=%s params=%s", nodeId, nodeType, inputSources, node.get("parameters"), ) if nodeType == "flow.ifElse": out = await self._ifElse(node, nodeOutputs, nodeId, inputSources) logger.info("FlowExecutor node %s ifElse -> %s", nodeId, out) return out if nodeType == "flow.merge": out = await self._merge(node, nodeOutputs, nodeId, inputSources) logger.info("FlowExecutor node %s merge -> %s", nodeId, out) return out if nodeType == "flow.wait": out = await self._wait(node, nodeOutputs, nodeId, inputSources) logger.info("FlowExecutor node %s wait -> %s", nodeId, out) return out if nodeType == "flow.stop": context["_stopped"] = True logger.info("FlowExecutor node %s -> STOP", nodeId) return {"stopped": True} if nodeType == "flow.switch": out = await self._switch(node, nodeOutputs, nodeId, inputSources) logger.info("FlowExecutor node %s switch -> %s", nodeId, out) return out if nodeType == "flow.loop": out = await self._loop(node, nodeOutputs, nodeId, inputSources) logger.info("FlowExecutor node %s loop -> %s", nodeId, out) return out logger.debug("FlowExecutor node %s unhandled type %s -> None", nodeId, nodeType) return None def _getInputData(self, nodeId: str, inputSources: Dict, nodeOutputs: Dict, outputIndex: int = 0) -> Any: """Get data from the connected source node.""" sources = inputSources.get(nodeId, {}) if 0 not in sources: return None srcId, srcOut = sources[0] return nodeOutputs.get(srcId) async def _ifElse( self, node: Dict, nodeOutputs: Dict, nodeId: str, inputSources: Dict, ) -> Any: condExpr = (node.get("parameters") or {}).get("condition", "") inp = self._getInputData(nodeId, {nodeId: inputSources}, nodeOutputs) # Simple eval - in production use safe evaluation try: # Replace {{nodeId}} refs with actual values from modules.workflows.automation2.graphUtils import resolveParameterReferences resolved = resolveParameterReferences(condExpr, nodeOutputs) # Minimal eval for simple comparisons (e.g. "True", "1 > 0") ok = bool(eval(resolved)) if resolved else False except Exception: ok = False return {"branch": 0 if ok else 1, "conditionResult": ok, "input": inp} async def _merge(self, node: Dict, nodeOutputs: Dict, nodeId: str, inputSources: Dict) -> Any: mode = (node.get("parameters") or {}).get("mode", "append") sources = inputSources items = [] for inpIdx in sorted(sources.keys()): srcId, _ = sources[inpIdx] data = nodeOutputs.get(srcId) if data is not None: if isinstance(data, list): items.extend(data) else: items.append(data) if mode == "combine" and len(items) == 2: if isinstance(items[0], dict) and isinstance(items[1], dict): return {**items[0], **items[1]} return items async def _wait(self, node: Dict, nodeOutputs: Dict) -> Any: secs = (node.get("parameters") or {}).get("seconds", 0) if secs > 0: await asyncio.sleep(min(float(secs), 300)) nodeId = node.get("id") from modules.workflows.automation2.graphUtils import getInputSources # Input comes from context inp = context.get("_inputData") if "context" in dir() else None return nodeOutputs.get(nodeId, {}) async def _wait( self, node: Dict, nodeOutputs: Dict, nodeId: str, inputSources: Dict, ) -> Any: secs = (node.get("parameters") or {}).get("seconds", 0) if secs > 0: await asyncio.sleep(min(float(secs), 300)) if 0 in inputSources: srcId, _ = inputSources[0] return nodeOutputs.get(srcId) return None async def _switch(self, node: Dict, nodeOutputs: Dict, nodeId: str, inputSources: Dict) -> Any: valueExpr = (node.get("parameters") or {}).get("value", "") from modules.workflows.automation2.graphUtils import resolveParameterReferences value = resolveParameterReferences(valueExpr, nodeOutputs) cases = (node.get("parameters") or {}).get("cases", []) for i, c in enumerate(cases): if c == value: return {"match": i, "value": value} return {"match": -1, "value": value} async def _loop(self, node: Dict, nodeOutputs: Dict, nodeId: str, inputSources: Dict) -> Any: itemsPath = (node.get("parameters") or {}).get("items", "[]") from modules.workflows.automation2.graphUtils import resolveParameterReferences items = resolveParameterReferences(itemsPath, nodeOutputs) if not isinstance(items, list): items = [items] if items is not None else [] return {"items": items, "count": len(items)}