329 lines
13 KiB
Python
329 lines
13 KiB
Python
# Copyright (c) 2025 Patrick Motsch
|
|
# Flow control node executor (ifElse, switch, loop, merge).
|
|
|
|
import logging
|
|
from typing import Any, Dict
|
|
|
|
from modules.features.graphicalEditor.portTypes import _wrapTransit, _unwrapTransit
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class FlowExecutor:
|
|
"""Execute flow control nodes."""
|
|
|
|
async def execute(
|
|
self,
|
|
node: Dict[str, Any],
|
|
context: Dict[str, Any],
|
|
) -> Any:
|
|
nodeType = node.get("type", "")
|
|
nodeOutputs = context.get("nodeOutputs", {})
|
|
connectionMap = context.get("connectionMap", {})
|
|
nodeId = node.get("id", "")
|
|
inputSources = context.get("inputSources", {}).get(nodeId, {})
|
|
logger.info(
|
|
"FlowExecutor node %s type=%s inputSources=%s params=%s",
|
|
nodeId,
|
|
nodeType,
|
|
inputSources,
|
|
node.get("parameters"),
|
|
)
|
|
|
|
if nodeType == "flow.ifElse":
|
|
out = await self._ifElse(node, nodeOutputs, nodeId, inputSources)
|
|
logger.info("FlowExecutor node %s ifElse -> branch=%s", nodeId, out.get("_meta", {}).get("branch"))
|
|
return out
|
|
if nodeType == "flow.switch":
|
|
out = await self._switch(node, nodeOutputs, nodeId, inputSources)
|
|
logger.info("FlowExecutor node %s switch -> match=%s", nodeId, out.get("_meta", {}).get("match"))
|
|
return out
|
|
if nodeType == "flow.loop":
|
|
out = await self._loop(node, nodeOutputs, nodeId, inputSources)
|
|
logger.info("FlowExecutor node %s loop -> %s", nodeId, out)
|
|
return out
|
|
if nodeType == "flow.merge":
|
|
out = await self._merge(node, nodeOutputs, nodeId, inputSources, context)
|
|
logger.info("FlowExecutor node %s merge -> keys=%s", nodeId, list(out.keys()) if isinstance(out, dict) else None)
|
|
return out
|
|
|
|
logger.debug("FlowExecutor node %s unhandled type %s -> None", nodeId, nodeType)
|
|
return None
|
|
|
|
def _getInputData(self, nodeId: str, inputSources: Dict, nodeOutputs: Dict, outputIndex: int = 0) -> Any:
|
|
"""Get data from the connected source node."""
|
|
sources = inputSources.get(nodeId, {})
|
|
if 0 not in sources:
|
|
return None
|
|
srcId, srcOut = sources[0]
|
|
return nodeOutputs.get(srcId)
|
|
|
|
async def _ifElse(
|
|
self,
|
|
node: Dict,
|
|
nodeOutputs: Dict,
|
|
nodeId: str,
|
|
inputSources: Dict,
|
|
) -> Any:
|
|
condParam = (node.get("parameters") or {}).get("condition")
|
|
inp = self._getInputData(nodeId, {nodeId: inputSources}, nodeOutputs)
|
|
ok = self._evalConditionParam(condParam, nodeOutputs)
|
|
return _wrapTransit(
|
|
_unwrapTransit(inp) if inp else inp,
|
|
{"branch": 0 if ok else 1, "conditionResult": ok},
|
|
)
|
|
|
|
def _evalConditionParam(self, condParam: Any, nodeOutputs: Dict) -> bool:
|
|
"""Evaluate condition: structured {type,ref,operator,value} or legacy string/ref."""
|
|
if condParam is None:
|
|
return False
|
|
if isinstance(condParam, dict) and condParam.get("type") == "condition":
|
|
return self._evalStructuredCondition(condParam, nodeOutputs)
|
|
from modules.workflows.automation2.graphUtils import resolveParameterReferences
|
|
resolved = resolveParameterReferences(condParam, nodeOutputs)
|
|
return self._evalCondition(resolved)
|
|
|
|
def _get_by_path(self, data: Any, path: list) -> Any:
|
|
"""Traverse data by path (strings and ints)."""
|
|
current = data
|
|
for seg in path:
|
|
if current is None:
|
|
return None
|
|
if isinstance(current, dict) and isinstance(seg, str) and seg in current:
|
|
current = current[seg]
|
|
elif isinstance(current, (list, tuple)) and isinstance(seg, (int, str)):
|
|
idx = int(seg) if isinstance(seg, str) and str(seg).isdigit() else seg
|
|
if isinstance(idx, int) and 0 <= idx < len(current):
|
|
current = current[idx]
|
|
else:
|
|
return None
|
|
else:
|
|
return None
|
|
return current
|
|
|
|
def _evalStructuredCondition(self, cond: Dict, nodeOutputs: Dict) -> bool:
|
|
"""Evaluate structured {ref, operator, value} condition."""
|
|
ref = cond.get("ref")
|
|
if not ref or ref.get("type") != "ref":
|
|
return False
|
|
node_id = ref.get("nodeId")
|
|
path = ref.get("path") or []
|
|
left = self._get_by_path(nodeOutputs.get(node_id), list(path))
|
|
operator = cond.get("operator", "eq")
|
|
right = cond.get("value")
|
|
|
|
if operator == "eq":
|
|
return left == right
|
|
if operator == "neq":
|
|
return left != right
|
|
if operator in ("lt", "lte", "gt", "gte"):
|
|
try:
|
|
l, r = float(left) if left is not None else 0, float(right) if right is not None else 0
|
|
if operator == "lt":
|
|
return l < r
|
|
if operator == "lte":
|
|
return l <= r
|
|
if operator == "gt":
|
|
return l > r
|
|
if operator == "gte":
|
|
return l >= r
|
|
except (TypeError, ValueError):
|
|
return False
|
|
if operator == "contains":
|
|
return right is not None and str(right) in str(left or "")
|
|
if operator == "not_contains":
|
|
return right is None or str(right) not in str(left or "")
|
|
if operator == "empty":
|
|
return left is None or left == "" or (isinstance(left, (list, dict)) and len(left) == 0)
|
|
if operator == "not_empty":
|
|
return left is not None and left != "" and (not isinstance(left, (list, dict)) or len(left) > 0)
|
|
if operator == "is_true":
|
|
return bool(left)
|
|
if operator == "is_false":
|
|
return not bool(left)
|
|
if operator == "before":
|
|
return self._compare_dates(left, right, lambda a, b: a < b)
|
|
if operator == "after":
|
|
return self._compare_dates(left, right, lambda a, b: a > b)
|
|
if operator == "exists":
|
|
return self._file_exists(left)
|
|
if operator == "not_exists":
|
|
return not self._file_exists(left)
|
|
return False
|
|
|
|
def _compare_dates(self, left: Any, right: Any, op) -> bool:
|
|
"""Compare left/right as dates; op(a,b) is the comparison."""
|
|
|
|
def parse(v):
|
|
if v is None:
|
|
return None
|
|
if hasattr(v, "timestamp"):
|
|
return v
|
|
s = str(v).strip()
|
|
if not s:
|
|
return None
|
|
from datetime import datetime
|
|
|
|
for fmt in ("%Y-%m-%d", "%d.%m.%Y", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%d %H:%M:%S"):
|
|
try:
|
|
return datetime.strptime(s, fmt)
|
|
except ValueError:
|
|
continue
|
|
try:
|
|
return datetime.fromisoformat(s.replace("Z", "+00:00"))
|
|
except ValueError:
|
|
return None
|
|
|
|
try:
|
|
a, b = parse(left), parse(right)
|
|
if a is None or b is None:
|
|
return False
|
|
return op(a, b)
|
|
except Exception as e:
|
|
logger.warning(f"_compare_dates failed: left={left}, right={right}: {e}")
|
|
return False
|
|
|
|
def _file_exists(self, val: Any) -> bool:
|
|
"""Check if value represents an existing file (object with url or non-empty string)."""
|
|
if val is None:
|
|
return False
|
|
if isinstance(val, dict):
|
|
return bool(val.get("url") or val.get("name"))
|
|
if isinstance(val, str):
|
|
return len(val.strip()) > 0
|
|
return bool(val)
|
|
|
|
def _evalCondition(self, resolved: Any) -> bool:
|
|
"""Evaluate condition: ref resolves to value → use truthiness; string → try eval."""
|
|
if resolved is None:
|
|
return False
|
|
if isinstance(resolved, (bool, int, float)):
|
|
return bool(resolved)
|
|
if isinstance(resolved, str):
|
|
try:
|
|
return bool(eval(resolved))
|
|
except Exception as e:
|
|
logger.warning(f"_evalCondition eval failed for expression: {e}")
|
|
return bool(resolved)
|
|
return bool(resolved)
|
|
|
|
async def _switch(self, node: Dict, nodeOutputs: Dict, nodeId: str, inputSources: Dict) -> Any:
|
|
valueExpr = (node.get("parameters") or {}).get("value", "")
|
|
from modules.workflows.automation2.graphUtils import resolveParameterReferences
|
|
value = resolveParameterReferences(valueExpr, nodeOutputs)
|
|
cases = (node.get("parameters") or {}).get("cases", [])
|
|
inp = self._getInputData(nodeId, {nodeId: inputSources}, nodeOutputs)
|
|
for i, c in enumerate(cases):
|
|
if self._evalSwitchCase(value, c):
|
|
return _wrapTransit(
|
|
_unwrapTransit(inp) if inp else inp,
|
|
{"match": i, "value": value},
|
|
)
|
|
return _wrapTransit(
|
|
_unwrapTransit(inp) if inp else inp,
|
|
{"match": -1, "value": value},
|
|
)
|
|
|
|
def _evalSwitchCase(self, left: Any, case: Any) -> bool:
|
|
"""
|
|
Evaluate a switch case. Case can be:
|
|
- dict: {operator, value} - use operator to compare left vs value
|
|
- plain value: legacy format - exact equality (eq)
|
|
"""
|
|
if isinstance(case, dict):
|
|
operator = case.get("operator", "eq")
|
|
right = case.get("value")
|
|
else:
|
|
operator = "eq"
|
|
right = case
|
|
# Same logic as _evalStructuredCondition but with explicit left/right
|
|
if operator == "eq":
|
|
return left == right
|
|
if operator == "neq":
|
|
return left != right
|
|
if operator in ("lt", "lte", "gt", "gte"):
|
|
try:
|
|
l, r = float(left) if left is not None else 0, float(right) if right is not None else 0
|
|
if operator == "lt":
|
|
return l < r
|
|
if operator == "lte":
|
|
return l <= r
|
|
if operator == "gt":
|
|
return l > r
|
|
if operator == "gte":
|
|
return l >= r
|
|
except (TypeError, ValueError):
|
|
return False
|
|
if operator == "contains":
|
|
return right is not None and str(right) in str(left or "")
|
|
if operator == "not_contains":
|
|
return right is None or str(right) not in str(left or "")
|
|
if operator == "empty":
|
|
return left is None or left == "" or (isinstance(left, (list, dict)) and len(left) == 0)
|
|
if operator == "not_empty":
|
|
return left is not None and left != "" and (not isinstance(left, (list, dict)) or len(left) > 0)
|
|
if operator == "is_true":
|
|
return bool(left)
|
|
if operator == "is_false":
|
|
return not bool(left)
|
|
if operator == "before":
|
|
return self._compare_dates(left, right, lambda a, b: a < b)
|
|
if operator == "after":
|
|
return self._compare_dates(left, right, lambda a, b: a > b)
|
|
if operator == "exists":
|
|
return self._file_exists(left)
|
|
if operator == "not_exists":
|
|
return not self._file_exists(left)
|
|
return False
|
|
|
|
async def _loop(self, node: Dict, nodeOutputs: Dict, nodeId: str, inputSources: Dict) -> Any:
|
|
itemsPath = (node.get("parameters") or {}).get("items", "[]")
|
|
from modules.workflows.automation2.graphUtils import resolveParameterReferences
|
|
items = resolveParameterReferences(itemsPath, nodeOutputs)
|
|
if isinstance(items, list):
|
|
pass
|
|
elif isinstance(items, dict):
|
|
items = [{"name": k, "value": v} for k, v in items.items()]
|
|
else:
|
|
items = [items] if items is not None else []
|
|
return {"items": items, "count": len(items)}
|
|
|
|
async def _merge(self, node: Dict, nodeOutputs: Dict, nodeId: str, inputSources: Dict, context: Dict) -> Any:
|
|
"""Merge multiple branch inputs. mode: first | all | append."""
|
|
mode = (node.get("parameters") or {}).get("mode", "first")
|
|
inputs: Dict[int, Any] = {}
|
|
for portIdx, (srcId, srcOut) in inputSources.items():
|
|
out = nodeOutputs.get(srcId)
|
|
if out is not None:
|
|
inputs[portIdx] = _unwrapTransit(out)
|
|
|
|
first = None
|
|
merged: Dict = {}
|
|
for idx in sorted(inputs.keys()):
|
|
val = inputs[idx]
|
|
if first is None:
|
|
first = val
|
|
if isinstance(val, dict):
|
|
merged.update(val)
|
|
|
|
if mode == "first":
|
|
pass
|
|
elif mode == "all":
|
|
pass
|
|
elif mode == "append":
|
|
allItems = []
|
|
for val in inputs.values():
|
|
if isinstance(val, list):
|
|
allItems.extend(val)
|
|
elif isinstance(val, dict) and "items" in val:
|
|
allItems.extend(val["items"])
|
|
elif val is not None:
|
|
allItems.append(val)
|
|
merged["items"] = allItems
|
|
|
|
return {
|
|
"inputs": inputs,
|
|
"first": first,
|
|
"merged": merged,
|
|
"_success": True,
|
|
}
|