gateway/modules/workflows/automation2/executors/dataExecutor.py
2026-04-13 00:38:47 +02:00

215 lines
8 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# Data manipulation node executor: data.aggregate, data.transform, data.filter.
import logging
from typing import Any, Dict
from modules.features.graphicalEditor.portTypes import _unwrapTransit, _wrapTransit
logger = logging.getLogger(__name__)
class DataExecutor:
"""Execute data.aggregate, data.transform, data.filter nodes."""
async def execute(
self,
node: Dict[str, Any],
context: Dict[str, Any],
) -> Any:
nodeType = node.get("type", "")
nodeId = node.get("id", "")
nodeOutputs = context.get("nodeOutputs", {})
inputSources = context.get("inputSources", {}).get(nodeId, {})
logger.info("DataExecutor node %s type=%s", nodeId, nodeType)
if nodeType == "data.aggregate":
return await self._aggregate(node, nodeOutputs, nodeId, inputSources, context)
if nodeType == "data.transform":
return await self._transform(node, nodeOutputs, nodeId, inputSources)
if nodeType == "data.filter":
return await self._filter(node, nodeOutputs, nodeId, inputSources)
logger.debug("DataExecutor node %s unhandled type %s", nodeId, nodeType)
return None
async def _aggregate(
self,
node: Dict,
nodeOutputs: Dict,
nodeId: str,
inputSources: Dict,
context: Dict,
) -> Any:
"""
In loop context: accumulation is handled by the engine (_aggregateAccumulators).
Outside loop: collect the single input.
"""
inp = self._getInput(inputSources, nodeOutputs)
mode = (node.get("parameters") or {}).get("mode", "collect")
if inp is None:
return {"items": [], "count": 0, "_success": True}
data = _unwrapTransit(inp) if isinstance(inp, dict) and inp.get("_transit") else inp
if mode == "collect":
items = [data] if data is not None else []
elif mode == "concat":
items = data if isinstance(data, list) else [data] if data is not None else []
elif mode == "sum":
val = data if isinstance(data, (int, float)) else 0
items = [val]
elif mode == "count":
items = [1] if data is not None else []
else:
items = [data] if data is not None else []
return {"items": items, "count": len(items), "_success": True}
async def _transform(
self,
node: Dict,
nodeOutputs: Dict,
nodeId: str,
inputSources: Dict,
) -> Any:
"""Apply mappings to restructure data."""
from modules.workflows.automation2.graphUtils import resolveParameterReferences
inp = self._getInput(inputSources, nodeOutputs)
data = _unwrapTransit(inp) if isinstance(inp, dict) and inp.get("_transit") else inp
mappings = (node.get("parameters") or {}).get("mappings", [])
result = {}
for mapping in mappings:
if not isinstance(mapping, dict):
continue
outputField = mapping.get("outputField")
if not outputField:
continue
source = mapping.get("source")
if source and isinstance(source, dict) and source.get("type") == "ref":
resolved = resolveParameterReferences(source, nodeOutputs)
result[outputField] = resolved
elif source and isinstance(source, dict) and source.get("type") == "value":
result[outputField] = source.get("value")
elif isinstance(data, dict) and mapping.get("sourceField"):
result[outputField] = data.get(mapping["sourceField"])
else:
result[outputField] = source
result["_success"] = True
return result
async def _filter(
self,
node: Dict,
nodeOutputs: Dict,
nodeId: str,
inputSources: Dict,
) -> Any:
"""Filter items by condition expression. Returns Transit envelope."""
inp = self._getInput(inputSources, nodeOutputs)
data = _unwrapTransit(inp) if isinstance(inp, dict) and inp.get("_transit") else inp
condition = (node.get("parameters") or {}).get("condition", "")
items = self._extractItems(data)
originalCount = len(items)
if not condition:
filtered = items
else:
filtered = [item for item in items if self._evalFilterCondition(item, condition)]
filteredData = data
if isinstance(data, dict):
filteredData = dict(data)
listKey = self._findListKey(data)
if listKey:
filteredData[listKey] = filtered
elif isinstance(data, list):
filteredData = filtered
return _wrapTransit(filteredData, {
"originalCount": originalCount,
"filteredCount": len(filtered),
})
def _getInput(self, inputSources: Dict, nodeOutputs: Dict) -> Any:
"""Get data from the first connected input port."""
if 0 not in inputSources:
return None
srcId, _ = inputSources[0]
return nodeOutputs.get(srcId)
def _extractItems(self, data: Any) -> list:
"""Extract the list of items from various data shapes."""
if isinstance(data, list):
return data
if isinstance(data, dict):
for key in ("items", "tasks", "emails", "files", "documents", "documentList"):
val = data.get(key)
if isinstance(val, list):
return val
return []
def _findListKey(self, data: Dict) -> str:
"""Find the key that holds the main list in a dict."""
for key in ("items", "tasks", "emails", "files", "documents", "documentList"):
if isinstance(data.get(key), list):
return key
return ""
def _evalFilterCondition(self, item: Any, condition: Any) -> bool:
"""
Evaluate a filter condition against a single item.
Supports structured conditions {field, operator, value} or simple string expressions.
"""
if isinstance(condition, dict):
field = condition.get("field", "")
operator = condition.get("operator", "eq")
value = condition.get("value")
left = item.get(field) if isinstance(item, dict) else item
return self._compareValues(left, operator, value)
if isinstance(condition, str) and condition.strip():
try:
if isinstance(item, dict):
return bool(eval(condition, {"__builtins__": {}}, item))
return bool(item)
except Exception as e:
logger.warning(f"_evalFilterCondition eval failed for condition='{condition}': {e}")
return True
return True
def _compareValues(self, left: Any, operator: str, right: Any) -> bool:
"""Compare two values with the given operator."""
if operator == "eq":
return left == right
if operator == "neq":
return left != right
if operator == "contains":
return right is not None and str(right) in str(left or "")
if operator == "startsWith":
return str(left or "").startswith(str(right or ""))
if operator == "isEmpty":
return left is None or left == "" or (isinstance(left, (list, dict)) and len(left) == 0)
if operator == "isNotEmpty":
return left is not None and left != "" and (not isinstance(left, (list, dict)) or len(left) > 0)
if operator in ("lt", "lte", "gt", "gte"):
try:
l = float(left) if left is not None else 0
r = float(right) if right is not None else 0
if operator == "lt":
return l < r
if operator == "lte":
return l <= r
if operator == "gt":
return l > r
return l >= r
except (TypeError, ValueError):
return False
return True