gateway/modules/workflows/automation2/executors/flowExecutor.py
2026-04-26 08:31:35 +02:00

373 lines
15 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# Flow control node executor (ifElse, switch, loop, merge).
import logging
from typing import Any, Dict
from modules.features.graphicalEditor.portTypes import wrapTransit, unwrapTransit
logger = logging.getLogger(__name__)
class FlowExecutor:
"""Execute flow control nodes."""
async def execute(
self,
node: Dict[str, Any],
context: Dict[str, Any],
) -> Any:
nodeType = node.get("type", "")
nodeOutputs = context.get("nodeOutputs", {})
connectionMap = context.get("connectionMap", {})
nodeId = node.get("id", "")
inputSources = context.get("inputSources", {}).get(nodeId, {})
logger.info(
"FlowExecutor node %s type=%s inputSources=%s params=%s",
nodeId,
nodeType,
inputSources,
node.get("parameters"),
)
if nodeType == "flow.ifElse":
out = await self._ifElse(node, nodeOutputs, nodeId, inputSources)
logger.info("FlowExecutor node %s ifElse -> branch=%s", nodeId, out.get("_meta", {}).get("branch"))
return out
if nodeType == "flow.switch":
out = await self._switch(node, nodeOutputs, nodeId, inputSources)
logger.info("FlowExecutor node %s switch -> match=%s", nodeId, out.get("_meta", {}).get("match"))
return out
if nodeType == "flow.loop":
out = await self._loop(node, nodeOutputs, nodeId, inputSources)
logger.info("FlowExecutor node %s loop -> %s", nodeId, out)
return out
if nodeType == "flow.merge":
out = await self._merge(node, nodeOutputs, nodeId, inputSources, context)
logger.info("FlowExecutor node %s merge -> keys=%s", nodeId, list(out.keys()) if isinstance(out, dict) else None)
return out
logger.debug("FlowExecutor node %s unhandled type %s -> None", nodeId, nodeType)
return None
def _getInputData(self, nodeId: str, inputSources: Dict, nodeOutputs: Dict, outputIndex: int = 0) -> Any:
"""Get data from the connected source node."""
sources = inputSources.get(nodeId, {})
if 0 not in sources:
return None
srcId, srcOut = sources[0]
return nodeOutputs.get(srcId)
async def _ifElse(
self,
node: Dict,
nodeOutputs: Dict,
nodeId: str,
inputSources: Dict,
) -> Any:
condParam = (node.get("parameters") or {}).get("condition")
inp = self._getInputData(nodeId, {nodeId: inputSources}, nodeOutputs)
ok = self._evalConditionParam(condParam, nodeOutputs)
return wrapTransit(
unwrapTransit(inp) if inp else inp,
{"branch": 0 if ok else 1, "conditionResult": ok},
)
def _evalConditionParam(self, condParam: Any, nodeOutputs: Dict) -> bool:
"""Evaluate condition: structured {type,ref,operator,value} or legacy string/ref."""
if condParam is None:
return False
if isinstance(condParam, dict) and condParam.get("type") == "condition":
return self._evalStructuredCondition(condParam, nodeOutputs)
from modules.workflows.automation2.graphUtils import resolveParameterReferences
resolved = resolveParameterReferences(condParam, nodeOutputs)
return self._evalCondition(resolved)
def _get_by_path(self, data: Any, path: list) -> Any:
"""Traverse data by path (strings and ints)."""
current = data
for seg in path:
if current is None:
return None
if isinstance(current, dict) and isinstance(seg, str) and seg in current:
current = current[seg]
elif isinstance(current, (list, tuple)) and isinstance(seg, (int, str)):
idx = int(seg) if isinstance(seg, str) and str(seg).isdigit() else seg
if isinstance(idx, int) and 0 <= idx < len(current):
current = current[idx]
else:
return None
else:
return None
return current
def _evalStructuredCondition(self, cond: Dict, nodeOutputs: Dict) -> bool:
"""Evaluate structured {ref, operator, value} condition."""
ref = cond.get("ref")
if not ref or ref.get("type") != "ref":
return False
node_id = ref.get("nodeId")
path = ref.get("path") or []
left = self._get_by_path(nodeOutputs.get(node_id), list(path))
operator = cond.get("operator", "eq")
right = cond.get("value")
if operator == "eq":
return left == right
if operator == "neq":
return left != right
if operator in ("lt", "lte", "gt", "gte"):
try:
l, r = float(left) if left is not None else 0, float(right) if right is not None else 0
if operator == "lt":
return l < r
if operator == "lte":
return l <= r
if operator == "gt":
return l > r
if operator == "gte":
return l >= r
except (TypeError, ValueError):
return False
if operator == "contains":
return right is not None and str(right) in str(left or "")
if operator == "not_contains":
return right is None or str(right) not in str(left or "")
if operator == "empty":
return left is None or left == "" or (isinstance(left, (list, dict)) and len(left) == 0)
if operator == "not_empty":
return left is not None and left != "" and (not isinstance(left, (list, dict)) or len(left) > 0)
if operator == "is_true":
return bool(left)
if operator == "is_false":
return not bool(left)
if operator == "before":
return self._compare_dates(left, right, lambda a, b: a < b)
if operator == "after":
return self._compare_dates(left, right, lambda a, b: a > b)
if operator == "exists":
return self._file_exists(left)
if operator == "not_exists":
return not self._file_exists(left)
return False
def _compare_dates(self, left: Any, right: Any, op) -> bool:
"""Compare left/right as dates; op(a,b) is the comparison."""
def parse(v):
if v is None:
return None
if hasattr(v, "timestamp"):
return v
s = str(v).strip()
if not s:
return None
from datetime import datetime
for fmt in ("%Y-%m-%d", "%d.%m.%Y", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%d %H:%M:%S"):
try:
return datetime.strptime(s, fmt)
except ValueError:
continue
try:
return datetime.fromisoformat(s.replace("Z", "+00:00"))
except ValueError:
return None
try:
a, b = parse(left), parse(right)
if a is None or b is None:
return False
return op(a, b)
except Exception as e:
logger.warning(f"_compare_dates failed: left={left}, right={right}: {e}")
return False
def _file_exists(self, val: Any) -> bool:
"""Check if value represents an existing file (object with url or non-empty string)."""
if val is None:
return False
if isinstance(val, dict):
return bool(val.get("url") or val.get("name"))
if isinstance(val, str):
return len(val.strip()) > 0
return bool(val)
def _evalCondition(self, resolved: Any) -> bool:
"""Evaluate condition: ref resolves to value → use truthiness; string → try eval."""
if resolved is None:
return False
if isinstance(resolved, (bool, int, float)):
return bool(resolved)
if isinstance(resolved, str):
try:
return bool(eval(resolved))
except Exception as e:
logger.warning(f"_evalCondition eval failed for expression: {e}")
return bool(resolved)
return bool(resolved)
async def _switch(self, node: Dict, nodeOutputs: Dict, nodeId: str, inputSources: Dict) -> Any:
valueExpr = (node.get("parameters") or {}).get("value", "")
from modules.workflows.automation2.graphUtils import resolveParameterReferences
value = resolveParameterReferences(valueExpr, nodeOutputs)
cases = (node.get("parameters") or {}).get("cases", [])
inp = self._getInputData(nodeId, {nodeId: inputSources}, nodeOutputs)
for i, c in enumerate(cases):
if self._evalSwitchCase(value, c):
return wrapTransit(
unwrapTransit(inp) if inp else inp,
{"match": i, "value": value},
)
return wrapTransit(
unwrapTransit(inp) if inp else inp,
{"match": -1, "value": value},
)
def _evalSwitchCase(self, left: Any, case: Any) -> bool:
"""
Evaluate a switch case. Case can be:
- dict: {operator, value} - use operator to compare left vs value
- plain value: legacy format - exact equality (eq)
"""
if isinstance(case, dict):
operator = case.get("operator", "eq")
right = case.get("value")
else:
operator = "eq"
right = case
# Same logic as _evalStructuredCondition but with explicit left/right
if operator == "eq":
return left == right
if operator == "neq":
return left != right
if operator in ("lt", "lte", "gt", "gte"):
try:
l, r = float(left) if left is not None else 0, float(right) if right is not None else 0
if operator == "lt":
return l < r
if operator == "lte":
return l <= r
if operator == "gt":
return l > r
if operator == "gte":
return l >= r
except (TypeError, ValueError):
return False
if operator == "contains":
return right is not None and str(right) in str(left or "")
if operator == "not_contains":
return right is None or str(right) not in str(left or "")
if operator == "empty":
return left is None or left == "" or (isinstance(left, (list, dict)) and len(left) == 0)
if operator == "not_empty":
return left is not None and left != "" and (not isinstance(left, (list, dict)) or len(left) > 0)
if operator == "is_true":
return bool(left)
if operator == "is_false":
return not bool(left)
if operator == "before":
return self._compare_dates(left, right, lambda a, b: a < b)
if operator == "after":
return self._compare_dates(left, right, lambda a, b: a > b)
if operator == "exists":
return self._file_exists(left)
if operator == "not_exists":
return not self._file_exists(left)
return False
async def _loop(self, node: Dict, nodeOutputs: Dict, nodeId: str, inputSources: Dict) -> Any:
params = node.get("parameters") or {}
itemsPath = params.get("items", "[]")
level = params.get("level", "auto")
from modules.workflows.automation2.graphUtils import resolveParameterReferences
items = resolveParameterReferences(itemsPath, nodeOutputs)
if level != "auto" and isinstance(items, dict):
items = self._resolveUdmLevel(items, level)
elif isinstance(items, list):
pass
elif isinstance(items, dict):
children = items.get("children")
if isinstance(children, list) and children:
items = children
else:
items = [{"name": k, "value": v} for k, v in items.items()]
else:
items = [items] if items is not None else []
return {"items": items, "count": len(items)}
def _resolveUdmLevel(self, udm: Dict, level: str) -> list:
"""Extract items from a UDM document/node at the requested structural level."""
children = udm.get("children") or []
if level == "documents":
return [c for c in children if isinstance(c, dict) and c.get("role") in ("document", "archive")]
if level == "structuralNodes":
if udm.get("role") == "document":
return children
out = []
for child in children:
if isinstance(child, dict) and isinstance(child.get("children"), list):
out.extend(child["children"])
elif isinstance(child, dict):
out.append(child)
return out if out else children
if level == "contentBlocks":
blocks = []
nodes = children
if udm.get("role") == "document":
for sn in nodes:
if isinstance(sn, dict) and isinstance(sn.get("children"), list):
blocks.extend(sn["children"])
elif udm.get("role") in ("page", "section", "slide", "sheet"):
blocks = nodes
else:
for child in nodes:
if isinstance(child, dict) and isinstance(child.get("children"), list):
for sn in child["children"]:
if isinstance(sn, dict) and isinstance(sn.get("children"), list):
blocks.extend(sn["children"])
else:
blocks.append(sn)
return blocks
return children
async def _merge(self, node: Dict, nodeOutputs: Dict, nodeId: str, inputSources: Dict, context: Dict) -> Any:
"""Merge multiple branch inputs. mode: first | all | append."""
mode = (node.get("parameters") or {}).get("mode", "first")
inputs: Dict[int, Any] = {}
for portIdx, (srcId, srcOut) in inputSources.items():
out = nodeOutputs.get(srcId)
if out is not None:
inputs[portIdx] = unwrapTransit(out)
first = None
merged: Dict = {}
for idx in sorted(inputs.keys()):
val = inputs[idx]
if first is None:
first = val
if isinstance(val, dict):
merged.update(val)
if mode == "first":
pass
elif mode == "all":
pass
elif mode == "append":
allItems = []
for val in inputs.values():
if isinstance(val, list):
allItems.extend(val)
elif isinstance(val, dict) and "items" in val:
allItems.extend(val["items"])
elif val is not None:
allItems.append(val)
merged["items"] = allItems
return {
"inputs": inputs,
"first": first,
"merged": merged,
"_success": True,
}