From 823afa78bb5308762c236b95a4b27031fffca6f0 Mon Sep 17 00:00:00 2001 From: Ida Date: Wed, 13 May 2026 06:18:42 +0200 Subject: [PATCH] continuous work of grafical editor --- ...g-mietzinsbestaetigung-pilot.workflow.json | 1 - .../nodeDefinitions/context.py | 229 +++------------ .../graphicalEditor/nodeDefinitions/flow.py | 88 +++++- .../routeFeatureGraphicalEditor.py | 75 ++++- .../graphicalEditor/upstreamPathsService.py | 86 +++++- modules/interfaces/interfaceBootstrap.py | 2 - .../serviceGeneration/subDocumentUtility.py | 68 ++++- .../workflows/automation2/executionEngine.py | 180 +++++++++++- .../executors/actionNodeExecutor.py | 139 ++++++++-- .../automation2/executors/flowExecutor.py | 56 ++-- modules/workflows/automation2/graphUtils.py | 95 ++++++- modules/workflows/methods/methodAi/_common.py | 28 +- .../methodContext/actions/extractContent.py | 67 ++++- .../methodContext/actions/mergeContext.py | 261 ++++++++++++------ .../methods/methodContext/methodContext.py | 35 +-- .../methods/methodFile/actions/create.py | 199 ++++++++++++- ...xecute_graph_loop_aggregate_consolidate.py | 50 +++- .../workflow/test_extract_content_handover.py | 93 ++++++- .../workflow/test_merge_context_handover.py | 178 ++++++++++++ .../unit/workflow/test_phase3_context_node.py | 8 +- .../workflow/test_phase4_workflow_nodes.py | 29 +- ...rialize_context_and_file_create_context.py | 98 +++++++ .../workflows/test_automation2_graphUtils.py | 34 +++ 23 files changed, 1691 insertions(+), 408 deletions(-) create mode 100644 tests/unit/workflow/test_merge_context_handover.py create mode 100644 tests/unit/workflow/test_serialize_context_and_file_create_context.py diff --git a/demoData/workflows/pwg-mietzinsbestaetigung-pilot.workflow.json b/demoData/workflows/pwg-mietzinsbestaetigung-pilot.workflow.json index 78f50751..eaf1a941 100644 --- a/demoData/workflows/pwg-mietzinsbestaetigung-pilot.workflow.json +++ b/demoData/workflows/pwg-mietzinsbestaetigung-pilot.workflow.json @@ -38,7 +38,6 @@ "title": "Pro Scan-Dokument", "parameters": { "items": {"type": "ref", "nodeId": "n2", "path": ["files"]}, - "level": "auto", "concurrency": 1 } }, diff --git a/modules/features/graphicalEditor/nodeDefinitions/context.py b/modules/features/graphicalEditor/nodeDefinitions/context.py index f7aa3df5..52ff3a8b 100644 --- a/modules/features/graphicalEditor/nodeDefinitions/context.py +++ b/modules/features/graphicalEditor/nodeDefinitions/context.py @@ -4,6 +4,8 @@ from modules.shared.i18nRegistry import t +from modules.features.graphicalEditor.nodeDefinitions.flow import CONTEXT_MERGE_ACTION_RESULT_DATA_PICK_OPTIONS + _CONTEXT_INPUT_SCHEMAS = [ "Transit", "ActionResult", @@ -19,38 +21,6 @@ _CONTEXT_INPUT_SCHEMAS = [ ] -_MERGE_RESULT_DATA_PICK_OPTIONS = [ - { - "path": ["merged"], - "pickerLabel": t("Zusammengeführt"), - "detail": t("Zusammengeführtes Objekt nach gewählter Strategie."), - "recommended": True, - "type": "Dict", - }, - { - "path": ["first"], - "pickerLabel": t("Erster Zweig"), - "detail": t("Daten vom ersten verbundenen Eingang."), - "recommended": False, - "type": "Any", - }, - { - "path": ["inputs"], - "pickerLabel": t("Alle Eingänge"), - "detail": t("Dict der Eingabeobjekte nach Port-Index."), - "recommended": False, - "type": "Dict[int,Any]", - }, - { - "path": ["conflicts"], - "pickerLabel": t("Konflikte"), - "detail": t("Liste der Schlüssel mit Konflikt (nur bei errorOnConflict)."), - "recommended": False, - "type": "List[str]", - }, -] - - CONTEXT_NODES = [ { "id": "context.extractContent", @@ -66,6 +36,29 @@ CONTEXT_NODES = [ {"name": "documentList", "type": "str", "required": True, "frontendType": "hidden", "description": t("Dokumentenliste (via Wire oder DataRef)"), "default": "", "graphInherit": {"port": 0, "kind": "documentListWire"}}, + { + "name": "contentFilter", + "type": "str", + "required": False, + "frontendType": "select", + "frontendOptions": { + "options": [ + {"value": "all", "label": t("Alles (Text, Tabellen, Bilder)")}, + {"value": "textOnly", "label": t("Nur Text und Tabellen")}, + {"value": "imagesOnly", "label": t("Nur Bilder")}, + {"value": "noImages", "label": t("Alles ausser Bilder")}, + ] + }, + "default": "all", + "description": t( + "Welche Parts im Handover behalten werden. " + "all = alle Typgruppen inkl. Bilder; " + "textOnly = ausschliesslich Text-, Tabellen- und Struktur-Parts; " + "imagesOnly = ausschliesslich Bild-Parts; " + "noImages = alle Parts ausser Bildern (weiter als textOnly: " + "auch kuenftige Nicht-Bild-Typen bleiben erhalten)." + ), + }, ], "inputs": 1, "outputs": 1, @@ -120,186 +113,40 @@ CONTEXT_NODES = [ "_method": "context", "_action": "extractContent", }, - { - "id": "context.setContext", - "category": "context", - "label": t("Kontext setzen"), - "description": t( - "Schreibt in den Workflow-Kontext. Pro Zeile: Ziel-Schlüssel, dann entweder einen " - "festen Wert, eine Datenquelle aus dem Graph (Kontext-Picker wie bei anderen Nodes), " - "oder eine Aufgabe für einen Benutzer (Human Task) zum Setzen des Werts." - ), - "parameters": [ - { - "name": "scope", - "type": "str", - "required": False, - "frontendType": "select", - "frontendOptions": {"options": ["local", "global", "session"]}, - "default": "local", - "description": t("Speicherbereich"), - }, - { - "name": "assignments", - "type": "list", - "required": True, - "frontendType": "contextAssignments", - "default": [], - "description": t( - "Zuweisungen: Ziel-Schlüssel, Quelle (Picker / fester Wert / Human Task), " - "Modus (set, setIfEmpty, append, increment). Optionaler Experten-Pfad `sourcePath` unter der " - "gewählten Datenquelle (z. B. payload.status)." - ), - "graphInherit": {"port": 0, "kind": "primaryTextRef"}, - }, - ], - "inputs": 1, - "outputs": 1, - "inputPorts": {0: {"accepts": _CONTEXT_INPUT_SCHEMAS}}, - "outputPorts": { - 0: { - "schema": "Transit", - "dynamic": True, - "deriveFrom": "assignments", - "deriveNameField": "contextKey", - } - }, - "injectUpstreamPayload": True, - "injectRunContext": True, - "surfaceDataAsTopLevel": True, - "meta": {"icon": "mdi-database-edit-outline", "color": "#5C6BC0", "usesAi": False}, - "_method": "context", - "_action": "setContext", - }, { "id": "context.mergeContext", "category": "context", "label": t("Kontext zusammenführen"), "description": t( - "Wartet auf alle verbundenen eingehenden Branches und führt deren " - "Kontext-Daten zu einem einheitlichen MergeResult zusammen. " - "Strategien: 'shallow' (oberste Ebene), 'deep' (rekursiv), " - "'firstWins' / 'lastWins' bei Konflikten, " - "'errorOnConflict' (bricht ab und listet Konflikte). " - "Der Node blockiert bis alle erwarteten Inputs eingetroffen sind." + "Führt eine Liste von Ergebnissen zu einem einzigen Kontext zusammen. " + "Wähle als Datenquelle die Option Alle Schleifen-Ergebnisse einer Schleife, " + "um alle Iterationsergebnisse in einem Datensatz zu vereinen." ), "parameters": [ { - "name": "strategy", - "type": "str", - "required": False, - "frontendType": "select", - "frontendOptions": { - "options": ["shallow", "deep", "firstWins", "lastWins", "errorOnConflict"] - }, - "default": "deep", - "description": t("Strategie bei gleichnamigen Keys aus verschiedenen Branches"), - }, - { - "name": "waitFor", - "type": "int", - "required": False, - "frontendType": "number", - "default": 0, - "description": t( - "Anzahl Inputs abwarten (0 = alle verbundenen Branches). " - "Hilfreich für optionale Branches mit Timeout." - ), - }, - { - "name": "timeoutMs", - "type": "int", - "required": False, - "frontendType": "number", - "default": 30000, - "description": t( - "Maximale Wartezeit in ms — danach wird mit den vorhandenen Inputs fortgesetzt" - ), - }, - ], - "inputs": 5, - "outputs": 1, - "inputPorts": { - 0: {"accepts": _CONTEXT_INPUT_SCHEMAS}, - 1: {"accepts": _CONTEXT_INPUT_SCHEMAS}, - 2: {"accepts": _CONTEXT_INPUT_SCHEMAS}, - 3: {"accepts": _CONTEXT_INPUT_SCHEMAS}, - 4: {"accepts": _CONTEXT_INPUT_SCHEMAS}, - }, - "outputPorts": { - 0: {"schema": "MergeResult", "dataPickOptions": _MERGE_RESULT_DATA_PICK_OPTIONS} - }, - "waitsForAllPredecessors": True, - "injectBranchInputs": True, - "meta": {"icon": "mdi-call-merge", "color": "#7B1FA2", "usesAi": False}, - "_method": "context", - "_action": "mergeContext", - }, - { - "id": "context.filterContext", - "category": "context", - "label": t("Kontext filtern"), - "description": t( - "Gibt nur bestimmte Felder des eingehenden Datenstroms weiter. " - "Modus 'allow': nur diese Keys passieren. " - "Modus 'block': diese Keys werden entfernt, alles andere bleibt. " - "Unterstützt Pfadausdrücke (z.B. 'user.*', '*.id') und tiefe Pfade ('address.city'). " - "Fehlende Keys werden je nach 'missingKeyBehavior' ignoriert, mit null befüllt oder als Fehler behandelt." - ), - "parameters": [ - { - "name": "mode", - "type": "str", - "required": False, - "frontendType": "select", - "frontendOptions": {"options": ["allow", "block"]}, - "default": "allow", - "description": t("Allowlist (nur diese durch) oder Blocklist (diese entfernen)"), - }, - { - "name": "keys", - "type": "list", + "name": "dataSource", + "type": "Any", "required": True, - "frontendType": "stringList", - "default": [], + "frontendType": "dataRef", "description": t( - "Key-Pfade oder Wildcard-Muster. " - "Beispiele: 'response', 'user.*', '*.id', 'address.city'." + "Datenquelle: Liste von Einträgen zum Zusammenführen " + "(z. B. Schleife → Alle Schleifen-Ergebnisse)" ), }, - { - "name": "missingKeyBehavior", - "type": "str", - "required": False, - "frontendType": "select", - "frontendOptions": {"options": ["skip", "nullFill", "error"]}, - "default": "skip", - "description": t("Verhalten wenn ein erlaubter Key im Input fehlt"), - }, - { - "name": "preserveMeta", - "type": "bool", - "required": False, - "frontendType": "checkbox", - "default": True, - "description": t("Interne Meta-Felder (_success, _error, _transit) immer durchlassen"), - }, ], "inputs": 1, "outputs": 1, "inputPorts": {0: {"accepts": _CONTEXT_INPUT_SCHEMAS}}, "outputPorts": { - 0: { - "schema": "Transit", - "dynamic": True, - "deriveFrom": "keys", - } + 0: {"schema": "ActionResult", "dataPickOptions": CONTEXT_MERGE_ACTION_RESULT_DATA_PICK_OPTIONS} }, "injectUpstreamPayload": True, + # Same contract as transformContext: picker paths like ``merged`` / ``first`` must match + # ``nodeOutputs`` (see actionNodeExecutor ``surfaceDataAsTopLevel``); merge payloads live in ``data``. "surfaceDataAsTopLevel": True, - "meta": {"icon": "mdi-filter-outline", "color": "#00838F", "usesAi": False}, + "meta": {"icon": "mdi-call-merge", "color": "#7B1FA2", "usesAi": False}, "_method": "context", - "_action": "filterContext", + "_action": "mergeContext", }, { "id": "context.transformContext", diff --git a/modules/features/graphicalEditor/nodeDefinitions/flow.py b/modules/features/graphicalEditor/nodeDefinitions/flow.py index 4435fe85..e47a063e 100644 --- a/modules/features/graphicalEditor/nodeDefinitions/flow.py +++ b/modules/features/graphicalEditor/nodeDefinitions/flow.py @@ -3,6 +3,35 @@ from modules.shared.i18nRegistry import t +LOOP_DONE_DATA_PICK_OPTIONS = [ + { + "path": ["bodyResults"], + "pickerLabel": t("Alle Schleifen-Ergebnisse"), + "detail": t( + "Ausgabe des letzten Schrittes im Schleifen-Rumpf pro Iteration als Liste, " + "ein Eintrag pro Durchlauf. Ideal als Eingabe fuer Kontext zusammenfuehren." + ), + "recommended": True, + "type": "List[Any]", + }, + { + "path": ["items"], + "pickerLabel": t("Iterierte Elemente"), + "detail": t( + "Liste der Schleifen-Elemente nach gewähltem Iterationsmodus (Kopie der Eingabeliste, gefiltert)." + ), + "recommended": False, + "type": "List[Any]", + }, + { + "path": ["count"], + "pickerLabel": t("Anzahl Durchläufe"), + "detail": t("Wie viele Iterationen die Schleife ausgeführt hat."), + "recommended": False, + "type": "int", + }, +] + LOOP_ITEM_DATA_PICK_OPTIONS = [ { "path": ["currentItem"], @@ -58,6 +87,19 @@ MERGE_RESULT_DATA_PICK_OPTIONS = [ }, ] +# Extended picker for ``context.mergeContext`` (ActionResult + ``surfaceDataAsTopLevel``): same +# merge keys as ``flow.merge`` plus ``count`` from the action payload. +CONTEXT_MERGE_ACTION_RESULT_DATA_PICK_OPTIONS = [ + *MERGE_RESULT_DATA_PICK_OPTIONS, + { + "path": ["count"], + "pickerLabel": t("Anzahl Einträge"), + "detail": t("Wie viele Einträge zusammengeführt wurden."), + "recommended": False, + "type": "int", + }, +] + # Ports, die typische Schritt-Ausgaben durchreichen (nicht nur leerer Transit). _FLOW_INPUT_SCHEMAS = [ "Transit", @@ -138,8 +180,10 @@ FLOW_NODES = [ "category": "flow", "label": t("Schleife / Für jedes"), "description": t( - "Iteriert über ein Array aus einem vorherigen Schritt (z. B. documente, Zeilen, Listeneinträge). " - "Optional: UDM-Ebene für strukturierte Dokumente." + "Zwei Ausgänge: „Schleife“ verbindet den Rumpf (pro Element); optional führt der Rumpf " + "mit einem Rücklauf-Pfeil wieder zum **gleichen Eingang** wie der vorherige Schritt (wie in n8n). " + "„Fertig“ führt genau einmal fort, wenn alle Iterationen beendet sind. " + "Die zu durchlaufende Liste wählen Sie wie bisher; UDM-/Strukturdaten werden automatisch sinnvoll in Elemente aufgelöst." ), "parameters": [ { @@ -150,13 +194,27 @@ FLOW_NODES = [ "description": t("Liste oder Sammlung zum Durchlaufen (im Data Picker wählen)"), }, { - "name": "level", + "name": "iterationMode", "type": "str", "required": False, "frontendType": "select", - "frontendOptions": {"options": ["auto", "documents", "structuralNodes", "contentBlocks"]}, - "description": t("Nur bei UDM-Daten: welche Strukturebene als Elemente verwendet wird"), - "default": "auto", + "frontendOptions": { + "options": ["all", "first", "last", "every_second", "every_third", "every_nth"], + }, + "description": t( + "Welche Elemente die Schleife besucht: alle, nur das erste/letzte, jedes zweite/dritte " + "oder jedes n-te (Schritt dann unter „Schrittweite“)." + ), + "default": "all", + }, + { + "name": "iterationStride", + "type": "int", + "required": False, + "frontendType": "number", + "frontendOptions": {"min": 2, "max": 100}, + "description": t("Nur bei „jedes n-te“: Schrittweite (z. B. 5 = jedes 5. Element ab Index 0)."), + "default": 2, }, { "name": "concurrency", @@ -169,12 +227,18 @@ FLOW_NODES = [ }, ], "inputs": 1, - "outputs": 1, - "inputPorts": {0: {"accepts": [ - "Transit", "UdmDocument", "EmailList", "DocumentList", "FileList", "TaskList", - "ActionResult", "AiResult", "QueryResult", "FormPayload", - ]}}, - "outputPorts": {0: {"schema": "LoopItem", "dataPickOptions": LOOP_ITEM_DATA_PICK_OPTIONS}}, + "outputs": 2, + "outputLabels": [t("Schleife"), t("Fertig")], + "inputPorts": { + 0: {"accepts": [ + "Transit", "UdmDocument", "EmailList", "DocumentList", "FileList", "TaskList", + "ActionResult", "AiResult", "QueryResult", "FormPayload", "LoopItem", + ]}, + }, + "outputPorts": { + 0: {"schema": "LoopItem", "dataPickOptions": LOOP_ITEM_DATA_PICK_OPTIONS}, + 1: {"schema": "Transit", "dataPickOptions": LOOP_DONE_DATA_PICK_OPTIONS}, + }, "executor": "flow", "meta": {"icon": "mdi-repeat", "color": "#FF9800", "usesAi": False}, }, diff --git a/modules/features/graphicalEditor/routeFeatureGraphicalEditor.py b/modules/features/graphicalEditor/routeFeatureGraphicalEditor.py index 4748f39a..50573b0a 100644 --- a/modules/features/graphicalEditor/routeFeatureGraphicalEditor.py +++ b/modules/features/graphicalEditor/routeFeatureGraphicalEditor.py @@ -26,7 +26,7 @@ from modules.workflows.automation2.runEnvelope import ( normalize_run_envelope, ) from modules.features.graphicalEditor.entryPoints import find_invocation -from modules.features.graphicalEditor.upstreamPathsService import compute_upstream_paths +from modules.features.graphicalEditor.upstreamPathsService import compute_upstream_paths, compute_graph_data_sources from modules.shared.i18nRegistry import apiRouteContext, resolveText routeApiMsg = apiRouteContext("routeFeatureGraphicalEditor") @@ -192,6 +192,34 @@ def post_upstream_paths( return {"paths": paths} +@router.post("/{instanceId}/graph-data-sources") +@limiter.limit("120/minute") +def post_graph_data_sources( + request: Request, + instanceId: str = Path(..., description="Feature instance ID"), + body: Dict[str, Any] = Body(...), + context: RequestContext = Depends(getRequestContext), +) -> dict: + """Scope-aware data sources for the DataPicker. + + Takes ``{ nodeId, graph: { nodes, connections } }`` and returns:: + + { + "availableSourceIds": [...], # ancestors minus loop-body nodes on Done branch + "portIndexOverrides": {nodeId: n}, # use outputPorts[n] instead of 0 + "loopBodyContextIds": [...], # loops whose body the node is in + } + + All loop scope logic lives here so the frontend has zero topology knowledge. + """ + _validateInstanceAccess(instanceId, context) + graph = body.get("graph") + node_id = body.get("nodeId") + if not isinstance(graph, dict) or not node_id: + raise HTTPException(status_code=400, detail=routeApiMsg("graph and nodeId are required")) + return compute_graph_data_sources(graph, str(node_id)) + + @router.get("/{instanceId}/upstream-paths/{node_id}") @limiter.limit("60/minute") def get_upstream_paths_saved( @@ -1724,6 +1752,51 @@ async def complete_task( ) +@router.post("/{instanceId}/tasks/{taskId}/cancel") +@limiter.limit("30/minute") +def cancel_pending_task_stop_run( + request: Request, + instanceId: str = Path(..., description="Feature instance ID"), + taskId: str = Path(..., description="Human task ID"), + context: RequestContext = Depends(getRequestContext), +) -> dict: + """Cancel a pending human task and stop the workflow run behind it.""" + mandateId = _validateInstanceAccess(instanceId, context) + iface = getGraphicalEditorInterface(context.user, mandateId, instanceId) + task = iface.getTask(taskId) + if not task: + raise HTTPException(status_code=404, detail=routeApiMsg("Task not found")) + + wf_ids = {w.get("id") for w in iface.getWorkflows() if w.get("id")} + if task.get("workflowId") not in wf_ids: + raise HTTPException(status_code=404, detail=routeApiMsg("Task not found")) + + if task.get("status") != "pending": + raise HTTPException(status_code=400, detail=routeApiMsg("Task already completed")) + + run_id = task.get("runId") + + from modules.workflows.automation2.executionEngine import requestRunStop + + if run_id: + requestRunStop(run_id) + db_run = iface.getRun(run_id) + if db_run: + current = db_run.get("status") or "" + if current not in ("completed", "failed", "cancelled"): + iface.updateRun(run_id, status="cancelled") + + pending = iface.getTasks(runId=run_id, status="pending") + for t in pending: + tid = t.get("id") + if tid: + iface.updateTask(tid, status="cancelled") + else: + iface.updateTask(taskId, status="cancelled") + + return {"success": True, "runId": run_id, "taskId": taskId} + + # ------------------------------------------------------------------------- # Monitoring / Metrics # ------------------------------------------------------------------------- diff --git a/modules/features/graphicalEditor/upstreamPathsService.py b/modules/features/graphicalEditor/upstreamPathsService.py index 9cff3151..f0cb473e 100644 --- a/modules/features/graphicalEditor/upstreamPathsService.py +++ b/modules/features/graphicalEditor/upstreamPathsService.py @@ -6,7 +6,7 @@ from typing import Any, Dict, List, Set from modules.features.graphicalEditor.nodeDefinitions import STATIC_NODE_TYPES from modules.features.graphicalEditor.portTypes import PORT_TYPE_CATALOG, PortSchema, parse_graph_defined_output_schema -from modules.workflows.automation2.graphUtils import buildConnectionMap +from modules.workflows.automation2.graphUtils import buildConnectionMap, getLoopBodyNodeIds, getLoopDoneNodeIds _NODE_BY_TYPE = {n["id"]: n for n in STATIC_NODE_TYPES} @@ -129,10 +129,13 @@ def compute_upstream_paths(graph: Dict[str, Any], target_node_id: str) -> List[D entry["producerLabel"] = (anode.get("title") or "").strip() or aid paths.append(entry) - # Lexical loop hints (flow.loop): any loop node in ancestors adds synthetic paths + # Lexical loop hints (flow.loop): only for nodes inside the loop body for aid in ancestors: anode = node_by_id.get(aid) or {} - if anode.get("type") == "flow.loop": + if anode.get("type") != "flow.loop": + continue + body_ids = getLoopBodyNodeIds(aid, conn_map) + if target_node_id in body_ids: paths.extend( [ { @@ -160,3 +163,80 @@ def compute_upstream_paths(graph: Dict[str, Any], target_node_id: str) -> List[D ) return paths + + +def compute_graph_data_sources(graph: Dict[str, Any], target_node_id: str) -> Dict[str, Any]: + """Return scope-aware data sources for the DataPicker. + + Determines which ancestor nodes are valid sources for ``target_node_id``, + taking loop scoping into account: + + - If ``target_node_id`` is on the *Done* branch of a ``flow.loop``, the + loop body nodes are excluded from ``availableSourceIds`` and the loop + node itself is mapped to its *Fertig* output port (index 1) via + ``portIndexOverrides``. + - If ``target_node_id`` is *inside* the loop body, the loop node id is + included in ``loopBodyContextIds`` so the frontend can show the lexical + loop variables (currentItem, currentIndex, count). + + Returns:: + + { + "availableSourceIds": [...], # ordered list + "portIndexOverrides": {nodeId: n}, # non-zero port indices + "loopBodyContextIds": [...], # loops whose body this node is in + } + """ + nodes = graph.get("nodes") or [] + connections = graph.get("connections") or [] + node_by_id: Dict[str, Any] = {n["id"]: n for n in nodes if n.get("id")} + + if target_node_id not in node_by_id: + return {"availableSourceIds": [], "portIndexOverrides": {}, "loopBodyContextIds": []} + + conn_map = buildConnectionMap(connections) + + # Collect all ancestors via backward BFS + preds: Dict[str, Set[str]] = {} + for tgt, pairs in conn_map.items(): + for src, _, _ in pairs: + preds.setdefault(tgt, set()).add(src) + + seen: Set[str] = set() + stack = [target_node_id] + ancestors: Set[str] = set() + while stack: + cur = stack.pop() + for p in preds.get(cur, ()): + if p not in seen: + seen.add(p) + ancestors.add(p) + stack.append(p) + + body_nodes_to_exclude: Set[str] = set() + port_index_overrides: Dict[str, int] = {} + loop_body_context_ids: List[str] = [] + + for aid in ancestors: + anode = node_by_id.get(aid) or {} + if anode.get("type") != "flow.loop": + continue + body_ids = getLoopBodyNodeIds(aid, conn_map) + done_ids = getLoopDoneNodeIds(aid, conn_map) + + if target_node_id in body_ids: + loop_body_context_ids.append(aid) + elif target_node_id in done_ids: + body_nodes_to_exclude.update(body_ids) + port_index_overrides[aid] = 1 + + available_source_ids = [ + aid for aid in sorted(ancestors) + if aid not in body_nodes_to_exclude + ] + + return { + "availableSourceIds": available_source_ids, + "portIndexOverrides": port_index_overrides, + "loopBodyContextIds": loop_body_context_ids, + } diff --git a/modules/interfaces/interfaceBootstrap.py b/modules/interfaces/interfaceBootstrap.py index b7a56a02..1f450d0c 100644 --- a/modules/interfaces/interfaceBootstrap.py +++ b/modules/interfaces/interfaceBootstrap.py @@ -308,7 +308,6 @@ def _buildSystemTemplates(): "title": "Pro E-Mail", "parameters": { "items": {"type": "ref", "nodeId": "n2", "path": ["emails"]}, - "level": "auto", "concurrency": 1, }, }, @@ -348,7 +347,6 @@ def _buildSystemTemplates(): "title": "Pro Dokument", "parameters": { "items": {"type": "ref", "nodeId": "n2", "path": ["files"]}, - "level": "auto", "concurrency": 1, }, }, diff --git a/modules/serviceCenter/services/serviceGeneration/subDocumentUtility.py b/modules/serviceCenter/services/serviceGeneration/subDocumentUtility.py index 594fbe02..d3fddeb1 100644 --- a/modules/serviceCenter/services/serviceGeneration/subDocumentUtility.py +++ b/modules/serviceCenter/services/serviceGeneration/subDocumentUtility.py @@ -4,10 +4,76 @@ import json import logging import os import re -from typing import Any, Dict +from typing import Any, Dict, List, Optional logger = logging.getLogger(__name__) +_MAX_AUTO_TABLE_COLS = 64 +_MAX_AUTO_TABLE_ROWS = 5000 +_MAX_AUTO_CELL_CHARS = 8000 + + +def _sanitize_cell_for_pipe_table(cell: str) -> str: + """Single-line cell safe for markdown pipe tables (no raw ``|``).""" + s = str(cell).replace("\r\n", "\n").replace("\r", "\n") + s = " ".join(line.strip() for line in s.split("\n") if line.strip()).strip() + return s.replace("|", "·") + + +def _try_delimited_block_as_markdown_table(block: str) -> Optional[str]: + """If ``block`` is a uniform tab- or semicolon-separated grid, return a pipe markdown table.""" + lines = [ln.strip() for ln in block.replace("\r\n", "\n").replace("\r", "\n").split("\n")] + lines = [ln for ln in lines if ln] + if len(lines) < 2: + return None + for sep in ("\t", ";"): + rows: List[List[str]] = [] + bad = False + for ln in lines: + cells = [c.strip() for c in ln.split(sep)] + if len(cells) < 2: + bad = True + break + rows.append(cells) + if bad: + continue + ncols = len(rows[0]) + if ncols > _MAX_AUTO_TABLE_COLS or len(rows) > _MAX_AUTO_TABLE_ROWS: + continue + if any(len(r) != ncols for r in rows): + continue + if any(len(_sanitize_cell_for_pipe_table(c)) > _MAX_AUTO_CELL_CHARS for r in rows for c in r): + continue + + def _row_md(r: List[str]) -> str: + return "| " + " | ".join(_sanitize_cell_for_pipe_table(c) for c in r) + " |" + + header = _row_md(rows[0]) + divider = "| " + " | ".join(["---"] * ncols) + " |" + body = "\n".join(_row_md(r) for r in rows[1:]) + return "\n".join([header, divider, body]) + return None + + +def enhancePlainTextWithMarkdownTables(body: str) -> str: + """Detect delimiter-separated grids in plain paragraphs and convert them to markdown pipe tables. + + Extractors often emit CSV-like blocks (``;`` or TAB) without markdown markers; passing those + straight into ``markdownToDocumentJson`` produced one giant paragraph. This pass runs only + on whitespace-separated blocks so normal prose stays unchanged. + """ + if not isinstance(body, str) or not body.strip(): + return body if isinstance(body, str) else "" + chunks = re.split(r"\n\s*\n", body.strip()) + out_parts: List[str] = [] + for ch in chunks: + ch = ch.strip() + if not ch: + continue + md_table = _try_delimited_block_as_markdown_table(ch) + out_parts.append(md_table if md_table else ch) + return "\n\n".join(out_parts) + def _parseInlineRuns(text: str) -> list: """ diff --git a/modules/workflows/automation2/executionEngine.py b/modules/workflows/automation2/executionEngine.py index 61dc8166..9df8cf9b 100644 --- a/modules/workflows/automation2/executionEngine.py +++ b/modules/workflows/automation2/executionEngine.py @@ -15,6 +15,8 @@ from modules.workflows.automation2.graphUtils import ( topoSort, getInputSources, getLoopBodyNodeIds, + getLoopDoneNodeIds, + getLoopPrimaryInputSource, ) from modules.workflows.automation2.executors import ( @@ -26,7 +28,7 @@ from modules.workflows.automation2.executors import ( PauseForHumanTaskError, PauseForEmailWaitError, ) -from modules.features.graphicalEditor.portTypes import normalizeToSchema +from modules.features.graphicalEditor.portTypes import normalizeToSchema, wrapTransit, unwrapTransit from modules.features.graphicalEditor.nodeDefinitions import STATIC_NODE_TYPES from modules.serviceCenter.services.serviceSubscription.mainServiceSubscription import SubscriptionInactiveException as _SubscriptionInactiveException from modules.serviceCenter.services.serviceBilling.mainServiceBilling import BillingContextError as _BillingContextError @@ -341,6 +343,98 @@ def _substituteFeatureInstancePlaceholders( return _json.loads(replaced) +async def _run_post_loop_done_nodes( + *, + loop_node_id: str, + body_ids: Set[str], + items: List[Any], + ordered: List[Dict], + connectionMap: Dict[str, List], + nodeOutputs: Dict[str, Any], + context: Dict[str, Any], + services: Any, + automation2_interface: Optional[Any], + runId: Optional[str], + processed_in_loop: Set[str], +) -> Optional[Dict[str, Any]]: + """After all loop iterations: merge upstream into loop output and run the Done (output 1) branch once.""" + _prim_in = getLoopPrimaryInputSource(loop_node_id, connectionMap, body_ids) + _upstream_loop = nodeOutputs.get(_prim_in[0]) if _prim_in else None + _base_raw = unwrapTransit(_upstream_loop) if isinstance(_upstream_loop, dict) and _upstream_loop.get("_transit") else _upstream_loop + _prev_loop_out = nodeOutputs.get(loop_node_id) + # ``bodyResults`` lives on the plain iteration-state dict; after resume / edge + # cases the loop slot may still be wrapped in Transit — unwrap before read. + _prev_plain = _prev_loop_out + if isinstance(_prev_loop_out, dict) and _prev_loop_out.get("_transit"): + _prev_plain = unwrapTransit(_prev_loop_out) + _body_results = ( + _prev_plain.get("bodyResults") if isinstance(_prev_plain, dict) else None + ) + if not isinstance(_base_raw, dict): + raise RuntimeError( + f"flow.loop {loop_node_id}: primary upstream output must be a dict (JSON handover / node output); " + f"got {type(_base_raw).__name__}" + ) + _merged_loop = {**_base_raw, "items": items, "count": len(items)} + if _body_results is not None: + _merged_loop["bodyResults"] = _body_results + nodeOutputs[loop_node_id] = wrapTransit(_merged_loop, {"loopCompleted": True, "loopNodeId": loop_node_id}) + + _done_all = getLoopDoneNodeIds(loop_node_id, connectionMap) + _done_only = _done_all - body_ids + _done_ordered = [n for n in ordered if n.get("id") in _done_only] + for _dn in _done_ordered: + _dnid = _dn.get("id") + if not _dnid or context.get("_stopped"): + break + if not _is_node_on_active_path(_dnid, connectionMap, nodeOutputs): + _skipSnap = {"_skipReason": "inactive_branch"} + for _sSrc, _, _ in connectionMap.get(_dnid, []): + if _sSrc in nodeOutputs: + _skipSnap[_sSrc] = nodeOutputs[_sSrc] + _skId = _createStepLog(automation2_interface, runId, _dnid, _dn.get("type", ""), status="skipped", inputSnapshot=_skipSnap) + if _skId: + _updateStepLog(automation2_interface, _skId, "skipped") + continue + _dexec = _getExecutor(_dn.get("type", ""), services, automation2_interface) + if not _dexec: + nodeOutputs[_dnid] = None + continue + _dStart = time.time() + _dIn = {} + for _src, _, _ in connectionMap.get(_dnid, []): + if _src in nodeOutputs: + _dIn[_src] = nodeOutputs[_src] + _dStepId = _createStepLog(automation2_interface, runId, _dnid, _dn.get("type", ""), "running", _dIn) + try: + _dres, _dRetry = await _executeWithRetry(_dexec, _dn, context) + _dres = _normalizeResult(_dres, _dn.get("type", "")) + nodeOutputs[_dnid] = _dres + _dDur = int((time.time() - _dStart) * 1000) + _dTok = _dres.get("tokensUsed", 0) if isinstance(_dres, dict) else 0 + _updateStepLog(automation2_interface, _dStepId, "completed", + output=_dres if isinstance(_dres, dict) else {"value": _dres}, + durationMs=_dDur, tokensUsed=_dTok, retryCount=_dRetry) + except PauseForHumanTaskError: + _updateStepLog(automation2_interface, _dStepId, "completed", + durationMs=int((time.time() - _dStart) * 1000)) + raise + except PauseForEmailWaitError: + _updateStepLog(automation2_interface, _dStepId, "completed", + durationMs=int((time.time() - _dStart) * 1000)) + raise + except (_SubscriptionInactiveException, _BillingContextError): + _updateStepLog(automation2_interface, _dStepId, "failed", + error="Subscription/Billing error", durationMs=int((time.time() - _dStart) * 1000)) + raise + except Exception as _dex: + _updateStepLog(automation2_interface, _dStepId, "failed", + error=str(_dex), durationMs=int((time.time() - _dStart) * 1000)) + raise + processed_in_loop.update(_done_only) + return None + + async def executeGraph( graph: Dict[str, Any], services: Any, @@ -510,6 +604,14 @@ async def executeGraph( body_ids = getLoopBodyNodeIds(loop_node_id, connectionMap) if loop_node_id else set() body_ordered = [n for n in ordered if n.get("id") in body_ids] processed_in_loop = set(body_ids) | {loop_node_id} if loop_node_id else set() + _resume_feedback_body_node_id = None + for _fb_src, _fb_so, _fb_ti in (connectionMap.get(loop_node_id) or []): + if _fb_src in body_ids and _fb_ti == 0: + _resume_feedback_body_node_id = _fb_src + break + if not _resume_feedback_body_node_id and body_ordered: + _resume_feedback_body_node_id = body_ordered[-1].get("id") + _resume_body_results: List[Any] = [] while next_index < len(items) and loop_node_id: nodeOutputs[loop_node_id] = { "items": items, @@ -547,6 +649,8 @@ async def executeGraph( output=result if isinstance(result, dict) else {"value": result}, durationMs=_rDur, retryCount=_rRetry) logger.info("executeGraph loop resume body node %s done (iter %d, retries=%d)", bnid, next_index, _rRetry) + if _resume_feedback_body_node_id and bnid == _resume_feedback_body_node_id: + _resume_body_results.append(result) except PauseForHumanTaskError as e: _updateStepLog(automation2_interface, _rStepId, "completed", durationMs=int((time.time() - _rStepStart) * 1000)) @@ -575,11 +679,27 @@ async def executeGraph( return {"success": False, "error": str(ex), "nodeOutputs": _serializableOutputs(nodeOutputs), "failedNode": bnid, "runId": runId} next_index += 1 if loop_node_id: - nodeOutputs[loop_node_id] = {"items": items, "count": len(items)} for aggId, accItems in _aggregateAccumulators.items(): nodeOutputs[aggId] = {"items": accItems, "count": len(accItems), "_success": True} _aggregateAccumulators.clear() - processed_in_loop = set(body_ids) | {loop_node_id} + if _resume_body_results: + _rlo = nodeOutputs.get(loop_node_id) + if isinstance(_rlo, dict): + _rlo["bodyResults"] = _resume_body_results + nodeOutputs[loop_node_id] = _rlo + await _run_post_loop_done_nodes( + loop_node_id=loop_node_id, + body_ids=body_ids, + items=items, + ordered=ordered, + connectionMap=connectionMap, + nodeOutputs=nodeOutputs, + context=context, + services=services, + automation2_interface=automation2_interface, + runId=runId, + processed_in_loop=processed_in_loop, + ) for i, node in enumerate(ordered): if skip_until_passed: @@ -593,7 +713,20 @@ async def executeGraph( break nodeId = node.get("id") nodeType = node.get("type", "") - if not _is_node_on_active_path(nodeId, connectionMap, nodeOutputs): + # flow.loop: the feedback edge (body → loop input 0) hasn't run yet on the first + # pass → would make _is_node_on_active_path return False. Only check the + # *primary* predecessor (the one outside the loop body). + if nodeType == "flow.loop": + _loop_body_ids = getLoopBodyNodeIds(nodeId, connectionMap) + _loop_primary = getLoopPrimaryInputSource(nodeId, connectionMap, _loop_body_ids) + _loop_check_map = ( + {nodeId: [(_loop_primary[0], _loop_primary[1], 0)]} + if _loop_primary else connectionMap + ) + _loop_active = _is_node_on_active_path(nodeId, _loop_check_map, nodeOutputs) + else: + _loop_active = _is_node_on_active_path(nodeId, connectionMap, nodeOutputs) + if not _loop_active: logger.info("executeGraph step %d/%d: nodeId=%s SKIP (inactive branch)", i + 1, len(ordered), nodeId) _skipInputSnap = {"_skipReason": "inactive_branch"} for _sSrc, _, _ in connectionMap.get(nodeId, []): @@ -635,6 +768,17 @@ async def executeGraph( _loopConcurrency = max(1, min(_loopConcurrency, 20)) _batchMode = len(items) > STEPLOG_BATCH_THRESHOLD _aggLock = asyncio.Lock() + # Prefer the *last* body node wired to loop input 0 (feedback / + # pipeline end) — first matching inbound edge can be a shallow node. + _feedback_candidates = [ + _fb_src + for _fb_src, _fb_so, _fb_ti in (connectionMap.get(nodeId) or []) + if _fb_src in body_ids and _fb_ti == 0 + ] + _feedback_body_node_id = _feedback_candidates[-1] if _feedback_candidates else None + if not _feedback_body_node_id and body_ordered: + _feedback_body_node_id = body_ordered[-1].get("id") + _bodyResultsPerIter: List[Any] = [None] * len(items) async def _runLoopIteration(_idx: int, _item: Any) -> Optional[Dict]: """Execute all body nodes for one iteration. Returns error dict or None.""" @@ -712,6 +856,10 @@ async def executeGraph( logger.exception("executeGraph loop body node %s FAILED (iter %d): %s", bnid, _idx, ex) return {"_error": str(ex), "failedNode": bnid} + if _feedback_body_node_id: + async with _aggLock: + if _idx < len(_bodyResultsPerIter): + _bodyResultsPerIter[_idx] = _activeOutputs.get(_feedback_body_node_id) if _batchMode and _idx > 0 and _idx % STEPLOG_BATCH_THRESHOLD == 0 and runId: _emitStepEvent(runId, {"type": "loop_progress", "nodeId": nodeId, "iteration": _idx, "total": len(items)}) return None @@ -755,7 +903,6 @@ async def executeGraph( _activeRunContexts.pop(runId, None) return {"success": False, "error": _rval["_error"], "nodeOutputs": _serializableOutputs(nodeOutputs), "failedNode": _rval.get("failedNode"), "runId": runId} - nodeOutputs[nodeId] = {"items": items, "count": len(items)} for aggId, accItems in _aggregateAccumulators.items(): allChunks = _aggregateTempChunks.pop(aggId, []) finalItems = [] @@ -764,6 +911,29 @@ async def executeGraph( finalItems.extend(accItems) nodeOutputs[aggId] = {"items": finalItems, "count": len(finalItems), "_success": True} _aggregateAccumulators.clear() + + # Always attach ``bodyResults`` (list per iteration, possibly None + # placeholders) so DataRefs to ``bodyResults`` resolve and + # ``context.mergeContext`` can fall back to the wired loop output. + _lo = nodeOutputs.get(nodeId) + if isinstance(_lo, dict): + _lo["bodyResults"] = _bodyResultsPerIter + nodeOutputs[nodeId] = _lo + + await _run_post_loop_done_nodes( + loop_node_id=nodeId, + body_ids=body_ids, + items=items, + ordered=ordered, + connectionMap=connectionMap, + nodeOutputs=nodeOutputs, + context=context, + services=services, + automation2_interface=automation2_interface, + runId=runId, + processed_in_loop=processed_in_loop, + ) + _updateStepLog(automation2_interface, _stepId, "completed", output={"iterationCount": len(items), "items": len(items), "concurrency": _loopConcurrency, "batchMode": _batchMode}, durationMs=int((time.time() - _stepStartMs) * 1000)) diff --git a/modules/workflows/automation2/executors/actionNodeExecutor.py b/modules/workflows/automation2/executors/actionNodeExecutor.py index 46264656..5d03298f 100644 --- a/modules/workflows/automation2/executors/actionNodeExecutor.py +++ b/modules/workflows/automation2/executors/actionNodeExecutor.py @@ -24,6 +24,74 @@ from modules.workflows.automation2.executors.inputExecutor import PauseForHumanT logger = logging.getLogger(__name__) +_FILE_CREATE_CTX_LOG_MAX = 500 + + +def _truncate_for_log(val: Any, max_len: int = _FILE_CREATE_CTX_LOG_MAX) -> str: + s = val if isinstance(val, str) else repr(val) + s = s.replace("\r", "\\r").replace("\n", "\\n") + if len(s) <= max_len: + return s + return s[:max_len] + f"...<{len(s)} chars>" + + +def _log_file_create_context_resolution( + node_id: str, + raw_params: Dict[str, Any], + resolved_params: Dict[str, Any], + exec_context: Dict[str, Any], +) -> None: + """Debug ``file.create`` when ``context`` resolves empty — trace refs and upstream output.""" + raw_c = raw_params.get("context") + res_c = resolved_params.get("context") + node_outputs = exec_context.get("nodeOutputs") or {} + input_sources = (exec_context.get("inputSources") or {}).get(node_id) or {} + src_entry = input_sources.get(0) + src_id = src_entry[0] if src_entry else None + upstream = node_outputs.get(src_id) if src_id else None + + up_summary = "missing" + up_resp_len = -1 + up_transit = False + if isinstance(upstream, dict): + up_transit = bool(upstream.get("_transit")) + inner = upstream.get("data") if up_transit else upstream + up_keys = sorted(k for k in upstream.keys() if not str(k).startswith("_") or k in ("_transit", "_success")) + up_resp_len = len(str((inner if isinstance(inner, dict) else upstream).get("response") or "")) + up_summary = "keys=%s transit=%s response_len=%s _success=%s" % ( + up_keys[:25], + up_transit, + up_resp_len, + upstream.get("_success"), + ) + + def _shape(name: str, v: Any) -> str: + if v is None: + return f"{name}=None" + if isinstance(v, dict) and v.get("type") == "ref": + return f"{name}=ref(nodeId={v.get('nodeId')!r}, path={v.get('path')!r})" + if isinstance(v, list): + if v and all(isinstance(x, dict) and x.get("type") == "ref" for x in v): + bits = [ + f"ref({x.get('nodeId')!r},{x.get('path')!r})" + for x in v[:5] + ] + return f"{name}=contextBuilder[{len(v)} refs: {', '.join(bits)}{'…' if len(v) > 5 else ''}]" + return f"{name}=list(len={len(v)}, elem0_type={type(v[0]).__name__})" + if isinstance(v, str): + return f"{name}=str(len={len(v)}, preview={_truncate_for_log(v, 240)!r})" + return f"{name}={type(v).__name__}({_truncate_for_log(v)!r})" + + logger.info( + "file.create context resolution node=%s port0=%r upstream_node=%s upstream: %s | %s | %s", + node_id, + src_id, + src_id, + up_summary, + _shape("raw", raw_c), + _shape("resolved", res_c), + ) + def _looks_like_ascii_base64_payload(s: str) -> bool: """Heuristic: ActionDocument binary payloads use standard ASCII base64; markdown/text uses other chars (#, *, -, …).""" @@ -336,14 +404,36 @@ def _getOutputSchemaName(nodeDef: Dict) -> str: def _resolveUpstreamPayload(nodeId: str, context: Dict[str, Any]) -> Any: - """Return the unwrapped output of the node connected to input port 0, or None.""" + """Return the unwrapped output of the primary inbound wire to ``nodeId``. + + Prefer logical input port 0. Some persisted graphs register the only edge + under a non-zero ``targetInput`` — fall back to the sole inbound port or + the first ``connectionMap`` entry so ``injectUpstreamPayload`` (e.g. + ``context.mergeContext`` after ``flow.loop``) still receives data. + """ from modules.features.graphicalEditor.portTypes import unwrapTransit + + nodeOutputs = context.get("nodeOutputs") or {} + connectionMap = context.get("connectionMap") or {} src_map = (context.get("inputSources") or {}).get(nodeId) or {} + entry = src_map.get(0) + if not entry and src_map: + if len(src_map) == 1: + entry = next(iter(src_map.values())) + else: + mi = min(src_map.keys()) + entry = src_map.get(mi) + if not entry and connectionMap.get(nodeId): + inc = connectionMap[nodeId] + if inc: + src_node_id, _so, _ti = inc[0] + entry = (src_node_id, _so) + if not entry: return None src_node_id, _ = entry - upstream = (context.get("nodeOutputs") or {}).get(src_node_id) + upstream = nodeOutputs.get(src_node_id) return unwrapTransit(upstream) if isinstance(upstream, dict) else upstream @@ -446,6 +536,9 @@ class ActionNodeExecutor: # 4. Apply declarative paramMappers from the node definition _applyParamMappers(nodeDef, resolvedParams) + if nodeType == "file.create": + _log_file_create_context_resolution(nodeId, params, resolvedParams, context) + # 5. email.checkEmail pause for email wait if nodeType == "email.checkEmail": runId = context.get("_runId") @@ -533,18 +626,6 @@ class ActionNodeExecutor: rawData = getattr(d, "documentData", None) if hasattr(d, "documentData") else (dumped.get("documentData") if isinstance(dumped, dict) else None) rawBytes = _coerce_document_data_to_bytes(rawData) - # Extracted page images are workflow intermediates — keep bytes as base64 on the - # ActionDocument only; do not create rows in the user's file library (Meine Dateien). - if isinstance(dumped, dict) and rawBytes: - _meta = dumped.get("validationMetadata") if isinstance(dumped.get("validationMetadata"), dict) else {} - if ( - _meta.get("actionType") == "context.extractContent" - and _meta.get("handoverRole") == "extractedMedia" - ): - dumped["documentData"] = base64.b64encode(rawBytes).decode("ascii") - dumped["_hasBinaryData"] = True - docsList.append(dumped) - continue if isinstance(dumped, dict) and rawBytes: try: from modules.interfaces.interfaceDbManagement import getInterface as _getMgmtInterface @@ -597,18 +678,10 @@ class ActionNodeExecutor: extractedContext = "" rd_early = getattr(result, "data", None) - if isinstance(rd_early, dict) and rd_early.get("response") is not None: - extractedContext = str(rd_early.get("response")).strip() - elif result.documents: - doc = result.documents[0] - raw = getattr(doc, "documentData", None) if hasattr(doc, "documentData") else (doc.get("documentData") if isinstance(doc, dict) else None) - if isinstance(raw, bytes): - try: - extractedContext = raw.decode("utf-8").strip() - except (UnicodeDecodeError, ValueError): - extractedContext = "" - elif raw: - extractedContext = str(raw).strip() + if isinstance(rd_early, dict): + _r = rd_early.get("response") + if _r is not None and str(_r).strip(): + extractedContext = str(_r).strip() promptText = str(resolvedParams.get("aiPrompt") or resolvedParams.get("prompt") or "").strip() resultData = getattr(result, "data", None) @@ -657,7 +730,19 @@ class ActionNodeExecutor: if not rsp: out["response"] = extractedContext or "" if result.success: - out["imageDocumentsOnly"] = _image_documents_from_docs_list(docsList) + img_only = _image_documents_from_docs_list(docsList) + # mergeContext packs iterated payloads under ``data.merged`` only — ``documents`` + # on the ActionResult is empty, so image sidecars live on ``merged.imageDocumentsOnly``. + if ( + nodeType == "context.mergeContext" + and isinstance(result.data, dict) + ): + merged_blob = result.data.get("merged") + if isinstance(merged_blob, dict): + merged_imgs = merged_blob.get("imageDocumentsOnly") + if isinstance(merged_imgs, list) and merged_imgs: + img_only = merged_imgs + out["imageDocumentsOnly"] = img_only if outputSchema == "TaskResult" and result.success and docsList: try: diff --git a/modules/workflows/automation2/executors/flowExecutor.py b/modules/workflows/automation2/executors/flowExecutor.py index 511be6ff..e0836db8 100644 --- a/modules/workflows/automation2/executors/flowExecutor.py +++ b/modules/workflows/automation2/executors/flowExecutor.py @@ -2,7 +2,7 @@ # Flow control node executor (ifElse, switch, loop, merge). import logging -from typing import Any, Dict +from typing import Any, Dict, List from modules.features.graphicalEditor.portTypes import wrapTransit, unwrapTransit @@ -279,26 +279,50 @@ class FlowExecutor: async def _loop(self, node: Dict, nodeOutputs: Dict, nodeId: str, inputSources: Dict) -> Any: params = node.get("parameters") or {} itemsPath = params.get("items", "[]") - level = params.get("level", "auto") from modules.workflows.automation2.graphUtils import resolveParameterReferences - items = resolveParameterReferences(itemsPath, nodeOutputs) - if level != "auto" and isinstance(items, dict): - items = self._resolveUdmLevel(items, level) - elif isinstance(items, list): - pass - elif isinstance(items, dict): - children = items.get("children") - if isinstance(children, list) and children: - items = children - else: - items = [{"name": k, "value": v} for k, v in items.items()] - else: - items = [items] if items is not None else [] + raw = resolveParameterReferences(itemsPath, nodeOutputs) + items = self._normalize_loop_items(raw) + mode = (params.get("iterationMode") or "all").strip().lower() + stride = params.get("iterationStride", 2) + try: + stride_int = int(stride) + except (TypeError, ValueError): + stride_int = 2 + items = self._apply_iteration_mode(items, mode, stride_int) return {"items": items, "count": len(items)} + def _normalize_loop_items(self, raw: Any) -> List[Any]: + """Coerce resolved `items` into a list (lists, dict children, or scalars).""" + if isinstance(raw, list): + return raw + if isinstance(raw, dict): + children = raw.get("children") + if isinstance(children, list) and len(children) > 0: + return children + return [{"name": k, "value": v} for k, v in raw.items()] + return [raw] if raw is not None else [] + + def _apply_iteration_mode(self, items: List[Any], mode: str, stride: int) -> List[Any]: + """Select which elements to iterate over (backend-defined modes).""" + if not items: + return [] + m = (mode or "all").strip().lower() + if m == "first": + return items[:1] + if m == "last": + return items[-1:] + if m == "every_second": + return items[::2] + if m == "every_third": + return items[::3] + if m == "every_nth": + step = max(2, min(100, int(stride))) + return items[::step] + return list(items) + def _resolveUdmLevel(self, udm: Dict, level: str) -> list: - """Extract items from a UDM document/node at the requested structural level.""" + """Extract items from a UDM document/node at the requested structural level (test / tooling).""" children = udm.get("children") or [] if level == "documents": return [c for c in children if isinstance(c, dict) and c.get("role") in ("document", "archive")] diff --git a/modules/workflows/automation2/graphUtils.py b/modules/workflows/automation2/graphUtils.py index 3a4ee5bd..3c1ceb82 100644 --- a/modules/workflows/automation2/graphUtils.py +++ b/modules/workflows/automation2/graphUtils.py @@ -48,26 +48,93 @@ def buildConnectionMap(connections: List[Dict]) -> Dict[str, List[Tuple[str, int def getLoopBodyNodeIds(loopNodeId: str, connectionMap: Dict[str, List[Tuple[str, int, int]]]) -> Set[str]: - """Nodes reachable from loop's output (BFS forward). Body = downstream nodes that receive from loop.""" + """Nodes reachable from flow.loop output port 0 only (loop body), BFS forward. + + Edges vom Rumpf zurück in den Loop-Knoten (gleicher Eingang wie der Hauptfluss) beenden die + Expansion am Loop-Knoten — der Loop-Knoten selbst ist nie Teil des Rumpfes. + """ from collections import deque - body = set() - # connectionMap: target -> [(source, sourceOutput, targetInput)] - rev: Dict[str, List[str]] = {} # source -> [targets] + + body: Set[str] = set() + rev: Dict[str, List[Tuple[str, int, int]]] = {} for tgt, pairs in connectionMap.items(): - for src, _, _ in pairs: - if src not in rev: - rev[src] = [] - rev[src].append(tgt) - q = deque([loopNodeId]) + for src, so, ti in pairs: + rev.setdefault(src, []).append((tgt, so, ti)) + + q: deque = deque() + for tgt, so, ti in rev.get(loopNodeId, []): + if so != 0: + continue + if tgt == loopNodeId: + continue + q.append(tgt) + while q: nid = q.popleft() - for tgt in rev.get(nid, []): - if tgt not in body: - body.add(tgt) - q.append(tgt) + if nid == loopNodeId: + continue + if nid not in body: + body.add(nid) + for tgt, _so, _ti in rev.get(nid, []): + if tgt == loopNodeId: + continue + if tgt not in body: + q.append(tgt) return body +def getLoopPrimaryInputSource( + loop_node_id: str, + connectionMap: Dict[str, List[Tuple[str, int, int]]], + body_ids: Set[str], +) -> Optional[Tuple[str, int]]: + """Pick the inbound edge for ``flow.loop`` when several wires hit the same input (0). + + The Schleifen-Rücklauf vom Rumpf und der „normale“ Vorgänger enden auf demselben Port; + für die Datenzusammenführung (Fertig-Ausgang, Logs) zählt der Vorgänger **außerhalb** des Rumpfes. + """ + incoming = connectionMap.get(loop_node_id, []) + candidates = [(src, so) for src, so, ti in incoming if ti == 0] + if not candidates: + return None + outside = [(src, so) for src, so in candidates if src not in body_ids] + if outside: + return outside[0] + return candidates[0] + + +def getLoopDoneNodeIds(loopNodeId: str, connectionMap: Dict[str, List[Tuple[str, int, int]]]) -> Set[str]: + """Nodes reachable from flow.loop output port 1 (runs once after all iterations).""" + from collections import deque + + done: Set[str] = set() + rev: Dict[str, List[Tuple[str, int, int]]] = {} + for tgt, pairs in connectionMap.items(): + for src, so, ti in pairs: + rev.setdefault(src, []).append((tgt, so, ti)) + + q: deque = deque() + for tgt, so, ti in rev.get(loopNodeId, []): + if so != 1: + continue + if tgt == loopNodeId: + continue + q.append(tgt) + + while q: + nid = q.popleft() + if nid == loopNodeId: + continue + if nid not in done: + done.add(nid) + for tgt, _so, _ti in rev.get(nid, []): + if tgt == loopNodeId: + continue + if tgt not in done: + q.append(tgt) + return done + + def getInputSources(nodeId: str, connectionMap: Dict[str, List[Tuple[str, int, int]]]) -> Dict[int, Tuple[str, int]]: """ For a node, return targetInput -> (sourceNodeId, sourceOutput). @@ -417,7 +484,7 @@ def resolveParameterReferences(value: Any, nodeOutputs: Dict[str, Any]) -> Any: resolved_parts = [resolveParameterReferences(v, nodeOutputs) for v in value] if len(resolved_parts) == 1: return resolved_parts[0] - parts = [serialize_context(p) for p in resolved_parts] + parts = [serialize_context(p, prefer_handover_primary=True) for p in resolved_parts] return "\n\n".join(p for p in parts if p) return [resolveParameterReferences(v, nodeOutputs) for v in value] return value diff --git a/modules/workflows/methods/methodAi/_common.py b/modules/workflows/methods/methodAi/_common.py index f198c6ac..60609104 100644 --- a/modules/workflows/methods/methodAi/_common.py +++ b/modules/workflows/methods/methodAi/_common.py @@ -4,7 +4,7 @@ """Shared helpers for AI workflow actions.""" import json -from typing import Any +from typing import Any, Optional def is_image_action_document_list(val: Any) -> bool: @@ -20,24 +20,42 @@ def is_image_action_document_list(val: Any) -> bool: return True -def serialize_context(val: Any) -> str: +def _handover_response_plain(val: Any) -> Optional[str]: + """If ``val`` is a dict with a non-empty ``response`` string, return it (BOM-stripped).""" + if not isinstance(val, dict): + return None + r = val.get("response") + if r is None or not str(r).strip(): + return None + return str(r).strip().lstrip("\ufeff") + + +def serialize_context(val: Any, *, prefer_handover_primary: bool = False) -> str: """Convert any context value to a readable string for use in AI prompts. - None / empty string → "" - empty dict (no keys) → "" (avoids literal "{}" in file.create / prompts) - str → as-is - - dict / list → pretty-printed JSON + - dict / list → pretty-printed JSON (unless ``prefer_handover_primary`` and dict has ``response``) + - if JSON encoding fails (cycles, etc.) but dict has ``response``, return that text instead of ``str(dict)`` - anything else → str() """ if val is None or val == "" or val == []: return "" if isinstance(val, dict) and len(val) == 0: return "" + if prefer_handover_primary: + got = _handover_response_plain(val) + if got is not None: + return got if isinstance(val, str): - return val.strip() + return val.strip().lstrip("\ufeff") try: - return json.dumps(val, ensure_ascii=False, indent=2) + return json.dumps(val, ensure_ascii=False, indent=2, default=str) except Exception: + got = _handover_response_plain(val) + if got is not None: + return got return str(val) diff --git a/modules/workflows/methods/methodContext/actions/extractContent.py b/modules/workflows/methods/methodContext/actions/extractContent.py index e055af17..659d0ea5 100644 --- a/modules/workflows/methods/methodContext/actions/extractContent.py +++ b/modules/workflows/methods/methodContext/actions/extractContent.py @@ -30,6 +30,38 @@ _UNSAFE_FILE_KEY = re.compile(r"[^\w\-.\(\)\[\]%@+]") HANDOVER_KIND = "context.extractContent.handover.v1" +_CONTENT_FILTER_OPTIONS = ("all", "textOnly", "imagesOnly", "noImages") + + +def _apply_content_filter(payload: Dict[str, Any], content_filter: str) -> Dict[str, Any]: + """Filter parts in the handover payload by content_filter. + + Semantics: + - all: keep every part (no-op). + - textOnly: whitelist — only typeGroup in (text, table, structure). + - imagesOnly: whitelist — only typeGroup == image. + - noImages: blacklist — every typeGroup except image (wider than textOnly; + future non-image types are retained). + """ + import copy + + if content_filter == "all": + return payload + result = copy.deepcopy(payload) + for bucket in (result.get("files") or {}).values(): + if not isinstance(bucket, dict): + continue + parts = bucket.get("parts") or [] + if content_filter == "textOnly": + parts = [p for p in parts if isinstance(p, dict) and (p.get("typeGroup") or "") in ("text", "table", "structure")] + elif content_filter == "imagesOnly": + parts = [p for p in parts if isinstance(p, dict) and (p.get("typeGroup") or "") == "image"] + elif content_filter == "noImages": + parts = [p for p in parts if isinstance(p, dict) and (p.get("typeGroup") or "") != "image"] + bucket["parts"] = parts + bucket["byTypeGroup"] = _rebuild_by_type_group(parts) + return result + def _default_extraction_options() -> ExtractionOptions: """No merge — keep all parts for downstream JSON selection.""" @@ -72,6 +104,19 @@ def _rebuild_by_type_group(parts_ser: List[Dict[str, Any]]) -> Dict[str, List[Di return by_type +def _part_carries_plain_text(p: dict) -> bool: + """Whether a serialized extraction part contributes to a flat ``response`` string.""" + if not isinstance(p, dict): + return False + tg = (p.get("typeGroup") or "").strip() + if tg in ("text", "table"): + return True + mime = (p.get("mimeType") or "").strip().lower() + if tg == "structure" and mime in ("text/plain", "text/html", "text/markdown"): + return True + return False + + def _joined_text_from_handover_payload(payload: Dict[str, Any]) -> str: """Concatenate text parts across fileOrder for AiResult-compatible ``response``.""" files_section = payload.get("files") or {} @@ -85,7 +130,7 @@ def _joined_text_from_handover_payload(payload: Dict[str, Any]) -> str: for p in bucket.get("parts") or []: if not isinstance(p, dict): continue - if (p.get("typeGroup") or "").strip() != "text": + if not _part_carries_plain_text(p): continue raw = p.get("data") if raw is None: @@ -314,11 +359,23 @@ async def extractContent(self, parameters: Dict[str, Any]) -> ActionResult: self.services.chat.progressLogUpdate(operation_id, 0.9, "Building JSON") + content_filter = str(parameters.get("contentFilter") or "all").strip().lower() + if content_filter not in _CONTENT_FILTER_OPTIONS: + content_filter = "all" + payload = _apply_content_filter(payload, content_filter) + stem = f"{wf}_{int(time.time())}" - stripped_payload, media_docs = _split_images_to_sidecar_documents( - payload, - document_name_stem=stem, - ) + # Only split image sidecars when the filtered payload can still contain image parts. + if content_filter in ("all", "imagesOnly"): + stripped_payload, media_docs = _split_images_to_sidecar_documents( + payload, + document_name_stem=stem, + ) + else: + # textOnly / noImages: no image parts remain → skip the split entirely. + stripped_payload = payload + media_docs = [] + joined_text = _joined_text_from_handover_payload(payload) json_meta = { diff --git a/modules/workflows/methods/methodContext/actions/mergeContext.py b/modules/workflows/methods/methodContext/actions/mergeContext.py index 7b8765a9..3947db30 100644 --- a/modules/workflows/methods/methodContext/actions/mergeContext.py +++ b/modules/workflows/methods/methodContext/actions/mergeContext.py @@ -2,43 +2,28 @@ # All rights reserved. """Action ``context.mergeContext``. -Reads ``_branchInputs`` (injected by ``ActionNodeExecutor`` because the node -declaration sets ``injectBranchInputs: True``) and combines them according to -the selected strategy. +Receives a list of results (e.g. from ``flow.loop`` ``bodyResults``) via the +``dataSource`` DataRef parameter and deep-merges them into a single dict. -The barrier behaviour — waiting until every connected predecessor has produced -output — is handled by the execution engine via ``waitsForAllPredecessors`` on -the node definition; this action is invoked only after all (or ``waitFor``) -inputs are present. +``dataSource`` must be set explicitly (resolved DataRef). There is no implicit +fallback to ``_upstreamPayload`` or loop payloads. """ from __future__ import annotations import copy +import json import logging -from typing import Any, Dict, List, Tuple +from typing import Any, Dict, List, Optional from modules.datamodels.datamodelChat import ActionResult +from modules.workflows.methods.methodContext.actions.extractContent import ( + _joined_text_from_handover_payload, +) logger = logging.getLogger(__name__) -_VALID_STRATEGIES = {"shallow", "deep", "firstWins", "lastWins", "errorOnConflict"} - - -def _shallow_merge(branches: List[Tuple[int, Any]]) -> Tuple[Dict[str, Any], List[str]]: - merged: Dict[str, Any] = {} - conflicts: List[str] = [] - for _, val in branches: - if not isinstance(val, dict): - continue - for k, v in val.items(): - if k in merged and merged[k] != v: - conflicts.append(k) - merged[k] = v - return merged, conflicts - - def _deep_merge(target: Dict[str, Any], source: Dict[str, Any], conflicts: List[str], path: str = "") -> None: for k, v in source.items(): full = f"{path}.{k}" if path else k @@ -48,80 +33,202 @@ def _deep_merge(target: Dict[str, Any], source: Dict[str, Any], conflicts: List[ existing = target[k] if isinstance(existing, dict) and isinstance(v, dict): _deep_merge(existing, v, conflicts, full) + elif isinstance(existing, list) and isinstance(v, list): + target[k] = existing + v else: if existing != v: conflicts.append(full) target[k] = copy.deepcopy(v) if isinstance(v, (dict, list)) else v -def _strategy_first_or_last_wins( - branches: List[Tuple[int, Any]], last: bool -) -> Tuple[Dict[str, Any], List[str]]: - iterator = list(reversed(branches)) if not last else list(branches) - merged: Dict[str, Any] = {} - conflicts: List[str] = [] - for _, val in iterator: - if not isinstance(val, dict): +def _coerce_to_list(value: Any) -> List[Any]: + """Normalise ``value`` to a list of items to merge.""" + if isinstance(value, list): + return value + if value is None: + return [] + return [value] + + +def _strip_document_data(doc: Any) -> Any: + """Keep document metadata but drop the raw blob so deep-merge stays small.""" + if not isinstance(doc, dict): + return doc + out = dict(doc) + out["documentData"] = None + return out + + +def _merge_payload(item: Any) -> Optional[Dict[str, Any]]: + """Return the dict to deep-merge for this item, or ``None`` to skip. + + ``documents[n].documentData`` is nulled before merging so large blobs + (e.g. ~3–4 MB handover-JSON per extractContent iteration) don't accumulate. + ``imageDocumentsOnly`` is left intact — ``_deep_merge`` list-concats it + across iterations, giving downstream nodes all images from all iterations. + """ + if not isinstance(item, dict): + return None + if item.get("success") is False: + return None + out = dict(item) + if isinstance(out.get("documents"), list): + out["documents"] = [_strip_document_data(d) for d in out["documents"]] + return out + + +def _primary_text_from_item(it: Any) -> str: + """Same sources as ``actionNodeExecutor`` / ``context.extractContent`` for primary text.""" + if not isinstance(it, dict): + return "" + r = it.get("response") + if r is not None and str(r).strip(): + return str(r).strip() + inner = it.get("data") + if isinstance(inner, dict): + r = inner.get("response") + if r is not None and str(r).strip(): + return str(r).strip() + docs = it.get("documents") + if not isinstance(docs, list) or not docs: + return "" + doc0 = docs[0] + raw: Any = None + if isinstance(doc0, dict): + raw = doc0.get("documentData") + elif hasattr(doc0, "documentData"): + raw = getattr(doc0, "documentData", None) + if isinstance(raw, bytes): + try: + return raw.decode("utf-8").strip() + except (UnicodeDecodeError, ValueError): + return "" + if isinstance(raw, dict): + return (_joined_text_from_handover_payload(raw) or "").strip() + if isinstance(raw, str) and raw.strip(): + s = raw.strip() + if s.startswith("{") and s.endswith("}"): + try: + parsed = json.loads(s) + if isinstance(parsed, dict): + return (_joined_text_from_handover_payload(parsed) or "").strip() + except (json.JSONDecodeError, TypeError): + pass + return s + return "" + + +def _sanitize_heading_title(name: str) -> str: + t = " ".join(name.replace("\r", " ").replace("\n", " ").split()).strip() + return t[:160] if len(t) > 160 else t + + +def _iteration_heading_from_item(it: Any) -> Optional[str]: + if not isinstance(it, dict): + return None + docs = it.get("documents") + if not isinstance(docs, list) or not docs: + return None + d0 = docs[0] + if not isinstance(d0, dict): + return None + name = d0.get("documentName") + if isinstance(name, str) and name.strip(): + return _sanitize_heading_title(name.strip()) + return None + + +def _synthesize_primary_response(merged: Dict[str, Any], inputs: List[Any]) -> str: + """Flat text for ``ActionResult.response`` / file.create. + + Prefer concatenating each input's primary text (loop bodyResults) so no + iteration is dropped — ``deep_merge`` overwrites scalar ``response`` with + the last item only; that merged value is a fallback when no per-item text + is found. + + When several inputs are merged, prefix each chunk with a markdown ``###`` + heading from ``documents[0].documentName`` so ``file.create`` renders clear + sections (CSV vs PDF vs …). + """ + chunks: List[str] = [] + multi = len(inputs) > 1 + for it in inputs: + t = _primary_text_from_item(it) + if not t: continue - for k, v in val.items(): - if k in merged and merged[k] != v: - conflicts.append(k) - if last or k not in merged: - merged[k] = v - return merged, conflicts + if multi: + h = _iteration_heading_from_item(it) + if h: + chunks.append(f"### {h}\n\n{t}") + continue + chunks.append(t) + if chunks: + return "\n\n".join(chunks) + + if isinstance(merged, dict): + r = merged.get("response") + if r is not None and str(r).strip(): + return str(r).strip() + + if isinstance(merged, dict) and merged: + try: + return json.dumps(merged, ensure_ascii=False, indent=2, default=str) + except Exception: + return str(merged) + return "" async def mergeContext(self, parameters: Dict[str, Any]) -> ActionResult: try: - strategy = str(parameters.get("strategy") or "deep") - if strategy not in _VALID_STRATEGIES: - return ActionResult.isFailure( - error=f"Invalid strategy '{strategy}', expected one of {sorted(_VALID_STRATEGIES)}" - ) + if "dataSource" not in parameters: + raise ValueError("dataSource is required (set a DataRef on the merge node)") + raw = parameters["dataSource"] + if isinstance(raw, str) and not raw.strip(): + raw = None + if raw is None: + return ActionResult.isFailure(error="dataSource ist erforderlich (DataRef auf die Quelle setzen).") + if isinstance(raw, list) and len(raw) == 0: + return ActionResult.isFailure(error="Keine Datenquelle angegeben oder Datenquelle ist leer.") - wait_for = int(parameters.get("waitFor") or 0) - - raw_inputs = parameters.get("_branchInputs") or {} - if not isinstance(raw_inputs, dict): - return ActionResult.isFailure(error="No branch inputs available — connect at least two upstream nodes") - - items: List[Tuple[int, Any]] = sorted( - ((int(k), v) for k, v in raw_inputs.items()), - key=lambda kv: kv[0], - ) - if wait_for > 0: - items = items[:wait_for] + items = _coerce_to_list(raw) if not items: - return ActionResult.isFailure(error="No branch inputs available") + return ActionResult.isFailure(error="Keine Datenquelle angegeben oder Datenquelle ist leer.") - first_value = items[0][1] if items else None + merged: Dict[str, Any] = {} conflicts: List[str] = [] + inputs: List[Any] = [] - if strategy == "shallow": - merged, conflicts = _shallow_merge(items) - elif strategy == "firstWins": - merged, conflicts = _strategy_first_or_last_wins(items, last=False) - elif strategy == "lastWins": - merged, conflicts = _strategy_first_or_last_wins(items, last=True) - elif strategy == "errorOnConflict": - merged, conflicts = _shallow_merge(items) - if conflicts: - return ActionResult.isFailure( - error=f"Conflicting keys: {sorted(set(conflicts))}", - ) - else: # deep (default) - merged = {} - for _, val in items: - if isinstance(val, dict): - _deep_merge(merged, val, conflicts) + for item in items: + if item is None: + continue + inputs.append(item) + payload = _merge_payload(item) + if payload: + _deep_merge(merged, payload, conflicts) + if not inputs: + return ActionResult.isFailure(error="Alle Einträge in der Datenquelle sind leer.") + + primary = _synthesize_primary_response(merged, inputs) + merged["response"] = primary + + _ps = primary if isinstance(primary, str) else repr(primary) + logger.info( + "mergeContext: inputs=%d merged_keys=%s primary_len=%d primary_preview=%r conflicts=%d", + len(inputs), + list(merged.keys())[:20], + len(_ps or ""), + (_ps[:200] + "…") if len(_ps) > 200 else _ps, + len(conflicts), + ) data: Dict[str, Any] = { - "inputs": {idx: val for idx, val in items}, - "first": first_value, "merged": merged, - "strategy": strategy, + "inputs": inputs, + "first": inputs[0] if inputs else None, + "count": len(inputs), "conflicts": sorted(set(conflicts)) if conflicts else [], + "response": primary, } return ActionResult.isSuccess(data=data) except Exception as exc: diff --git a/modules/workflows/methods/methodContext/methodContext.py b/modules/workflows/methods/methodContext/methodContext.py index 1f7b9180..b2e7220b 100644 --- a/modules/workflows/methods/methodContext/methodContext.py +++ b/modules/workflows/methods/methodContext/methodContext.py @@ -151,31 +151,20 @@ class MethodContext(MethodBase): "mergeContext": WorkflowActionDefinition( actionId="context.mergeContext", description=( - "Merge data arriving from multiple parallel branches into a single " - "MergeResult. Strategies: shallow, deep, firstWins, lastWins, " - "errorOnConflict. The execution engine waits for all connected " - "predecessors before invoking this action (waitsForAllPredecessors=True)." + "Führt eine Liste von Schrittergebnissen (z. B. ``bodyResults`` einer " + "``flow.loop``) zu einem zusammengeführten Dict zusammen." ), - outputType="MergeResult", + outputType="ActionResult", parameters={ - "strategy": WorkflowActionParameter( - name="strategy", type="str", required=False, - frontendType=FrontendType.SELECT, - frontendOptions=["shallow", "deep", "firstWins", "lastWins", "errorOnConflict"], - default="deep", - description="Conflict resolution strategy for keys present in several branches", - ), - "waitFor": WorkflowActionParameter( - name="waitFor", type="int", required=False, - frontendType=FrontendType.NUMBER, - default=0, - description="Number of branches to consume (0 = all). Used together with timeoutMs.", - ), - "timeoutMs": WorkflowActionParameter( - name="timeoutMs", type="int", required=False, - frontendType=FrontendType.NUMBER, - default=30000, - description="Maximum wait time in milliseconds before continuing with available inputs", + "dataSource": WorkflowActionParameter( + name="dataSource", + type="Any", + frontendType=FrontendType.CONTEXT_BUILDER, + required=False, + description=( + "Datenquelle (DataRef), meist Schleife → Alle Schleifen-Ergebnisse. " + "Optional wenn der Knoten per Kabel am Schleifen-„Fertig“-Ausgang hängt." + ), ), }, execute=mergeContext.__get__(self, self.__class__), diff --git a/modules/workflows/methods/methodFile/actions/create.py b/modules/workflows/methods/methodFile/actions/create.py index 791d0903..e7ef569c 100644 --- a/modules/workflows/methods/methodFile/actions/create.py +++ b/modules/workflows/methods/methodFile/actions/create.py @@ -3,6 +3,7 @@ from typing import Any, Dict, List, Optional +import asyncio import base64 import binascii import io @@ -11,7 +12,10 @@ import logging import re from modules.datamodels.datamodelChat import ActionResult, ActionDocument -from modules.serviceCenter.services.serviceGeneration.subDocumentUtility import markdownToDocumentJson +from modules.serviceCenter.services.serviceGeneration.subDocumentUtility import ( + enhancePlainTextWithMarkdownTables, + markdownToDocumentJson, +) from modules.shared.i18nRegistry import normalizePrimaryLanguageTag from modules.workflows.automation2.executors.actionNodeExecutor import _coerce_document_data_to_bytes from modules.workflows.methods.methodAi._common import is_image_action_document_list, serialize_context @@ -21,6 +25,78 @@ logger = logging.getLogger(__name__) _SAFE_FILENAME = re.compile(r'[^\w\-.\(\)\s\[\]%@+]') +_HEAVY_CONTEXT_KEYS = frozenset({"imageDocumentsOnly", "documents", "inputs"}) + + +def _collect_image_documents_only(raw: Any) -> List[Any]: + """Resolve ``imageDocumentsOnly`` whether the context is merged, nested, or surfaced.""" + if not isinstance(raw, dict): + return [] + paths = ( + ("imageDocumentsOnly",), + ("merged", "imageDocumentsOnly"), + ("data", "merged", "imageDocumentsOnly"), + ("data", "imageDocumentsOnly"), + ) + for path in paths: + cur: Any = raw + ok = True + for p in path: + if not isinstance(cur, dict): + ok = False + break + cur = cur.get(p) + if ok and isinstance(cur, list) and cur: + return cur + return [] + + +def _context_string_for_report(raw: Any, output_format: str) -> str: + """Build one narrative string for ``markdownToDocumentJson`` / render. + + Prefer plain ``response`` text (merge node surfaces it; nested ``merged.response`` + too). Never dump ``inputs`` / binary lists into the PDF body — that produced giant + JSON + base64 "hash" paragraphs after merge + ``contextBuilder``. + """ + of = (output_format or "docx").strip().lower().lstrip(".") + if of == "json": + return serialize_context(raw, prefer_handover_primary=False) + if isinstance(raw, str): + return raw.strip().lstrip("\ufeff") + if isinstance(raw, dict): + for path in ( + ("response",), + ("merged", "response"), + ("data", "response"), + ("data", "merged", "response"), + ): + cur: Any = raw + ok = True + for k in path: + if not isinstance(cur, dict): + ok = False + break + cur = cur.get(k) + if ok and cur is not None and str(cur).strip(): + return str(cur).strip().lstrip("\ufeff") + lean = {k: v for k, v in raw.items() if k not in _HEAVY_CONTEXT_KEYS} + try: + return json.dumps(lean, ensure_ascii=False, indent=2, default=str) + except Exception: + return serialize_context(lean, prefer_handover_primary=False) + return serialize_context(raw, prefer_handover_primary=False) + + +def _raw_context_preview_for_log(raw: Any, max_len: int = 500) -> str: + if raw is None: + return "None" + s = raw if isinstance(raw, str) else repr(raw) + s = s.replace("\r", "\\r").replace("\n", "\\n") + if len(s) <= max_len: + return s + return s[:max_len] + f"...<{len(s)} chars>" + + def _persistDocumentsToUserFiles( action_documents: list, services, @@ -139,6 +215,98 @@ def _load_image_bytes_from_action_doc(doc: dict, services) -> Optional[bytes]: return None +# Images larger than this threshold (decoded bytes) are resized before embedding +# to avoid multi-minute PDF rendering of high-res raster scans. +_MAX_IMAGE_EMBED_BYTES = 300_000 # 300 KB decoded ≈ ~400 KB base64 +_IMAGE_MAX_DIMENSION = 1200 # longest edge in pixels after resize + + +def _resize_image_for_document(image_bytes: bytes) -> bytes: + """Resize image to at most ``_IMAGE_MAX_DIMENSION`` px on the longest edge + and re-encode as JPEG. Falls back to the original bytes on any error.""" + try: + from PIL import Image as PILImage + import io as _io + + img = PILImage.open(_io.BytesIO(image_bytes)) + + # Flatten transparency / palette modes to RGB (required for JPEG) + if img.mode in ("RGBA", "LA"): + bg = PILImage.new("RGB", img.size, (255, 255, 255)) + bg.paste(img, mask=img.split()[-1]) + img = bg + elif img.mode == "P": + img = img.convert("RGBA") + bg = PILImage.new("RGB", img.size, (255, 255, 255)) + bg.paste(img, mask=img.split()[-1]) + img = bg + elif img.mode != "RGB": + img = img.convert("RGB") + + w, h = img.size + if max(w, h) > _IMAGE_MAX_DIMENSION: + # thumbnail() is optimised for downscaling: it uses an intermediate + # box-filter step before the final filter, making it 3-5× faster + # than resize() on large images. BILINEAR is fast and sufficient + # for document thumbnails. + img.thumbnail((_IMAGE_MAX_DIMENSION, _IMAGE_MAX_DIMENSION), PILImage.BILINEAR) + + out = _io.BytesIO() + img.save(out, format="JPEG", quality=85, optimize=True) + return out.getvalue() + except Exception as e: + logger.warning("file.create: image resize failed (%s) — using original bytes", e) + return image_bytes + + +def _append_images_to_content(structured_content: dict, image_docs: list, services=None) -> dict: + """Append images from imageDocumentsOnly as native image elements to the structured JSON. + + Each image becomes an ``image`` element with ``base64Data`` in a trailing + "Bilder" section of the first document. Images larger than + ``_MAX_IMAGE_EMBED_BYTES`` are automatically resized/compressed so the + synchronous PDF renderer does not block for minutes on high-res scans. + The renderers (DOCX / PDF) handle ``content.base64Data`` natively. + """ + elements = [] + for doc in image_docs: + b = _load_image_bytes_from_action_doc(doc, services) + if not b: + raw = doc.get("documentData") if isinstance(doc, dict) else None + if isinstance(raw, str): + try: + b = base64.b64decode(raw) + except Exception: + pass + if not b: + continue + + if len(b) > _MAX_IMAGE_EMBED_BYTES: + logger.info( + "file.create: image %s is %d bytes — resizing to max %dpx for embedding", + (doc.get("documentName") if isinstance(doc, dict) else "?") or "?", + len(b), + _IMAGE_MAX_DIMENSION, + ) + b = _resize_image_for_document(b) + + elements.append({ + "type": "image", + "content": { + "base64Data": base64.b64encode(b).decode("ascii"), + "alt": (doc.get("documentName") if isinstance(doc, dict) else None) or "image", + }, + }) + + if not elements: + return structured_content + + docs = structured_content.get("documents") + if isinstance(docs, list) and docs: + docs[0].setdefault("sections", []).append({"heading": "Bilder", "elements": elements}) + return structured_content + + def _images_list_to_pdf(image_bytes_list: List[bytes]) -> bytes: """One PDF page per image; embedded raster data via PyMuPDF.""" import fitz @@ -239,21 +407,24 @@ async def create(self, parameters: Dict[str, Any]) -> ActionResult: Create a file from context (text/markdown from upstream AI node). Uses GenerationService.renderReport to produce docx, pdf, txt, md, html, xlsx, etc. """ - raw_context = parameters.get("context", "") or parameters.get("text", "") or "" + raw_context = parameters.get("context", "") if isinstance(raw_context, list) and is_image_action_document_list(raw_context): return await _create_merged_image_documents(self, parameters, raw_context) - context = serialize_context(raw_context) + outputFormat = (parameters.get("outputFormat") or "docx").strip().lower().lstrip(".") + context = _context_string_for_report(raw_context, outputFormat) if not context: logger.warning( - "file.create: context empty after resolve — check DataRefs (e.g. Antworttext / " - "documents[0].documentData from the AI step)." + "file.create: context empty after resolve — raw_context type=%s raw_summary=%r " + "serialized_len=%s (check ActionNodeExecutor \"file.create context resolution\" log for DataRef / upstream).", + type(raw_context).__name__, + _raw_context_preview_for_log(raw_context), + len(context or ""), ) return ActionResult.isFailure(error="context is required (connect an AI node or provide text)") - outputFormat = (parameters.get("outputFormat") or "docx").strip().lower().lstrip(".") title = (parameters.get("title") or "Document").strip() templateName = parameters.get("templateName") language = normalizePrimaryLanguageTag( @@ -267,10 +438,26 @@ async def create(self, parameters: Dict[str, Any]) -> ActionResult: folder_id = str(raw_folder).strip() try: + if outputFormat != "json": + context = enhancePlainTextWithMarkdownTables(context) structured_content = markdownToDocumentJson(context, title, language) if templateName: structured_content.setdefault("metadata", {})["templateName"] = templateName + img_docs = _collect_image_documents_only(raw_context) + if img_docs: + # Image decoding and PIL resizing are CPU-bound; run them in a + # thread pool so the event loop is not blocked while processing + # high-res raster images (e.g. 3+ MB PNGs from PDF extraction). + loop = asyncio.get_event_loop() + structured_content = await loop.run_in_executor( + None, + _append_images_to_content, + structured_content, + img_docs, + self.services, + ) + generation = getattr(self.services, "generation", None) if not generation: return ActionResult.isFailure(error="Generation service not available") diff --git a/tests/integration/workflows/test_execute_graph_loop_aggregate_consolidate.py b/tests/integration/workflows/test_execute_graph_loop_aggregate_consolidate.py index 428fcd25..751de6d4 100644 --- a/tests/integration/workflows/test_execute_graph_loop_aggregate_consolidate.py +++ b/tests/integration/workflows/test_execute_graph_loop_aggregate_consolidate.py @@ -28,15 +28,14 @@ async def test_execute_graph_loop_and_aggregate_collects_items(): "type": "flow.loop", "parameters": { "items": {"type": "ref", "nodeId": "t1", "path": ["payload", "items"]}, - "level": "auto", "concurrency": 1, }, }, {"id": "agg1", "type": "data.aggregate", "parameters": {"mode": "collect"}}, ], "connections": [ - {"source": "t1", "target": "loop1"}, - {"source": "loop1", "target": "agg1"}, + {"source": "t1", "target": "loop1", "targetInput": 0}, + {"source": "loop1", "target": "agg1", "sourceOutput": 0, "targetInput": 0}, ], } run_envelope = default_run_envelope( @@ -72,15 +71,14 @@ async def test_data_consolidate_after_aggregate_same_context_as_post_loop(): "type": "flow.loop", "parameters": { "items": {"type": "ref", "nodeId": "t1", "path": ["payload", "items"]}, - "level": "auto", "concurrency": 1, }, }, {"id": "agg1", "type": "data.aggregate", "parameters": {"mode": "collect"}}, ], "connections": [ - {"source": "t1", "target": "loop1"}, - {"source": "loop1", "target": "agg1"}, + {"source": "t1", "target": "loop1", "targetInput": 0}, + {"source": "loop1", "target": "agg1", "sourceOutput": 0, "targetInput": 0}, ], } run_envelope = default_run_envelope( @@ -121,3 +119,43 @@ async def test_data_consolidate_after_aggregate_same_context_as_post_loop(): assert len(result["rows"]) == 2 assert result["rows"][0].get("currentItem", {}).get("a") == 1 assert result["rows"][1].get("currentItem", {}).get("b") == "y" + + +@pytest.mark.asyncio +async def test_loop_done_branch_runs_once_after_body(): + """Loop output 1 (Fertig) runs downstream once; body stays on output 0.""" + graph = { + "nodes": [ + {"id": "t1", "type": "trigger.manual", "parameters": {}}, + { + "id": "loop1", + "type": "flow.loop", + "parameters": { + "items": {"type": "ref", "nodeId": "t1", "path": ["payload", "items"]}, + "concurrency": 1, + }, + }, + {"id": "agg1", "type": "data.aggregate", "parameters": {"mode": "collect"}}, + {"id": "c1", "type": "data.consolidate", "parameters": {"mode": "table"}}, + ], + "connections": [ + {"source": "t1", "target": "loop1", "targetInput": 0}, + {"source": "loop1", "target": "agg1", "sourceOutput": 0, "targetInput": 0}, + {"source": "loop1", "target": "c1", "sourceOutput": 1, "targetInput": 0}, + ], + } + run_envelope = default_run_envelope( + "manual", + payload={"items": [{"a": 1}, {"a": 2}]}, + ) + res = await executeGraph( + graph, + services=_minimal_services(), + run_envelope=run_envelope, + userId="test-user", + ) + assert res.get("success") is True, res + out = res["nodeOutputs"] + assert out["agg1"]["count"] == 2 + assert out["c1"]["count"] == 2 + assert out["c1"]["mode"] == "table" diff --git a/tests/unit/workflow/test_extract_content_handover.py b/tests/unit/workflow/test_extract_content_handover.py index 506c3230..f393c0ea 100644 --- a/tests/unit/workflow/test_extract_content_handover.py +++ b/tests/unit/workflow/test_extract_content_handover.py @@ -2,12 +2,17 @@ import base64 -from modules.workflows.methods.methodContext.actions import extractContent as ec +from modules.workflows.methods.methodContext.actions.extractContent import ( + HANDOVER_KIND, + _apply_content_filter, + _joined_text_from_handover_payload, + _split_images_to_sidecar_documents, +) -def test_joined_text_from_handover_orders_text_parts_only(): +def test_joined_text_orders_text_table_and_skips_container(): payload = { - "kind": ec.HANDOVER_KIND, + "kind": HANDOVER_KIND, "fileOrder": ["f1"], "files": { "f1": { @@ -19,14 +24,28 @@ def test_joined_text_from_handover_orders_text_parts_only(): } }, } - assert ec._joined_text_from_handover_payload(payload) == "A\n\nB" + assert _joined_text_from_handover_payload(payload) == "A\n\nB" + + +def test_joined_text_includes_csv_table_parts(): + payload = { + "fileOrder": ["f1"], + "files": { + "f1": { + "parts": [ + {"typeGroup": "table", "mimeType": "text/csv", "data": "a,b\n1,2", "id": "t"}, + ] + } + }, + } + assert _joined_text_from_handover_payload(payload) == "a,b\n1,2" def test_split_images_moves_pixels_to_blob_docs(): raw = b"fake-binary-image" b64 = base64.b64encode(raw).decode("ascii") payload = { - "kind": ec.HANDOVER_KIND, + "kind": HANDOVER_KIND, "schemaVersion": 1, "fileOrder": ["f1"], "files": { @@ -44,7 +63,7 @@ def test_split_images_moves_pixels_to_blob_docs(): } }, } - stripped, blobs = ec._split_images_to_sidecar_documents(payload, document_name_stem="abc") + stripped, blobs = _split_images_to_sidecar_documents(payload, document_name_stem="abc") assert len(blobs) == 1 assert blobs[0].mimeType == "image/png" assert blobs[0].documentData == raw @@ -61,3 +80,65 @@ def test_split_images_moves_pixels_to_blob_docs(): assert img_parts[0]["data"] == "" assert img_parts[0]["handoverMediaDocumentName"] == blobs[0].documentName assert "image" in stripped["files"]["f1"]["byTypeGroup"] + + +def _mixed_payload(): + return { + "kind": HANDOVER_KIND, + "schemaVersion": 1, + "fileOrder": ["f1"], + "files": { + "f1": { + "parts": [ + {"typeGroup": "text", "data": "hello", "id": "t1"}, + {"typeGroup": "table", "mimeType": "text/csv", "data": "a,b", "id": "tb1"}, + {"typeGroup": "image", "mimeType": "image/png", "data": "abc=", "id": "i1"}, + {"typeGroup": "structure", "mimeType": "text/html", "data": "

", "id": "s1"}, + ], + } + }, + } + + +def test_content_filter_all_is_noop(): + payload = _mixed_payload() + result = _apply_content_filter(payload, "all") + assert result is payload # same object, no copy + + +def test_content_filter_text_only_keeps_text_table_structure(): + result = _apply_content_filter(_mixed_payload(), "textOnly") + parts = result["files"]["f1"]["parts"] + type_groups = {p["typeGroup"] for p in parts} + assert type_groups == {"text", "table", "structure"} + assert "image" not in type_groups + + +def test_content_filter_images_only(): + result = _apply_content_filter(_mixed_payload(), "imagesOnly") + parts = result["files"]["f1"]["parts"] + assert all(p["typeGroup"] == "image" for p in parts) + assert len(parts) == 1 + + +def test_content_filter_no_images_removes_only_images(): + result = _apply_content_filter(_mixed_payload(), "noImages") + parts = result["files"]["f1"]["parts"] + type_groups = {p["typeGroup"] for p in parts} + assert "image" not in type_groups + # text, table, structure all remain + assert {"text", "table", "structure"} == type_groups + + +def test_content_filter_text_only_joined_text_has_no_image_data(): + result = _apply_content_filter(_mixed_payload(), "textOnly") + text = _joined_text_from_handover_payload(result) + assert "hello" in text + assert "abc=" not in text # base64 image data must not appear + + +def test_content_filter_text_only_no_sidecars(): + """textOnly: no image parts → _split produces zero sidecars.""" + result = _apply_content_filter(_mixed_payload(), "textOnly") + stripped, blobs = _split_images_to_sidecar_documents(result, document_name_stem="test") + assert blobs == [] diff --git a/tests/unit/workflow/test_merge_context_handover.py b/tests/unit/workflow/test_merge_context_handover.py new file mode 100644 index 00000000..c89de1e3 --- /dev/null +++ b/tests/unit/workflow/test_merge_context_handover.py @@ -0,0 +1,178 @@ +# Unit tests: context.mergeContext primary text from extract handover (documents[0]). + +import json + +import pytest + +from modules.workflows.methods.methodContext.actions.extractContent import HANDOVER_KIND +from modules.workflows.methods.methodContext.actions.mergeContext import mergeContext + + +def _handover(text: str) -> dict: + return { + "kind": HANDOVER_KIND, + "fileOrder": ["f1"], + "files": { + "f1": { + "parts": [ + {"typeGroup": "text", "data": text, "id": "t1"}, + ] + } + }, + } + + +@pytest.mark.asyncio +async def test_mergeContext_requires_dataSource(): + result = await mergeContext(object(), {}) + assert not result.success + err = result.error or "" + assert "dataSource" in err or "erforderlich" in err.lower() + + +@pytest.mark.asyncio +async def test_mergeContext_handover_only_in_documents_yields_data_response(): + item = { + "success": True, + "data": {}, + "documents": [ + { + "documentName": "handover.json", + "mimeType": "application/json", + "documentData": _handover("only-from-handover"), + } + ], + } + result = await mergeContext(object(), {"dataSource": [item]}) + assert result.success + assert result.data + assert result.data.get("response") == "only-from-handover" + + +@pytest.mark.asyncio +async def test_mergeContext_handover_json_string_in_documentData(): + payload = _handover("from-json-string") + item = { + "success": True, + "data": {}, + "documents": [ + { + "documentName": "handover.json", + "mimeType": "application/json", + "documentData": json.dumps(payload), + } + ], + } + result = await mergeContext(object(), {"dataSource": [item]}) + assert result.success + assert result.data.get("response") == "from-json-string" + + +@pytest.mark.asyncio +async def test_mergeContext_joins_multiple_handover_items(): + items = [ + { + "success": True, + "data": {}, + "documents": [{"documentData": _handover("alpha"), "documentName": "a.json"}], + }, + { + "success": True, + "data": {}, + "documents": [{"documentData": _handover("beta"), "documentName": "b.json"}], + }, + ] + result = await mergeContext(object(), {"dataSource": items}) + assert result.success + assert result.data.get("response") == "### a.json\n\nalpha\n\n### b.json\n\nbeta" + + +@pytest.mark.asyncio +async def test_mergeContext_merged_response_wins_over_handover_chunks(): + items = [ + { + "success": True, + "data": {"response": "merged-wins"}, + "documents": [{"documentData": _handover("ignored"), "documentName": "a.json"}], + }, + ] + result = await mergeContext(object(), {"dataSource": items}) + assert result.success + assert result.data.get("response") == "merged-wins" + + +@pytest.mark.asyncio +async def test_mergeContext_concatenates_each_iteration_data_response_not_only_last(): + """deep_merge overwrites ``response``; synthesis must still include every loop body result.""" + items = [ + {"success": True, "data": {"response": "chunk-aaa"}}, + {"success": True, "data": {"response": "chunk-bbb"}}, + {"success": True, "data": {"response": "chunk-ccc"}}, + ] + result = await mergeContext(object(), {"dataSource": items}) + assert result.success + r = result.data.get("response") or "" + assert "chunk-aaa" in r + assert "chunk-bbb" in r + assert "chunk-ccc" in r + assert r == "chunk-aaa\n\nchunk-bbb\n\nchunk-ccc" + assert result.data["merged"]["response"] == r + + +@pytest.mark.asyncio +async def test_mergeContext_primary_serializes_as_plain_text_for_file_create(): + from modules.workflows.methods.methodAi._common import serialize_context + + items = [ + {"success": True, "data": {"response": "section-one"}}, + {"success": True, "data": {"response": "section-two"}}, + ] + result = await mergeContext(object(), {"dataSource": items}) + primary = result.data.get("response") + assert isinstance(primary, str) + assert serialize_context(primary) == primary + + +@pytest.mark.asyncio +async def test_mergeContext_strips_document_data_from_merged_documents(): + """documentData must be None in merged.documents — blobs must not accumulate.""" + big_blob = "x" * 100_000 + items = [ + { + "success": True, + "data": {"response": "a"}, + "documents": [ + {"documentName": "a.json", "mimeType": "application/json", "documentData": big_blob}, + ], + }, + { + "success": True, + "data": {"response": "b"}, + "documents": [ + {"documentName": "b.json", "mimeType": "application/json", "documentData": big_blob}, + ], + }, + ] + result = await mergeContext(object(), {"dataSource": items}) + assert result.success + merged_docs = result.data["merged"].get("documents") or [] + assert len(merged_docs) >= 1 + for doc in merged_docs: + assert doc.get("documentData") is None, "documentData must be stripped before deep-merge" + + +@pytest.mark.asyncio +async def test_mergeContext_accumulates_image_documents_only_across_iterations(): + """imageDocumentsOnly from every iteration must be list-concat in merged.""" + img_a = {"documentName": "img_a.png", "mimeType": "image/png", "documentData": "aaa="} + img_b = {"documentName": "img_b.png", "mimeType": "image/png", "documentData": "bbb="} + items = [ + {"success": True, "data": {"response": "a"}, "imageDocumentsOnly": [img_a]}, + {"success": True, "data": {"response": "b"}, "imageDocumentsOnly": [img_b]}, + ] + result = await mergeContext(object(), {"dataSource": items}) + assert result.success + imgs = result.data["merged"].get("imageDocumentsOnly") or [] + names = [d.get("documentName") for d in imgs] + assert "img_a.png" in names + assert "img_b.png" in names diff --git a/tests/unit/workflow/test_phase3_context_node.py b/tests/unit/workflow/test_phase3_context_node.py index bd104c0c..3f055ca3 100644 --- a/tests/unit/workflow/test_phase3_context_node.py +++ b/tests/unit/workflow/test_phase3_context_node.py @@ -25,7 +25,7 @@ def test_context_extractContent_node_shape(): assert "DocumentList" in node["inputPorts"][0]["accepts"] assert "LoopItem" in node["inputPorts"][0]["accepts"] names = [p["name"] for p in node["parameters"]] - assert names == ["documentList"] + assert names == ["documentList", "contentFilter"] def test_udm_port_types_registered(): @@ -65,3 +65,9 @@ def test_getExecutor_dispatches_context(): from modules.workflows.automation2.executors import ActionNodeExecutor executor = _getExecutor("context.extractContent", None) assert isinstance(executor, ActionNodeExecutor) + + +def test_context_mergeContext_surfaces_data_pick_paths_match_node_outputs(): + """DataPicker uses paths like ``merged``; executor must surface ``data.*`` to top level.""" + node = next(n for n in STATIC_NODE_TYPES if n["id"] == "context.mergeContext") + assert node.get("surfaceDataAsTopLevel") is True diff --git a/tests/unit/workflow/test_phase4_workflow_nodes.py b/tests/unit/workflow/test_phase4_workflow_nodes.py index c24a485b..55272413 100644 --- a/tests/unit/workflow/test_phase4_workflow_nodes.py +++ b/tests/unit/workflow/test_phase4_workflow_nodes.py @@ -21,16 +21,19 @@ class TestNodeDefinitions: assert node["_action"] == "consolidate" assert node["outputPorts"][0]["schema"] == "ConsolidateResult" - def test_flow_loop_has_level_and_concurrency(self): + def test_flow_loop_has_iteration_mode_and_two_outputs(self): node = next(n for n in STATIC_NODE_TYPES if n["id"] == "flow.loop") paramNames = [p["name"] for p in node["parameters"]] - assert "level" in paramNames + assert "iterationMode" in paramNames + assert "iterationStride" in paramNames assert "concurrency" in paramNames - levelParam = next(p for p in node["parameters"] if p["name"] == "level") - assert "structuralNodes" in levelParam["frontendOptions"]["options"] - assert "contentBlocks" in levelParam["frontendOptions"]["options"] + assert "level" not in paramNames + modeParam = next(p for p in node["parameters"] if p["name"] == "iterationMode") + assert "every_nth" in modeParam["frontendOptions"]["options"] concParam = next(p for p in node["parameters"] if p["name"] == "concurrency") assert concParam["default"] == 1 + assert node["inputs"] == 1 + assert node["outputs"] == 2 def test_flow_loop_accepts_udm(self): node = next(n for n in STATIC_NODE_TYPES if n["id"] == "flow.loop") @@ -146,13 +149,27 @@ class TestFlowLoopUdmLevel: ex = FlowExecutor() udm = {"id": "d1", "role": "document", "children": [{"id": "p1"}, {"id": "p2"}]} node = {"type": "flow.loop", "id": "loop1", - "parameters": {"items": "direct", "level": "auto"}} + "parameters": {"items": "direct"}} ctx = {"nodeOutputs": {"loop1": udm, "direct": udm}, "connectionMap": {}, "inputSources": {"loop1": {0: ("direct", 0)}}} from unittest.mock import patch with patch("modules.workflows.automation2.graphUtils.resolveParameterReferences", return_value=udm): result = await ex.execute(node, ctx) assert result["count"] == 2 + @pytest.mark.asyncio + async def test_loop_every_nth_stride(self): + from modules.workflows.automation2.executors.flowExecutor import FlowExecutor + ex = FlowExecutor() + node = {"type": "flow.loop", "id": "loop1", "parameters": { + "items": {"type": "value", "value": [10, 20, 30, 40, 50]}, + "iterationMode": "every_nth", + "iterationStride": 2, + }} + ctx = {"nodeOutputs": {}, "connectionMap": {}, "inputSources": {"loop1": {}}} + result = await ex.execute(node, ctx) + assert result["count"] == 3 + assert result["items"] == [10, 30, 50] + @pytest.mark.asyncio class TestDataFilterUdm: diff --git a/tests/unit/workflow/test_serialize_context_and_file_create_context.py b/tests/unit/workflow/test_serialize_context_and_file_create_context.py new file mode 100644 index 00000000..57ae3823 --- /dev/null +++ b/tests/unit/workflow/test_serialize_context_and_file_create_context.py @@ -0,0 +1,98 @@ +# Copyright (c) 2025 Patrick Motsch +# All rights reserved. + +import json + +from modules.workflows.methods.methodAi._common import serialize_context +from modules.serviceCenter.services.serviceGeneration.subDocumentUtility import ( + enhancePlainTextWithMarkdownTables, + markdownToDocumentJson, +) +from modules.workflows.methods.methodFile.actions.create import ( + _collect_image_documents_only, + _context_string_for_report, +) + + +def test_serialize_context_nonserializable_embeds_via_default_str(): + class _Ns: + def __str__(self): + return "ns" + + s = serialize_context({"x": _Ns(), "n": 1}) + parsed = json.loads(s) + assert parsed["n"] == 1 + assert "ns" in parsed["x"] + + +def test_serialize_context_strips_bom_on_plain_string(): + assert serialize_context("\ufeffhello") == "hello" + + +def test_context_string_docx_prefers_response_over_full_dict(): + body = "Datum;Mandant\n2026-01-01;acme" + ctx = {"response": "\ufeff" + body, "data": {"foo": 1}} + assert _context_string_for_report(ctx, "docx") == body + + +def test_context_string_json_serializes_full_structure(): + ctx = {"response": "hi", "data": {"foo": 1}} + out = _context_string_for_report(ctx, "json") + assert json.loads(out)["data"]["foo"] == 1 + + +def test_serialize_context_prefers_response_when_json_fails(): + d: dict = {"response": "plain", "n": 1} + d["_loop"] = d # circular — json.dumps fails + assert serialize_context(d).strip() == "plain" + + +def test_serialize_context_prefer_handover_primary_skips_metadata(): + blob = {"response": "LINE", "data": {"nested": {"x" * 200}}, "extra": {"y": 2}} + s = serialize_context(blob, prefer_handover_primary=True) + assert s == "LINE" + + +def test_context_string_plain_str_passthrough_docx(): + assert _context_string_for_report(" hello ", "docx") == "hello" + + +def test_collect_image_documents_nested_paths(): + imgs = [{"documentName": "m.png", "mimeType": "image/png"}] + assert _collect_image_documents_only({"merged": {"imageDocumentsOnly": imgs}}) == imgs + assert _collect_image_documents_only({"data": {"merged": {"imageDocumentsOnly": imgs}}}) == imgs + + +def test_context_string_prefers_merged_response_over_inputs_noise(): + raw = {"merged": {"response": "from-merged"}, "inputs": {"0": {"documentData": "X" * 10000}}} + assert _context_string_for_report(raw, "docx") == "from-merged" + + +def test_context_string_fallback_json_strips_heavy_keys(): + raw = {"foo": 1, "inputs": {"nasty": True}, "imageDocumentsOnly": [{"documentName": "x"}]} + out = _context_string_for_report(raw, "docx") + parsed = json.loads(out) + assert "inputs" not in parsed + assert "imageDocumentsOnly" not in parsed + assert parsed["foo"] == 1 + + +def test_enhance_plain_csv_semicolon_to_markdown_table(): + body = "Datum;Betrag\n2026-01-01;12.50\n2026-01-02;3.00" + out = enhancePlainTextWithMarkdownTables(body) + assert "| Datum |" in out + assert "| Betrag |" in out + assert "---" in out + + +def test_enhance_preserves_normal_paragraphs(): + body = "Ein Absatz ohne Raster.\n\nZweiter Gedanke." + assert enhancePlainTextWithMarkdownTables(body) == body + + +def test_enhance_then_markdown_json_contains_table_section(): + body = "Datum;Betrag\n2026-01-01;12\n2026-01-02;3" + enhanced = enhancePlainTextWithMarkdownTables(body) + doc = markdownToDocumentJson(enhanced, "Report", "de") + sections = doc["documents"][0]["sections"] + assert any(s.get("content_type") == "table" for s in sections) diff --git a/tests/unit/workflows/test_automation2_graphUtils.py b/tests/unit/workflows/test_automation2_graphUtils.py index 5ea7126a..d5c88acf 100644 --- a/tests/unit/workflows/test_automation2_graphUtils.py +++ b/tests/unit/workflows/test_automation2_graphUtils.py @@ -175,3 +175,37 @@ class TestPathContainsWildcard: def test_literal_star_in_int_segment_does_not_match(self): from modules.workflows.automation2.graphUtils import _pathContainsWildcard assert _pathContainsWildcard([1, 2, 3]) is False + + +class TestLoopBodyAndDoneReachability: + """flow.loop: body only from output 0; done branch from output 1 (engine helpers).""" + + def test_body_only_output_0_not_done_chain(self): + from modules.workflows.automation2.graphUtils import buildConnectionMap, getLoopBodyNodeIds, getLoopDoneNodeIds + + conns = [ + {"source": "tr", "target": "loop", "targetInput": 0}, + {"source": "loop", "target": "a", "sourceOutput": 0, "targetInput": 0}, + {"source": "loop", "target": "d", "sourceOutput": 1, "targetInput": 0}, + {"source": "a", "target": "b"}, + ] + cm = buildConnectionMap(conns) + assert getLoopBodyNodeIds("loop", cm) == {"a", "b"} + assert getLoopDoneNodeIds("loop", cm) == {"d"} + + def test_primary_input_prefers_outside_body(self): + from modules.workflows.automation2.graphUtils import ( + buildConnectionMap, + getLoopBodyNodeIds, + getLoopPrimaryInputSource, + ) + + conns = [ + {"source": "tr", "target": "loop", "targetInput": 0}, + {"source": "a", "target": "loop", "targetInput": 0}, + {"source": "loop", "target": "a", "sourceOutput": 0, "targetInput": 0}, + ] + cm = buildConnectionMap(conns) + body = getLoopBodyNodeIds("loop", cm) + assert body == {"a"} + assert getLoopPrimaryInputSource("loop", cm, body) == ("tr", 0)