# Copyright (c) 2025 Patrick Motsch # Flow control node executor (ifElse, switch, loop, merge). import logging from typing import Any, Dict from modules.features.graphicalEditor.portTypes import _wrapTransit, _unwrapTransit logger = logging.getLogger(__name__) class FlowExecutor: """Execute flow control nodes.""" async def execute( self, node: Dict[str, Any], context: Dict[str, Any], ) -> Any: nodeType = node.get("type", "") nodeOutputs = context.get("nodeOutputs", {}) connectionMap = context.get("connectionMap", {}) nodeId = node.get("id", "") inputSources = context.get("inputSources", {}).get(nodeId, {}) logger.info( "FlowExecutor node %s type=%s inputSources=%s params=%s", nodeId, nodeType, inputSources, node.get("parameters"), ) if nodeType == "flow.ifElse": out = await self._ifElse(node, nodeOutputs, nodeId, inputSources) logger.info("FlowExecutor node %s ifElse -> branch=%s", nodeId, out.get("_meta", {}).get("branch")) return out if nodeType == "flow.switch": out = await self._switch(node, nodeOutputs, nodeId, inputSources) logger.info("FlowExecutor node %s switch -> match=%s", nodeId, out.get("_meta", {}).get("match")) return out if nodeType == "flow.loop": out = await self._loop(node, nodeOutputs, nodeId, inputSources) logger.info("FlowExecutor node %s loop -> %s", nodeId, out) return out if nodeType == "flow.merge": out = await self._merge(node, nodeOutputs, nodeId, inputSources, context) logger.info("FlowExecutor node %s merge -> keys=%s", nodeId, list(out.keys()) if isinstance(out, dict) else None) return out logger.debug("FlowExecutor node %s unhandled type %s -> None", nodeId, nodeType) return None def _getInputData(self, nodeId: str, inputSources: Dict, nodeOutputs: Dict, outputIndex: int = 0) -> Any: """Get data from the connected source node.""" sources = inputSources.get(nodeId, {}) if 0 not in sources: return None srcId, srcOut = sources[0] return nodeOutputs.get(srcId) async def _ifElse( self, node: Dict, nodeOutputs: Dict, nodeId: str, inputSources: Dict, ) -> Any: condParam = (node.get("parameters") or {}).get("condition") inp = self._getInputData(nodeId, {nodeId: inputSources}, nodeOutputs) ok = self._evalConditionParam(condParam, nodeOutputs) return _wrapTransit( _unwrapTransit(inp) if inp else inp, {"branch": 0 if ok else 1, "conditionResult": ok}, ) def _evalConditionParam(self, condParam: Any, nodeOutputs: Dict) -> bool: """Evaluate condition: structured {type,ref,operator,value} or legacy string/ref.""" if condParam is None: return False if isinstance(condParam, dict) and condParam.get("type") == "condition": return self._evalStructuredCondition(condParam, nodeOutputs) from modules.workflows.automation2.graphUtils import resolveParameterReferences resolved = resolveParameterReferences(condParam, nodeOutputs) return self._evalCondition(resolved) def _get_by_path(self, data: Any, path: list) -> Any: """Traverse data by path (strings and ints).""" current = data for seg in path: if current is None: return None if isinstance(current, dict) and isinstance(seg, str) and seg in current: current = current[seg] elif isinstance(current, (list, tuple)) and isinstance(seg, (int, str)): idx = int(seg) if isinstance(seg, str) and str(seg).isdigit() else seg if isinstance(idx, int) and 0 <= idx < len(current): current = current[idx] else: return None else: return None return current def _evalStructuredCondition(self, cond: Dict, nodeOutputs: Dict) -> bool: """Evaluate structured {ref, operator, value} condition.""" ref = cond.get("ref") if not ref or ref.get("type") != "ref": return False node_id = ref.get("nodeId") path = ref.get("path") or [] left = self._get_by_path(nodeOutputs.get(node_id), list(path)) operator = cond.get("operator", "eq") right = cond.get("value") if operator == "eq": return left == right if operator == "neq": return left != right if operator in ("lt", "lte", "gt", "gte"): try: l, r = float(left) if left is not None else 0, float(right) if right is not None else 0 if operator == "lt": return l < r if operator == "lte": return l <= r if operator == "gt": return l > r if operator == "gte": return l >= r except (TypeError, ValueError): return False if operator == "contains": return right is not None and str(right) in str(left or "") if operator == "not_contains": return right is None or str(right) not in str(left or "") if operator == "empty": return left is None or left == "" or (isinstance(left, (list, dict)) and len(left) == 0) if operator == "not_empty": return left is not None and left != "" and (not isinstance(left, (list, dict)) or len(left) > 0) if operator == "is_true": return bool(left) if operator == "is_false": return not bool(left) if operator == "before": return self._compare_dates(left, right, lambda a, b: a < b) if operator == "after": return self._compare_dates(left, right, lambda a, b: a > b) if operator == "exists": return self._file_exists(left) if operator == "not_exists": return not self._file_exists(left) return False def _compare_dates(self, left: Any, right: Any, op) -> bool: """Compare left/right as dates; op(a,b) is the comparison.""" def parse(v): if v is None: return None if hasattr(v, "timestamp"): return v s = str(v).strip() if not s: return None from datetime import datetime for fmt in ("%Y-%m-%d", "%d.%m.%Y", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%d %H:%M:%S"): try: return datetime.strptime(s, fmt) except ValueError: continue try: return datetime.fromisoformat(s.replace("Z", "+00:00")) except ValueError: return None try: a, b = parse(left), parse(right) if a is None or b is None: return False return op(a, b) except Exception as e: logger.warning(f"_compare_dates failed: left={left}, right={right}: {e}") return False def _file_exists(self, val: Any) -> bool: """Check if value represents an existing file (object with url or non-empty string).""" if val is None: return False if isinstance(val, dict): return bool(val.get("url") or val.get("name")) if isinstance(val, str): return len(val.strip()) > 0 return bool(val) def _evalCondition(self, resolved: Any) -> bool: """Evaluate condition: ref resolves to value → use truthiness; string → try eval.""" if resolved is None: return False if isinstance(resolved, (bool, int, float)): return bool(resolved) if isinstance(resolved, str): try: return bool(eval(resolved)) except Exception as e: logger.warning(f"_evalCondition eval failed for expression: {e}") return bool(resolved) return bool(resolved) async def _switch(self, node: Dict, nodeOutputs: Dict, nodeId: str, inputSources: Dict) -> Any: valueExpr = (node.get("parameters") or {}).get("value", "") from modules.workflows.automation2.graphUtils import resolveParameterReferences value = resolveParameterReferences(valueExpr, nodeOutputs) cases = (node.get("parameters") or {}).get("cases", []) inp = self._getInputData(nodeId, {nodeId: inputSources}, nodeOutputs) for i, c in enumerate(cases): if self._evalSwitchCase(value, c): return _wrapTransit( _unwrapTransit(inp) if inp else inp, {"match": i, "value": value}, ) return _wrapTransit( _unwrapTransit(inp) if inp else inp, {"match": -1, "value": value}, ) def _evalSwitchCase(self, left: Any, case: Any) -> bool: """ Evaluate a switch case. Case can be: - dict: {operator, value} - use operator to compare left vs value - plain value: legacy format - exact equality (eq) """ if isinstance(case, dict): operator = case.get("operator", "eq") right = case.get("value") else: operator = "eq" right = case # Same logic as _evalStructuredCondition but with explicit left/right if operator == "eq": return left == right if operator == "neq": return left != right if operator in ("lt", "lte", "gt", "gte"): try: l, r = float(left) if left is not None else 0, float(right) if right is not None else 0 if operator == "lt": return l < r if operator == "lte": return l <= r if operator == "gt": return l > r if operator == "gte": return l >= r except (TypeError, ValueError): return False if operator == "contains": return right is not None and str(right) in str(left or "") if operator == "not_contains": return right is None or str(right) not in str(left or "") if operator == "empty": return left is None or left == "" or (isinstance(left, (list, dict)) and len(left) == 0) if operator == "not_empty": return left is not None and left != "" and (not isinstance(left, (list, dict)) or len(left) > 0) if operator == "is_true": return bool(left) if operator == "is_false": return not bool(left) if operator == "before": return self._compare_dates(left, right, lambda a, b: a < b) if operator == "after": return self._compare_dates(left, right, lambda a, b: a > b) if operator == "exists": return self._file_exists(left) if operator == "not_exists": return not self._file_exists(left) return False async def _loop(self, node: Dict, nodeOutputs: Dict, nodeId: str, inputSources: Dict) -> Any: itemsPath = (node.get("parameters") or {}).get("items", "[]") from modules.workflows.automation2.graphUtils import resolveParameterReferences items = resolveParameterReferences(itemsPath, nodeOutputs) if isinstance(items, list): pass elif isinstance(items, dict): items = [{"name": k, "value": v} for k, v in items.items()] else: items = [items] if items is not None else [] return {"items": items, "count": len(items)} async def _merge(self, node: Dict, nodeOutputs: Dict, nodeId: str, inputSources: Dict, context: Dict) -> Any: """Merge multiple branch inputs. mode: first | all | append.""" mode = (node.get("parameters") or {}).get("mode", "first") inputs: Dict[int, Any] = {} for portIdx, (srcId, srcOut) in inputSources.items(): out = nodeOutputs.get(srcId) if out is not None: inputs[portIdx] = _unwrapTransit(out) first = None merged: Dict = {} for idx in sorted(inputs.keys()): val = inputs[idx] if first is None: first = val if isinstance(val, dict): merged.update(val) if mode == "first": pass elif mode == "all": pass elif mode == "append": allItems = [] for val in inputs.values(): if isinstance(val, list): allItems.extend(val) elif isinstance(val, dict) and "items" in val: allItems.extend(val["items"]) elif val is not None: allItems.append(val) merged["items"] = allItems return { "inputs": inputs, "first": first, "merged": merged, "_success": True, }