gateway/modules/workflows/automation2/executors/flowExecutor.py
2026-03-22 16:15:11 +01:00

146 lines
5.8 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# Flow control node executor (ifElse, merge, wait, stop).
import asyncio
import logging
from typing import Dict, Any
logger = logging.getLogger(__name__)
class FlowExecutor:
"""Execute flow control nodes."""
async def execute(
self,
node: Dict[str, Any],
context: Dict[str, Any],
) -> Any:
nodeType = node.get("type", "")
nodeOutputs = context.get("nodeOutputs", {})
connectionMap = context.get("connectionMap", {})
nodeId = node.get("id", "")
inputSources = context.get("inputSources", {}).get(nodeId, {})
logger.info(
"FlowExecutor node %s type=%s inputSources=%s params=%s",
nodeId,
nodeType,
inputSources,
node.get("parameters"),
)
if nodeType == "flow.ifElse":
out = await self._ifElse(node, nodeOutputs, nodeId, inputSources)
logger.info("FlowExecutor node %s ifElse -> %s", nodeId, out)
return out
if nodeType == "flow.merge":
out = await self._merge(node, nodeOutputs, nodeId, inputSources)
logger.info("FlowExecutor node %s merge -> %s", nodeId, out)
return out
if nodeType == "flow.wait":
out = await self._wait(node, nodeOutputs, nodeId, inputSources)
logger.info("FlowExecutor node %s wait -> %s", nodeId, out)
return out
if nodeType == "flow.stop":
context["_stopped"] = True
logger.info("FlowExecutor node %s -> STOP", nodeId)
return {"stopped": True}
if nodeType == "flow.switch":
out = await self._switch(node, nodeOutputs, nodeId, inputSources)
logger.info("FlowExecutor node %s switch -> %s", nodeId, out)
return out
if nodeType == "flow.loop":
out = await self._loop(node, nodeOutputs, nodeId, inputSources)
logger.info("FlowExecutor node %s loop -> %s", nodeId, out)
return out
logger.debug("FlowExecutor node %s unhandled type %s -> None", nodeId, nodeType)
return None
def _getInputData(self, nodeId: str, inputSources: Dict, nodeOutputs: Dict, outputIndex: int = 0) -> Any:
"""Get data from the connected source node."""
sources = inputSources.get(nodeId, {})
if 0 not in sources:
return None
srcId, srcOut = sources[0]
return nodeOutputs.get(srcId)
async def _ifElse(
self,
node: Dict,
nodeOutputs: Dict,
nodeId: str,
inputSources: Dict,
) -> Any:
condExpr = (node.get("parameters") or {}).get("condition", "")
inp = self._getInputData(nodeId, {nodeId: inputSources}, nodeOutputs)
# Simple eval - in production use safe evaluation
try:
# Replace {{nodeId}} refs with actual values
from modules.workflows.automation2.graphUtils import resolveParameterReferences
resolved = resolveParameterReferences(condExpr, nodeOutputs)
# Minimal eval for simple comparisons (e.g. "True", "1 > 0")
ok = bool(eval(resolved)) if resolved else False
except Exception:
ok = False
return {"branch": 0 if ok else 1, "conditionResult": ok, "input": inp}
async def _merge(self, node: Dict, nodeOutputs: Dict, nodeId: str, inputSources: Dict) -> Any:
mode = (node.get("parameters") or {}).get("mode", "append")
sources = inputSources
items = []
for inpIdx in sorted(sources.keys()):
srcId, _ = sources[inpIdx]
data = nodeOutputs.get(srcId)
if data is not None:
if isinstance(data, list):
items.extend(data)
else:
items.append(data)
if mode == "combine" and len(items) == 2:
if isinstance(items[0], dict) and isinstance(items[1], dict):
return {**items[0], **items[1]}
return items
async def _wait(self, node: Dict, nodeOutputs: Dict) -> Any:
secs = (node.get("parameters") or {}).get("seconds", 0)
if secs > 0:
await asyncio.sleep(min(float(secs), 300))
nodeId = node.get("id")
from modules.workflows.automation2.graphUtils import getInputSources
# Input comes from context
inp = context.get("_inputData") if "context" in dir() else None
return nodeOutputs.get(nodeId, {})
async def _wait(
self,
node: Dict,
nodeOutputs: Dict,
nodeId: str,
inputSources: Dict,
) -> Any:
secs = (node.get("parameters") or {}).get("seconds", 0)
if secs > 0:
await asyncio.sleep(min(float(secs), 300))
if 0 in inputSources:
srcId, _ = inputSources[0]
return nodeOutputs.get(srcId)
return None
async def _switch(self, node: Dict, nodeOutputs: Dict, nodeId: str, inputSources: Dict) -> Any:
valueExpr = (node.get("parameters") or {}).get("value", "")
from modules.workflows.automation2.graphUtils import resolveParameterReferences
value = resolveParameterReferences(valueExpr, nodeOutputs)
cases = (node.get("parameters") or {}).get("cases", [])
for i, c in enumerate(cases):
if c == value:
return {"match": i, "value": value}
return {"match": -1, "value": value}
async def _loop(self, node: Dict, nodeOutputs: Dict, nodeId: str, inputSources: Dict) -> Any:
itemsPath = (node.get("parameters") or {}).get("items", "[]")
from modules.workflows.automation2.graphUtils import resolveParameterReferences
items = resolveParameterReferences(itemsPath, nodeOutputs)
if not isinstance(items, list):
items = [items] if items is not None else []
return {"items": items, "count": len(items)}