continuous work of grafical editor

This commit is contained in:
Ida 2026-05-13 06:18:42 +02:00
parent f6229b517e
commit 823afa78bb
23 changed files with 1691 additions and 408 deletions

View file

@ -38,7 +38,6 @@
"title": "Pro Scan-Dokument",
"parameters": {
"items": {"type": "ref", "nodeId": "n2", "path": ["files"]},
"level": "auto",
"concurrency": 1
}
},

View file

@ -4,6 +4,8 @@
from modules.shared.i18nRegistry import t
from modules.features.graphicalEditor.nodeDefinitions.flow import CONTEXT_MERGE_ACTION_RESULT_DATA_PICK_OPTIONS
_CONTEXT_INPUT_SCHEMAS = [
"Transit",
"ActionResult",
@ -19,38 +21,6 @@ _CONTEXT_INPUT_SCHEMAS = [
]
_MERGE_RESULT_DATA_PICK_OPTIONS = [
{
"path": ["merged"],
"pickerLabel": t("Zusammengeführt"),
"detail": t("Zusammengeführtes Objekt nach gewählter Strategie."),
"recommended": True,
"type": "Dict",
},
{
"path": ["first"],
"pickerLabel": t("Erster Zweig"),
"detail": t("Daten vom ersten verbundenen Eingang."),
"recommended": False,
"type": "Any",
},
{
"path": ["inputs"],
"pickerLabel": t("Alle Eingänge"),
"detail": t("Dict der Eingabeobjekte nach Port-Index."),
"recommended": False,
"type": "Dict[int,Any]",
},
{
"path": ["conflicts"],
"pickerLabel": t("Konflikte"),
"detail": t("Liste der Schlüssel mit Konflikt (nur bei errorOnConflict)."),
"recommended": False,
"type": "List[str]",
},
]
CONTEXT_NODES = [
{
"id": "context.extractContent",
@ -66,6 +36,29 @@ CONTEXT_NODES = [
{"name": "documentList", "type": "str", "required": True, "frontendType": "hidden",
"description": t("Dokumentenliste (via Wire oder DataRef)"), "default": "",
"graphInherit": {"port": 0, "kind": "documentListWire"}},
{
"name": "contentFilter",
"type": "str",
"required": False,
"frontendType": "select",
"frontendOptions": {
"options": [
{"value": "all", "label": t("Alles (Text, Tabellen, Bilder)")},
{"value": "textOnly", "label": t("Nur Text und Tabellen")},
{"value": "imagesOnly", "label": t("Nur Bilder")},
{"value": "noImages", "label": t("Alles ausser Bilder")},
]
},
"default": "all",
"description": t(
"Welche Parts im Handover behalten werden. "
"all = alle Typgruppen inkl. Bilder; "
"textOnly = ausschliesslich Text-, Tabellen- und Struktur-Parts; "
"imagesOnly = ausschliesslich Bild-Parts; "
"noImages = alle Parts ausser Bildern (weiter als textOnly: "
"auch kuenftige Nicht-Bild-Typen bleiben erhalten)."
),
},
],
"inputs": 1,
"outputs": 1,
@ -120,186 +113,40 @@ CONTEXT_NODES = [
"_method": "context",
"_action": "extractContent",
},
{
"id": "context.setContext",
"category": "context",
"label": t("Kontext setzen"),
"description": t(
"Schreibt in den Workflow-Kontext. Pro Zeile: Ziel-Schlüssel, dann entweder einen "
"festen Wert, eine Datenquelle aus dem Graph (Kontext-Picker wie bei anderen Nodes), "
"oder eine Aufgabe für einen Benutzer (Human Task) zum Setzen des Werts."
),
"parameters": [
{
"name": "scope",
"type": "str",
"required": False,
"frontendType": "select",
"frontendOptions": {"options": ["local", "global", "session"]},
"default": "local",
"description": t("Speicherbereich"),
},
{
"name": "assignments",
"type": "list",
"required": True,
"frontendType": "contextAssignments",
"default": [],
"description": t(
"Zuweisungen: Ziel-Schlüssel, Quelle (Picker / fester Wert / Human Task), "
"Modus (set, setIfEmpty, append, increment). Optionaler Experten-Pfad `sourcePath` unter der "
"gewählten Datenquelle (z. B. payload.status)."
),
"graphInherit": {"port": 0, "kind": "primaryTextRef"},
},
],
"inputs": 1,
"outputs": 1,
"inputPorts": {0: {"accepts": _CONTEXT_INPUT_SCHEMAS}},
"outputPorts": {
0: {
"schema": "Transit",
"dynamic": True,
"deriveFrom": "assignments",
"deriveNameField": "contextKey",
}
},
"injectUpstreamPayload": True,
"injectRunContext": True,
"surfaceDataAsTopLevel": True,
"meta": {"icon": "mdi-database-edit-outline", "color": "#5C6BC0", "usesAi": False},
"_method": "context",
"_action": "setContext",
},
{
"id": "context.mergeContext",
"category": "context",
"label": t("Kontext zusammenführen"),
"description": t(
"Wartet auf alle verbundenen eingehenden Branches und führt deren "
"Kontext-Daten zu einem einheitlichen MergeResult zusammen. "
"Strategien: 'shallow' (oberste Ebene), 'deep' (rekursiv), "
"'firstWins' / 'lastWins' bei Konflikten, "
"'errorOnConflict' (bricht ab und listet Konflikte). "
"Der Node blockiert bis alle erwarteten Inputs eingetroffen sind."
"Führt eine Liste von Ergebnissen zu einem einzigen Kontext zusammen. "
"Wähle als Datenquelle die Option Alle Schleifen-Ergebnisse einer Schleife, "
"um alle Iterationsergebnisse in einem Datensatz zu vereinen."
),
"parameters": [
{
"name": "strategy",
"type": "str",
"required": False,
"frontendType": "select",
"frontendOptions": {
"options": ["shallow", "deep", "firstWins", "lastWins", "errorOnConflict"]
},
"default": "deep",
"description": t("Strategie bei gleichnamigen Keys aus verschiedenen Branches"),
},
{
"name": "waitFor",
"type": "int",
"required": False,
"frontendType": "number",
"default": 0,
"description": t(
"Anzahl Inputs abwarten (0 = alle verbundenen Branches). "
"Hilfreich für optionale Branches mit Timeout."
),
},
{
"name": "timeoutMs",
"type": "int",
"required": False,
"frontendType": "number",
"default": 30000,
"description": t(
"Maximale Wartezeit in ms — danach wird mit den vorhandenen Inputs fortgesetzt"
),
},
],
"inputs": 5,
"outputs": 1,
"inputPorts": {
0: {"accepts": _CONTEXT_INPUT_SCHEMAS},
1: {"accepts": _CONTEXT_INPUT_SCHEMAS},
2: {"accepts": _CONTEXT_INPUT_SCHEMAS},
3: {"accepts": _CONTEXT_INPUT_SCHEMAS},
4: {"accepts": _CONTEXT_INPUT_SCHEMAS},
},
"outputPorts": {
0: {"schema": "MergeResult", "dataPickOptions": _MERGE_RESULT_DATA_PICK_OPTIONS}
},
"waitsForAllPredecessors": True,
"injectBranchInputs": True,
"meta": {"icon": "mdi-call-merge", "color": "#7B1FA2", "usesAi": False},
"_method": "context",
"_action": "mergeContext",
},
{
"id": "context.filterContext",
"category": "context",
"label": t("Kontext filtern"),
"description": t(
"Gibt nur bestimmte Felder des eingehenden Datenstroms weiter. "
"Modus 'allow': nur diese Keys passieren. "
"Modus 'block': diese Keys werden entfernt, alles andere bleibt. "
"Unterstützt Pfadausdrücke (z.B. 'user.*', '*.id') und tiefe Pfade ('address.city'). "
"Fehlende Keys werden je nach 'missingKeyBehavior' ignoriert, mit null befüllt oder als Fehler behandelt."
),
"parameters": [
{
"name": "mode",
"type": "str",
"required": False,
"frontendType": "select",
"frontendOptions": {"options": ["allow", "block"]},
"default": "allow",
"description": t("Allowlist (nur diese durch) oder Blocklist (diese entfernen)"),
},
{
"name": "keys",
"type": "list",
"name": "dataSource",
"type": "Any",
"required": True,
"frontendType": "stringList",
"default": [],
"frontendType": "dataRef",
"description": t(
"Key-Pfade oder Wildcard-Muster. "
"Beispiele: 'response', 'user.*', '*.id', 'address.city'."
"Datenquelle: Liste von Einträgen zum Zusammenführen "
"(z. B. Schleife → Alle Schleifen-Ergebnisse)"
),
},
{
"name": "missingKeyBehavior",
"type": "str",
"required": False,
"frontendType": "select",
"frontendOptions": {"options": ["skip", "nullFill", "error"]},
"default": "skip",
"description": t("Verhalten wenn ein erlaubter Key im Input fehlt"),
},
{
"name": "preserveMeta",
"type": "bool",
"required": False,
"frontendType": "checkbox",
"default": True,
"description": t("Interne Meta-Felder (_success, _error, _transit) immer durchlassen"),
},
],
"inputs": 1,
"outputs": 1,
"inputPorts": {0: {"accepts": _CONTEXT_INPUT_SCHEMAS}},
"outputPorts": {
0: {
"schema": "Transit",
"dynamic": True,
"deriveFrom": "keys",
}
0: {"schema": "ActionResult", "dataPickOptions": CONTEXT_MERGE_ACTION_RESULT_DATA_PICK_OPTIONS}
},
"injectUpstreamPayload": True,
# Same contract as transformContext: picker paths like ``merged`` / ``first`` must match
# ``nodeOutputs`` (see actionNodeExecutor ``surfaceDataAsTopLevel``); merge payloads live in ``data``.
"surfaceDataAsTopLevel": True,
"meta": {"icon": "mdi-filter-outline", "color": "#00838F", "usesAi": False},
"meta": {"icon": "mdi-call-merge", "color": "#7B1FA2", "usesAi": False},
"_method": "context",
"_action": "filterContext",
"_action": "mergeContext",
},
{
"id": "context.transformContext",

View file

@ -3,6 +3,35 @@
from modules.shared.i18nRegistry import t
LOOP_DONE_DATA_PICK_OPTIONS = [
{
"path": ["bodyResults"],
"pickerLabel": t("Alle Schleifen-Ergebnisse"),
"detail": t(
"Ausgabe des letzten Schrittes im Schleifen-Rumpf pro Iteration als Liste, "
"ein Eintrag pro Durchlauf. Ideal als Eingabe fuer Kontext zusammenfuehren."
),
"recommended": True,
"type": "List[Any]",
},
{
"path": ["items"],
"pickerLabel": t("Iterierte Elemente"),
"detail": t(
"Liste der Schleifen-Elemente nach gewähltem Iterationsmodus (Kopie der Eingabeliste, gefiltert)."
),
"recommended": False,
"type": "List[Any]",
},
{
"path": ["count"],
"pickerLabel": t("Anzahl Durchläufe"),
"detail": t("Wie viele Iterationen die Schleife ausgeführt hat."),
"recommended": False,
"type": "int",
},
]
LOOP_ITEM_DATA_PICK_OPTIONS = [
{
"path": ["currentItem"],
@ -58,6 +87,19 @@ MERGE_RESULT_DATA_PICK_OPTIONS = [
},
]
# Extended picker for ``context.mergeContext`` (ActionResult + ``surfaceDataAsTopLevel``): same
# merge keys as ``flow.merge`` plus ``count`` from the action payload.
CONTEXT_MERGE_ACTION_RESULT_DATA_PICK_OPTIONS = [
*MERGE_RESULT_DATA_PICK_OPTIONS,
{
"path": ["count"],
"pickerLabel": t("Anzahl Einträge"),
"detail": t("Wie viele Einträge zusammengeführt wurden."),
"recommended": False,
"type": "int",
},
]
# Ports, die typische Schritt-Ausgaben durchreichen (nicht nur leerer Transit).
_FLOW_INPUT_SCHEMAS = [
"Transit",
@ -138,8 +180,10 @@ FLOW_NODES = [
"category": "flow",
"label": t("Schleife / Für jedes"),
"description": t(
"Iteriert über ein Array aus einem vorherigen Schritt (z. B. documente, Zeilen, Listeneinträge). "
"Optional: UDM-Ebene für strukturierte Dokumente."
"Zwei Ausgänge: „Schleife“ verbindet den Rumpf (pro Element); optional führt der Rumpf "
"mit einem Rücklauf-Pfeil wieder zum **gleichen Eingang** wie der vorherige Schritt (wie in n8n). "
"„Fertig“ führt genau einmal fort, wenn alle Iterationen beendet sind. "
"Die zu durchlaufende Liste wählen Sie wie bisher; UDM-/Strukturdaten werden automatisch sinnvoll in Elemente aufgelöst."
),
"parameters": [
{
@ -150,13 +194,27 @@ FLOW_NODES = [
"description": t("Liste oder Sammlung zum Durchlaufen (im Data Picker wählen)"),
},
{
"name": "level",
"name": "iterationMode",
"type": "str",
"required": False,
"frontendType": "select",
"frontendOptions": {"options": ["auto", "documents", "structuralNodes", "contentBlocks"]},
"description": t("Nur bei UDM-Daten: welche Strukturebene als Elemente verwendet wird"),
"default": "auto",
"frontendOptions": {
"options": ["all", "first", "last", "every_second", "every_third", "every_nth"],
},
"description": t(
"Welche Elemente die Schleife besucht: alle, nur das erste/letzte, jedes zweite/dritte "
"oder jedes n-te (Schritt dann unter „Schrittweite“)."
),
"default": "all",
},
{
"name": "iterationStride",
"type": "int",
"required": False,
"frontendType": "number",
"frontendOptions": {"min": 2, "max": 100},
"description": t("Nur bei „jedes n-te“: Schrittweite (z. B. 5 = jedes 5. Element ab Index 0)."),
"default": 2,
},
{
"name": "concurrency",
@ -169,12 +227,18 @@ FLOW_NODES = [
},
],
"inputs": 1,
"outputs": 1,
"inputPorts": {0: {"accepts": [
"outputs": 2,
"outputLabels": [t("Schleife"), t("Fertig")],
"inputPorts": {
0: {"accepts": [
"Transit", "UdmDocument", "EmailList", "DocumentList", "FileList", "TaskList",
"ActionResult", "AiResult", "QueryResult", "FormPayload",
]}},
"outputPorts": {0: {"schema": "LoopItem", "dataPickOptions": LOOP_ITEM_DATA_PICK_OPTIONS}},
"ActionResult", "AiResult", "QueryResult", "FormPayload", "LoopItem",
]},
},
"outputPorts": {
0: {"schema": "LoopItem", "dataPickOptions": LOOP_ITEM_DATA_PICK_OPTIONS},
1: {"schema": "Transit", "dataPickOptions": LOOP_DONE_DATA_PICK_OPTIONS},
},
"executor": "flow",
"meta": {"icon": "mdi-repeat", "color": "#FF9800", "usesAi": False},
},

View file

@ -26,7 +26,7 @@ from modules.workflows.automation2.runEnvelope import (
normalize_run_envelope,
)
from modules.features.graphicalEditor.entryPoints import find_invocation
from modules.features.graphicalEditor.upstreamPathsService import compute_upstream_paths
from modules.features.graphicalEditor.upstreamPathsService import compute_upstream_paths, compute_graph_data_sources
from modules.shared.i18nRegistry import apiRouteContext, resolveText
routeApiMsg = apiRouteContext("routeFeatureGraphicalEditor")
@ -192,6 +192,34 @@ def post_upstream_paths(
return {"paths": paths}
@router.post("/{instanceId}/graph-data-sources")
@limiter.limit("120/minute")
def post_graph_data_sources(
request: Request,
instanceId: str = Path(..., description="Feature instance ID"),
body: Dict[str, Any] = Body(...),
context: RequestContext = Depends(getRequestContext),
) -> dict:
"""Scope-aware data sources for the DataPicker.
Takes ``{ nodeId, graph: { nodes, connections } }`` and returns::
{
"availableSourceIds": [...], # ancestors minus loop-body nodes on Done branch
"portIndexOverrides": {nodeId: n}, # use outputPorts[n] instead of 0
"loopBodyContextIds": [...], # loops whose body the node is in
}
All loop scope logic lives here so the frontend has zero topology knowledge.
"""
_validateInstanceAccess(instanceId, context)
graph = body.get("graph")
node_id = body.get("nodeId")
if not isinstance(graph, dict) or not node_id:
raise HTTPException(status_code=400, detail=routeApiMsg("graph and nodeId are required"))
return compute_graph_data_sources(graph, str(node_id))
@router.get("/{instanceId}/upstream-paths/{node_id}")
@limiter.limit("60/minute")
def get_upstream_paths_saved(
@ -1724,6 +1752,51 @@ async def complete_task(
)
@router.post("/{instanceId}/tasks/{taskId}/cancel")
@limiter.limit("30/minute")
def cancel_pending_task_stop_run(
request: Request,
instanceId: str = Path(..., description="Feature instance ID"),
taskId: str = Path(..., description="Human task ID"),
context: RequestContext = Depends(getRequestContext),
) -> dict:
"""Cancel a pending human task and stop the workflow run behind it."""
mandateId = _validateInstanceAccess(instanceId, context)
iface = getGraphicalEditorInterface(context.user, mandateId, instanceId)
task = iface.getTask(taskId)
if not task:
raise HTTPException(status_code=404, detail=routeApiMsg("Task not found"))
wf_ids = {w.get("id") for w in iface.getWorkflows() if w.get("id")}
if task.get("workflowId") not in wf_ids:
raise HTTPException(status_code=404, detail=routeApiMsg("Task not found"))
if task.get("status") != "pending":
raise HTTPException(status_code=400, detail=routeApiMsg("Task already completed"))
run_id = task.get("runId")
from modules.workflows.automation2.executionEngine import requestRunStop
if run_id:
requestRunStop(run_id)
db_run = iface.getRun(run_id)
if db_run:
current = db_run.get("status") or ""
if current not in ("completed", "failed", "cancelled"):
iface.updateRun(run_id, status="cancelled")
pending = iface.getTasks(runId=run_id, status="pending")
for t in pending:
tid = t.get("id")
if tid:
iface.updateTask(tid, status="cancelled")
else:
iface.updateTask(taskId, status="cancelled")
return {"success": True, "runId": run_id, "taskId": taskId}
# -------------------------------------------------------------------------
# Monitoring / Metrics
# -------------------------------------------------------------------------

View file

@ -6,7 +6,7 @@ from typing import Any, Dict, List, Set
from modules.features.graphicalEditor.nodeDefinitions import STATIC_NODE_TYPES
from modules.features.graphicalEditor.portTypes import PORT_TYPE_CATALOG, PortSchema, parse_graph_defined_output_schema
from modules.workflows.automation2.graphUtils import buildConnectionMap
from modules.workflows.automation2.graphUtils import buildConnectionMap, getLoopBodyNodeIds, getLoopDoneNodeIds
_NODE_BY_TYPE = {n["id"]: n for n in STATIC_NODE_TYPES}
@ -129,10 +129,13 @@ def compute_upstream_paths(graph: Dict[str, Any], target_node_id: str) -> List[D
entry["producerLabel"] = (anode.get("title") or "").strip() or aid
paths.append(entry)
# Lexical loop hints (flow.loop): any loop node in ancestors adds synthetic paths
# Lexical loop hints (flow.loop): only for nodes inside the loop body
for aid in ancestors:
anode = node_by_id.get(aid) or {}
if anode.get("type") == "flow.loop":
if anode.get("type") != "flow.loop":
continue
body_ids = getLoopBodyNodeIds(aid, conn_map)
if target_node_id in body_ids:
paths.extend(
[
{
@ -160,3 +163,80 @@ def compute_upstream_paths(graph: Dict[str, Any], target_node_id: str) -> List[D
)
return paths
def compute_graph_data_sources(graph: Dict[str, Any], target_node_id: str) -> Dict[str, Any]:
"""Return scope-aware data sources for the DataPicker.
Determines which ancestor nodes are valid sources for ``target_node_id``,
taking loop scoping into account:
- If ``target_node_id`` is on the *Done* branch of a ``flow.loop``, the
loop body nodes are excluded from ``availableSourceIds`` and the loop
node itself is mapped to its *Fertig* output port (index 1) via
``portIndexOverrides``.
- If ``target_node_id`` is *inside* the loop body, the loop node id is
included in ``loopBodyContextIds`` so the frontend can show the lexical
loop variables (currentItem, currentIndex, count).
Returns::
{
"availableSourceIds": [...], # ordered list
"portIndexOverrides": {nodeId: n}, # non-zero port indices
"loopBodyContextIds": [...], # loops whose body this node is in
}
"""
nodes = graph.get("nodes") or []
connections = graph.get("connections") or []
node_by_id: Dict[str, Any] = {n["id"]: n for n in nodes if n.get("id")}
if target_node_id not in node_by_id:
return {"availableSourceIds": [], "portIndexOverrides": {}, "loopBodyContextIds": []}
conn_map = buildConnectionMap(connections)
# Collect all ancestors via backward BFS
preds: Dict[str, Set[str]] = {}
for tgt, pairs in conn_map.items():
for src, _, _ in pairs:
preds.setdefault(tgt, set()).add(src)
seen: Set[str] = set()
stack = [target_node_id]
ancestors: Set[str] = set()
while stack:
cur = stack.pop()
for p in preds.get(cur, ()):
if p not in seen:
seen.add(p)
ancestors.add(p)
stack.append(p)
body_nodes_to_exclude: Set[str] = set()
port_index_overrides: Dict[str, int] = {}
loop_body_context_ids: List[str] = []
for aid in ancestors:
anode = node_by_id.get(aid) or {}
if anode.get("type") != "flow.loop":
continue
body_ids = getLoopBodyNodeIds(aid, conn_map)
done_ids = getLoopDoneNodeIds(aid, conn_map)
if target_node_id in body_ids:
loop_body_context_ids.append(aid)
elif target_node_id in done_ids:
body_nodes_to_exclude.update(body_ids)
port_index_overrides[aid] = 1
available_source_ids = [
aid for aid in sorted(ancestors)
if aid not in body_nodes_to_exclude
]
return {
"availableSourceIds": available_source_ids,
"portIndexOverrides": port_index_overrides,
"loopBodyContextIds": loop_body_context_ids,
}

View file

@ -308,7 +308,6 @@ def _buildSystemTemplates():
"title": "Pro E-Mail",
"parameters": {
"items": {"type": "ref", "nodeId": "n2", "path": ["emails"]},
"level": "auto",
"concurrency": 1,
},
},
@ -348,7 +347,6 @@ def _buildSystemTemplates():
"title": "Pro Dokument",
"parameters": {
"items": {"type": "ref", "nodeId": "n2", "path": ["files"]},
"level": "auto",
"concurrency": 1,
},
},

View file

@ -4,10 +4,76 @@ import json
import logging
import os
import re
from typing import Any, Dict
from typing import Any, Dict, List, Optional
logger = logging.getLogger(__name__)
_MAX_AUTO_TABLE_COLS = 64
_MAX_AUTO_TABLE_ROWS = 5000
_MAX_AUTO_CELL_CHARS = 8000
def _sanitize_cell_for_pipe_table(cell: str) -> str:
"""Single-line cell safe for markdown pipe tables (no raw ``|``)."""
s = str(cell).replace("\r\n", "\n").replace("\r", "\n")
s = " ".join(line.strip() for line in s.split("\n") if line.strip()).strip()
return s.replace("|", "·")
def _try_delimited_block_as_markdown_table(block: str) -> Optional[str]:
"""If ``block`` is a uniform tab- or semicolon-separated grid, return a pipe markdown table."""
lines = [ln.strip() for ln in block.replace("\r\n", "\n").replace("\r", "\n").split("\n")]
lines = [ln for ln in lines if ln]
if len(lines) < 2:
return None
for sep in ("\t", ";"):
rows: List[List[str]] = []
bad = False
for ln in lines:
cells = [c.strip() for c in ln.split(sep)]
if len(cells) < 2:
bad = True
break
rows.append(cells)
if bad:
continue
ncols = len(rows[0])
if ncols > _MAX_AUTO_TABLE_COLS or len(rows) > _MAX_AUTO_TABLE_ROWS:
continue
if any(len(r) != ncols for r in rows):
continue
if any(len(_sanitize_cell_for_pipe_table(c)) > _MAX_AUTO_CELL_CHARS for r in rows for c in r):
continue
def _row_md(r: List[str]) -> str:
return "| " + " | ".join(_sanitize_cell_for_pipe_table(c) for c in r) + " |"
header = _row_md(rows[0])
divider = "| " + " | ".join(["---"] * ncols) + " |"
body = "\n".join(_row_md(r) for r in rows[1:])
return "\n".join([header, divider, body])
return None
def enhancePlainTextWithMarkdownTables(body: str) -> str:
"""Detect delimiter-separated grids in plain paragraphs and convert them to markdown pipe tables.
Extractors often emit CSV-like blocks (``;`` or TAB) without markdown markers; passing those
straight into ``markdownToDocumentJson`` produced one giant paragraph. This pass runs only
on whitespace-separated blocks so normal prose stays unchanged.
"""
if not isinstance(body, str) or not body.strip():
return body if isinstance(body, str) else ""
chunks = re.split(r"\n\s*\n", body.strip())
out_parts: List[str] = []
for ch in chunks:
ch = ch.strip()
if not ch:
continue
md_table = _try_delimited_block_as_markdown_table(ch)
out_parts.append(md_table if md_table else ch)
return "\n\n".join(out_parts)
def _parseInlineRuns(text: str) -> list:
"""

View file

@ -15,6 +15,8 @@ from modules.workflows.automation2.graphUtils import (
topoSort,
getInputSources,
getLoopBodyNodeIds,
getLoopDoneNodeIds,
getLoopPrimaryInputSource,
)
from modules.workflows.automation2.executors import (
@ -26,7 +28,7 @@ from modules.workflows.automation2.executors import (
PauseForHumanTaskError,
PauseForEmailWaitError,
)
from modules.features.graphicalEditor.portTypes import normalizeToSchema
from modules.features.graphicalEditor.portTypes import normalizeToSchema, wrapTransit, unwrapTransit
from modules.features.graphicalEditor.nodeDefinitions import STATIC_NODE_TYPES
from modules.serviceCenter.services.serviceSubscription.mainServiceSubscription import SubscriptionInactiveException as _SubscriptionInactiveException
from modules.serviceCenter.services.serviceBilling.mainServiceBilling import BillingContextError as _BillingContextError
@ -341,6 +343,98 @@ def _substituteFeatureInstancePlaceholders(
return _json.loads(replaced)
async def _run_post_loop_done_nodes(
*,
loop_node_id: str,
body_ids: Set[str],
items: List[Any],
ordered: List[Dict],
connectionMap: Dict[str, List],
nodeOutputs: Dict[str, Any],
context: Dict[str, Any],
services: Any,
automation2_interface: Optional[Any],
runId: Optional[str],
processed_in_loop: Set[str],
) -> Optional[Dict[str, Any]]:
"""After all loop iterations: merge upstream into loop output and run the Done (output 1) branch once."""
_prim_in = getLoopPrimaryInputSource(loop_node_id, connectionMap, body_ids)
_upstream_loop = nodeOutputs.get(_prim_in[0]) if _prim_in else None
_base_raw = unwrapTransit(_upstream_loop) if isinstance(_upstream_loop, dict) and _upstream_loop.get("_transit") else _upstream_loop
_prev_loop_out = nodeOutputs.get(loop_node_id)
# ``bodyResults`` lives on the plain iteration-state dict; after resume / edge
# cases the loop slot may still be wrapped in Transit — unwrap before read.
_prev_plain = _prev_loop_out
if isinstance(_prev_loop_out, dict) and _prev_loop_out.get("_transit"):
_prev_plain = unwrapTransit(_prev_loop_out)
_body_results = (
_prev_plain.get("bodyResults") if isinstance(_prev_plain, dict) else None
)
if not isinstance(_base_raw, dict):
raise RuntimeError(
f"flow.loop {loop_node_id}: primary upstream output must be a dict (JSON handover / node output); "
f"got {type(_base_raw).__name__}"
)
_merged_loop = {**_base_raw, "items": items, "count": len(items)}
if _body_results is not None:
_merged_loop["bodyResults"] = _body_results
nodeOutputs[loop_node_id] = wrapTransit(_merged_loop, {"loopCompleted": True, "loopNodeId": loop_node_id})
_done_all = getLoopDoneNodeIds(loop_node_id, connectionMap)
_done_only = _done_all - body_ids
_done_ordered = [n for n in ordered if n.get("id") in _done_only]
for _dn in _done_ordered:
_dnid = _dn.get("id")
if not _dnid or context.get("_stopped"):
break
if not _is_node_on_active_path(_dnid, connectionMap, nodeOutputs):
_skipSnap = {"_skipReason": "inactive_branch"}
for _sSrc, _, _ in connectionMap.get(_dnid, []):
if _sSrc in nodeOutputs:
_skipSnap[_sSrc] = nodeOutputs[_sSrc]
_skId = _createStepLog(automation2_interface, runId, _dnid, _dn.get("type", ""), status="skipped", inputSnapshot=_skipSnap)
if _skId:
_updateStepLog(automation2_interface, _skId, "skipped")
continue
_dexec = _getExecutor(_dn.get("type", ""), services, automation2_interface)
if not _dexec:
nodeOutputs[_dnid] = None
continue
_dStart = time.time()
_dIn = {}
for _src, _, _ in connectionMap.get(_dnid, []):
if _src in nodeOutputs:
_dIn[_src] = nodeOutputs[_src]
_dStepId = _createStepLog(automation2_interface, runId, _dnid, _dn.get("type", ""), "running", _dIn)
try:
_dres, _dRetry = await _executeWithRetry(_dexec, _dn, context)
_dres = _normalizeResult(_dres, _dn.get("type", ""))
nodeOutputs[_dnid] = _dres
_dDur = int((time.time() - _dStart) * 1000)
_dTok = _dres.get("tokensUsed", 0) if isinstance(_dres, dict) else 0
_updateStepLog(automation2_interface, _dStepId, "completed",
output=_dres if isinstance(_dres, dict) else {"value": _dres},
durationMs=_dDur, tokensUsed=_dTok, retryCount=_dRetry)
except PauseForHumanTaskError:
_updateStepLog(automation2_interface, _dStepId, "completed",
durationMs=int((time.time() - _dStart) * 1000))
raise
except PauseForEmailWaitError:
_updateStepLog(automation2_interface, _dStepId, "completed",
durationMs=int((time.time() - _dStart) * 1000))
raise
except (_SubscriptionInactiveException, _BillingContextError):
_updateStepLog(automation2_interface, _dStepId, "failed",
error="Subscription/Billing error", durationMs=int((time.time() - _dStart) * 1000))
raise
except Exception as _dex:
_updateStepLog(automation2_interface, _dStepId, "failed",
error=str(_dex), durationMs=int((time.time() - _dStart) * 1000))
raise
processed_in_loop.update(_done_only)
return None
async def executeGraph(
graph: Dict[str, Any],
services: Any,
@ -510,6 +604,14 @@ async def executeGraph(
body_ids = getLoopBodyNodeIds(loop_node_id, connectionMap) if loop_node_id else set()
body_ordered = [n for n in ordered if n.get("id") in body_ids]
processed_in_loop = set(body_ids) | {loop_node_id} if loop_node_id else set()
_resume_feedback_body_node_id = None
for _fb_src, _fb_so, _fb_ti in (connectionMap.get(loop_node_id) or []):
if _fb_src in body_ids and _fb_ti == 0:
_resume_feedback_body_node_id = _fb_src
break
if not _resume_feedback_body_node_id and body_ordered:
_resume_feedback_body_node_id = body_ordered[-1].get("id")
_resume_body_results: List[Any] = []
while next_index < len(items) and loop_node_id:
nodeOutputs[loop_node_id] = {
"items": items,
@ -547,6 +649,8 @@ async def executeGraph(
output=result if isinstance(result, dict) else {"value": result},
durationMs=_rDur, retryCount=_rRetry)
logger.info("executeGraph loop resume body node %s done (iter %d, retries=%d)", bnid, next_index, _rRetry)
if _resume_feedback_body_node_id and bnid == _resume_feedback_body_node_id:
_resume_body_results.append(result)
except PauseForHumanTaskError as e:
_updateStepLog(automation2_interface, _rStepId, "completed",
durationMs=int((time.time() - _rStepStart) * 1000))
@ -575,11 +679,27 @@ async def executeGraph(
return {"success": False, "error": str(ex), "nodeOutputs": _serializableOutputs(nodeOutputs), "failedNode": bnid, "runId": runId}
next_index += 1
if loop_node_id:
nodeOutputs[loop_node_id] = {"items": items, "count": len(items)}
for aggId, accItems in _aggregateAccumulators.items():
nodeOutputs[aggId] = {"items": accItems, "count": len(accItems), "_success": True}
_aggregateAccumulators.clear()
processed_in_loop = set(body_ids) | {loop_node_id}
if _resume_body_results:
_rlo = nodeOutputs.get(loop_node_id)
if isinstance(_rlo, dict):
_rlo["bodyResults"] = _resume_body_results
nodeOutputs[loop_node_id] = _rlo
await _run_post_loop_done_nodes(
loop_node_id=loop_node_id,
body_ids=body_ids,
items=items,
ordered=ordered,
connectionMap=connectionMap,
nodeOutputs=nodeOutputs,
context=context,
services=services,
automation2_interface=automation2_interface,
runId=runId,
processed_in_loop=processed_in_loop,
)
for i, node in enumerate(ordered):
if skip_until_passed:
@ -593,7 +713,20 @@ async def executeGraph(
break
nodeId = node.get("id")
nodeType = node.get("type", "")
if not _is_node_on_active_path(nodeId, connectionMap, nodeOutputs):
# flow.loop: the feedback edge (body → loop input 0) hasn't run yet on the first
# pass → would make _is_node_on_active_path return False. Only check the
# *primary* predecessor (the one outside the loop body).
if nodeType == "flow.loop":
_loop_body_ids = getLoopBodyNodeIds(nodeId, connectionMap)
_loop_primary = getLoopPrimaryInputSource(nodeId, connectionMap, _loop_body_ids)
_loop_check_map = (
{nodeId: [(_loop_primary[0], _loop_primary[1], 0)]}
if _loop_primary else connectionMap
)
_loop_active = _is_node_on_active_path(nodeId, _loop_check_map, nodeOutputs)
else:
_loop_active = _is_node_on_active_path(nodeId, connectionMap, nodeOutputs)
if not _loop_active:
logger.info("executeGraph step %d/%d: nodeId=%s SKIP (inactive branch)", i + 1, len(ordered), nodeId)
_skipInputSnap = {"_skipReason": "inactive_branch"}
for _sSrc, _, _ in connectionMap.get(nodeId, []):
@ -635,6 +768,17 @@ async def executeGraph(
_loopConcurrency = max(1, min(_loopConcurrency, 20))
_batchMode = len(items) > STEPLOG_BATCH_THRESHOLD
_aggLock = asyncio.Lock()
# Prefer the *last* body node wired to loop input 0 (feedback /
# pipeline end) — first matching inbound edge can be a shallow node.
_feedback_candidates = [
_fb_src
for _fb_src, _fb_so, _fb_ti in (connectionMap.get(nodeId) or [])
if _fb_src in body_ids and _fb_ti == 0
]
_feedback_body_node_id = _feedback_candidates[-1] if _feedback_candidates else None
if not _feedback_body_node_id and body_ordered:
_feedback_body_node_id = body_ordered[-1].get("id")
_bodyResultsPerIter: List[Any] = [None] * len(items)
async def _runLoopIteration(_idx: int, _item: Any) -> Optional[Dict]:
"""Execute all body nodes for one iteration. Returns error dict or None."""
@ -712,6 +856,10 @@ async def executeGraph(
logger.exception("executeGraph loop body node %s FAILED (iter %d): %s", bnid, _idx, ex)
return {"_error": str(ex), "failedNode": bnid}
if _feedback_body_node_id:
async with _aggLock:
if _idx < len(_bodyResultsPerIter):
_bodyResultsPerIter[_idx] = _activeOutputs.get(_feedback_body_node_id)
if _batchMode and _idx > 0 and _idx % STEPLOG_BATCH_THRESHOLD == 0 and runId:
_emitStepEvent(runId, {"type": "loop_progress", "nodeId": nodeId, "iteration": _idx, "total": len(items)})
return None
@ -755,7 +903,6 @@ async def executeGraph(
_activeRunContexts.pop(runId, None)
return {"success": False, "error": _rval["_error"], "nodeOutputs": _serializableOutputs(nodeOutputs), "failedNode": _rval.get("failedNode"), "runId": runId}
nodeOutputs[nodeId] = {"items": items, "count": len(items)}
for aggId, accItems in _aggregateAccumulators.items():
allChunks = _aggregateTempChunks.pop(aggId, [])
finalItems = []
@ -764,6 +911,29 @@ async def executeGraph(
finalItems.extend(accItems)
nodeOutputs[aggId] = {"items": finalItems, "count": len(finalItems), "_success": True}
_aggregateAccumulators.clear()
# Always attach ``bodyResults`` (list per iteration, possibly None
# placeholders) so DataRefs to ``bodyResults`` resolve and
# ``context.mergeContext`` can fall back to the wired loop output.
_lo = nodeOutputs.get(nodeId)
if isinstance(_lo, dict):
_lo["bodyResults"] = _bodyResultsPerIter
nodeOutputs[nodeId] = _lo
await _run_post_loop_done_nodes(
loop_node_id=nodeId,
body_ids=body_ids,
items=items,
ordered=ordered,
connectionMap=connectionMap,
nodeOutputs=nodeOutputs,
context=context,
services=services,
automation2_interface=automation2_interface,
runId=runId,
processed_in_loop=processed_in_loop,
)
_updateStepLog(automation2_interface, _stepId, "completed",
output={"iterationCount": len(items), "items": len(items), "concurrency": _loopConcurrency, "batchMode": _batchMode},
durationMs=int((time.time() - _stepStartMs) * 1000))

View file

@ -24,6 +24,74 @@ from modules.workflows.automation2.executors.inputExecutor import PauseForHumanT
logger = logging.getLogger(__name__)
_FILE_CREATE_CTX_LOG_MAX = 500
def _truncate_for_log(val: Any, max_len: int = _FILE_CREATE_CTX_LOG_MAX) -> str:
s = val if isinstance(val, str) else repr(val)
s = s.replace("\r", "\\r").replace("\n", "\\n")
if len(s) <= max_len:
return s
return s[:max_len] + f"...<{len(s)} chars>"
def _log_file_create_context_resolution(
node_id: str,
raw_params: Dict[str, Any],
resolved_params: Dict[str, Any],
exec_context: Dict[str, Any],
) -> None:
"""Debug ``file.create`` when ``context`` resolves empty — trace refs and upstream output."""
raw_c = raw_params.get("context")
res_c = resolved_params.get("context")
node_outputs = exec_context.get("nodeOutputs") or {}
input_sources = (exec_context.get("inputSources") or {}).get(node_id) or {}
src_entry = input_sources.get(0)
src_id = src_entry[0] if src_entry else None
upstream = node_outputs.get(src_id) if src_id else None
up_summary = "missing"
up_resp_len = -1
up_transit = False
if isinstance(upstream, dict):
up_transit = bool(upstream.get("_transit"))
inner = upstream.get("data") if up_transit else upstream
up_keys = sorted(k for k in upstream.keys() if not str(k).startswith("_") or k in ("_transit", "_success"))
up_resp_len = len(str((inner if isinstance(inner, dict) else upstream).get("response") or ""))
up_summary = "keys=%s transit=%s response_len=%s _success=%s" % (
up_keys[:25],
up_transit,
up_resp_len,
upstream.get("_success"),
)
def _shape(name: str, v: Any) -> str:
if v is None:
return f"{name}=None"
if isinstance(v, dict) and v.get("type") == "ref":
return f"{name}=ref(nodeId={v.get('nodeId')!r}, path={v.get('path')!r})"
if isinstance(v, list):
if v and all(isinstance(x, dict) and x.get("type") == "ref" for x in v):
bits = [
f"ref({x.get('nodeId')!r},{x.get('path')!r})"
for x in v[:5]
]
return f"{name}=contextBuilder[{len(v)} refs: {', '.join(bits)}{'' if len(v) > 5 else ''}]"
return f"{name}=list(len={len(v)}, elem0_type={type(v[0]).__name__})"
if isinstance(v, str):
return f"{name}=str(len={len(v)}, preview={_truncate_for_log(v, 240)!r})"
return f"{name}={type(v).__name__}({_truncate_for_log(v)!r})"
logger.info(
"file.create context resolution node=%s port0=%r upstream_node=%s upstream: %s | %s | %s",
node_id,
src_id,
src_id,
up_summary,
_shape("raw", raw_c),
_shape("resolved", res_c),
)
def _looks_like_ascii_base64_payload(s: str) -> bool:
"""Heuristic: ActionDocument binary payloads use standard ASCII base64; markdown/text uses other chars (#, *, -, …)."""
@ -336,14 +404,36 @@ def _getOutputSchemaName(nodeDef: Dict) -> str:
def _resolveUpstreamPayload(nodeId: str, context: Dict[str, Any]) -> Any:
"""Return the unwrapped output of the node connected to input port 0, or None."""
"""Return the unwrapped output of the primary inbound wire to ``nodeId``.
Prefer logical input port 0. Some persisted graphs register the only edge
under a non-zero ``targetInput`` fall back to the sole inbound port or
the first ``connectionMap`` entry so ``injectUpstreamPayload`` (e.g.
``context.mergeContext`` after ``flow.loop``) still receives data.
"""
from modules.features.graphicalEditor.portTypes import unwrapTransit
nodeOutputs = context.get("nodeOutputs") or {}
connectionMap = context.get("connectionMap") or {}
src_map = (context.get("inputSources") or {}).get(nodeId) or {}
entry = src_map.get(0)
if not entry and src_map:
if len(src_map) == 1:
entry = next(iter(src_map.values()))
else:
mi = min(src_map.keys())
entry = src_map.get(mi)
if not entry and connectionMap.get(nodeId):
inc = connectionMap[nodeId]
if inc:
src_node_id, _so, _ti = inc[0]
entry = (src_node_id, _so)
if not entry:
return None
src_node_id, _ = entry
upstream = (context.get("nodeOutputs") or {}).get(src_node_id)
upstream = nodeOutputs.get(src_node_id)
return unwrapTransit(upstream) if isinstance(upstream, dict) else upstream
@ -446,6 +536,9 @@ class ActionNodeExecutor:
# 4. Apply declarative paramMappers from the node definition
_applyParamMappers(nodeDef, resolvedParams)
if nodeType == "file.create":
_log_file_create_context_resolution(nodeId, params, resolvedParams, context)
# 5. email.checkEmail pause for email wait
if nodeType == "email.checkEmail":
runId = context.get("_runId")
@ -533,18 +626,6 @@ class ActionNodeExecutor:
rawData = getattr(d, "documentData", None) if hasattr(d, "documentData") else (dumped.get("documentData") if isinstance(dumped, dict) else None)
rawBytes = _coerce_document_data_to_bytes(rawData)
# Extracted page images are workflow intermediates — keep bytes as base64 on the
# ActionDocument only; do not create rows in the user's file library (Meine Dateien).
if isinstance(dumped, dict) and rawBytes:
_meta = dumped.get("validationMetadata") if isinstance(dumped.get("validationMetadata"), dict) else {}
if (
_meta.get("actionType") == "context.extractContent"
and _meta.get("handoverRole") == "extractedMedia"
):
dumped["documentData"] = base64.b64encode(rawBytes).decode("ascii")
dumped["_hasBinaryData"] = True
docsList.append(dumped)
continue
if isinstance(dumped, dict) and rawBytes:
try:
from modules.interfaces.interfaceDbManagement import getInterface as _getMgmtInterface
@ -597,18 +678,10 @@ class ActionNodeExecutor:
extractedContext = ""
rd_early = getattr(result, "data", None)
if isinstance(rd_early, dict) and rd_early.get("response") is not None:
extractedContext = str(rd_early.get("response")).strip()
elif result.documents:
doc = result.documents[0]
raw = getattr(doc, "documentData", None) if hasattr(doc, "documentData") else (doc.get("documentData") if isinstance(doc, dict) else None)
if isinstance(raw, bytes):
try:
extractedContext = raw.decode("utf-8").strip()
except (UnicodeDecodeError, ValueError):
extractedContext = ""
elif raw:
extractedContext = str(raw).strip()
if isinstance(rd_early, dict):
_r = rd_early.get("response")
if _r is not None and str(_r).strip():
extractedContext = str(_r).strip()
promptText = str(resolvedParams.get("aiPrompt") or resolvedParams.get("prompt") or "").strip()
resultData = getattr(result, "data", None)
@ -657,7 +730,19 @@ class ActionNodeExecutor:
if not rsp:
out["response"] = extractedContext or ""
if result.success:
out["imageDocumentsOnly"] = _image_documents_from_docs_list(docsList)
img_only = _image_documents_from_docs_list(docsList)
# mergeContext packs iterated payloads under ``data.merged`` only — ``documents``
# on the ActionResult is empty, so image sidecars live on ``merged.imageDocumentsOnly``.
if (
nodeType == "context.mergeContext"
and isinstance(result.data, dict)
):
merged_blob = result.data.get("merged")
if isinstance(merged_blob, dict):
merged_imgs = merged_blob.get("imageDocumentsOnly")
if isinstance(merged_imgs, list) and merged_imgs:
img_only = merged_imgs
out["imageDocumentsOnly"] = img_only
if outputSchema == "TaskResult" and result.success and docsList:
try:

View file

@ -2,7 +2,7 @@
# Flow control node executor (ifElse, switch, loop, merge).
import logging
from typing import Any, Dict
from typing import Any, Dict, List
from modules.features.graphicalEditor.portTypes import wrapTransit, unwrapTransit
@ -279,26 +279,50 @@ class FlowExecutor:
async def _loop(self, node: Dict, nodeOutputs: Dict, nodeId: str, inputSources: Dict) -> Any:
params = node.get("parameters") or {}
itemsPath = params.get("items", "[]")
level = params.get("level", "auto")
from modules.workflows.automation2.graphUtils import resolveParameterReferences
items = resolveParameterReferences(itemsPath, nodeOutputs)
if level != "auto" and isinstance(items, dict):
items = self._resolveUdmLevel(items, level)
elif isinstance(items, list):
pass
elif isinstance(items, dict):
children = items.get("children")
if isinstance(children, list) and children:
items = children
else:
items = [{"name": k, "value": v} for k, v in items.items()]
else:
items = [items] if items is not None else []
raw = resolveParameterReferences(itemsPath, nodeOutputs)
items = self._normalize_loop_items(raw)
mode = (params.get("iterationMode") or "all").strip().lower()
stride = params.get("iterationStride", 2)
try:
stride_int = int(stride)
except (TypeError, ValueError):
stride_int = 2
items = self._apply_iteration_mode(items, mode, stride_int)
return {"items": items, "count": len(items)}
def _normalize_loop_items(self, raw: Any) -> List[Any]:
"""Coerce resolved `items` into a list (lists, dict children, or scalars)."""
if isinstance(raw, list):
return raw
if isinstance(raw, dict):
children = raw.get("children")
if isinstance(children, list) and len(children) > 0:
return children
return [{"name": k, "value": v} for k, v in raw.items()]
return [raw] if raw is not None else []
def _apply_iteration_mode(self, items: List[Any], mode: str, stride: int) -> List[Any]:
"""Select which elements to iterate over (backend-defined modes)."""
if not items:
return []
m = (mode or "all").strip().lower()
if m == "first":
return items[:1]
if m == "last":
return items[-1:]
if m == "every_second":
return items[::2]
if m == "every_third":
return items[::3]
if m == "every_nth":
step = max(2, min(100, int(stride)))
return items[::step]
return list(items)
def _resolveUdmLevel(self, udm: Dict, level: str) -> list:
"""Extract items from a UDM document/node at the requested structural level."""
"""Extract items from a UDM document/node at the requested structural level (test / tooling)."""
children = udm.get("children") or []
if level == "documents":
return [c for c in children if isinstance(c, dict) and c.get("role") in ("document", "archive")]

View file

@ -48,26 +48,93 @@ def buildConnectionMap(connections: List[Dict]) -> Dict[str, List[Tuple[str, int
def getLoopBodyNodeIds(loopNodeId: str, connectionMap: Dict[str, List[Tuple[str, int, int]]]) -> Set[str]:
"""Nodes reachable from loop's output (BFS forward). Body = downstream nodes that receive from loop."""
"""Nodes reachable from flow.loop output port 0 only (loop body), BFS forward.
Edges vom Rumpf zurück in den Loop-Knoten (gleicher Eingang wie der Hauptfluss) beenden die
Expansion am Loop-Knoten der Loop-Knoten selbst ist nie Teil des Rumpfes.
"""
from collections import deque
body = set()
# connectionMap: target -> [(source, sourceOutput, targetInput)]
rev: Dict[str, List[str]] = {} # source -> [targets]
body: Set[str] = set()
rev: Dict[str, List[Tuple[str, int, int]]] = {}
for tgt, pairs in connectionMap.items():
for src, _, _ in pairs:
if src not in rev:
rev[src] = []
rev[src].append(tgt)
q = deque([loopNodeId])
for src, so, ti in pairs:
rev.setdefault(src, []).append((tgt, so, ti))
q: deque = deque()
for tgt, so, ti in rev.get(loopNodeId, []):
if so != 0:
continue
if tgt == loopNodeId:
continue
q.append(tgt)
while q:
nid = q.popleft()
for tgt in rev.get(nid, []):
if nid == loopNodeId:
continue
if nid not in body:
body.add(nid)
for tgt, _so, _ti in rev.get(nid, []):
if tgt == loopNodeId:
continue
if tgt not in body:
body.add(tgt)
q.append(tgt)
return body
def getLoopPrimaryInputSource(
loop_node_id: str,
connectionMap: Dict[str, List[Tuple[str, int, int]]],
body_ids: Set[str],
) -> Optional[Tuple[str, int]]:
"""Pick the inbound edge for ``flow.loop`` when several wires hit the same input (0).
The Schleifen-Rücklauf vom Rumpf und der normale Vorgänger enden auf demselben Port;
für die Datenzusammenführung (Fertig-Ausgang, Logs) zählt der Vorgänger **außerhalb** des Rumpfes.
"""
incoming = connectionMap.get(loop_node_id, [])
candidates = [(src, so) for src, so, ti in incoming if ti == 0]
if not candidates:
return None
outside = [(src, so) for src, so in candidates if src not in body_ids]
if outside:
return outside[0]
return candidates[0]
def getLoopDoneNodeIds(loopNodeId: str, connectionMap: Dict[str, List[Tuple[str, int, int]]]) -> Set[str]:
"""Nodes reachable from flow.loop output port 1 (runs once after all iterations)."""
from collections import deque
done: Set[str] = set()
rev: Dict[str, List[Tuple[str, int, int]]] = {}
for tgt, pairs in connectionMap.items():
for src, so, ti in pairs:
rev.setdefault(src, []).append((tgt, so, ti))
q: deque = deque()
for tgt, so, ti in rev.get(loopNodeId, []):
if so != 1:
continue
if tgt == loopNodeId:
continue
q.append(tgt)
while q:
nid = q.popleft()
if nid == loopNodeId:
continue
if nid not in done:
done.add(nid)
for tgt, _so, _ti in rev.get(nid, []):
if tgt == loopNodeId:
continue
if tgt not in done:
q.append(tgt)
return done
def getInputSources(nodeId: str, connectionMap: Dict[str, List[Tuple[str, int, int]]]) -> Dict[int, Tuple[str, int]]:
"""
For a node, return targetInput -> (sourceNodeId, sourceOutput).
@ -417,7 +484,7 @@ def resolveParameterReferences(value: Any, nodeOutputs: Dict[str, Any]) -> Any:
resolved_parts = [resolveParameterReferences(v, nodeOutputs) for v in value]
if len(resolved_parts) == 1:
return resolved_parts[0]
parts = [serialize_context(p) for p in resolved_parts]
parts = [serialize_context(p, prefer_handover_primary=True) for p in resolved_parts]
return "\n\n".join(p for p in parts if p)
return [resolveParameterReferences(v, nodeOutputs) for v in value]
return value

View file

@ -4,7 +4,7 @@
"""Shared helpers for AI workflow actions."""
import json
from typing import Any
from typing import Any, Optional
def is_image_action_document_list(val: Any) -> bool:
@ -20,24 +20,42 @@ def is_image_action_document_list(val: Any) -> bool:
return True
def serialize_context(val: Any) -> str:
def _handover_response_plain(val: Any) -> Optional[str]:
"""If ``val`` is a dict with a non-empty ``response`` string, return it (BOM-stripped)."""
if not isinstance(val, dict):
return None
r = val.get("response")
if r is None or not str(r).strip():
return None
return str(r).strip().lstrip("\ufeff")
def serialize_context(val: Any, *, prefer_handover_primary: bool = False) -> str:
"""Convert any context value to a readable string for use in AI prompts.
- None / empty string ""
- empty dict (no keys) "" (avoids literal "{}" in file.create / prompts)
- str as-is
- dict / list pretty-printed JSON
- dict / list pretty-printed JSON (unless ``prefer_handover_primary`` and dict has ``response``)
- if JSON encoding fails (cycles, etc.) but dict has ``response``, return that text instead of ``str(dict)``
- anything else str()
"""
if val is None or val == "" or val == []:
return ""
if isinstance(val, dict) and len(val) == 0:
return ""
if prefer_handover_primary:
got = _handover_response_plain(val)
if got is not None:
return got
if isinstance(val, str):
return val.strip()
return val.strip().lstrip("\ufeff")
try:
return json.dumps(val, ensure_ascii=False, indent=2)
return json.dumps(val, ensure_ascii=False, indent=2, default=str)
except Exception:
got = _handover_response_plain(val)
if got is not None:
return got
return str(val)

View file

@ -30,6 +30,38 @@ _UNSAFE_FILE_KEY = re.compile(r"[^\w\-.\(\)\[\]%@+]")
HANDOVER_KIND = "context.extractContent.handover.v1"
_CONTENT_FILTER_OPTIONS = ("all", "textOnly", "imagesOnly", "noImages")
def _apply_content_filter(payload: Dict[str, Any], content_filter: str) -> Dict[str, Any]:
"""Filter parts in the handover payload by content_filter.
Semantics:
- all: keep every part (no-op).
- textOnly: whitelist only typeGroup in (text, table, structure).
- imagesOnly: whitelist only typeGroup == image.
- noImages: blacklist every typeGroup except image (wider than textOnly;
future non-image types are retained).
"""
import copy
if content_filter == "all":
return payload
result = copy.deepcopy(payload)
for bucket in (result.get("files") or {}).values():
if not isinstance(bucket, dict):
continue
parts = bucket.get("parts") or []
if content_filter == "textOnly":
parts = [p for p in parts if isinstance(p, dict) and (p.get("typeGroup") or "") in ("text", "table", "structure")]
elif content_filter == "imagesOnly":
parts = [p for p in parts if isinstance(p, dict) and (p.get("typeGroup") or "") == "image"]
elif content_filter == "noImages":
parts = [p for p in parts if isinstance(p, dict) and (p.get("typeGroup") or "") != "image"]
bucket["parts"] = parts
bucket["byTypeGroup"] = _rebuild_by_type_group(parts)
return result
def _default_extraction_options() -> ExtractionOptions:
"""No merge — keep all parts for downstream JSON selection."""
@ -72,6 +104,19 @@ def _rebuild_by_type_group(parts_ser: List[Dict[str, Any]]) -> Dict[str, List[Di
return by_type
def _part_carries_plain_text(p: dict) -> bool:
"""Whether a serialized extraction part contributes to a flat ``response`` string."""
if not isinstance(p, dict):
return False
tg = (p.get("typeGroup") or "").strip()
if tg in ("text", "table"):
return True
mime = (p.get("mimeType") or "").strip().lower()
if tg == "structure" and mime in ("text/plain", "text/html", "text/markdown"):
return True
return False
def _joined_text_from_handover_payload(payload: Dict[str, Any]) -> str:
"""Concatenate text parts across fileOrder for AiResult-compatible ``response``."""
files_section = payload.get("files") or {}
@ -85,7 +130,7 @@ def _joined_text_from_handover_payload(payload: Dict[str, Any]) -> str:
for p in bucket.get("parts") or []:
if not isinstance(p, dict):
continue
if (p.get("typeGroup") or "").strip() != "text":
if not _part_carries_plain_text(p):
continue
raw = p.get("data")
if raw is None:
@ -314,11 +359,23 @@ async def extractContent(self, parameters: Dict[str, Any]) -> ActionResult:
self.services.chat.progressLogUpdate(operation_id, 0.9, "Building JSON")
content_filter = str(parameters.get("contentFilter") or "all").strip().lower()
if content_filter not in _CONTENT_FILTER_OPTIONS:
content_filter = "all"
payload = _apply_content_filter(payload, content_filter)
stem = f"{wf}_{int(time.time())}"
# Only split image sidecars when the filtered payload can still contain image parts.
if content_filter in ("all", "imagesOnly"):
stripped_payload, media_docs = _split_images_to_sidecar_documents(
payload,
document_name_stem=stem,
)
else:
# textOnly / noImages: no image parts remain → skip the split entirely.
stripped_payload = payload
media_docs = []
joined_text = _joined_text_from_handover_payload(payload)
json_meta = {

View file

@ -2,43 +2,28 @@
# All rights reserved.
"""Action ``context.mergeContext``.
Reads ``_branchInputs`` (injected by ``ActionNodeExecutor`` because the node
declaration sets ``injectBranchInputs: True``) and combines them according to
the selected strategy.
Receives a list of results (e.g. from ``flow.loop`` ``bodyResults``) via the
``dataSource`` DataRef parameter and deep-merges them into a single dict.
The barrier behaviour waiting until every connected predecessor has produced
output is handled by the execution engine via ``waitsForAllPredecessors`` on
the node definition; this action is invoked only after all (or ``waitFor``)
inputs are present.
``dataSource`` must be set explicitly (resolved DataRef). There is no implicit
fallback to ``_upstreamPayload`` or loop payloads.
"""
from __future__ import annotations
import copy
import json
import logging
from typing import Any, Dict, List, Tuple
from typing import Any, Dict, List, Optional
from modules.datamodels.datamodelChat import ActionResult
from modules.workflows.methods.methodContext.actions.extractContent import (
_joined_text_from_handover_payload,
)
logger = logging.getLogger(__name__)
_VALID_STRATEGIES = {"shallow", "deep", "firstWins", "lastWins", "errorOnConflict"}
def _shallow_merge(branches: List[Tuple[int, Any]]) -> Tuple[Dict[str, Any], List[str]]:
merged: Dict[str, Any] = {}
conflicts: List[str] = []
for _, val in branches:
if not isinstance(val, dict):
continue
for k, v in val.items():
if k in merged and merged[k] != v:
conflicts.append(k)
merged[k] = v
return merged, conflicts
def _deep_merge(target: Dict[str, Any], source: Dict[str, Any], conflicts: List[str], path: str = "") -> None:
for k, v in source.items():
full = f"{path}.{k}" if path else k
@ -48,80 +33,202 @@ def _deep_merge(target: Dict[str, Any], source: Dict[str, Any], conflicts: List[
existing = target[k]
if isinstance(existing, dict) and isinstance(v, dict):
_deep_merge(existing, v, conflicts, full)
elif isinstance(existing, list) and isinstance(v, list):
target[k] = existing + v
else:
if existing != v:
conflicts.append(full)
target[k] = copy.deepcopy(v) if isinstance(v, (dict, list)) else v
def _strategy_first_or_last_wins(
branches: List[Tuple[int, Any]], last: bool
) -> Tuple[Dict[str, Any], List[str]]:
iterator = list(reversed(branches)) if not last else list(branches)
merged: Dict[str, Any] = {}
conflicts: List[str] = []
for _, val in iterator:
if not isinstance(val, dict):
def _coerce_to_list(value: Any) -> List[Any]:
"""Normalise ``value`` to a list of items to merge."""
if isinstance(value, list):
return value
if value is None:
return []
return [value]
def _strip_document_data(doc: Any) -> Any:
"""Keep document metadata but drop the raw blob so deep-merge stays small."""
if not isinstance(doc, dict):
return doc
out = dict(doc)
out["documentData"] = None
return out
def _merge_payload(item: Any) -> Optional[Dict[str, Any]]:
"""Return the dict to deep-merge for this item, or ``None`` to skip.
``documents[n].documentData`` is nulled before merging so large blobs
(e.g. ~34 MB handover-JSON per extractContent iteration) don't accumulate.
``imageDocumentsOnly`` is left intact ``_deep_merge`` list-concats it
across iterations, giving downstream nodes all images from all iterations.
"""
if not isinstance(item, dict):
return None
if item.get("success") is False:
return None
out = dict(item)
if isinstance(out.get("documents"), list):
out["documents"] = [_strip_document_data(d) for d in out["documents"]]
return out
def _primary_text_from_item(it: Any) -> str:
"""Same sources as ``actionNodeExecutor`` / ``context.extractContent`` for primary text."""
if not isinstance(it, dict):
return ""
r = it.get("response")
if r is not None and str(r).strip():
return str(r).strip()
inner = it.get("data")
if isinstance(inner, dict):
r = inner.get("response")
if r is not None and str(r).strip():
return str(r).strip()
docs = it.get("documents")
if not isinstance(docs, list) or not docs:
return ""
doc0 = docs[0]
raw: Any = None
if isinstance(doc0, dict):
raw = doc0.get("documentData")
elif hasattr(doc0, "documentData"):
raw = getattr(doc0, "documentData", None)
if isinstance(raw, bytes):
try:
return raw.decode("utf-8").strip()
except (UnicodeDecodeError, ValueError):
return ""
if isinstance(raw, dict):
return (_joined_text_from_handover_payload(raw) or "").strip()
if isinstance(raw, str) and raw.strip():
s = raw.strip()
if s.startswith("{") and s.endswith("}"):
try:
parsed = json.loads(s)
if isinstance(parsed, dict):
return (_joined_text_from_handover_payload(parsed) or "").strip()
except (json.JSONDecodeError, TypeError):
pass
return s
return ""
def _sanitize_heading_title(name: str) -> str:
t = " ".join(name.replace("\r", " ").replace("\n", " ").split()).strip()
return t[:160] if len(t) > 160 else t
def _iteration_heading_from_item(it: Any) -> Optional[str]:
if not isinstance(it, dict):
return None
docs = it.get("documents")
if not isinstance(docs, list) or not docs:
return None
d0 = docs[0]
if not isinstance(d0, dict):
return None
name = d0.get("documentName")
if isinstance(name, str) and name.strip():
return _sanitize_heading_title(name.strip())
return None
def _synthesize_primary_response(merged: Dict[str, Any], inputs: List[Any]) -> str:
"""Flat text for ``ActionResult.response`` / file.create.
Prefer concatenating each input's primary text (loop bodyResults) so no
iteration is dropped ``deep_merge`` overwrites scalar ``response`` with
the last item only; that merged value is a fallback when no per-item text
is found.
When several inputs are merged, prefix each chunk with a markdown ``###``
heading from ``documents[0].documentName`` so ``file.create`` renders clear
sections (CSV vs PDF vs ).
"""
chunks: List[str] = []
multi = len(inputs) > 1
for it in inputs:
t = _primary_text_from_item(it)
if not t:
continue
for k, v in val.items():
if k in merged and merged[k] != v:
conflicts.append(k)
if last or k not in merged:
merged[k] = v
return merged, conflicts
if multi:
h = _iteration_heading_from_item(it)
if h:
chunks.append(f"### {h}\n\n{t}")
continue
chunks.append(t)
if chunks:
return "\n\n".join(chunks)
if isinstance(merged, dict):
r = merged.get("response")
if r is not None and str(r).strip():
return str(r).strip()
if isinstance(merged, dict) and merged:
try:
return json.dumps(merged, ensure_ascii=False, indent=2, default=str)
except Exception:
return str(merged)
return ""
async def mergeContext(self, parameters: Dict[str, Any]) -> ActionResult:
try:
strategy = str(parameters.get("strategy") or "deep")
if strategy not in _VALID_STRATEGIES:
return ActionResult.isFailure(
error=f"Invalid strategy '{strategy}', expected one of {sorted(_VALID_STRATEGIES)}"
)
if "dataSource" not in parameters:
raise ValueError("dataSource is required (set a DataRef on the merge node)")
raw = parameters["dataSource"]
if isinstance(raw, str) and not raw.strip():
raw = None
if raw is None:
return ActionResult.isFailure(error="dataSource ist erforderlich (DataRef auf die Quelle setzen).")
if isinstance(raw, list) and len(raw) == 0:
return ActionResult.isFailure(error="Keine Datenquelle angegeben oder Datenquelle ist leer.")
wait_for = int(parameters.get("waitFor") or 0)
raw_inputs = parameters.get("_branchInputs") or {}
if not isinstance(raw_inputs, dict):
return ActionResult.isFailure(error="No branch inputs available — connect at least two upstream nodes")
items: List[Tuple[int, Any]] = sorted(
((int(k), v) for k, v in raw_inputs.items()),
key=lambda kv: kv[0],
)
if wait_for > 0:
items = items[:wait_for]
items = _coerce_to_list(raw)
if not items:
return ActionResult.isFailure(error="No branch inputs available")
return ActionResult.isFailure(error="Keine Datenquelle angegeben oder Datenquelle ist leer.")
first_value = items[0][1] if items else None
merged: Dict[str, Any] = {}
conflicts: List[str] = []
inputs: List[Any] = []
if strategy == "shallow":
merged, conflicts = _shallow_merge(items)
elif strategy == "firstWins":
merged, conflicts = _strategy_first_or_last_wins(items, last=False)
elif strategy == "lastWins":
merged, conflicts = _strategy_first_or_last_wins(items, last=True)
elif strategy == "errorOnConflict":
merged, conflicts = _shallow_merge(items)
if conflicts:
return ActionResult.isFailure(
error=f"Conflicting keys: {sorted(set(conflicts))}",
for item in items:
if item is None:
continue
inputs.append(item)
payload = _merge_payload(item)
if payload:
_deep_merge(merged, payload, conflicts)
if not inputs:
return ActionResult.isFailure(error="Alle Einträge in der Datenquelle sind leer.")
primary = _synthesize_primary_response(merged, inputs)
merged["response"] = primary
_ps = primary if isinstance(primary, str) else repr(primary)
logger.info(
"mergeContext: inputs=%d merged_keys=%s primary_len=%d primary_preview=%r conflicts=%d",
len(inputs),
list(merged.keys())[:20],
len(_ps or ""),
(_ps[:200] + "") if len(_ps) > 200 else _ps,
len(conflicts),
)
else: # deep (default)
merged = {}
for _, val in items:
if isinstance(val, dict):
_deep_merge(merged, val, conflicts)
data: Dict[str, Any] = {
"inputs": {idx: val for idx, val in items},
"first": first_value,
"merged": merged,
"strategy": strategy,
"inputs": inputs,
"first": inputs[0] if inputs else None,
"count": len(inputs),
"conflicts": sorted(set(conflicts)) if conflicts else [],
"response": primary,
}
return ActionResult.isSuccess(data=data)
except Exception as exc:

View file

@ -151,31 +151,20 @@ class MethodContext(MethodBase):
"mergeContext": WorkflowActionDefinition(
actionId="context.mergeContext",
description=(
"Merge data arriving from multiple parallel branches into a single "
"MergeResult. Strategies: shallow, deep, firstWins, lastWins, "
"errorOnConflict. The execution engine waits for all connected "
"predecessors before invoking this action (waitsForAllPredecessors=True)."
"Führt eine Liste von Schrittergebnissen (z. B. ``bodyResults`` einer "
"``flow.loop``) zu einem zusammengeführten Dict zusammen."
),
outputType="MergeResult",
outputType="ActionResult",
parameters={
"strategy": WorkflowActionParameter(
name="strategy", type="str", required=False,
frontendType=FrontendType.SELECT,
frontendOptions=["shallow", "deep", "firstWins", "lastWins", "errorOnConflict"],
default="deep",
description="Conflict resolution strategy for keys present in several branches",
"dataSource": WorkflowActionParameter(
name="dataSource",
type="Any",
frontendType=FrontendType.CONTEXT_BUILDER,
required=False,
description=(
"Datenquelle (DataRef), meist Schleife → Alle Schleifen-Ergebnisse. "
"Optional wenn der Knoten per Kabel am Schleifen-„Fertig“-Ausgang hängt."
),
"waitFor": WorkflowActionParameter(
name="waitFor", type="int", required=False,
frontendType=FrontendType.NUMBER,
default=0,
description="Number of branches to consume (0 = all). Used together with timeoutMs.",
),
"timeoutMs": WorkflowActionParameter(
name="timeoutMs", type="int", required=False,
frontendType=FrontendType.NUMBER,
default=30000,
description="Maximum wait time in milliseconds before continuing with available inputs",
),
},
execute=mergeContext.__get__(self, self.__class__),

View file

@ -3,6 +3,7 @@
from typing import Any, Dict, List, Optional
import asyncio
import base64
import binascii
import io
@ -11,7 +12,10 @@ import logging
import re
from modules.datamodels.datamodelChat import ActionResult, ActionDocument
from modules.serviceCenter.services.serviceGeneration.subDocumentUtility import markdownToDocumentJson
from modules.serviceCenter.services.serviceGeneration.subDocumentUtility import (
enhancePlainTextWithMarkdownTables,
markdownToDocumentJson,
)
from modules.shared.i18nRegistry import normalizePrimaryLanguageTag
from modules.workflows.automation2.executors.actionNodeExecutor import _coerce_document_data_to_bytes
from modules.workflows.methods.methodAi._common import is_image_action_document_list, serialize_context
@ -21,6 +25,78 @@ logger = logging.getLogger(__name__)
_SAFE_FILENAME = re.compile(r'[^\w\-.\(\)\s\[\]%@+]')
_HEAVY_CONTEXT_KEYS = frozenset({"imageDocumentsOnly", "documents", "inputs"})
def _collect_image_documents_only(raw: Any) -> List[Any]:
"""Resolve ``imageDocumentsOnly`` whether the context is merged, nested, or surfaced."""
if not isinstance(raw, dict):
return []
paths = (
("imageDocumentsOnly",),
("merged", "imageDocumentsOnly"),
("data", "merged", "imageDocumentsOnly"),
("data", "imageDocumentsOnly"),
)
for path in paths:
cur: Any = raw
ok = True
for p in path:
if not isinstance(cur, dict):
ok = False
break
cur = cur.get(p)
if ok and isinstance(cur, list) and cur:
return cur
return []
def _context_string_for_report(raw: Any, output_format: str) -> str:
"""Build one narrative string for ``markdownToDocumentJson`` / render.
Prefer plain ``response`` text (merge node surfaces it; nested ``merged.response``
too). Never dump ``inputs`` / binary lists into the PDF body that produced giant
JSON + base64 "hash" paragraphs after merge + ``contextBuilder``.
"""
of = (output_format or "docx").strip().lower().lstrip(".")
if of == "json":
return serialize_context(raw, prefer_handover_primary=False)
if isinstance(raw, str):
return raw.strip().lstrip("\ufeff")
if isinstance(raw, dict):
for path in (
("response",),
("merged", "response"),
("data", "response"),
("data", "merged", "response"),
):
cur: Any = raw
ok = True
for k in path:
if not isinstance(cur, dict):
ok = False
break
cur = cur.get(k)
if ok and cur is not None and str(cur).strip():
return str(cur).strip().lstrip("\ufeff")
lean = {k: v for k, v in raw.items() if k not in _HEAVY_CONTEXT_KEYS}
try:
return json.dumps(lean, ensure_ascii=False, indent=2, default=str)
except Exception:
return serialize_context(lean, prefer_handover_primary=False)
return serialize_context(raw, prefer_handover_primary=False)
def _raw_context_preview_for_log(raw: Any, max_len: int = 500) -> str:
if raw is None:
return "None"
s = raw if isinstance(raw, str) else repr(raw)
s = s.replace("\r", "\\r").replace("\n", "\\n")
if len(s) <= max_len:
return s
return s[:max_len] + f"...<{len(s)} chars>"
def _persistDocumentsToUserFiles(
action_documents: list,
services,
@ -139,6 +215,98 @@ def _load_image_bytes_from_action_doc(doc: dict, services) -> Optional[bytes]:
return None
# Images larger than this threshold (decoded bytes) are resized before embedding
# to avoid multi-minute PDF rendering of high-res raster scans.
_MAX_IMAGE_EMBED_BYTES = 300_000 # 300 KB decoded ≈ ~400 KB base64
_IMAGE_MAX_DIMENSION = 1200 # longest edge in pixels after resize
def _resize_image_for_document(image_bytes: bytes) -> bytes:
"""Resize image to at most ``_IMAGE_MAX_DIMENSION`` px on the longest edge
and re-encode as JPEG. Falls back to the original bytes on any error."""
try:
from PIL import Image as PILImage
import io as _io
img = PILImage.open(_io.BytesIO(image_bytes))
# Flatten transparency / palette modes to RGB (required for JPEG)
if img.mode in ("RGBA", "LA"):
bg = PILImage.new("RGB", img.size, (255, 255, 255))
bg.paste(img, mask=img.split()[-1])
img = bg
elif img.mode == "P":
img = img.convert("RGBA")
bg = PILImage.new("RGB", img.size, (255, 255, 255))
bg.paste(img, mask=img.split()[-1])
img = bg
elif img.mode != "RGB":
img = img.convert("RGB")
w, h = img.size
if max(w, h) > _IMAGE_MAX_DIMENSION:
# thumbnail() is optimised for downscaling: it uses an intermediate
# box-filter step before the final filter, making it 3-5× faster
# than resize() on large images. BILINEAR is fast and sufficient
# for document thumbnails.
img.thumbnail((_IMAGE_MAX_DIMENSION, _IMAGE_MAX_DIMENSION), PILImage.BILINEAR)
out = _io.BytesIO()
img.save(out, format="JPEG", quality=85, optimize=True)
return out.getvalue()
except Exception as e:
logger.warning("file.create: image resize failed (%s) — using original bytes", e)
return image_bytes
def _append_images_to_content(structured_content: dict, image_docs: list, services=None) -> dict:
"""Append images from imageDocumentsOnly as native image elements to the structured JSON.
Each image becomes an ``image`` element with ``base64Data`` in a trailing
"Bilder" section of the first document. Images larger than
``_MAX_IMAGE_EMBED_BYTES`` are automatically resized/compressed so the
synchronous PDF renderer does not block for minutes on high-res scans.
The renderers (DOCX / PDF) handle ``content.base64Data`` natively.
"""
elements = []
for doc in image_docs:
b = _load_image_bytes_from_action_doc(doc, services)
if not b:
raw = doc.get("documentData") if isinstance(doc, dict) else None
if isinstance(raw, str):
try:
b = base64.b64decode(raw)
except Exception:
pass
if not b:
continue
if len(b) > _MAX_IMAGE_EMBED_BYTES:
logger.info(
"file.create: image %s is %d bytes — resizing to max %dpx for embedding",
(doc.get("documentName") if isinstance(doc, dict) else "?") or "?",
len(b),
_IMAGE_MAX_DIMENSION,
)
b = _resize_image_for_document(b)
elements.append({
"type": "image",
"content": {
"base64Data": base64.b64encode(b).decode("ascii"),
"alt": (doc.get("documentName") if isinstance(doc, dict) else None) or "image",
},
})
if not elements:
return structured_content
docs = structured_content.get("documents")
if isinstance(docs, list) and docs:
docs[0].setdefault("sections", []).append({"heading": "Bilder", "elements": elements})
return structured_content
def _images_list_to_pdf(image_bytes_list: List[bytes]) -> bytes:
"""One PDF page per image; embedded raster data via PyMuPDF."""
import fitz
@ -239,21 +407,24 @@ async def create(self, parameters: Dict[str, Any]) -> ActionResult:
Create a file from context (text/markdown from upstream AI node).
Uses GenerationService.renderReport to produce docx, pdf, txt, md, html, xlsx, etc.
"""
raw_context = parameters.get("context", "") or parameters.get("text", "") or ""
raw_context = parameters.get("context", "")
if isinstance(raw_context, list) and is_image_action_document_list(raw_context):
return await _create_merged_image_documents(self, parameters, raw_context)
context = serialize_context(raw_context)
outputFormat = (parameters.get("outputFormat") or "docx").strip().lower().lstrip(".")
context = _context_string_for_report(raw_context, outputFormat)
if not context:
logger.warning(
"file.create: context empty after resolve — check DataRefs (e.g. Antworttext / "
"documents[0].documentData from the AI step)."
"file.create: context empty after resolve — raw_context type=%s raw_summary=%r "
"serialized_len=%s (check ActionNodeExecutor \"file.create context resolution\" log for DataRef / upstream).",
type(raw_context).__name__,
_raw_context_preview_for_log(raw_context),
len(context or ""),
)
return ActionResult.isFailure(error="context is required (connect an AI node or provide text)")
outputFormat = (parameters.get("outputFormat") or "docx").strip().lower().lstrip(".")
title = (parameters.get("title") or "Document").strip()
templateName = parameters.get("templateName")
language = normalizePrimaryLanguageTag(
@ -267,10 +438,26 @@ async def create(self, parameters: Dict[str, Any]) -> ActionResult:
folder_id = str(raw_folder).strip()
try:
if outputFormat != "json":
context = enhancePlainTextWithMarkdownTables(context)
structured_content = markdownToDocumentJson(context, title, language)
if templateName:
structured_content.setdefault("metadata", {})["templateName"] = templateName
img_docs = _collect_image_documents_only(raw_context)
if img_docs:
# Image decoding and PIL resizing are CPU-bound; run them in a
# thread pool so the event loop is not blocked while processing
# high-res raster images (e.g. 3+ MB PNGs from PDF extraction).
loop = asyncio.get_event_loop()
structured_content = await loop.run_in_executor(
None,
_append_images_to_content,
structured_content,
img_docs,
self.services,
)
generation = getattr(self.services, "generation", None)
if not generation:
return ActionResult.isFailure(error="Generation service not available")

View file

@ -28,15 +28,14 @@ async def test_execute_graph_loop_and_aggregate_collects_items():
"type": "flow.loop",
"parameters": {
"items": {"type": "ref", "nodeId": "t1", "path": ["payload", "items"]},
"level": "auto",
"concurrency": 1,
},
},
{"id": "agg1", "type": "data.aggregate", "parameters": {"mode": "collect"}},
],
"connections": [
{"source": "t1", "target": "loop1"},
{"source": "loop1", "target": "agg1"},
{"source": "t1", "target": "loop1", "targetInput": 0},
{"source": "loop1", "target": "agg1", "sourceOutput": 0, "targetInput": 0},
],
}
run_envelope = default_run_envelope(
@ -72,15 +71,14 @@ async def test_data_consolidate_after_aggregate_same_context_as_post_loop():
"type": "flow.loop",
"parameters": {
"items": {"type": "ref", "nodeId": "t1", "path": ["payload", "items"]},
"level": "auto",
"concurrency": 1,
},
},
{"id": "agg1", "type": "data.aggregate", "parameters": {"mode": "collect"}},
],
"connections": [
{"source": "t1", "target": "loop1"},
{"source": "loop1", "target": "agg1"},
{"source": "t1", "target": "loop1", "targetInput": 0},
{"source": "loop1", "target": "agg1", "sourceOutput": 0, "targetInput": 0},
],
}
run_envelope = default_run_envelope(
@ -121,3 +119,43 @@ async def test_data_consolidate_after_aggregate_same_context_as_post_loop():
assert len(result["rows"]) == 2
assert result["rows"][0].get("currentItem", {}).get("a") == 1
assert result["rows"][1].get("currentItem", {}).get("b") == "y"
@pytest.mark.asyncio
async def test_loop_done_branch_runs_once_after_body():
"""Loop output 1 (Fertig) runs downstream once; body stays on output 0."""
graph = {
"nodes": [
{"id": "t1", "type": "trigger.manual", "parameters": {}},
{
"id": "loop1",
"type": "flow.loop",
"parameters": {
"items": {"type": "ref", "nodeId": "t1", "path": ["payload", "items"]},
"concurrency": 1,
},
},
{"id": "agg1", "type": "data.aggregate", "parameters": {"mode": "collect"}},
{"id": "c1", "type": "data.consolidate", "parameters": {"mode": "table"}},
],
"connections": [
{"source": "t1", "target": "loop1", "targetInput": 0},
{"source": "loop1", "target": "agg1", "sourceOutput": 0, "targetInput": 0},
{"source": "loop1", "target": "c1", "sourceOutput": 1, "targetInput": 0},
],
}
run_envelope = default_run_envelope(
"manual",
payload={"items": [{"a": 1}, {"a": 2}]},
)
res = await executeGraph(
graph,
services=_minimal_services(),
run_envelope=run_envelope,
userId="test-user",
)
assert res.get("success") is True, res
out = res["nodeOutputs"]
assert out["agg1"]["count"] == 2
assert out["c1"]["count"] == 2
assert out["c1"]["mode"] == "table"

View file

@ -2,12 +2,17 @@
import base64
from modules.workflows.methods.methodContext.actions import extractContent as ec
from modules.workflows.methods.methodContext.actions.extractContent import (
HANDOVER_KIND,
_apply_content_filter,
_joined_text_from_handover_payload,
_split_images_to_sidecar_documents,
)
def test_joined_text_from_handover_orders_text_parts_only():
def test_joined_text_orders_text_table_and_skips_container():
payload = {
"kind": ec.HANDOVER_KIND,
"kind": HANDOVER_KIND,
"fileOrder": ["f1"],
"files": {
"f1": {
@ -19,14 +24,28 @@ def test_joined_text_from_handover_orders_text_parts_only():
}
},
}
assert ec._joined_text_from_handover_payload(payload) == "A\n\nB"
assert _joined_text_from_handover_payload(payload) == "A\n\nB"
def test_joined_text_includes_csv_table_parts():
payload = {
"fileOrder": ["f1"],
"files": {
"f1": {
"parts": [
{"typeGroup": "table", "mimeType": "text/csv", "data": "a,b\n1,2", "id": "t"},
]
}
},
}
assert _joined_text_from_handover_payload(payload) == "a,b\n1,2"
def test_split_images_moves_pixels_to_blob_docs():
raw = b"fake-binary-image"
b64 = base64.b64encode(raw).decode("ascii")
payload = {
"kind": ec.HANDOVER_KIND,
"kind": HANDOVER_KIND,
"schemaVersion": 1,
"fileOrder": ["f1"],
"files": {
@ -44,7 +63,7 @@ def test_split_images_moves_pixels_to_blob_docs():
}
},
}
stripped, blobs = ec._split_images_to_sidecar_documents(payload, document_name_stem="abc")
stripped, blobs = _split_images_to_sidecar_documents(payload, document_name_stem="abc")
assert len(blobs) == 1
assert blobs[0].mimeType == "image/png"
assert blobs[0].documentData == raw
@ -61,3 +80,65 @@ def test_split_images_moves_pixels_to_blob_docs():
assert img_parts[0]["data"] == ""
assert img_parts[0]["handoverMediaDocumentName"] == blobs[0].documentName
assert "image" in stripped["files"]["f1"]["byTypeGroup"]
def _mixed_payload():
return {
"kind": HANDOVER_KIND,
"schemaVersion": 1,
"fileOrder": ["f1"],
"files": {
"f1": {
"parts": [
{"typeGroup": "text", "data": "hello", "id": "t1"},
{"typeGroup": "table", "mimeType": "text/csv", "data": "a,b", "id": "tb1"},
{"typeGroup": "image", "mimeType": "image/png", "data": "abc=", "id": "i1"},
{"typeGroup": "structure", "mimeType": "text/html", "data": "<p/>", "id": "s1"},
],
}
},
}
def test_content_filter_all_is_noop():
payload = _mixed_payload()
result = _apply_content_filter(payload, "all")
assert result is payload # same object, no copy
def test_content_filter_text_only_keeps_text_table_structure():
result = _apply_content_filter(_mixed_payload(), "textOnly")
parts = result["files"]["f1"]["parts"]
type_groups = {p["typeGroup"] for p in parts}
assert type_groups == {"text", "table", "structure"}
assert "image" not in type_groups
def test_content_filter_images_only():
result = _apply_content_filter(_mixed_payload(), "imagesOnly")
parts = result["files"]["f1"]["parts"]
assert all(p["typeGroup"] == "image" for p in parts)
assert len(parts) == 1
def test_content_filter_no_images_removes_only_images():
result = _apply_content_filter(_mixed_payload(), "noImages")
parts = result["files"]["f1"]["parts"]
type_groups = {p["typeGroup"] for p in parts}
assert "image" not in type_groups
# text, table, structure all remain
assert {"text", "table", "structure"} == type_groups
def test_content_filter_text_only_joined_text_has_no_image_data():
result = _apply_content_filter(_mixed_payload(), "textOnly")
text = _joined_text_from_handover_payload(result)
assert "hello" in text
assert "abc=" not in text # base64 image data must not appear
def test_content_filter_text_only_no_sidecars():
"""textOnly: no image parts → _split produces zero sidecars."""
result = _apply_content_filter(_mixed_payload(), "textOnly")
stripped, blobs = _split_images_to_sidecar_documents(result, document_name_stem="test")
assert blobs == []

View file

@ -0,0 +1,178 @@
# Unit tests: context.mergeContext primary text from extract handover (documents[0]).
import json
import pytest
from modules.workflows.methods.methodContext.actions.extractContent import HANDOVER_KIND
from modules.workflows.methods.methodContext.actions.mergeContext import mergeContext
def _handover(text: str) -> dict:
return {
"kind": HANDOVER_KIND,
"fileOrder": ["f1"],
"files": {
"f1": {
"parts": [
{"typeGroup": "text", "data": text, "id": "t1"},
]
}
},
}
@pytest.mark.asyncio
async def test_mergeContext_requires_dataSource():
result = await mergeContext(object(), {})
assert not result.success
err = result.error or ""
assert "dataSource" in err or "erforderlich" in err.lower()
@pytest.mark.asyncio
async def test_mergeContext_handover_only_in_documents_yields_data_response():
item = {
"success": True,
"data": {},
"documents": [
{
"documentName": "handover.json",
"mimeType": "application/json",
"documentData": _handover("only-from-handover"),
}
],
}
result = await mergeContext(object(), {"dataSource": [item]})
assert result.success
assert result.data
assert result.data.get("response") == "only-from-handover"
@pytest.mark.asyncio
async def test_mergeContext_handover_json_string_in_documentData():
payload = _handover("from-json-string")
item = {
"success": True,
"data": {},
"documents": [
{
"documentName": "handover.json",
"mimeType": "application/json",
"documentData": json.dumps(payload),
}
],
}
result = await mergeContext(object(), {"dataSource": [item]})
assert result.success
assert result.data.get("response") == "from-json-string"
@pytest.mark.asyncio
async def test_mergeContext_joins_multiple_handover_items():
items = [
{
"success": True,
"data": {},
"documents": [{"documentData": _handover("alpha"), "documentName": "a.json"}],
},
{
"success": True,
"data": {},
"documents": [{"documentData": _handover("beta"), "documentName": "b.json"}],
},
]
result = await mergeContext(object(), {"dataSource": items})
assert result.success
assert result.data.get("response") == "### a.json\n\nalpha\n\n### b.json\n\nbeta"
@pytest.mark.asyncio
async def test_mergeContext_merged_response_wins_over_handover_chunks():
items = [
{
"success": True,
"data": {"response": "merged-wins"},
"documents": [{"documentData": _handover("ignored"), "documentName": "a.json"}],
},
]
result = await mergeContext(object(), {"dataSource": items})
assert result.success
assert result.data.get("response") == "merged-wins"
@pytest.mark.asyncio
async def test_mergeContext_concatenates_each_iteration_data_response_not_only_last():
"""deep_merge overwrites ``response``; synthesis must still include every loop body result."""
items = [
{"success": True, "data": {"response": "chunk-aaa"}},
{"success": True, "data": {"response": "chunk-bbb"}},
{"success": True, "data": {"response": "chunk-ccc"}},
]
result = await mergeContext(object(), {"dataSource": items})
assert result.success
r = result.data.get("response") or ""
assert "chunk-aaa" in r
assert "chunk-bbb" in r
assert "chunk-ccc" in r
assert r == "chunk-aaa\n\nchunk-bbb\n\nchunk-ccc"
assert result.data["merged"]["response"] == r
@pytest.mark.asyncio
async def test_mergeContext_primary_serializes_as_plain_text_for_file_create():
from modules.workflows.methods.methodAi._common import serialize_context
items = [
{"success": True, "data": {"response": "section-one"}},
{"success": True, "data": {"response": "section-two"}},
]
result = await mergeContext(object(), {"dataSource": items})
primary = result.data.get("response")
assert isinstance(primary, str)
assert serialize_context(primary) == primary
@pytest.mark.asyncio
async def test_mergeContext_strips_document_data_from_merged_documents():
"""documentData must be None in merged.documents — blobs must not accumulate."""
big_blob = "x" * 100_000
items = [
{
"success": True,
"data": {"response": "a"},
"documents": [
{"documentName": "a.json", "mimeType": "application/json", "documentData": big_blob},
],
},
{
"success": True,
"data": {"response": "b"},
"documents": [
{"documentName": "b.json", "mimeType": "application/json", "documentData": big_blob},
],
},
]
result = await mergeContext(object(), {"dataSource": items})
assert result.success
merged_docs = result.data["merged"].get("documents") or []
assert len(merged_docs) >= 1
for doc in merged_docs:
assert doc.get("documentData") is None, "documentData must be stripped before deep-merge"
@pytest.mark.asyncio
async def test_mergeContext_accumulates_image_documents_only_across_iterations():
"""imageDocumentsOnly from every iteration must be list-concat in merged."""
img_a = {"documentName": "img_a.png", "mimeType": "image/png", "documentData": "aaa="}
img_b = {"documentName": "img_b.png", "mimeType": "image/png", "documentData": "bbb="}
items = [
{"success": True, "data": {"response": "a"}, "imageDocumentsOnly": [img_a]},
{"success": True, "data": {"response": "b"}, "imageDocumentsOnly": [img_b]},
]
result = await mergeContext(object(), {"dataSource": items})
assert result.success
imgs = result.data["merged"].get("imageDocumentsOnly") or []
names = [d.get("documentName") for d in imgs]
assert "img_a.png" in names
assert "img_b.png" in names

View file

@ -25,7 +25,7 @@ def test_context_extractContent_node_shape():
assert "DocumentList" in node["inputPorts"][0]["accepts"]
assert "LoopItem" in node["inputPorts"][0]["accepts"]
names = [p["name"] for p in node["parameters"]]
assert names == ["documentList"]
assert names == ["documentList", "contentFilter"]
def test_udm_port_types_registered():
@ -65,3 +65,9 @@ def test_getExecutor_dispatches_context():
from modules.workflows.automation2.executors import ActionNodeExecutor
executor = _getExecutor("context.extractContent", None)
assert isinstance(executor, ActionNodeExecutor)
def test_context_mergeContext_surfaces_data_pick_paths_match_node_outputs():
"""DataPicker uses paths like ``merged``; executor must surface ``data.*`` to top level."""
node = next(n for n in STATIC_NODE_TYPES if n["id"] == "context.mergeContext")
assert node.get("surfaceDataAsTopLevel") is True

View file

@ -21,16 +21,19 @@ class TestNodeDefinitions:
assert node["_action"] == "consolidate"
assert node["outputPorts"][0]["schema"] == "ConsolidateResult"
def test_flow_loop_has_level_and_concurrency(self):
def test_flow_loop_has_iteration_mode_and_two_outputs(self):
node = next(n for n in STATIC_NODE_TYPES if n["id"] == "flow.loop")
paramNames = [p["name"] for p in node["parameters"]]
assert "level" in paramNames
assert "iterationMode" in paramNames
assert "iterationStride" in paramNames
assert "concurrency" in paramNames
levelParam = next(p for p in node["parameters"] if p["name"] == "level")
assert "structuralNodes" in levelParam["frontendOptions"]["options"]
assert "contentBlocks" in levelParam["frontendOptions"]["options"]
assert "level" not in paramNames
modeParam = next(p for p in node["parameters"] if p["name"] == "iterationMode")
assert "every_nth" in modeParam["frontendOptions"]["options"]
concParam = next(p for p in node["parameters"] if p["name"] == "concurrency")
assert concParam["default"] == 1
assert node["inputs"] == 1
assert node["outputs"] == 2
def test_flow_loop_accepts_udm(self):
node = next(n for n in STATIC_NODE_TYPES if n["id"] == "flow.loop")
@ -146,13 +149,27 @@ class TestFlowLoopUdmLevel:
ex = FlowExecutor()
udm = {"id": "d1", "role": "document", "children": [{"id": "p1"}, {"id": "p2"}]}
node = {"type": "flow.loop", "id": "loop1",
"parameters": {"items": "direct", "level": "auto"}}
"parameters": {"items": "direct"}}
ctx = {"nodeOutputs": {"loop1": udm, "direct": udm}, "connectionMap": {}, "inputSources": {"loop1": {0: ("direct", 0)}}}
from unittest.mock import patch
with patch("modules.workflows.automation2.graphUtils.resolveParameterReferences", return_value=udm):
result = await ex.execute(node, ctx)
assert result["count"] == 2
@pytest.mark.asyncio
async def test_loop_every_nth_stride(self):
from modules.workflows.automation2.executors.flowExecutor import FlowExecutor
ex = FlowExecutor()
node = {"type": "flow.loop", "id": "loop1", "parameters": {
"items": {"type": "value", "value": [10, 20, 30, 40, 50]},
"iterationMode": "every_nth",
"iterationStride": 2,
}}
ctx = {"nodeOutputs": {}, "connectionMap": {}, "inputSources": {"loop1": {}}}
result = await ex.execute(node, ctx)
assert result["count"] == 3
assert result["items"] == [10, 30, 50]
@pytest.mark.asyncio
class TestDataFilterUdm:

View file

@ -0,0 +1,98 @@
# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
import json
from modules.workflows.methods.methodAi._common import serialize_context
from modules.serviceCenter.services.serviceGeneration.subDocumentUtility import (
enhancePlainTextWithMarkdownTables,
markdownToDocumentJson,
)
from modules.workflows.methods.methodFile.actions.create import (
_collect_image_documents_only,
_context_string_for_report,
)
def test_serialize_context_nonserializable_embeds_via_default_str():
class _Ns:
def __str__(self):
return "ns"
s = serialize_context({"x": _Ns(), "n": 1})
parsed = json.loads(s)
assert parsed["n"] == 1
assert "ns" in parsed["x"]
def test_serialize_context_strips_bom_on_plain_string():
assert serialize_context("\ufeffhello") == "hello"
def test_context_string_docx_prefers_response_over_full_dict():
body = "Datum;Mandant\n2026-01-01;acme"
ctx = {"response": "\ufeff" + body, "data": {"foo": 1}}
assert _context_string_for_report(ctx, "docx") == body
def test_context_string_json_serializes_full_structure():
ctx = {"response": "hi", "data": {"foo": 1}}
out = _context_string_for_report(ctx, "json")
assert json.loads(out)["data"]["foo"] == 1
def test_serialize_context_prefers_response_when_json_fails():
d: dict = {"response": "plain", "n": 1}
d["_loop"] = d # circular — json.dumps fails
assert serialize_context(d).strip() == "plain"
def test_serialize_context_prefer_handover_primary_skips_metadata():
blob = {"response": "LINE", "data": {"nested": {"x" * 200}}, "extra": {"y": 2}}
s = serialize_context(blob, prefer_handover_primary=True)
assert s == "LINE"
def test_context_string_plain_str_passthrough_docx():
assert _context_string_for_report(" hello ", "docx") == "hello"
def test_collect_image_documents_nested_paths():
imgs = [{"documentName": "m.png", "mimeType": "image/png"}]
assert _collect_image_documents_only({"merged": {"imageDocumentsOnly": imgs}}) == imgs
assert _collect_image_documents_only({"data": {"merged": {"imageDocumentsOnly": imgs}}}) == imgs
def test_context_string_prefers_merged_response_over_inputs_noise():
raw = {"merged": {"response": "from-merged"}, "inputs": {"0": {"documentData": "X" * 10000}}}
assert _context_string_for_report(raw, "docx") == "from-merged"
def test_context_string_fallback_json_strips_heavy_keys():
raw = {"foo": 1, "inputs": {"nasty": True}, "imageDocumentsOnly": [{"documentName": "x"}]}
out = _context_string_for_report(raw, "docx")
parsed = json.loads(out)
assert "inputs" not in parsed
assert "imageDocumentsOnly" not in parsed
assert parsed["foo"] == 1
def test_enhance_plain_csv_semicolon_to_markdown_table():
body = "Datum;Betrag\n2026-01-01;12.50\n2026-01-02;3.00"
out = enhancePlainTextWithMarkdownTables(body)
assert "| Datum |" in out
assert "| Betrag |" in out
assert "---" in out
def test_enhance_preserves_normal_paragraphs():
body = "Ein Absatz ohne Raster.\n\nZweiter Gedanke."
assert enhancePlainTextWithMarkdownTables(body) == body
def test_enhance_then_markdown_json_contains_table_section():
body = "Datum;Betrag\n2026-01-01;12\n2026-01-02;3"
enhanced = enhancePlainTextWithMarkdownTables(body)
doc = markdownToDocumentJson(enhanced, "Report", "de")
sections = doc["documents"][0]["sections"]
assert any(s.get("content_type") == "table" for s in sections)

View file

@ -175,3 +175,37 @@ class TestPathContainsWildcard:
def test_literal_star_in_int_segment_does_not_match(self):
from modules.workflows.automation2.graphUtils import _pathContainsWildcard
assert _pathContainsWildcard([1, 2, 3]) is False
class TestLoopBodyAndDoneReachability:
"""flow.loop: body only from output 0; done branch from output 1 (engine helpers)."""
def test_body_only_output_0_not_done_chain(self):
from modules.workflows.automation2.graphUtils import buildConnectionMap, getLoopBodyNodeIds, getLoopDoneNodeIds
conns = [
{"source": "tr", "target": "loop", "targetInput": 0},
{"source": "loop", "target": "a", "sourceOutput": 0, "targetInput": 0},
{"source": "loop", "target": "d", "sourceOutput": 1, "targetInput": 0},
{"source": "a", "target": "b"},
]
cm = buildConnectionMap(conns)
assert getLoopBodyNodeIds("loop", cm) == {"a", "b"}
assert getLoopDoneNodeIds("loop", cm) == {"d"}
def test_primary_input_prefers_outside_body(self):
from modules.workflows.automation2.graphUtils import (
buildConnectionMap,
getLoopBodyNodeIds,
getLoopPrimaryInputSource,
)
conns = [
{"source": "tr", "target": "loop", "targetInput": 0},
{"source": "a", "target": "loop", "targetInput": 0},
{"source": "loop", "target": "a", "sourceOutput": 0, "targetInput": 0},
]
cm = buildConnectionMap(conns)
body = getLoopBodyNodeIds("loop", cm)
assert body == {"a"}
assert getLoopPrimaryInputSource("loop", cm, body) == ("tr", 0)