diff --git a/modules/features/graphicalEditor/nodeDefinitions/context.py b/modules/features/graphicalEditor/nodeDefinitions/context.py index c6423d51..f7aa3df5 100644 --- a/modules/features/graphicalEditor/nodeDefinitions/context.py +++ b/modules/features/graphicalEditor/nodeDefinitions/context.py @@ -1,8 +1,56 @@ # Copyright (c) 2025 Patrick Motsch -# Context node definitions — structural extraction without AI. +# Context node definitions — structural extraction without AI plus +# generic key/value, merge, filter and transform helpers. from modules.shared.i18nRegistry import t +_CONTEXT_INPUT_SCHEMAS = [ + "Transit", + "ActionResult", + "AiResult", + "MergeResult", + "FormPayload", + "DocumentList", + "EmailList", + "TaskList", + "FileList", + "LoopItem", + "UdmDocument", +] + + +_MERGE_RESULT_DATA_PICK_OPTIONS = [ + { + "path": ["merged"], + "pickerLabel": t("Zusammengeführt"), + "detail": t("Zusammengeführtes Objekt nach gewählter Strategie."), + "recommended": True, + "type": "Dict", + }, + { + "path": ["first"], + "pickerLabel": t("Erster Zweig"), + "detail": t("Daten vom ersten verbundenen Eingang."), + "recommended": False, + "type": "Any", + }, + { + "path": ["inputs"], + "pickerLabel": t("Alle Eingänge"), + "detail": t("Dict der Eingabeobjekte nach Port-Index."), + "recommended": False, + "type": "Dict[int,Any]", + }, + { + "path": ["conflicts"], + "pickerLabel": t("Konflikte"), + "detail": t("Liste der Schlüssel mit Konflikt (nur bei errorOnConflict)."), + "recommended": False, + "type": "List[str]", + }, +] + + CONTEXT_NODES = [ { "id": "context.extractContent", @@ -72,4 +120,257 @@ CONTEXT_NODES = [ "_method": "context", "_action": "extractContent", }, + { + "id": "context.setContext", + "category": "context", + "label": t("Kontext setzen"), + "description": t( + "Schreibt in den Workflow-Kontext. Pro Zeile: Ziel-Schlüssel, dann entweder einen " + "festen Wert, eine Datenquelle aus dem Graph (Kontext-Picker wie bei anderen Nodes), " + "oder eine Aufgabe für einen Benutzer (Human Task) zum Setzen des Werts." + ), + "parameters": [ + { + "name": "scope", + "type": "str", + "required": False, + "frontendType": "select", + "frontendOptions": {"options": ["local", "global", "session"]}, + "default": "local", + "description": t("Speicherbereich"), + }, + { + "name": "assignments", + "type": "list", + "required": True, + "frontendType": "contextAssignments", + "default": [], + "description": t( + "Zuweisungen: Ziel-Schlüssel, Quelle (Picker / fester Wert / Human Task), " + "Modus (set, setIfEmpty, append, increment). Optionaler Experten-Pfad `sourcePath` unter der " + "gewählten Datenquelle (z. B. payload.status)." + ), + "graphInherit": {"port": 0, "kind": "primaryTextRef"}, + }, + ], + "inputs": 1, + "outputs": 1, + "inputPorts": {0: {"accepts": _CONTEXT_INPUT_SCHEMAS}}, + "outputPorts": { + 0: { + "schema": "Transit", + "dynamic": True, + "deriveFrom": "assignments", + "deriveNameField": "contextKey", + } + }, + "injectUpstreamPayload": True, + "injectRunContext": True, + "surfaceDataAsTopLevel": True, + "meta": {"icon": "mdi-database-edit-outline", "color": "#5C6BC0", "usesAi": False}, + "_method": "context", + "_action": "setContext", + }, + { + "id": "context.mergeContext", + "category": "context", + "label": t("Kontext zusammenführen"), + "description": t( + "Wartet auf alle verbundenen eingehenden Branches und führt deren " + "Kontext-Daten zu einem einheitlichen MergeResult zusammen. " + "Strategien: 'shallow' (oberste Ebene), 'deep' (rekursiv), " + "'firstWins' / 'lastWins' bei Konflikten, " + "'errorOnConflict' (bricht ab und listet Konflikte). " + "Der Node blockiert bis alle erwarteten Inputs eingetroffen sind." + ), + "parameters": [ + { + "name": "strategy", + "type": "str", + "required": False, + "frontendType": "select", + "frontendOptions": { + "options": ["shallow", "deep", "firstWins", "lastWins", "errorOnConflict"] + }, + "default": "deep", + "description": t("Strategie bei gleichnamigen Keys aus verschiedenen Branches"), + }, + { + "name": "waitFor", + "type": "int", + "required": False, + "frontendType": "number", + "default": 0, + "description": t( + "Anzahl Inputs abwarten (0 = alle verbundenen Branches). " + "Hilfreich für optionale Branches mit Timeout." + ), + }, + { + "name": "timeoutMs", + "type": "int", + "required": False, + "frontendType": "number", + "default": 30000, + "description": t( + "Maximale Wartezeit in ms — danach wird mit den vorhandenen Inputs fortgesetzt" + ), + }, + ], + "inputs": 5, + "outputs": 1, + "inputPorts": { + 0: {"accepts": _CONTEXT_INPUT_SCHEMAS}, + 1: {"accepts": _CONTEXT_INPUT_SCHEMAS}, + 2: {"accepts": _CONTEXT_INPUT_SCHEMAS}, + 3: {"accepts": _CONTEXT_INPUT_SCHEMAS}, + 4: {"accepts": _CONTEXT_INPUT_SCHEMAS}, + }, + "outputPorts": { + 0: {"schema": "MergeResult", "dataPickOptions": _MERGE_RESULT_DATA_PICK_OPTIONS} + }, + "waitsForAllPredecessors": True, + "injectBranchInputs": True, + "meta": {"icon": "mdi-call-merge", "color": "#7B1FA2", "usesAi": False}, + "_method": "context", + "_action": "mergeContext", + }, + { + "id": "context.filterContext", + "category": "context", + "label": t("Kontext filtern"), + "description": t( + "Gibt nur bestimmte Felder des eingehenden Datenstroms weiter. " + "Modus 'allow': nur diese Keys passieren. " + "Modus 'block': diese Keys werden entfernt, alles andere bleibt. " + "Unterstützt Pfadausdrücke (z.B. 'user.*', '*.id') und tiefe Pfade ('address.city'). " + "Fehlende Keys werden je nach 'missingKeyBehavior' ignoriert, mit null befüllt oder als Fehler behandelt." + ), + "parameters": [ + { + "name": "mode", + "type": "str", + "required": False, + "frontendType": "select", + "frontendOptions": {"options": ["allow", "block"]}, + "default": "allow", + "description": t("Allowlist (nur diese durch) oder Blocklist (diese entfernen)"), + }, + { + "name": "keys", + "type": "list", + "required": True, + "frontendType": "stringList", + "default": [], + "description": t( + "Key-Pfade oder Wildcard-Muster. " + "Beispiele: 'response', 'user.*', '*.id', 'address.city'." + ), + }, + { + "name": "missingKeyBehavior", + "type": "str", + "required": False, + "frontendType": "select", + "frontendOptions": {"options": ["skip", "nullFill", "error"]}, + "default": "skip", + "description": t("Verhalten wenn ein erlaubter Key im Input fehlt"), + }, + { + "name": "preserveMeta", + "type": "bool", + "required": False, + "frontendType": "checkbox", + "default": True, + "description": t("Interne Meta-Felder (_success, _error, _transit) immer durchlassen"), + }, + ], + "inputs": 1, + "outputs": 1, + "inputPorts": {0: {"accepts": _CONTEXT_INPUT_SCHEMAS}}, + "outputPorts": { + 0: { + "schema": "Transit", + "dynamic": True, + "deriveFrom": "keys", + } + }, + "injectUpstreamPayload": True, + "surfaceDataAsTopLevel": True, + "meta": {"icon": "mdi-filter-outline", "color": "#00838F", "usesAi": False}, + "_method": "context", + "_action": "filterContext", + }, + { + "id": "context.transformContext", + "category": "context", + "label": t("Kontext transformieren"), + "description": t( + "Verändert die Struktur des eingehenden Datenstroms. " + "Operationen pro Mapping: 'rename' (Key umbenennen), 'cast' (Typ konvertieren), " + "'nest' (mehrere Felder unter neuem Objekt zusammenfassen), " + "'flatten' (verschachteltes Objekt auf oberste Ebene heben), " + "'compute' (neues Feld aus Template-/{{...}}-Ausdruck berechnen). " + "Jedes Mapping definiert: 'sourceField' (Eingangspfad / Ausdruck), " + "'outputField' (Ausgabe-Key), 'operation' und 'type' (Zieltyp). " + "Das Ergebnis ist ein neues Objekt — der ursprüngliche Datenstrom " + "wird nicht automatisch weitergegeben (ausser 'passthroughUnmapped: true')." + ), + "parameters": [ + { + "name": "mappings", + "type": "list", + "required": True, + "frontendType": "mappingTable", + "default": [], + "description": t( + "Liste von Mapping-Einträgen. Jeder Eintrag: " + "sourceField (DataRef-Pfad oder Ausdruck), " + "outputField (Ziel-Key im Output), " + "operation (rename | cast | nest | flatten | compute), " + "type (str | int | bool | float | object | list — für cast), " + "expression (für compute: Template oder Ausdruck, z.B. '{{firstName}} {{lastName}}')." + ), + }, + { + "name": "passthroughUnmapped", + "type": "bool", + "required": False, + "frontendType": "checkbox", + "default": False, + "description": t( + "Alle nicht gemappten Felder des Eingangs zusätzlich in den Output übernehmen." + ), + }, + { + "name": "flattenDepth", + "type": "int", + "required": False, + "frontendType": "number", + "default": 1, + "description": t("Tiefe für flatten-Operation (1 = eine Ebene, -1 = vollständig)"), + }, + ], + "inputs": 1, + "outputs": 1, + "inputPorts": {0: {"accepts": _CONTEXT_INPUT_SCHEMAS}}, + "outputPorts": { + 0: { + "schema": { + "kind": "fromGraph", + "parameter": "mappings", + "nameField": "outputField", + "schemaName": "Transform_dynamic", + }, + "dynamic": True, + "deriveFrom": "mappings", + "deriveNameField": "outputField", + } + }, + "injectUpstreamPayload": True, + "surfaceDataAsTopLevel": True, + "meta": {"icon": "mdi-swap-horizontal", "color": "#EF6C00", "usesAi": False}, + "_method": "context", + "_action": "transformContext", + }, ] diff --git a/modules/features/graphicalEditor/portTypes.py b/modules/features/graphicalEditor/portTypes.py index 2b14d6aa..24a97446 100644 --- a/modules/features/graphicalEditor/portTypes.py +++ b/modules/features/graphicalEditor/portTypes.py @@ -882,8 +882,22 @@ def _resolveTransitChain( # Schema derivation for dynamic outputs # --------------------------------------------------------------------------- -def deriveFormPayloadSchemaFromParam(node: Dict[str, Any], param_key: str) -> Optional[PortSchema]: - """Derive output schema from a field-builder JSON list (``fields``, ``formFields``, …).""" +def deriveFormPayloadSchemaFromParam( + node: Dict[str, Any], + param_key: str, + name_field: str = "name", + type_field: str = "type", + label_field: str = "label", + schema_name: str = "FormPayload_dynamic", +) -> Optional[PortSchema]: + """Derive an output schema from a graph-defined parameter. + + Supports three parameter shapes: + - List[Dict] with ``name_field`` (e.g. ``fields[].name``, ``entries[].key``, + ``mappings[].outputField``). + - Group-fields: ``type == "group"`` recursed via ``fields``. + - List[str]: each string is taken as a leaf path key (used for ``filterContext.keys``). + """ from modules.features.graphicalEditor.nodeDefinitions.input import FORM_FIELD_TYPES _FORM_TYPE_TO_PORT: Dict[str, str] = {f["id"]: f["portType"] for f in FORM_FIELD_TYPES} @@ -906,21 +920,35 @@ def deriveFormPayloadSchemaFromParam(node: Dict[str, Any], param_key: str) -> Op )) for f in fields_param: - if not isinstance(f, dict) or not f.get("name"): + if isinstance(f, str): + if f.strip(): + _append_field(f.strip(), "str", None, False) continue - fname = str(f["name"]) - if str(f.get("type", "")).lower() == "group" and isinstance(f.get("fields"), list): + if not isinstance(f, dict): + continue + fname_raw = f.get(name_field) + if not fname_raw and name_field == "contextKey": + fname_raw = f.get("key") + if not fname_raw: + continue + fname = str(fname_raw) + if str(f.get(type_field, "")).lower() == "group" and isinstance(f.get("fields"), list): for sub in f["fields"]: - if isinstance(sub, dict) and sub.get("name"): + if isinstance(sub, dict) and sub.get(name_field): _append_field( - f"{fname}.{sub['name']}", - sub.get("type", "str"), - sub.get("label"), + f"{fname}.{sub[name_field]}", + sub.get(type_field, "str"), + sub.get(label_field), bool(sub.get("required", False)), ) continue - _append_field(fname, f.get("type", "str"), f.get("label"), bool(f.get("required", False))) - return PortSchema(name="FormPayload_dynamic", fields=portFields) if portFields else None + _append_field( + fname, + f.get(type_field, "str"), + f.get(label_field), + bool(f.get("required", False)), + ) + return PortSchema(name=schema_name, fields=portFields) if portFields else None def _deriveFormPayloadSchema(node: Dict[str, Any]) -> Optional[PortSchema]: @@ -945,9 +973,20 @@ def parse_graph_defined_output_schema( schema_spec = output_port.get("schema") if isinstance(schema_spec, dict) and schema_spec.get("kind") == "fromGraph": param_key = str(schema_spec.get("parameter") or "fields") - return deriveFormPayloadSchemaFromParam(node, param_key) + name_field = str(schema_spec.get("nameField") or "name") + type_field = str(schema_spec.get("typeField") or "type") + label_field = str(schema_spec.get("labelField") or "label") + schema_name = str(schema_spec.get("schemaName") or "FormPayload_dynamic") + return deriveFormPayloadSchemaFromParam( + node, param_key, + name_field=name_field, type_field=type_field, + label_field=label_field, schema_name=schema_name, + ) if output_port.get("dynamic") and output_port.get("deriveFrom"): - return deriveFormPayloadSchemaFromParam(node, str(output_port.get("deriveFrom"))) + name_field = str(output_port.get("deriveNameField") or "name") + return deriveFormPayloadSchemaFromParam( + node, str(output_port.get("deriveFrom")), name_field=name_field, + ) if isinstance(schema_spec, str) and schema_spec: return PORT_TYPE_CATALOG.get(schema_spec) return None diff --git a/modules/shared/frontendTypes.py b/modules/shared/frontendTypes.py index 29db7ba6..46b142a1 100644 --- a/modules/shared/frontendTypes.py +++ b/modules/shared/frontendTypes.py @@ -88,6 +88,12 @@ class FrontendType(str, Enum): FILTER_EXPRESSION = "filterExpression" """Filter expression builder for data.filter""" + CONTEXT_BUILDER = "contextBuilder" + """Upstream handover picker (graph editor): DataRef / path selection from prior nodes.""" + + CONTEXT_ASSIGNMENTS = "contextAssignments" + """Context set assignments: target key, picker | literal | human task (graph editor).""" + USER_FILE_FOLDER = "userFileFolder" """User file storage folder (graph editor): browse My Files tree or create folders.""" diff --git a/modules/workflows/automation2/executionEngine.py b/modules/workflows/automation2/executionEngine.py index 3c056df6..61dc8166 100644 --- a/modules/workflows/automation2/executionEngine.py +++ b/modules/workflows/automation2/executionEngine.py @@ -85,8 +85,23 @@ def _outputSchemaForNode(nodeType: str) -> Optional[str]: return None -def _isMergeNode(nodeType: str) -> bool: - return nodeType == "flow.merge" +def _isBarrierNode(nodeType: str) -> bool: + """Barrier nodes wait for all connected predecessors before executing. + + Backwards compatible: ``flow.merge`` is always a barrier. Any other node may + declare ``waitsForAllPredecessors: True`` in its STATIC_NODE_TYPES entry + (e.g. ``context.mergeContext``). + """ + if nodeType == "flow.merge": + return True + for nd in STATIC_NODE_TYPES: + if nd.get("id") == nodeType: + return bool(nd.get("waitsForAllPredecessors")) + return False + + +# Legacy alias used inside this module. +_isMergeNode = _isBarrierNode def _allMergePredecessorsReady( @@ -94,7 +109,7 @@ def _allMergePredecessorsReady( connectionMap: Dict[str, List], nodeOutputs: Dict[str, Any], ) -> bool: - """For flow.merge: check that every connected predecessor has produced output or was skipped.""" + """For barrier nodes: check that every connected predecessor has produced output or was skipped.""" for src, _, _ in connectionMap.get(nodeId, []): if src not in nodeOutputs: return False @@ -467,6 +482,10 @@ async def executeGraph( "_orderedNodes": ordered, "runEnvelope": env_for_run, } + # Lets graph actions (e.g. ``context.setContext`` human-task mode) call + # ``createTask`` / ``updateRun`` without threading the interface through services. + if automation2_interface: + context["_automation2Interface"] = automation2_interface # _context key in nodeOutputs for system variable resolution nodeOutputs["_context"] = context @@ -749,9 +768,9 @@ async def executeGraph( output={"iterationCount": len(items), "items": len(items), "concurrency": _loopConcurrency, "batchMode": _batchMode}, durationMs=int((time.time() - _stepStartMs) * 1000)) logger.info("executeGraph flow.loop done: %d iterations (concurrency=%d, batchMode=%s)", len(items), _loopConcurrency, _batchMode) - elif _isMergeNode(nodeType): + elif _isBarrierNode(nodeType): if not _allMergePredecessorsReady(nodeId, connectionMap, nodeOutputs): - logger.info("executeGraph node %s (flow.merge): waiting — not all predecessors ready, deferring", nodeId) + logger.info("executeGraph node %s (%s): waiting — not all predecessors ready, deferring", nodeId, nodeType) nodeOutputs[nodeId] = None continue _stepStartMs = time.time() diff --git a/modules/workflows/automation2/executors/actionNodeExecutor.py b/modules/workflows/automation2/executors/actionNodeExecutor.py index 56238b88..c94cdef8 100644 --- a/modules/workflows/automation2/executors/actionNodeExecutor.py +++ b/modules/workflows/automation2/executors/actionNodeExecutor.py @@ -20,6 +20,7 @@ from modules.features.graphicalEditor.portTypes import ( ) from modules.serviceCenter.services.serviceSubscription.mainServiceSubscription import SubscriptionInactiveException as _SubscriptionInactiveException from modules.serviceCenter.services.serviceBilling.mainServiceBilling import BillingContextError as _BillingContextError +from modules.workflows.automation2.executors.inputExecutor import PauseForHumanTaskError logger = logging.getLogger(__name__) @@ -334,6 +335,35 @@ def _getOutputSchemaName(nodeDef: Dict) -> str: return port0.get("schema", "ActionResult") +def _resolveUpstreamPayload(nodeId: str, context: Dict[str, Any]) -> Any: + """Return the unwrapped output of the node connected to input port 0, or None.""" + from modules.features.graphicalEditor.portTypes import unwrapTransit + src_map = (context.get("inputSources") or {}).get(nodeId) or {} + entry = src_map.get(0) + if not entry: + return None + src_node_id, _ = entry + upstream = (context.get("nodeOutputs") or {}).get(src_node_id) + return unwrapTransit(upstream) if isinstance(upstream, dict) else upstream + + +def _resolveBranchInputs(nodeId: str, context: Dict[str, Any]) -> Dict[int, Any]: + """Return ``Dict[port_index → unwrapped upstream output]`` for every wired input port.""" + from modules.features.graphicalEditor.portTypes import unwrapTransit + src_map = (context.get("inputSources") or {}).get(nodeId) or {} + nodeOutputs = context.get("nodeOutputs") or {} + out: Dict[int, Any] = {} + for port_ix, entry in src_map.items(): + if not entry: + continue + src_node_id, _ = entry + upstream = nodeOutputs.get(src_node_id) + if upstream is None: + continue + out[int(port_ix)] = unwrapTransit(upstream) if isinstance(upstream, dict) else upstream + return out + + class ActionNodeExecutor: """Execute action nodes by mapping to method actions via ActionExecutor.""" @@ -401,6 +431,18 @@ class ActionNodeExecutor: chatService = getattr(self.services, "chat", None) _resolveConnectionParam(resolvedParams, chatService, self.services) + # 3b. Optional graph-level injections declared on the node definition. + # - injectUpstreamPayload: True → ``_upstreamPayload`` (port 0 source output, transit-unwrapped) + # - injectBranchInputs: True → ``_branchInputs`` (Dict[port_index, output] for all wired ports) + # - injectRunContext: True → ``_runContext`` (the live execution context dict) + if nodeDef.get("injectUpstreamPayload"): + resolvedParams["_upstreamPayload"] = _resolveUpstreamPayload(nodeId, context) + if nodeDef.get("injectBranchInputs"): + resolvedParams["_branchInputs"] = _resolveBranchInputs(nodeId, context) + if nodeDef.get("injectRunContext"): + resolvedParams["_runContext"] = context + resolvedParams["_workflowNodeId"] = nodeId + # 4. Apply declarative paramMappers from the node definition _applyParamMappers(nodeDef, resolvedParams) @@ -424,6 +466,8 @@ class ActionNodeExecutor: try: executor = ActionExecutor(self.services) result = await executor.executeAction(methodName, actionName, resolvedParams) + except PauseForHumanTaskError: + raise except (_SubscriptionInactiveException, _BillingContextError): raise except Exception as e: @@ -602,4 +646,15 @@ class ActionNodeExecutor: return normalizeToSchema(cr_out, outputSchema) _attachConnectionProvenance(out, resolvedParams, outputSchema, chatService, self.services) - return normalizeToSchema(out, outputSchema) + + # When the node declares ``surfaceDataAsTopLevel`` (typical for + # dynamic-schema context nodes whose output keys are graph-defined), + # surface ``data.`` to ``out.`` so DataRefs from downstream + # nodes hit the user-defined keys without needing a ``data.`` prefix. + if nodeDef.get("surfaceDataAsTopLevel") and isinstance(dataField, dict): + for k, v in dataField.items(): + if k not in out and not str(k).startswith("_"): + out[k] = v + + normalized_schema = outputSchema if isinstance(outputSchema, str) else "Transit" + return normalizeToSchema(out, normalized_schema) diff --git a/modules/workflows/methods/methodContext/actions/filterContext.py b/modules/workflows/methods/methodContext/actions/filterContext.py new file mode 100644 index 00000000..6087b380 --- /dev/null +++ b/modules/workflows/methods/methodContext/actions/filterContext.py @@ -0,0 +1,141 @@ +# Copyright (c) 2026 Patrick Motsch +# All rights reserved. +"""Action ``context.filterContext``. + +Allow- or block-lists keys/paths from the upstream payload using simple glob +patterns. Implementation uses ``fnmatch`` (no regex) and traverses dotted paths +on dicts. +""" + +from __future__ import annotations + +import copy +import fnmatch +import logging +from typing import Any, Dict, List, Optional, Tuple + +from modules.datamodels.datamodelChat import ActionResult + +logger = logging.getLogger(__name__) + + +_META_KEYS = ("_success", "_error", "_transit", "_meta", "_warnings") + + +def _flatten(payload: Any, prefix: str = "") -> Dict[str, Any]: + """Yield ``{dotted.path: value}`` for every leaf in a dict tree.""" + out: Dict[str, Any] = {} + if not isinstance(payload, dict): + if prefix: + out[prefix] = payload + return out + for k, v in payload.items(): + path = f"{prefix}.{k}" if prefix else str(k) + if isinstance(v, dict): + out.update(_flatten(v, path)) + else: + out[path] = v + return out + + +def _set_path(target: Dict[str, Any], dotted: str, value: Any) -> None: + parts = dotted.split(".") + cur = target + for seg in parts[:-1]: + nxt = cur.get(seg) + if not isinstance(nxt, dict): + nxt = {} + cur[seg] = nxt + cur = nxt + cur[parts[-1]] = value + + +def _del_path(target: Dict[str, Any], dotted: str) -> bool: + parts = dotted.split(".") + cur: Any = target + stack: List[Tuple[Dict[str, Any], str]] = [] + for seg in parts[:-1]: + if not isinstance(cur, dict) or seg not in cur: + return False + stack.append((cur, seg)) + cur = cur[seg] + if not isinstance(cur, dict) or parts[-1] not in cur: + return False + del cur[parts[-1]] + return True + + +def _match_any(pattern: str, all_paths: List[str]) -> List[str]: + """Return every flattened path matching the glob pattern.""" + return [p for p in all_paths if fnmatch.fnmatchcase(p, pattern)] + + +async def filterContext(self, parameters: Dict[str, Any]) -> ActionResult: + try: + mode = str(parameters.get("mode") or "allow") + if mode not in ("allow", "block"): + return ActionResult.isFailure(error=f"Invalid mode '{mode}', expected 'allow' or 'block'") + + keys: List[str] = parameters.get("keys") or [] + if not isinstance(keys, list) or not keys: + return ActionResult.isFailure(error="'keys' must be a non-empty list of paths or patterns") + + missing_behavior = str(parameters.get("missingKeyBehavior") or "skip") + if missing_behavior not in ("skip", "nullFill", "error"): + return ActionResult.isFailure(error=f"Invalid missingKeyBehavior '{missing_behavior}'") + + preserve_meta = bool(parameters.get("preserveMeta", True)) + upstream = parameters.get("_upstreamPayload") or {} + if not isinstance(upstream, dict): + upstream = {"value": upstream} + + flat = _flatten(upstream) + all_paths = list(flat.keys()) + + if mode == "allow": + result: Dict[str, Any] = {} + missing: List[str] = [] + for pat in keys: + p = str(pat).strip() + if not p: + continue + matches = _match_any(p, all_paths) + if not matches: + missing.append(p) + if missing_behavior == "nullFill": + _set_path(result, p, None) + continue + for m in matches: + _set_path(result, m, flat[m]) + + if missing and missing_behavior == "error": + return ActionResult.isFailure(error=f"Missing keys: {missing}") + + if preserve_meta: + for mk in _META_KEYS: + if mk in upstream: + result[mk] = upstream[mk] + + data: Dict[str, Any] = result + if missing and missing_behavior != "error": + data["_missingKeys"] = missing + return ActionResult.isSuccess(data=data) + + # mode == "block" + cloned = copy.deepcopy(upstream) + removed: List[str] = [] + for pat in keys: + p = str(pat).strip() + if not p: + continue + matches = _match_any(p, all_paths) + for m in matches: + if preserve_meta and m in _META_KEYS: + continue + if _del_path(cloned, m): + removed.append(m) + cloned["_removedKeys"] = removed + return ActionResult.isSuccess(data=cloned) + except Exception as exc: + logger.exception("filterContext failed") + return ActionResult.isFailure(error=str(exc)) diff --git a/modules/workflows/methods/methodContext/actions/mergeContext.py b/modules/workflows/methods/methodContext/actions/mergeContext.py new file mode 100644 index 00000000..7b8765a9 --- /dev/null +++ b/modules/workflows/methods/methodContext/actions/mergeContext.py @@ -0,0 +1,129 @@ +# Copyright (c) 2026 Patrick Motsch +# All rights reserved. +"""Action ``context.mergeContext``. + +Reads ``_branchInputs`` (injected by ``ActionNodeExecutor`` because the node +declaration sets ``injectBranchInputs: True``) and combines them according to +the selected strategy. + +The barrier behaviour — waiting until every connected predecessor has produced +output — is handled by the execution engine via ``waitsForAllPredecessors`` on +the node definition; this action is invoked only after all (or ``waitFor``) +inputs are present. +""" + +from __future__ import annotations + +import copy +import logging +from typing import Any, Dict, List, Tuple + +from modules.datamodels.datamodelChat import ActionResult + +logger = logging.getLogger(__name__) + + +_VALID_STRATEGIES = {"shallow", "deep", "firstWins", "lastWins", "errorOnConflict"} + + +def _shallow_merge(branches: List[Tuple[int, Any]]) -> Tuple[Dict[str, Any], List[str]]: + merged: Dict[str, Any] = {} + conflicts: List[str] = [] + for _, val in branches: + if not isinstance(val, dict): + continue + for k, v in val.items(): + if k in merged and merged[k] != v: + conflicts.append(k) + merged[k] = v + return merged, conflicts + + +def _deep_merge(target: Dict[str, Any], source: Dict[str, Any], conflicts: List[str], path: str = "") -> None: + for k, v in source.items(): + full = f"{path}.{k}" if path else k + if k not in target: + target[k] = copy.deepcopy(v) if isinstance(v, (dict, list)) else v + continue + existing = target[k] + if isinstance(existing, dict) and isinstance(v, dict): + _deep_merge(existing, v, conflicts, full) + else: + if existing != v: + conflicts.append(full) + target[k] = copy.deepcopy(v) if isinstance(v, (dict, list)) else v + + +def _strategy_first_or_last_wins( + branches: List[Tuple[int, Any]], last: bool +) -> Tuple[Dict[str, Any], List[str]]: + iterator = list(reversed(branches)) if not last else list(branches) + merged: Dict[str, Any] = {} + conflicts: List[str] = [] + for _, val in iterator: + if not isinstance(val, dict): + continue + for k, v in val.items(): + if k in merged and merged[k] != v: + conflicts.append(k) + if last or k not in merged: + merged[k] = v + return merged, conflicts + + +async def mergeContext(self, parameters: Dict[str, Any]) -> ActionResult: + try: + strategy = str(parameters.get("strategy") or "deep") + if strategy not in _VALID_STRATEGIES: + return ActionResult.isFailure( + error=f"Invalid strategy '{strategy}', expected one of {sorted(_VALID_STRATEGIES)}" + ) + + wait_for = int(parameters.get("waitFor") or 0) + + raw_inputs = parameters.get("_branchInputs") or {} + if not isinstance(raw_inputs, dict): + return ActionResult.isFailure(error="No branch inputs available — connect at least two upstream nodes") + + items: List[Tuple[int, Any]] = sorted( + ((int(k), v) for k, v in raw_inputs.items()), + key=lambda kv: kv[0], + ) + if wait_for > 0: + items = items[:wait_for] + + if not items: + return ActionResult.isFailure(error="No branch inputs available") + + first_value = items[0][1] if items else None + conflicts: List[str] = [] + + if strategy == "shallow": + merged, conflicts = _shallow_merge(items) + elif strategy == "firstWins": + merged, conflicts = _strategy_first_or_last_wins(items, last=False) + elif strategy == "lastWins": + merged, conflicts = _strategy_first_or_last_wins(items, last=True) + elif strategy == "errorOnConflict": + merged, conflicts = _shallow_merge(items) + if conflicts: + return ActionResult.isFailure( + error=f"Conflicting keys: {sorted(set(conflicts))}", + ) + else: # deep (default) + merged = {} + for _, val in items: + if isinstance(val, dict): + _deep_merge(merged, val, conflicts) + + data: Dict[str, Any] = { + "inputs": {idx: val for idx, val in items}, + "first": first_value, + "merged": merged, + "strategy": strategy, + "conflicts": sorted(set(conflicts)) if conflicts else [], + } + return ActionResult.isSuccess(data=data) + except Exception as exc: + logger.exception("mergeContext failed") + return ActionResult.isFailure(error=str(exc)) diff --git a/modules/workflows/methods/methodContext/actions/setContext.py b/modules/workflows/methods/methodContext/actions/setContext.py new file mode 100644 index 00000000..7d54a719 --- /dev/null +++ b/modules/workflows/methods/methodContext/actions/setContext.py @@ -0,0 +1,452 @@ +# Copyright (c) 2026 Patrick Motsch +# All rights reserved. +"""Action ``context.setContext``. + +Stores values in the workflow context (``local`` | ``global`` | ``session``). + +Each **assignment** row defines a target ``contextKey`` and how to obtain the value: + +- ``valueSource=pickUpstream`` — use ``upstreamRef`` (DataRef resolved by the graph) or, + for experts, a dotted ``sourcePath`` on ``_upstreamPayload``. +- ``valueSource=literal`` — use ``literal`` (with ``valueType`` coercion). +- ``valueSource=humanTask`` — pause and create a task (requires ``_automation2Interface``). + +Legacy graphs may still send ``entries`` / ``upstreamPick`` + ``targetKey``; those are +normalized into the same shape before processing. +""" + +from __future__ import annotations + +import json +import logging +from typing import Any, Dict, List, Optional, Tuple + +from modules.datamodels.datamodelChat import ActionResult +from modules.workflows.automation2.executors.inputExecutor import PauseForHumanTaskError + +logger = logging.getLogger(__name__) + + +_VALID_MODES = {"set", "setIfEmpty", "append", "increment"} +_VALID_SCOPES = {"local", "global", "session"} +_VALID_VALUE_SOURCES = {"pickUpstream", "literal", "humanTask"} + + +def _get_by_path(data: Any, dotted: str) -> Any: + """Traverse dict/list by dotted path (``payload.status``, ``items.0.name``).""" + if not dotted or not str(dotted).strip(): + return None + cur: Any = data + for seg in str(dotted).strip().split("."): + if cur is None: + return None + if isinstance(cur, dict) and seg in cur: + cur = cur[seg] + continue + if isinstance(cur, (list, tuple)): + try: + idx = int(seg) + except ValueError: + return None + if 0 <= idx < len(cur): + cur = cur[idx] + continue + return None + return cur + + +def _is_unresolved_ref(value: Any) -> bool: + return isinstance(value, dict) and value.get("type") == "ref" + + +def _coerce_type(value: Any, type_str: str) -> Any: + """Best-effort coerce ``value`` into the declared entry ``type``.""" + if type_str in (None, "", "any", "Any"): + return value + try: + if type_str == "str": + return "" if value is None else str(value) + if type_str == "int": + if isinstance(value, bool): + return int(value) + if value is None or value == "": + return 0 + return int(float(value)) + if type_str == "float": + if value is None or value == "": + return 0.0 + return float(value) + if type_str == "bool": + if isinstance(value, bool): + return value + if isinstance(value, (int, float)): + return bool(value) + return str(value).strip().lower() in ("1", "true", "yes", "on", "ja") + if type_str in ("list", "List", "array"): + if value is None: + return [] + if isinstance(value, str) and value.strip().startswith(("[", "{")): + try: + parsed = json.loads(value) + return parsed if isinstance(parsed, list) else [parsed] + except json.JSONDecodeError: + pass + return value if isinstance(value, list) else [value] + if type_str in ("object", "dict", "Dict"): + if isinstance(value, str) and value.strip().startswith("{"): + try: + parsed = json.loads(value) + return parsed if isinstance(value, dict) else {"value": parsed} + except json.JSONDecodeError: + pass + return value if isinstance(value, dict) else {"value": value} + except (TypeError, ValueError) as exc: + logger.warning("setContext._coerce_type %r → %s failed: %s", value, type_str, exc) + return value + + +def _resolve_store(scope: str, run_context: Optional[Dict[str, Any]]) -> Dict[str, Any]: + """Return the dict that backs the requested scope.""" + if not isinstance(run_context, dict): + return {} + if scope == "global": + return run_context.setdefault("_globalContext", {}) + if scope == "session": + return run_context.setdefault("_sessionContext", {}) + return run_context.setdefault("_localContext", {}) + + +def _entry_context_key(entry: Dict[str, Any]) -> Optional[str]: + ck = entry.get("contextKey") or entry.get("key") + if ck is None: + return None + s = str(ck).strip() + return s or None + + +def _apply_value_to_store( + store: Dict[str, Any], + context_key: str, + value: Any, + mode: str, + type_str: str, +) -> Optional[str]: + """Apply coerced ``value`` to ``store[context_key]``. Returns error string or None.""" + if mode not in _VALID_MODES: + return f"unknown mode '{mode}' on key '{context_key}'" + + coerced = _coerce_type(value, str(type_str or "")) + + if mode == "set": + store[context_key] = coerced + return None + if mode == "setIfEmpty": + if context_key not in store or store.get(context_key) in (None, "", [], {}): + store[context_key] = coerced + return None + if mode == "append": + existing = store.get(context_key) + if existing is None: + store[context_key] = [coerced] if not isinstance(coerced, list) else list(coerced) + elif isinstance(existing, list): + if isinstance(coerced, list): + existing.extend(coerced) + else: + existing.append(coerced) + elif isinstance(existing, str): + store[context_key] = existing + ("" if coerced is None else str(coerced)) + else: + store[context_key] = [existing, coerced] + return None + if mode == "increment": + existing = store.get(context_key, 0) + try: + store[context_key] = ( + float(existing) + float(coerced) + if isinstance(existing, float) or isinstance(coerced, float) + else int(existing) + int(coerced) + ) + except (TypeError, ValueError): + return f"increment requires numeric value/state for key '{context_key}'" + return None + return None + + +def _value_source(row: Dict[str, Any]) -> str: + vs = row.get("valueSource") + if isinstance(vs, str) and vs.strip() in _VALID_VALUE_SOURCES: + return vs.strip() + am = str(row.get("assignmentMode") or "direct").strip() + if am == "fromUpstream": + return "pickUpstream" + if am == "humanTask": + return "humanTask" + if am == "direct": + return "literal" + return "literal" + + +def _normalize_assignments(parameters: Dict[str, Any]) -> List[Dict[str, Any]]: + """Build a single list of assignment dicts from new or legacy parameters.""" + raw = parameters.get("assignments") + if isinstance(raw, list) and raw: + out: List[Dict[str, Any]] = [] + for item in raw: + if isinstance(item, dict): + out.append(dict(item)) + if out: + return out + + legacy_entries = parameters.get("entries") + global_pick = parameters.get("upstreamPick") + + if isinstance(legacy_entries, list) and legacy_entries: + out = [] + for entry in legacy_entries: + if not isinstance(entry, dict): + continue + row = dict(entry) + row["valueSource"] = _value_source(entry) + am = str(entry.get("assignmentMode") or "direct").strip() + if am == "fromUpstream" and not str(entry.get("sourcePath") or "").strip(): + if global_pick is not None and not (isinstance(global_pick, str) and not global_pick.strip()): + if not (isinstance(global_pick, (list, dict)) and len(global_pick) == 0): + row["upstreamRef"] = global_pick + if am == "direct": + row["literal"] = entry.get("value") + row["valueSource"] = "literal" + out.append(row) + if out: + return out + + tk = str(parameters.get("targetKey") or "").strip() + if tk and global_pick is not None: + if isinstance(global_pick, str) and not global_pick.strip(): + pass + elif isinstance(global_pick, (list, dict)) and len(global_pick) == 0: + pass + else: + return [ + { + "contextKey": tk, + "valueSource": "pickUpstream", + "upstreamRef": global_pick, + "mode": "set", + "valueType": "str", + } + ] + + return [] + + +def _resolve_pick_upstream( + row: Dict[str, Any], + upstream: Any, + parameters: Dict[str, Any], +) -> Tuple[Optional[Any], Optional[str]]: + path = str(row.get("sourcePath") or "").strip() + ref_val = row.get("upstreamRef") + + if ref_val is not None and ref_val != "": + if _is_unresolved_ref(ref_val): + return None, "upstream DataRef konnte nicht aufgelöst werden" + base: Any = ref_val + if path: + hit = _get_by_path(base, path) + if hit is None and isinstance(upstream, dict): + hit = _get_by_path(upstream, path) + if hit is not None: + return hit, None + return None, f"path '{path}' not found under picked value or upstream payload" + return base, None + + if path: + if not isinstance(upstream, dict): + return None, "sourcePath benötigt ein strukturiertes Upstream-Payload (dict)" + return _get_by_path(upstream, path), None + + return None, "Picker: Datenquelle wählen oder sourcePath (z. B. payload.status) setzen" + + +def _resolve_literal(row: Dict[str, Any]) -> Tuple[Optional[Any], Optional[str]]: + raw = row.get("literal") + if raw is None and "value" in row: + raw = row.get("value") + if raw is None: + return None, "literal value missing" + if isinstance(raw, (dict, list, bool, int, float)) or raw is None: + return raw, None + s = str(raw) + type_str = str(row.get("valueType") or row.get("type") or "str") + if type_str in ("object", "dict", "Dict", "list", "List", "array") and s.strip().startswith(("[", "{")): + try: + return json.loads(s), None + except json.JSONDecodeError as exc: + return None, f"invalid JSON literal: {exc}" + return s, None + + +def _pause_for_human_tasks( + *, + iface: Any, + run_context: Dict[str, Any], + parameters: Dict[str, Any], + pending_entries: List[Dict[str, Any]], + scope: str, +) -> None: + """Create a single human task for all ``humanTask`` rows and pause the run.""" + run_id = str(run_context.get("_runId") or "") + workflow_id = str(run_context.get("workflowId") or "") + node_id = str(parameters.get("_workflowNodeId") or "") + user_id = run_context.get("userId") + + cfg = { + "kind": "contextSetAssignment", + "scope": scope, + "entries": pending_entries, + "description": ( + "Set or confirm workflow context keys. After completion, resume the run;" + " submitted values should be merged into context by the task handler." + ), + } + + task = iface.createTask( + runId=run_id, + workflowId=workflow_id, + nodeId=node_id, + nodeType="context.setContext", + config=cfg, + assigneeId=str(user_id) if user_id else None, + ) + task_id = str((task or {}).get("id") or "") + ordered_ids = [n.get("id") for n in (run_context.get("_orderedNodes") or []) if n.get("id")] + iface.updateRun( + run_id, + status="paused", + nodeOutputs=run_context.get("nodeOutputs"), + currentNodeId=node_id, + context={ + "connectionMap": run_context.get("connectionMap"), + "inputSources": run_context.get("inputSources"), + "orderedNodeIds": ordered_ids, + "pauseReason": "contextAssignment", + }, + ) + if not (run_id and task_id and node_id): + raise RuntimeError("humanTask requires _runId, task id, and _workflowNodeId") + raise PauseForHumanTaskError(runId=run_id, taskId=task_id, nodeId=node_id) + + +async def setContext(self, parameters: Dict[str, Any]) -> ActionResult: + try: + scope = str(parameters.get("scope") or "local") + if scope not in _VALID_SCOPES: + return ActionResult.isFailure(error=f"Invalid scope '{scope}', expected one of {sorted(_VALID_SCOPES)}") + + entries: List[Dict[str, Any]] = _normalize_assignments(parameters) + if not entries: + return ActionResult.isFailure( + error="Mindestens eine Zuweisung konfigurieren (Ziel-Schlüssel, Quelle und Wert / Picker / Task).", + ) + + run_context = parameters.get("_runContext") + if not isinstance(run_context, dict): + return ActionResult.isFailure(error="internal: execution context missing") + + store = _resolve_store(scope, run_context) + upstream = parameters.get("_upstreamPayload") + + applied: Dict[str, Any] = {} + errors: List[str] = [] + human_rows: List[Dict[str, Any]] = [] + + for entry in entries: + if not isinstance(entry, dict): + errors.append("entry is not an object") + continue + + ck = _entry_context_key(entry) + if not ck: + errors.append("assignment needs contextKey") + continue + + vs = _value_source(entry) + if vs not in _VALID_VALUE_SOURCES: + errors.append(f"{ck}: unknown valueSource '{vs}'") + continue + + if vs == "humanTask": + human_rows.append( + { + "contextKey": ck, + "sourcePath": entry.get("sourcePath"), + "taskTitle": entry.get("taskTitle"), + "taskDescription": entry.get("taskDescription"), + "type": entry.get("valueType") or entry.get("type"), + "mode": entry.get("mode") or "set", + } + ) + continue + + val: Any = None + err: Optional[str] = None + + if vs == "pickUpstream": + val, err = _resolve_pick_upstream(entry, upstream, parameters) + else: + val, err = _resolve_literal(entry) + + if err: + errors.append(f"{ck}: {err}") + continue + + err2 = _apply_value_to_store( + store, + ck, + val, + str(entry.get("mode") or "set"), + str(entry.get("valueType") or entry.get("type") or ""), + ) + if err2: + errors.append(f"{ck}: {err2}") + continue + applied[ck] = store.get(ck) + + iface = run_context.get("_automation2Interface") + if human_rows: + if iface: + _pause_for_human_tasks( + iface=iface, + run_context=run_context, + parameters=parameters, + pending_entries=human_rows, + scope=scope, + ) + else: + applied["_humanTaskFallback"] = ( + "humanTask requires a live automation2 interface on the run; " + "configure execution via the graphical editor API or add an input.human node." + ) + applied["_pendingHumanContextKeys"] = [r["contextKey"] for r in human_rows] + + if errors and not applied and not human_rows: + return ActionResult.isFailure(error="; ".join(errors)) + + data: Dict[str, Any] = dict(applied) + data["_scope"] = scope + data["_appliedKeys"] = [k for k in applied if not str(k).startswith("_")] + if errors: + data["_warnings"] = errors + + if isinstance(upstream, dict): + meta = upstream.get("_meta") + if isinstance(meta, dict): + data["_meta"] = meta + data.setdefault("_transit", True) + + return ActionResult.isSuccess(data=data) + except PauseForHumanTaskError: + raise + except Exception as exc: + logger.exception("setContext failed") + return ActionResult.isFailure(error=str(exc)) diff --git a/modules/workflows/methods/methodContext/actions/transformContext.py b/modules/workflows/methods/methodContext/actions/transformContext.py new file mode 100644 index 00000000..6fe05e03 --- /dev/null +++ b/modules/workflows/methods/methodContext/actions/transformContext.py @@ -0,0 +1,222 @@ +# Copyright (c) 2026 Patrick Motsch +# All rights reserved. +"""Action ``context.transformContext``. + +Applies a sequence of mappings to the upstream payload. Supported operations: + +- ``rename`` — copy a source path to a new output key +- ``cast`` — copy and convert to a target type (errors recorded in ``_castErrors``) +- ``nest`` — group several mappings under a dotted ``outputField`` (e.g. ``address.city``) +- ``flatten`` — copy a nested dict's leaves up to the configured ``flattenDepth`` +- ``compute`` — render a ``{{...}}`` template using the upstream payload as scope +""" + +from __future__ import annotations + +import logging +import re +from typing import Any, Dict, List, Optional + +from modules.datamodels.datamodelChat import ActionResult + +logger = logging.getLogger(__name__) + + +_VALID_OPERATIONS = {"rename", "cast", "nest", "flatten", "compute"} + + +def _get_path(payload: Any, dotted: str) -> Any: + cur = payload + for seg in str(dotted).split("."): + if cur is None: + return None + if isinstance(cur, dict): + cur = cur.get(seg) + continue + if isinstance(cur, list): + try: + cur = cur[int(seg)] + except (ValueError, IndexError): + return None + continue + return None + return cur + + +def _set_path(target: Dict[str, Any], dotted: str, value: Any) -> None: + parts = str(dotted).split(".") + cur = target + for seg in parts[:-1]: + nxt = cur.get(seg) + if not isinstance(nxt, dict): + nxt = {} + cur[seg] = nxt + cur = nxt + cur[parts[-1]] = value + + +def _coerce_type(value: Any, type_str: str) -> Any: + if type_str in (None, "", "any", "Any"): + return value + if type_str == "str": + return "" if value is None else str(value) + if type_str == "int": + if isinstance(value, bool): + return int(value) + if value is None or value == "": + raise ValueError("empty value") + return int(float(value)) + if type_str == "float": + if value is None or value == "": + raise ValueError("empty value") + return float(value) + if type_str == "bool": + if isinstance(value, bool): + return value + if isinstance(value, (int, float)): + return bool(value) + return str(value).strip().lower() in ("1", "true", "yes", "on", "ja") + if type_str in ("list", "List", "array"): + return value if isinstance(value, list) else ([value] if value is not None else []) + if type_str in ("object", "dict", "Dict"): + return value if isinstance(value, dict) else {"value": value} + return value + + +_TEMPLATE_RE = re.compile(r"\{\{\s*([^{}\s|]+)(?:\s*\|\s*([^{}]*))?\s*\}\}") + + +def _apply_filter(value: Any, filter_chain: str) -> Any: + """Minimal filter pipeline: ``upper``, ``lower``, ``trim``, ``default:foo``.""" + out = value + for token in filter_chain.split("|"): + f = token.strip() + if not f: + continue + if f == "upper": + out = "" if out is None else str(out).upper() + elif f == "lower": + out = "" if out is None else str(out).lower() + elif f == "trim": + out = "" if out is None else str(out).strip() + elif f.startswith("default:"): + if out is None or out == "": + out = f.split(":", 1)[1] + else: + logger.debug("transformContext: unknown filter '%s' ignored", f) + return out + + +def _render_template(template: str, scope: Dict[str, Any]) -> str: + def replace(match: re.Match) -> str: + path = match.group(1) + filters = match.group(2) or "" + value = _get_path(scope, path) + if filters: + value = _apply_filter(value, filters) + return "" if value is None else str(value) + + return _TEMPLATE_RE.sub(replace, template) + + +def _flatten_with_depth(node: Any, depth: int, prefix: str = "") -> Dict[str, Any]: + out: Dict[str, Any] = {} + if not isinstance(node, dict) or depth == 0: + if prefix: + out[prefix] = node + return out + for k, v in node.items(): + path = f"{prefix}.{k}" if prefix else str(k) + if isinstance(v, dict) and depth != 1: + out.update(_flatten_with_depth(v, depth - 1 if depth > 0 else -1, path)) + elif isinstance(v, dict): + out[path] = v + else: + out[path] = v + return out + + +async def transformContext(self, parameters: Dict[str, Any]) -> ActionResult: + try: + mappings: List[Dict[str, Any]] = parameters.get("mappings") or [] + if not isinstance(mappings, list) or not mappings: + return ActionResult.isFailure(error="'mappings' must be a non-empty list") + + passthrough = bool(parameters.get("passthroughUnmapped", False)) + flatten_depth = int(parameters.get("flattenDepth") or 1) + + upstream = parameters.get("_upstreamPayload") + if not isinstance(upstream, dict): + upstream = {"value": upstream} if upstream is not None else {} + + result: Dict[str, Any] = {} + consumed_paths: set = set() + cast_errors: Dict[str, str] = {} + + for m in mappings: + if not isinstance(m, dict): + continue + op = str(m.get("operation") or "rename") + if op not in _VALID_OPERATIONS: + cast_errors[str(m.get("outputField") or "?")] = f"unknown operation '{op}'" + continue + output_field = str(m.get("outputField") or "").strip() + if not output_field: + continue + source_field = str(m.get("sourceField") or "").strip() + target_type = str(m.get("type") or "") + + if op == "compute": + expression = str(m.get("expression") or m.get("sourceField") or "") + value = _render_template(expression, upstream) + if target_type: + try: + value = _coerce_type(value, target_type) + except (TypeError, ValueError) as exc: + cast_errors[output_field] = str(exc) + value = None + _set_path(result, output_field, value) + continue + + if op == "flatten": + base = _get_path(upstream, source_field) if source_field else upstream + flat = _flatten_with_depth(base, flatten_depth, output_field if source_field else "") + for path, val in flat.items(): + _set_path(result, path or output_field, val) + if source_field: + consumed_paths.add(source_field) + continue + + value = _get_path(upstream, source_field) if source_field else None + if source_field: + consumed_paths.add(source_field) + + if op == "cast" and target_type: + try: + value = _coerce_type(value, target_type) + except (TypeError, ValueError) as exc: + cast_errors[output_field] = str(exc) + value = None + elif op == "rename" and target_type: + # Optional explicit type on rename is treated like cast best-effort. + try: + value = _coerce_type(value, target_type) + except (TypeError, ValueError) as exc: + cast_errors[output_field] = str(exc) + # ``nest`` is implicit: dotted ``outputField`` writes into a nested dict + _set_path(result, output_field, value) + + if passthrough: + for k, v in upstream.items(): + if k.startswith("_"): + continue + if k in result or k in consumed_paths: + continue + result[k] = v + + if cast_errors: + result["_castErrors"] = cast_errors + return ActionResult.isSuccess(data=result) + except Exception as exc: + logger.exception("transformContext failed") + return ActionResult.isFailure(error=str(exc)) diff --git a/modules/workflows/methods/methodContext/methodContext.py b/modules/workflows/methods/methodContext/methodContext.py index ae6fcbcb..1f7b9180 100644 --- a/modules/workflows/methods/methodContext/methodContext.py +++ b/modules/workflows/methods/methodContext/methodContext.py @@ -15,6 +15,10 @@ from .actions.getDocumentIndex import getDocumentIndex from .actions.extractContent import extractContent from .actions.neutralizeData import neutralizeData from .actions.triggerPreprocessingServer import triggerPreprocessingServer +from .actions.setContext import setContext +from .actions.mergeContext import mergeContext +from .actions.filterContext import filterContext +from .actions.transformContext import transformContext logger = logging.getLogger(__name__) @@ -116,7 +120,135 @@ class MethodContext(MethodBase): ) }, execute=triggerPreprocessingServer.__get__(self, self.__class__) - ) + ), + "setContext": WorkflowActionDefinition( + actionId="context.setContext", + description=( + "Set workflow context: list of assignments with target key, then upstream picker, " + "fixed literal, or human task per row." + ), + outputType="Transit", + parameters={ + "scope": WorkflowActionParameter( + name="scope", type="str", required=False, + frontendType=FrontendType.SELECT, + frontendOptions=["local", "global", "session"], + default="local", + description="Storage scope for keys written by this node", + ), + "assignments": WorkflowActionParameter( + name="assignments", type="list", required=True, + frontendType=FrontendType.CONTEXT_ASSIGNMENTS, + default=[], + description=( + "List of rows: contextKey, valueSource (pickUpstream | literal | humanTask), " + "upstreamRef, literal, sourcePath, mode, valueType, task fields." + ), + ), + }, + execute=setContext.__get__(self, self.__class__), + ), + "mergeContext": WorkflowActionDefinition( + actionId="context.mergeContext", + description=( + "Merge data arriving from multiple parallel branches into a single " + "MergeResult. Strategies: shallow, deep, firstWins, lastWins, " + "errorOnConflict. The execution engine waits for all connected " + "predecessors before invoking this action (waitsForAllPredecessors=True)." + ), + outputType="MergeResult", + parameters={ + "strategy": WorkflowActionParameter( + name="strategy", type="str", required=False, + frontendType=FrontendType.SELECT, + frontendOptions=["shallow", "deep", "firstWins", "lastWins", "errorOnConflict"], + default="deep", + description="Conflict resolution strategy for keys present in several branches", + ), + "waitFor": WorkflowActionParameter( + name="waitFor", type="int", required=False, + frontendType=FrontendType.NUMBER, + default=0, + description="Number of branches to consume (0 = all). Used together with timeoutMs.", + ), + "timeoutMs": WorkflowActionParameter( + name="timeoutMs", type="int", required=False, + frontendType=FrontendType.NUMBER, + default=30000, + description="Maximum wait time in milliseconds before continuing with available inputs", + ), + }, + execute=mergeContext.__get__(self, self.__class__), + ), + "filterContext": WorkflowActionDefinition( + actionId="context.filterContext", + description=( + "Allow- or block-list keys/paths from the upstream payload. " + "Supports glob patterns (user.*, *.id) and dotted paths (address.city). " + "Missing-key behaviour is configurable (skip, nullFill, error)." + ), + outputType="Transit", + parameters={ + "mode": WorkflowActionParameter( + name="mode", type="str", required=False, + frontendType=FrontendType.SELECT, + frontendOptions=["allow", "block"], + default="allow", + description="allow = only these keys pass; block = these keys are removed", + ), + "keys": WorkflowActionParameter( + name="keys", type="list", required=True, + frontendType=FrontendType.JSON, + default=[], + description="Key paths or glob patterns", + ), + "missingKeyBehavior": WorkflowActionParameter( + name="missingKeyBehavior", type="str", required=False, + frontendType=FrontendType.SELECT, + frontendOptions=["skip", "nullFill", "error"], + default="skip", + description="What to do when an allowed key is missing in the input", + ), + "preserveMeta": WorkflowActionParameter( + name="preserveMeta", type="bool", required=False, + frontendType=FrontendType.CHECKBOX, + default=True, + description="Always pass through internal meta fields (_success, _error, _transit)", + ), + }, + execute=filterContext.__get__(self, self.__class__), + ), + "transformContext": WorkflowActionDefinition( + actionId="context.transformContext", + description=( + "Transform the upstream payload via a list of {sourceField, outputField, " + "operation, type, expression} mappings. Operations: rename, cast, nest, " + "flatten, compute. compute uses {{...}} templates; nesting is implicit " + "via dotted outputField paths." + ), + outputType="Transit", + parameters={ + "mappings": WorkflowActionParameter( + name="mappings", type="list", required=True, + frontendType=FrontendType.MAPPING_TABLE, + default=[], + description="List of mapping entries", + ), + "passthroughUnmapped": WorkflowActionParameter( + name="passthroughUnmapped", type="bool", required=False, + frontendType=FrontendType.CHECKBOX, + default=False, + description="Forward fields of the upstream payload that no mapping consumed", + ), + "flattenDepth": WorkflowActionParameter( + name="flattenDepth", type="int", required=False, + frontendType=FrontendType.NUMBER, + default=1, + description="Depth for flatten operation (1 = one level, -1 = full)", + ), + }, + execute=transformContext.__get__(self, self.__class__), + ), } # Validate actions after definition @@ -127,4 +259,8 @@ class MethodContext(MethodBase): self.extractContent = extractContent.__get__(self, self.__class__) self.neutralizeData = neutralizeData.__get__(self, self.__class__) self.triggerPreprocessingServer = triggerPreprocessingServer.__get__(self, self.__class__) + self.setContext = setContext.__get__(self, self.__class__) + self.mergeContext = mergeContext.__get__(self, self.__class__) + self.filterContext = filterContext.__get__(self, self.__class__) + self.transformContext = transformContext.__get__(self, self.__class__)