gateway/modules/workflows/automation2/executors/dataExecutor.py
2026-04-26 08:31:35 +02:00

250 lines
9.4 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# Data manipulation node executor: data.aggregate, data.filter, data.consolidate.
import logging
from typing import Any, Dict
from modules.features.graphicalEditor.portTypes import unwrapTransit, wrapTransit
logger = logging.getLogger(__name__)
class DataExecutor:
"""Execute data.aggregate, data.filter, data.consolidate nodes."""
async def execute(
self,
node: Dict[str, Any],
context: Dict[str, Any],
) -> Any:
nodeType = node.get("type", "")
nodeId = node.get("id", "")
nodeOutputs = context.get("nodeOutputs", {})
inputSources = context.get("inputSources", {}).get(nodeId, {})
logger.info("DataExecutor node %s type=%s", nodeId, nodeType)
if nodeType == "data.aggregate":
return await self._aggregate(node, nodeOutputs, nodeId, inputSources, context)
if nodeType == "data.filter":
return await self._filter(node, nodeOutputs, nodeId, inputSources)
if nodeType == "data.consolidate":
return await self._consolidate(node, nodeOutputs, nodeId, inputSources)
logger.debug("DataExecutor node %s unhandled type %s", nodeId, nodeType)
return None
async def _aggregate(
self,
node: Dict,
nodeOutputs: Dict,
nodeId: str,
inputSources: Dict,
context: Dict,
) -> Any:
"""
In loop context: accumulation is handled by the engine (_aggregateAccumulators).
Outside loop: collect the single input.
"""
inp = self._getInput(inputSources, nodeOutputs)
mode = (node.get("parameters") or {}).get("mode", "collect")
if inp is None:
return {"items": [], "count": 0, "_success": True}
data = unwrapTransit(inp) if isinstance(inp, dict) and inp.get("_transit") else inp
if mode == "collect":
items = [data] if data is not None else []
elif mode == "concat":
items = data if isinstance(data, list) else [data] if data is not None else []
elif mode == "sum":
val = data if isinstance(data, (int, float)) else 0
items = [val]
elif mode == "count":
items = [1] if data is not None else []
else:
items = [data] if data is not None else []
return {"items": items, "count": len(items), "_success": True}
async def _filter(
self,
node: Dict,
nodeOutputs: Dict,
nodeId: str,
inputSources: Dict,
) -> Any:
"""Filter items by condition expression and/or UDM content type. Returns Transit envelope."""
inp = self._getInput(inputSources, nodeOutputs)
data = unwrapTransit(inp) if isinstance(inp, dict) and inp.get("_transit") else inp
params = node.get("parameters") or {}
condition = params.get("condition", "")
udmContentType = params.get("udmContentType", "")
if udmContentType and isinstance(data, dict) and data.get("children"):
data = self._filterUdmByContentType(data, udmContentType)
items = self._extractItems(data)
originalCount = len(items)
if not condition:
filtered = items
else:
filtered = [item for item in items if self._evalFilterCondition(item, condition)]
filteredData = data
if isinstance(data, dict):
filteredData = dict(data)
listKey = self._findListKey(data)
if listKey:
filteredData[listKey] = filtered
elif isinstance(data, list):
filteredData = filtered
return wrapTransit(filteredData, {
"originalCount": originalCount,
"filteredCount": len(filtered),
})
async def _consolidate(
self,
node: Dict,
nodeOutputs: Dict,
nodeId: str,
inputSources: Dict,
) -> Any:
"""Deterministic consolidation: table, concat, merge, csvJoin."""
inp = self._getInput(inputSources, nodeOutputs)
data = unwrapTransit(inp) if isinstance(inp, dict) and inp.get("_transit") else inp
params = node.get("parameters") or {}
mode = params.get("mode", "table")
separator = params.get("separator", "\n")
items = self._extractItems(data) if isinstance(data, (dict, list)) else []
count = len(items)
if mode == "concat":
result = separator.join(str(i) for i in items)
elif mode == "csvJoin":
lines = []
for item in items:
if isinstance(item, dict):
lines.append(separator.join(str(v) for v in item.values()))
else:
lines.append(str(item))
result = "\n".join(lines)
elif mode == "merge":
merged: Dict = {}
for item in items:
if isinstance(item, dict):
merged.update(item)
result = merged
else:
rows = []
headers: list = []
for item in items:
if isinstance(item, dict):
for k in item:
if k not in headers:
headers.append(k)
rows.append(item)
else:
rows.append({"value": item})
if "value" not in headers:
headers.append("value")
result = {"headers": headers, "rows": rows}
return {"result": result, "mode": mode, "count": count, "_success": True}
def _getInput(self, inputSources: Dict, nodeOutputs: Dict) -> Any:
"""Get data from the first connected input port."""
if 0 not in inputSources:
return None
srcId, _ = inputSources[0]
return nodeOutputs.get(srcId)
def _extractItems(self, data: Any) -> list:
"""Extract the list of items from various data shapes."""
if isinstance(data, list):
return data
if isinstance(data, dict):
for key in ("items", "tasks", "emails", "files", "documents", "documentList"):
val = data.get(key)
if isinstance(val, list):
return val
return []
def _findListKey(self, data: Dict) -> str:
"""Find the key that holds the main list in a dict."""
for key in ("items", "tasks", "emails", "files", "documents", "documentList"):
if isinstance(data.get(key), list):
return key
return ""
def _evalFilterCondition(self, item: Any, condition: Any) -> bool:
"""
Evaluate a filter condition against a single item.
Supports structured conditions {field, operator, value} or simple string expressions.
"""
if isinstance(condition, dict):
field = condition.get("field", "")
operator = condition.get("operator", "eq")
value = condition.get("value")
left = item.get(field) if isinstance(item, dict) else item
return self._compareValues(left, operator, value)
if isinstance(condition, str) and condition.strip():
try:
if isinstance(item, dict):
return bool(eval(condition, {"__builtins__": {}}, item))
return bool(item)
except Exception as e:
logger.warning(f"_evalFilterCondition eval failed for condition='{condition}': {e}")
return True
return True
def _filterUdmByContentType(self, data: Dict, contentType: str) -> Dict:
"""Filter UDM document/node, keeping only ContentBlocks matching the given contentType."""
result: list = []
children = data.get("children") or []
for child in children:
if not isinstance(child, dict):
continue
if child.get("contentType") == contentType:
result.append(child)
elif isinstance(child.get("children"), list):
for block in child["children"]:
if isinstance(block, dict) and block.get("contentType") == contentType:
result.append(block)
return {"nodes": result, "count": len(result), "_udmFiltered": True}
def _compareValues(self, left: Any, operator: str, right: Any) -> bool:
"""Compare two values with the given operator."""
if operator == "eq":
return left == right
if operator == "neq":
return left != right
if operator == "contains":
return right is not None and str(right) in str(left or "")
if operator == "startsWith":
return str(left or "").startswith(str(right or ""))
if operator == "isEmpty":
return left is None or left == "" or (isinstance(left, (list, dict)) and len(left) == 0)
if operator == "isNotEmpty":
return left is not None and left != "" and (not isinstance(left, (list, dict)) or len(left) > 0)
if operator in ("lt", "lte", "gt", "gte"):
try:
l = float(left) if left is not None else 0
r = float(right) if right is not None else 0
if operator == "lt":
return l < r
if operator == "lte":
return l <= r
if operator == "gt":
return l > r
return l >= r
except (TypeError, ValueError):
return False
return True