120 lines
4.2 KiB
Python
120 lines
4.2 KiB
Python
# Copyright (c) 2025 Patrick Motsch
|
|
# Data transformation node executor (setFields, filter, parseJson, template).
|
|
|
|
import json
|
|
import logging
|
|
import re
|
|
from typing import Dict, Any, List
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def _get_nested(obj: Any, path: str) -> Any:
|
|
"""Get nested key from obj, e.g. 'data.items'."""
|
|
for k in path.split("."):
|
|
if not k:
|
|
continue
|
|
if isinstance(obj, dict) and k in obj:
|
|
obj = obj[k]
|
|
elif isinstance(obj, (list, tuple)) and k.isdigit():
|
|
obj = obj[int(k)]
|
|
else:
|
|
return None
|
|
return obj
|
|
|
|
|
|
class DataExecutor:
|
|
"""Execute data transformation nodes."""
|
|
|
|
async def execute(
|
|
self,
|
|
node: Dict[str, Any],
|
|
context: Dict[str, Any],
|
|
) -> Any:
|
|
nodeType = node.get("type", "")
|
|
nodeOutputs = context.get("nodeOutputs", {})
|
|
nodeId = node.get("id", "")
|
|
inputSources = context.get("inputSources", {}).get(nodeId, {})
|
|
params = node.get("parameters") or {}
|
|
logger.info(
|
|
"DataExecutor node %s type=%s inputSources=%s params=%s",
|
|
nodeId,
|
|
nodeType,
|
|
inputSources,
|
|
params,
|
|
)
|
|
|
|
inp = None
|
|
if 0 in inputSources:
|
|
srcId, _ = inputSources[0]
|
|
inp = nodeOutputs.get(srcId)
|
|
|
|
from modules.workflows.automation2.graphUtils import resolveParameterReferences
|
|
resolvedParams = {k: resolveParameterReferences(v, nodeOutputs) for k, v in params.items()}
|
|
|
|
if nodeType == "data.setFields":
|
|
out = self._setFields(inp, resolvedParams)
|
|
logger.info("DataExecutor node %s setFields inp=%s -> %s", nodeId, type(inp).__name__, out)
|
|
return out
|
|
if nodeType == "data.filter":
|
|
out = self._filter(inp, resolvedParams)
|
|
logger.info("DataExecutor node %s filter inp=%s -> len=%d", nodeId, type(inp).__name__, len(out) if isinstance(out, list) else -1)
|
|
return out
|
|
if nodeType == "data.parseJson":
|
|
out = self._parseJson(inp, resolvedParams)
|
|
logger.info("DataExecutor node %s parseJson -> %s", nodeId, type(out).__name__)
|
|
return out
|
|
if nodeType == "data.template":
|
|
out = self._template(inp, resolvedParams, nodeOutputs)
|
|
logger.info("DataExecutor node %s template -> %s", nodeId, out)
|
|
return out
|
|
|
|
logger.debug("DataExecutor node %s unhandled type %s -> passThrough", nodeId, nodeType)
|
|
return inp
|
|
|
|
def _setFields(self, inp: Any, params: Dict) -> Any:
|
|
fields = params.get("fields", {})
|
|
if not isinstance(fields, dict):
|
|
return inp
|
|
base = dict(inp) if isinstance(inp, dict) else {}
|
|
base.update(fields)
|
|
return base
|
|
|
|
def _filter(self, inp: Any, params: Dict) -> Any:
|
|
itemsPath = (params.get("itemsPath") or "").strip()
|
|
condition = params.get("condition", "True")
|
|
items = inp
|
|
if itemsPath:
|
|
items = _get_nested(inp, itemsPath)
|
|
if not isinstance(items, list):
|
|
items = [inp] if inp is not None else []
|
|
out = []
|
|
for i, item in enumerate(items):
|
|
try:
|
|
local = {"item": item, "index": i, "input": inp}
|
|
ok = bool(eval(condition, {"__builtins__": {}}, local))
|
|
if ok:
|
|
out.append(item)
|
|
except Exception:
|
|
pass
|
|
return out
|
|
|
|
def _parseJson(self, inp: Any, params: Dict) -> Any:
|
|
jsonPath = (params.get("jsonPath") or "").strip()
|
|
raw = inp
|
|
if jsonPath:
|
|
raw = _get_nested(inp, jsonPath) if isinstance(inp, dict) else inp
|
|
if isinstance(raw, dict):
|
|
return raw
|
|
if isinstance(raw, str):
|
|
try:
|
|
return json.loads(raw)
|
|
except json.JSONDecodeError:
|
|
return {"error": "Invalid JSON", "raw": raw[:200]}
|
|
return inp
|
|
|
|
def _template(self, inp: Any, params: Dict, nodeOutputs: Dict) -> Any:
|
|
tpl = params.get("template", "")
|
|
from modules.workflows.automation2.graphUtils import resolveParameterReferences
|
|
result = resolveParameterReferences(tpl, nodeOutputs)
|
|
return {"text": result, "template": tpl}
|