fix: handover nochmal zentralisiert

This commit is contained in:
Ida 2026-05-14 16:41:43 +02:00
parent 8eb094be16
commit 64b58802a4
29 changed files with 2439 additions and 715 deletions

View file

@ -6,6 +6,9 @@ from modules.shared.i18nRegistry import t
from modules.features.graphicalEditor.nodeDefinitions.contextPickerHelp import (
CONTEXT_BUILDER_PARAM_DESCRIPTION,
)
from modules.features.graphicalEditor.nodeDefinitions.flow import (
CONTEXT_ENVELOPE_DATA_PICK_OPTIONS,
)
# Shared authoritative DataPicker paths (same handover idea as ``context.extractContent`` outputPorts).
ACTION_RESULT_DATA_PICK_OPTIONS = [
@ -43,6 +46,7 @@ ACTION_RESULT_DATA_PICK_OPTIONS = [
]
AI_RESULT_DATA_PICK_OPTIONS = [
*CONTEXT_ENVELOPE_DATA_PICK_OPTIONS,
{
"path": ["documents", 0, "documentData"],
"pickerLabel": t("Gesamter Inhalt"),
@ -50,14 +54,14 @@ AI_RESULT_DATA_PICK_OPTIONS = [
"Hauptausgabedatei oder strukturierter Inhalt von ``documents[0]`` "
"(z. B. erzeugtes Dokument, JSON-Handover)."
),
"recommended": True,
"recommended": False,
"type": "Any",
},
{
"path": ["response"],
"pickerLabel": t("Nur Text"),
"detail": t("Modell-Antwort als reiner Fließtext (ohne eingebettete Bildbytes)."),
"recommended": True,
"recommended": False,
"type": "str",
},
{

View file

@ -4,7 +4,10 @@
from modules.shared.i18nRegistry import t
from modules.features.graphicalEditor.nodeDefinitions.flow import CONTEXT_MERGE_ACTION_RESULT_DATA_PICK_OPTIONS
from modules.features.graphicalEditor.nodeDefinitions.flow import (
CONTEXT_ENVELOPE_DATA_PICK_OPTIONS,
CONTEXT_MERGE_ACTION_RESULT_DATA_PICK_OPTIONS,
)
_CONTEXT_INPUT_SCHEMAS = [
"Transit",
@ -27,11 +30,12 @@ CONTEXT_NODES = [
"category": "context",
"label": t("Inhalt extrahieren"),
"description": t(
"Extrahiert Inhalt ohne KI. Ergebnis einheitlich wie KI-Schritte: `response` "
"(gesammelter Klartext), strukturierte JSON-Unterlage in `documents[0]`, "
"einzelne Bilder als eigene Dokumente `extract_media_*` (nur im Workflow, ohne Eintrag unter „Meine Dateien“) — "
"Auswahl im Daten-Picker wie bei `ai.process`."
"Extrahiert Inhalt ohne KI. ``data`` ist die gewählte **Presentation** (`fileOrder`, `files` je "
"Quelldatei, kanonisches `data` pro Bucket) plus ``_meta`` (Quellnamen, Operation, Persist). "
"``response`` für diesen Knoten bleibt leer — kein zusätzlicher Fließtext. "
"``imageDocumentsOnly`` enthält Bilder über persistierte Artefakte."
),
"injectRunContext": True,
"parameters": [
{"name": "documentList", "type": "str", "required": True, "frontendType": "hidden",
"description": t("Dokumentenliste (via Wire oder DataRef)"), "default": "",
@ -51,7 +55,7 @@ CONTEXT_NODES = [
},
"default": "all",
"description": t(
"Welche Parts im Handover behalten werden. "
"Welche extrahierten Parts weiterverwendet werden. "
"all = alle Typgruppen inkl. Bilder; "
"textOnly = ausschliesslich Text-, Tabellen- und Struktur-Parts; "
"imagesOnly = ausschliesslich Bild-Parts; "
@ -75,8 +79,7 @@ CONTEXT_NODES = [
},
"default": "lines",
"description": t(
"Wie die extrahierten Inhalte unter ``presentation`` strukturiert werden "
"(zusaetzlich zu den unveraenderten ``parts`` im Handover)."
"Wie das Ergebnis unter ``files`` strukturiert wird (``outputMode``: blob, lines, …)."
),
},
{
@ -238,10 +241,11 @@ CONTEXT_NODES = [
{"value": "all", "label": t("PDF/Parts: alle Typgruppen")},
]
},
"default": "text",
"default": "all",
"description": t(
"Filtert fuer die Presentation-Schicht nach typeGroup/MIME "
"(gilt fuer alle Dokumenttypen analog, nicht nur PDF)."
"(gilt fuer alle Dokumenttypen analog, nicht nur PDF). "
"Passt zum Inhaltsfilter „Alles“; „Text & Tabellen“ blendet Bild-Parts in der Presentation aus."
),
},
{
@ -271,51 +275,40 @@ CONTEXT_NODES = [
# Frontend uses only this list — no schema expansion merge for this port.
"dataPickOptions": [
{
"path": ["documents", 0, "documentData"],
"pickerLabel": t("Gesamter Inhalt"),
"path": ["data"],
"pickerLabel": t("Vollständiges data-Objekt"),
"detail": t(
"Strukturiertes Handover als JSON inklusive aller Textteile "
"und Verweisen auf ausgelagerte Bilder."
"Presentation-Envelope (``schemaVersion``, ``kind``, ``fileOrder``, ``files``) "
"plus ``_meta`` (``operationRef``, ``sourceFileNames``, Persist)."
),
"recommended": True,
"type": "Any",
},
{
"path": ["documents", 0, "documentData", "presentation"],
"pickerLabel": t("Presentation (strukturierte Sicht)"),
"detail": t(
"Nur die konfigurierte Ausgabe-Struktur (blob/lines/pages/chunks/structured); "
"unveraenderte Roh-Parts bleiben im umschliessenden Handover."
),
"path": ["data", "files"],
"pickerLabel": t("Alle Dateibuckets"),
"detail": t("Map Dateischlüssel → Bucket (Zeilenliste, Blob, CSV-Tabelle bei structured, …)."),
"recommended": False,
"type": "Any",
},
{
"path": ["response"],
"pickerLabel": t("Nur Text"),
"detail": t(
"Verketteter Klartext aus allen erkannten Textteilen."
),
"recommended": True,
"type": "str",
},
{
"path": ["imageDocumentsOnly"],
"pickerLabel": t("Nur Bilder"),
"detail": t(
"Nur die extrahierten Bilddokumente als Liste, ohne JSON-Handover."
"Nur die Bilder aus der Extraktion (persistierte Artefakte bzw. inline), "
"als Liste fuer nachgelagerte Schritte."
),
"recommended": False,
"type": "List[ActionDocument]",
},
{
"path": ["documents"],
"pickerLabel": t("Alle Dateitypen"),
"path": ["data", "_meta"],
"pickerLabel": t("Metadaten (_meta)"),
"detail": t(
"Alle Ausgabedokumente nacheinander: JSON-Handover und Bilder."
"``operationRef``, ``sourceFileNames``, Presentation-Parameter, Liste persistierter Bilder."
),
"recommended": False,
"type": "List[ActionDocument]",
"type": "Any",
},
],
}
@ -330,6 +323,8 @@ CONTEXT_NODES = [
"label": t("Kontext zusammenführen"),
"description": t(
"Führt eine Liste von Ergebnissen zu einem einzigen Kontext zusammen. "
"Ausgabe ``data``: versionierter Umschlag (``schemaVersion``, ``kind``), Felder wie "
"``merged`` / ``first`` / ``response`` sowie ``_meta``. "
"Wähle als Datenquelle die Option Alle Schleifen-Ergebnisse einer Schleife, "
"um alle Iterationsergebnisse in einem Datensatz zu vereinen."
),
@ -365,6 +360,8 @@ CONTEXT_NODES = [
"label": t("Kontext transformieren"),
"description": t(
"Verändert die Struktur des eingehenden Datenstroms. "
"Ausgabe ``data``: versionierter Umschlag (``schemaVersion``, ``kind``: transform), "
"konfigurierte Ausgabe-Felder und ``_meta``. "
"Operationen pro Mapping: 'rename' (Key umbenennen), 'cast' (Typ konvertieren), "
"'nest' (mehrere Felder unter neuem Objekt zusammenfassen), "
"'flatten' (verschachteltes Objekt auf oberste Ebene heben), "
@ -423,6 +420,7 @@ CONTEXT_NODES = [
"dynamic": True,
"deriveFrom": "mappings",
"deriveNameField": "outputField",
"dataPickOptions": CONTEXT_ENVELOPE_DATA_PICK_OPTIONS,
}
},
"injectUpstreamPayload": True,

View file

@ -14,9 +14,8 @@ FILE_NODES = [
"category": "file",
"label": t("Datei erstellen"),
"description": t(
"Erstellt eine Datei aus Kontext. Nach „Inhalt extrahieren“: „response“ für reinen Text; "
"„Nur Bilder“ liefert alle extrahierten Bilder — Datei erstellen fasst sie zu einer PDF oder DOCX "
"(Ausgabeformat pdf oder docx wählen)."
"Erstellt eine Datei aus der Presentation von „Inhalt extrahieren“ "
"(``data`` oder Schleifen-``bodyResults``). Ausgabe über den Generation-Service."
),
"parameters": [
{"name": "outputFormat", "type": "str", "required": True, "frontendType": "select",
@ -29,7 +28,7 @@ FILE_NODES = [
"default": ""},
{"name": "context", "type": "Any", "required": False, "frontendType": "contextBuilder",
"description": CONTEXT_BUILDER_PARAM_DESCRIPTION, "default": "",
"graphInherit": {"port": 0, "kind": "primaryTextRef"}},
"graphInherit": {"port": 0, "kind": "recommendedDataPickRef"}},
],
"inputs": 1,
"outputs": 1,

View file

@ -63,6 +63,28 @@ LOOP_ITEM_DATA_PICK_OPTIONS = [
},
]
# Base paths when ``ActionResult.data`` uses envelope + ``_meta`` (context.extractContent-style clarity).
CONTEXT_ENVELOPE_DATA_PICK_OPTIONS = [
{
"path": ["data"],
"pickerLabel": t("Vollständiges data-Objekt"),
"detail": t(
"Versionierter Kontext-Umschlag: ``schemaVersion``, ``kind``, Nutzdatenfelder, ``_meta``."
),
"recommended": True,
"type": "Dict",
},
{
"path": ["data", "_meta"],
"pickerLabel": t("Technische Metadaten (_meta)"),
"detail": t(
"`actionType`, Payload-Schema-Version; bei Transform/Merge keine großen Payloads."
),
"recommended": False,
"type": "Any",
},
]
MERGE_RESULT_DATA_PICK_OPTIONS = [
{
"path": ["merged"],
@ -90,6 +112,7 @@ MERGE_RESULT_DATA_PICK_OPTIONS = [
# Extended picker for ``context.mergeContext`` (ActionResult + ``surfaceDataAsTopLevel``): same
# merge keys as ``flow.merge`` plus ``count`` from the action payload.
CONTEXT_MERGE_ACTION_RESULT_DATA_PICK_OPTIONS = [
*CONTEXT_ENVELOPE_DATA_PICK_OPTIONS,
*MERGE_RESULT_DATA_PICK_OPTIONS,
{
"path": ["count"],

View file

@ -315,14 +315,18 @@ PORT_TYPE_CATALOG: Dict[str, PortSchema] = {
# bindings like `processDocuments → documents → *` for syncToAccounting.
PortField(name="documents", type="List[ActionDocument]", required=False,
description=(
"Dokumentliste: Index 0 oft JSON-Handover oder Hauptdatei; Einträge mit "
"MIME image/* oder Namen extract_media_* sind ausgelagerte Bilder (documentData = Binär)."
"Dokumentliste für Actions mit echten Artefakt-Dokumenten. "
"Beim Knoten „Inhalt extrahieren“ fehlt dieses Feld in der Knotenausgabe."
),
picker_label=t("Alle Ausgabe-Dokumente"),
picker_item_label=t("je Dokument"),
),
PortField(name="data", type="Dict", required=False,
description="Ergebnisdaten",
description=(
"Strukturierter Inhalt. Bei **context.extractContent**: **Presentation**-Root "
"(`schemaVersion`, `kind`, `fileOrder`, `files`) plus **`_meta`** — ohne "
"zusätzliches `response`/`contentExtracted`-Duplikat."
),
picker_label=t("Technische Detaildaten (data)")),
# Mirror AiResult primary text fields so DataPicker / primaryTextRef behave the same
PortField(name="prompt", type="str", required=False,
@ -330,7 +334,8 @@ PORT_TYPE_CATALOG: Dict[str, PortSchema] = {
picker_label=t("Auslöser / Prompt (falls vorhanden)")),
PortField(name="response", type="str", required=False,
description=(
"Primär nur Fließtext (z. B. nach Extraktion: alle Text-Parts verkettet, keine Bilder)."
"Fließtext wo die Action einen liefert. Bei **„Inhalt extrahieren“** absichtlich leer — "
"Inhalt liegt in ``data``.``files``."
),
recommended=True,
picker_label=t("Nur Fließtext (gesamt)")),
@ -339,12 +344,29 @@ PORT_TYPE_CATALOG: Dict[str, PortSchema] = {
picker_label=t("Mitgegebener Kontext")),
PortField(name="imageDocumentsOnly", type="List[ActionDocument]", required=False,
description=(
"Nur Bildausgaben (ohne JSON-Handover), z. B. von context.extractContent."
"Nur Bild-bezogene Einträge. Bei „Inhalt extrahieren“: synthetische "
"Einträge mit ``fileId`` aus persistierten Extrakt-Bildern (kein separates JSON-Dokument)."
),
picker_label=t("Nur Bilder (Liste)")),
PortField(name="responseData", type="Dict", required=False,
description="Optional: strukturierte Zusatzdaten",
picker_label=t("Strukturierte Zusatzdaten")),
PortField(name="presentation", type="Dict", required=False,
description=(
"Selten: Top-Level-Spiegel von Präsentationsdaten andere Actions. "
"Bei „Inhalt extrahieren“ liegt alles direkt unter ``data`` (kein zusätzlicher Spiegel)."
),
picker_label=t("Presentation (Top-Level-Spiegel)")),
PortField(name="presentationSummary", type="Dict", required=False,
description=(
"Kompakte Metadaten zu ``presentation`` (Debugging / traces)."
),
picker_label=t("Presentation-Zusammenfassung")),
PortField(name="presentationConfig", type="Dict", required=False,
description=(
"Optional: Debugging-Konfiguration; bei Extract liegt die Primärquelle in ``validationMetadata`` des JSON-Dokuments."
),
picker_label=t("Presentation-Konfiguration")),
]),
"Transit": PortSchema(name="Transit", fields=[]),
"UdmDocument": PortSchema(name="UdmDocument", carriesConnectionProvenance=True, fields=[
@ -675,6 +697,8 @@ SYSTEM_VARIABLES: Dict[str, Dict[str, str]] = {
#
# When a parameter declares ``graphInherit.kind == "primaryTextRef"``, executeGraph
# inserts an explicit DataRef before run (see pickNotPushMigration.materializePrimaryTextHandover).
# ``recommendedDataPickRef`` uses upstream ``outputPorts.dataPickOptions`` where ``recommended: true``
# (see pickNotPushMigration.materializeRecommendedDataPickRef).
# Schema names are catalog output port types (e.g. AiResult).
PRIMARY_TEXT_HANDOVER_REF_PATH: Dict[str, List[Any]] = {

View file

@ -110,24 +110,29 @@ def compute_upstream_paths(graph: Dict[str, Any], target_node_id: str) -> List[D
out0 = (ndef.get("outputPorts") or {}).get(0, {})
out0 = out0 if isinstance(out0, dict) else {}
dpo = out0.get("dataPickOptions")
if isinstance(dpo, list) and len(dpo) > 0:
bases: List[Dict[str, Any]] = []
if isinstance(dpo, list):
bases = _paths_for_data_pick_options(dpo, aid)
derived = parse_graph_defined_output_schema(anode, out0)
derived_paths: List[Dict[str, Any]] = []
if derived:
derived_paths = _paths_for_port_schema(derived, aid)
merged_list = bases + derived_paths
if merged_list:
plab = (anode.get("title") or "").strip() or aid
for entry in _paths_for_data_pick_options(dpo, aid):
for entry in merged_list:
entry["producerLabel"] = plab
paths.append(entry)
continue
derived = parse_graph_defined_output_schema(anode, out0)
if derived:
for entry in _paths_for_port_schema(derived, aid):
entry["producerLabel"] = (anode.get("title") or "").strip() or aid
paths.append(entry)
else:
raw_schema = out0.get("schema") if isinstance(out0, dict) else None
schema_name = raw_schema if isinstance(raw_schema, str) and raw_schema else "ActionResult"
for entry in _paths_for_schema(schema_name, aid):
entry["producerLabel"] = (anode.get("title") or "").strip() or aid
paths.append(entry)
raw_schema = out0.get("schema") if isinstance(out0, dict) else None
schema_name = raw_schema if isinstance(raw_schema, str) and raw_schema else "ActionResult"
plab = (anode.get("title") or "").strip() or aid
for entry in _paths_for_schema(schema_name, aid):
entry["producerLabel"] = plab
paths.append(entry)
# Lexical loop hints (flow.loop): only for nodes inside the loop body
for aid in ancestors:

View file

@ -990,6 +990,10 @@ class ComponentObjects:
If pagination is provided: PaginatedResult with items and metadata
"""
def _convertFileItems(files):
from modules.workflows.automation2.workflowArtifactVisibility import (
suppress_workflow_file_in_workspace_ui,
)
fileItems = []
for file in files:
try:
@ -1002,6 +1006,8 @@ class ComponentObjects:
fileName = file.get("fileName")
if not fileName or fileName == "None":
continue
if suppress_workflow_file_in_workspace_ui(file):
continue
if file.get("scope") is None:
file["scope"] = "personal"

View file

@ -26,6 +26,7 @@ from modules.features.graphicalEditor.datamodelFeatureGraphicalEditor import (
AutoWorkflow,
)
from modules.features.graphicalEditor.interfaceFeatureGraphicalEditor import graphicalEditorDatabase
from modules.workflows.automation2.workflowArtifactVisibility import suppress_workflow_file_in_workspace_ui
from modules.shared.i18nRegistry import apiRouteContext
routeApiMsg = apiRouteContext("routeAutomationWorkspace")
@ -265,7 +266,8 @@ def getWorkspaceRunDetail(
logger.warning("getWorkspaceRunDetail: file lookup failed: %s", e)
def _resolveFileList(ids: set[str]) -> list[dict]:
return [fileMetaById[fid] for fid in ids if fid in fileMetaById]
rows = [dict(fileMetaById[fid]) for fid in ids if fid in fileMetaById]
return [m for m in rows if not suppress_workflow_file_in_workspace_ui(m)]
assignedFileIds: set[str] = set()
for step, (inputIds, outputIds) in zip(steps, perStepFileIds):

View file

@ -73,7 +73,30 @@ class PdfExtractor(Extractor):
))
return parts
# Extract text per page with PyMuPDF (same lib as in-place search - ensures extraction matches PDF text layer)
file_name = context.get("fileName", "document.pdf")
ordered_ok = False
try:
doc = fitz.open(stream=fileBytes, filetype="pdf")
for page_index in range(len(doc)):
page = doc[page_index]
page_parts = self._extract_page_blocks_in_reading_order(
page,
doc,
page_index=page_index,
root_id=rootId,
file_name=file_name,
)
if page_parts:
parts.extend(page_parts)
ordered_ok = True
doc.close()
except Exception:
ordered_ok = False
if ordered_ok and any(getattr(p, "typeGroup", "") in ("text", "image") for p in parts):
return parts
parts = [parts[0]] # keep container only; fall back below
try:
doc = fitz.open(stream=fileBytes, filetype="pdf")
for i in range(len(doc)):
@ -174,4 +197,196 @@ class PdfExtractor(Extractor):
return parts
@staticmethod
def _text_from_text_block(block: Dict[str, Any]) -> str:
lines_out: List[str] = []
for line in block.get("lines") or []:
if not isinstance(line, dict):
continue
spans = line.get("spans") or []
line_text = "".join(
str(span.get("text") or "")
for span in spans
if isinstance(span, dict)
)
lines_out.append(line_text)
return "\n".join(lines_out).strip()
@staticmethod
def _bbox_center(bbox: Any) -> tuple[float, float]:
if not isinstance(bbox, (list, tuple)) or len(bbox) < 4:
return 0.0, 0.0
x0, y0, x1, y1 = float(bbox[0]), float(bbox[1]), float(bbox[2]), float(bbox[3])
return (x0 + x1) / 2.0, (y0 + y1) / 2.0
@staticmethod
def _point_inside_bbox(x: float, y: float, bbox: Any) -> bool:
if not isinstance(bbox, (list, tuple)) or len(bbox) < 4:
return False
x0, y0, x1, y1 = float(bbox[0]), float(bbox[1]), float(bbox[2]), float(bbox[3])
return x0 <= x <= x1 and y0 <= y <= y1
def _extract_page_blocks_in_reading_order(
self,
page: Any,
doc: Any,
*,
page_index: int,
root_id: str,
file_name: str,
) -> List[ContentPart]:
"""Emit text/image/table parts in on-page reading order (top-to-bottom, left-to-right)."""
entries: List[tuple[float, float, str, Dict[str, Any]]] = []
table_bboxes: List[Any] = []
try:
table_finder = page.find_tables()
for ti, tab in enumerate(getattr(table_finder, "tables", []) or []):
try:
matrix = tab.extract()
except Exception:
matrix = None
if not matrix:
continue
csv_data = self._rows_to_csv_payload(matrix)
if not csv_data.strip():
continue
bbox = getattr(tab, "bbox", None)
if bbox is not None:
table_bboxes.append(bbox)
cy, cx = self._bbox_center(bbox)
entries.append((cy, cx, "table", {
"label": f"table_{page_index + 1}_{ti}",
"data": csv_data,
"table_index": ti,
}))
except Exception:
pass
try:
page_dict = page.get_text("dict", sort=True)
except Exception:
page_dict = None
blocks = page_dict.get("blocks") if isinstance(page_dict, dict) else None
if isinstance(blocks, list):
text_block_no = 0
image_no = 0
for block in blocks:
if not isinstance(block, dict):
continue
bbox = block.get("bbox")
cy, cx = self._bbox_center(bbox)
btype = block.get("type")
if btype == 0:
if any(self._point_inside_bbox(cx, cy, tb) for tb in table_bboxes):
continue
text = self._text_from_text_block(block)
if not text:
continue
label = f"page_{page_index + 1}" if text_block_no == 0 else f"page_{page_index + 1}_t{text_block_no}"
entries.append((cy, cx, "text", {
"label": label,
"data": text,
"text_block_no": text_block_no,
}))
text_block_no += 1
continue
if btype != 1:
continue
img_bytes = block.get("image")
ext = str(block.get("ext") or "png").lower()
mime = f"image/{ext}"
if not img_bytes:
xref = block.get("xref")
if xref is not None:
try:
extracted = doc.extract_image(int(xref))
img_bytes = extracted.get("image", b"")
ext = str(extracted.get("ext") or ext).lower()
mime = f"image/{ext}"
except Exception:
img_bytes = b""
if not img_bytes:
continue
entries.append((cy, cx, "image", {
"label": f"image_{page_index + 1}_{image_no}",
"mime": mime,
"bytes": img_bytes,
"image_no": image_no,
}))
image_no += 1
entries.sort(key=lambda item: (item[0], item[1]))
out: List[ContentPart] = []
for _y, _x, kind, payload in entries:
if kind == "text":
tbno = int(payload.get("text_block_no") or 0)
text = str(payload.get("data") or "")
out.append(ContentPart(
id=makeId(),
parentId=root_id,
label=str(payload.get("label") or f"page_{page_index + 1}"),
typeGroup="text",
mimeType="text/plain",
data=text,
metadata={
"pages": 1,
"pageIndex": page_index,
"size": len(text.encode("utf-8")),
"contextRef": {
"containerPath": file_name,
"location": f"page:{page_index + 1}/block:{tbno}",
"pageIndex": page_index,
},
},
))
elif kind == "table":
ti = int(payload.get("table_index") or 0)
csv_data = str(payload.get("data") or "")
out.append(ContentPart(
id=makeId(),
parentId=root_id,
label=str(payload.get("label") or f"table_{page_index + 1}_{ti}"),
typeGroup="table",
mimeType="text/csv",
data=csv_data,
metadata={
"pageIndex": page_index,
"size": len(csv_data.encode("utf-8")),
"contextRef": {
"containerPath": file_name,
"location": f"page:{page_index + 1}/table:{ti}",
"pageIndex": page_index,
},
},
))
elif kind == "image":
ino = int(payload.get("image_no") or 0)
img_bytes = payload.get("bytes") or b""
mime = str(payload.get("mime") or "image/png")
out.append(ContentPart(
id=makeId(),
parentId=root_id,
label=str(payload.get("label") or f"image_{page_index + 1}_{ino}"),
typeGroup="image",
mimeType=mime,
data=base64.b64encode(img_bytes).decode("utf-8"),
metadata={
"pageIndex": page_index,
"size": len(img_bytes),
"contextRef": {
"containerPath": file_name,
"location": f"page:{page_index + 1}/image:{ino}",
"pageIndex": page_index,
},
},
))
return out
@staticmethod
def _rows_to_csv_payload(rows: List[List[Any]]) -> str:
lines: List[str] = []
for row in rows:
cells = [str(c or "").replace('"', '""') for c in row]
lines.append(",".join(f'"{c}"' for c in cells))
return "\n".join(lines)

View file

@ -670,7 +670,7 @@ class RendererPdf(BaseRenderer):
runType = run.get("type", "text")
value = self._escapeReportlabXml(run.get("value", ""))
if runType == "text":
parts.append(value)
parts.append(value.replace("\n", "<br/>"))
elif runType == "bold":
parts.append(f"<b>{value}</b>")
elif runType == "italic":
@ -691,6 +691,7 @@ class RendererPdf(BaseRenderer):
if not text:
return ""
s = self._escapeReportlabXml(text)
s = s.replace("\n", "<br/>")
s = _re_pdf.sub(r"\*\*(.+?)\*\*", r"<b>\1</b>", s, flags=_re_pdf.DOTALL)
s = _re_pdf.sub(r"__(.+?)__", r"<b>\1</b>", s, flags=_re_pdf.DOTALL)
s = _re_pdf.sub(r"(?<!\*)\*([^*\n]+?)\*(?!\*)", r"<i>\1</i>", s)

View file

@ -217,6 +217,30 @@ def _serializableOutputs(nodeOutputs: Dict[str, Any]) -> Dict[str, Any]:
return _stripBinaryValues(cleaned)
def _merge_node_parameters_into_snap(
snap: Optional[Dict[str, Any]],
*,
node_id: Optional[str],
context: Optional[Dict[str, Any]],
) -> Dict[str, Any]:
"""Copy wire snapshot and attach **nodeParameters** from the graph definition (by ``node_id``).
Uses ``context['graphNodesById']`` populated at executeGraph start stable even when
per-step node dict references differ. Field name is ``nodeParameters`` (no leading
underscore) so it survives consumers that hide ``_*`` keys."""
merged: Dict[str, Any] = dict(snap or {})
if not node_id or not isinstance(context, dict):
return merged
cmap = context.get("graphNodesById")
if not isinstance(cmap, dict):
return merged
gnode = cmap.get(node_id)
if not isinstance(gnode, dict):
return merged
merged["nodeParameters"] = dict(gnode.get("parameters") or {})
return merged
def _emitStepEvent(runId: str, stepData: Dict[str, Any]) -> None:
"""Emit a step-log SSE event to any listening client for this run."""
try:
@ -319,18 +343,20 @@ async def _ge_log_node_finished(
loop_index: Optional[int] = None,
loop_node_id: Optional[str] = None,
loop_item: Optional[Any] = None,
exec_context: Optional[Dict[str, Any]] = None,
) -> None:
"""Append one execution line + one workflow-context snapshot (NDJSON)."""
if file_logger is None or not run_id:
return
ts = _ge_iso_timestamp()
snap = _merge_node_parameters_into_snap(input_snap, node_id=node_id, context=exec_context)
exec_rec: Dict[str, Any] = {
"timestamp": ts,
"runId": run_id,
"nodeId": node_id,
"nodeType": node_type,
"status": status,
"input": _stripBinaryValues(dict(input_snap or {})),
"input": _stripBinaryValues(snap),
}
if skip_reason:
exec_rec["skipReason"] = skip_reason
@ -470,6 +496,7 @@ async def _run_post_loop_done_nodes(
for _sSrc, _, _ in connectionMap.get(_dnid, []):
if _sSrc in nodeOutputs:
_skipSnap[_sSrc] = nodeOutputs[_sSrc]
_skipSnap = _merge_node_parameters_into_snap(_skipSnap, node_id=_dnid, context=context)
_skId = _createStepLog(automation2_interface, runId, _dnid, _dn.get("type", ""), status="skipped", inputSnapshot=_skipSnap)
if _skId:
_updateStepLog(automation2_interface, _skId, "skipped")
@ -478,6 +505,7 @@ async def _run_post_loop_done_nodes(
run_id=runId,
node_outputs=nodeOutputs,
run_envelope=context.get("runEnvelope"),
exec_context=context,
node_id=_dnid,
node_type=_dn.get("type", ""),
status="skipped",
@ -494,6 +522,7 @@ async def _run_post_loop_done_nodes(
for _src, _, _ in connectionMap.get(_dnid, []):
if _src in nodeOutputs:
_dIn[_src] = nodeOutputs[_src]
_dIn = _merge_node_parameters_into_snap(_dIn, node_id=_dnid, context=context)
_dStepId = _createStepLog(automation2_interface, runId, _dnid, _dn.get("type", ""), "running", _dIn)
try:
_dres, _dRetry = await _executeWithRetry(_dexec, _dn, context)
@ -509,6 +538,7 @@ async def _run_post_loop_done_nodes(
run_id=runId,
node_outputs=nodeOutputs,
run_envelope=context.get("runEnvelope"),
exec_context=context,
node_id=_dnid,
node_type=_dn.get("type", ""),
status="completed",
@ -525,6 +555,7 @@ async def _run_post_loop_done_nodes(
run_id=runId,
node_outputs=nodeOutputs,
run_envelope=context.get("runEnvelope"),
exec_context=context,
node_id=_dnid,
node_type=_dn.get("type", ""),
status="completed",
@ -540,6 +571,7 @@ async def _run_post_loop_done_nodes(
run_id=runId,
node_outputs=nodeOutputs,
run_envelope=context.get("runEnvelope"),
exec_context=context,
node_id=_dnid,
node_type=_dn.get("type", ""),
status="completed",
@ -556,6 +588,7 @@ async def _run_post_loop_done_nodes(
run_id=runId,
node_outputs=nodeOutputs,
run_envelope=context.get("runEnvelope"),
exec_context=context,
node_id=_dnid,
node_type=_dn.get("type", ""),
status="failed",
@ -573,6 +606,7 @@ async def _run_post_loop_done_nodes(
run_id=runId,
node_outputs=nodeOutputs,
run_envelope=context.get("runEnvelope"),
exec_context=context,
node_id=_dnid,
node_type=_dn.get("type", ""),
status="failed",
@ -622,6 +656,8 @@ async def executeGraph(
from modules.workflows.automation2.pickNotPushMigration import (
materializeConnectionRefs,
materializePrimaryTextHandover,
materializeRecommendedDataPickRef,
normalizeFileCreatePresentationRefs,
)
from modules.workflows.automation2.featureInstanceRefMigration import (
materializeFeatureInstanceRefs,
@ -635,6 +671,8 @@ async def executeGraph(
graph = materializeFeatureInstanceRefs(graph)
graph = materializeConnectionRefs(graph)
graph = materializePrimaryTextHandover(graph)
graph = materializeRecommendedDataPickRef(graph)
graph = normalizeFileCreatePresentationRefs(graph)
nodeTypeIds = _getNodeTypeIds(services)
logger.debug("executeGraph nodeTypeIds (%d): %s", len(nodeTypeIds), sorted(nodeTypeIds))
errors = validateGraph(graph, nodeTypeIds)
@ -720,6 +758,9 @@ async def executeGraph(
env_for_run = normalize_run_envelope(run_envelope, user_id=userId)
graph_nodes_by_id: Dict[str, Any] = {
str(n["id"]): n for n in nodes if n.get("id")
}
context = {
"workflowId": workflowId,
"instanceId": instanceId,
@ -732,6 +773,7 @@ async def executeGraph(
"_runId": runId,
"_orderedNodes": ordered,
"runEnvelope": env_for_run,
"graphNodesById": graph_nodes_by_id,
}
# Lets graph actions (e.g. ``context.setContext`` human-task mode) call
# ``createTask`` / ``updateRun`` without threading the interface through services.
@ -803,6 +845,7 @@ async def executeGraph(
for _rSrc, _, _ in connectionMap.get(bnid, []):
if _rSrc in nodeOutputs:
_rInputSnap[_rSrc] = nodeOutputs[_rSrc]
_rInputSnap = _merge_node_parameters_into_snap(_rInputSnap, node_id=bnid, context=context)
_rStepId = _createStepLog(automation2_interface, runId, bnid, body_node.get("type", ""), "running", _rInputSnap)
try:
result, _rRetry = await _executeWithRetry(executor, body_node, context)
@ -821,6 +864,7 @@ async def executeGraph(
run_id=runId,
node_outputs=nodeOutputs,
run_envelope=context.get("runEnvelope"),
exec_context=context,
node_id=bnid,
node_type=body_node.get("type", ""),
status="completed",
@ -844,6 +888,7 @@ async def executeGraph(
run_id=runId,
node_outputs=nodeOutputs,
run_envelope=context.get("runEnvelope"),
exec_context=context,
node_id=bnid,
node_type=body_node.get("type", ""),
status="completed",
@ -867,6 +912,7 @@ async def executeGraph(
run_id=runId,
node_outputs=nodeOutputs,
run_envelope=context.get("runEnvelope"),
exec_context=context,
node_id=bnid,
node_type=body_node.get("type", ""),
status="completed",
@ -886,6 +932,7 @@ async def executeGraph(
run_id=runId,
node_outputs=nodeOutputs,
run_envelope=context.get("runEnvelope"),
exec_context=context,
node_id=bnid,
node_type=body_node.get("type", ""),
status="failed",
@ -906,6 +953,7 @@ async def executeGraph(
run_id=runId,
node_outputs=nodeOutputs,
run_envelope=context.get("runEnvelope"),
exec_context=context,
node_id=bnid,
node_type=body_node.get("type", ""),
status="failed",
@ -979,6 +1027,7 @@ async def executeGraph(
for _sSrc, _, _ in connectionMap.get(nodeId, []):
if _sSrc in nodeOutputs:
_skipInputSnap[_sSrc] = nodeOutputs[_sSrc]
_skipInputSnap = _merge_node_parameters_into_snap(_skipInputSnap, node_id=nodeId, context=context)
_skipStepId = _createStepLog(automation2_interface, runId, nodeId, nodeType, status="skipped", inputSnapshot=_skipInputSnap)
if _skipStepId:
_updateStepLog(automation2_interface, _skipStepId, "skipped")
@ -987,6 +1036,7 @@ async def executeGraph(
run_id=runId,
node_outputs=nodeOutputs,
run_envelope=context.get("runEnvelope"),
exec_context=context,
node_id=nodeId,
node_type=nodeType,
status="skipped",
@ -1015,6 +1065,7 @@ async def executeGraph(
for _lSrc, _, _ in connectionMap.get(nodeId, []):
if _lSrc in nodeOutputs:
_loopInputSnap[_lSrc] = nodeOutputs[_lSrc]
_loopInputSnap = _merge_node_parameters_into_snap(_loopInputSnap, node_id=nodeId, context=context)
_stepId = _createStepLog(automation2_interface, runId, nodeId, nodeType, "running", _loopInputSnap)
result = await executor.execute(node, context)
items = result.get("items") or []
@ -1068,6 +1119,9 @@ async def executeGraph(
for _bSnapSrc, _, _ in connectionMap.get(bnid, []):
if _bSnapSrc in _activeOutputs:
_bInputSnapAlways[_bSnapSrc] = _activeOutputs[_bSnapSrc]
_bInputSnapAlways = _merge_node_parameters_into_snap(
_bInputSnapAlways, node_id=bnid, context=context
)
_bStepId = None
if not _batchMode or _idx == 0 or _idx == len(items) - 1:
_bStepId = _createStepLog(
@ -1100,6 +1154,7 @@ async def executeGraph(
run_id=runId,
node_outputs=_activeOutputs,
run_envelope=context.get("runEnvelope"),
exec_context=context,
node_id=bnid,
node_type=body_node.get("type", ""),
status="completed",
@ -1123,6 +1178,7 @@ async def executeGraph(
run_id=runId,
node_outputs=_activeOutputs,
run_envelope=context.get("runEnvelope"),
exec_context=context,
node_id=bnid,
node_type=body_node.get("type", ""),
status="completed",
@ -1148,6 +1204,7 @@ async def executeGraph(
run_id=runId,
node_outputs=_activeOutputs,
run_envelope=context.get("runEnvelope"),
exec_context=context,
node_id=bnid,
node_type=body_node.get("type", ""),
status="completed",
@ -1168,6 +1225,7 @@ async def executeGraph(
run_id=runId,
node_outputs=_activeOutputs,
run_envelope=context.get("runEnvelope"),
exec_context=context,
node_id=bnid,
node_type=body_node.get("type", ""),
status="failed",
@ -1189,6 +1247,7 @@ async def executeGraph(
run_id=runId,
node_outputs=_activeOutputs,
run_envelope=context.get("runEnvelope"),
exec_context=context,
node_id=bnid,
node_type=body_node.get("type", ""),
status="failed",
@ -1296,6 +1355,7 @@ async def executeGraph(
run_id=runId,
node_outputs=nodeOutputs,
run_envelope=context.get("runEnvelope"),
exec_context=context,
node_id=nodeId,
node_type=nodeType,
status="completed",
@ -1314,6 +1374,7 @@ async def executeGraph(
for src, _, _ in connectionMap.get(nodeId, []):
if src in nodeOutputs:
_inputSnap[src] = nodeOutputs[src]
_inputSnap = _merge_node_parameters_into_snap(_inputSnap, node_id=nodeId, context=context)
_stepId = _createStepLog(automation2_interface, runId, nodeId, nodeType, "running", _inputSnap)
result, retryCount = await _executeWithRetry(executor, node, context)
result = _normalizeResult(result, nodeType)
@ -1328,6 +1389,7 @@ async def executeGraph(
run_id=runId,
node_outputs=nodeOutputs,
run_envelope=context.get("runEnvelope"),
exec_context=context,
node_id=nodeId,
node_type=nodeType,
status="completed",
@ -1342,6 +1404,7 @@ async def executeGraph(
for src, _, _ in connectionMap.get(nodeId, []):
if src in nodeOutputs:
_inputSnap[src] = nodeOutputs[src]
_inputSnap = _merge_node_parameters_into_snap(_inputSnap, node_id=nodeId, context=context)
_stepId = _createStepLog(automation2_interface, runId, nodeId, nodeType, "running", _inputSnap)
result, retryCount = await _executeWithRetry(executor, node, context)
result = _normalizeResult(result, nodeType)
@ -1356,6 +1419,7 @@ async def executeGraph(
run_id=runId,
node_outputs=nodeOutputs,
run_envelope=context.get("runEnvelope"),
exec_context=context,
node_id=nodeId,
node_type=nodeType,
status="completed",
@ -1384,6 +1448,7 @@ async def executeGraph(
run_id=runId,
node_outputs=nodeOutputs,
run_envelope=context.get("runEnvelope"),
exec_context=context,
node_id=nodeId,
node_type=nodeType,
status="completed",
@ -1411,6 +1476,7 @@ async def executeGraph(
run_id=runId,
node_outputs=nodeOutputs,
run_envelope=context.get("runEnvelope"),
exec_context=context,
node_id=nodeId,
node_type=nodeType,
status="completed",
@ -1471,6 +1537,7 @@ async def executeGraph(
run_id=runId,
node_outputs=nodeOutputs,
run_envelope=context.get("runEnvelope"),
exec_context=context,
node_id=nodeId,
node_type=nodeType,
status="failed",

View file

@ -21,10 +21,40 @@ from modules.features.graphicalEditor.portTypes import (
from modules.serviceCenter.services.serviceSubscription.mainServiceSubscription import SubscriptionInactiveException as _SubscriptionInactiveException
from modules.serviceCenter.services.serviceBilling.mainServiceBilling import BillingContextError as _BillingContextError
from modules.workflows.automation2.executors.inputExecutor import PauseForHumanTaskError
from modules.workflows.methods.methodContext.actions.extractContent import (
PRESENTATION_KIND,
build_presentation_envelope_from_plain_text,
presentation_dict_without_meta,
presentation_response_text,
)
logger = logging.getLogger(__name__)
_FILE_CREATE_CTX_LOG_MAX = 500
_SKIP_UNIFIED_PRESENTATION_NODES = frozenset({"context.extractContent"})
def _attach_unified_presentation_data(out: Dict[str, Any], *, node_type: str) -> None:
"""Ensure ``out[\"data\"]`` carries ``context.extractContent.presentation.v1`` for ``file.create``."""
if node_type in _SKIP_UNIFIED_PRESENTATION_NODES:
return
data = out.get("data")
if isinstance(data, dict) and data.get("kind") == PRESENTATION_KIND:
return
text = str(out.get("response") or "").strip()
if not text and isinstance(data, dict):
text = str(data.get("response") or "").strip()
if not text:
return
pres = build_presentation_envelope_from_plain_text(text, source_name=node_type or "content")
if not pres:
return
meta: Dict[str, Any] = {"actionType": node_type}
if isinstance(data, dict):
prev = data.get("_meta")
if isinstance(prev, dict):
meta = {**prev, **meta}
out["data"] = {**pres, "_meta": meta}
def _truncate_for_log(val: Any, max_len: int = _FILE_CREATE_CTX_LOG_MAX) -> str:
@ -147,6 +177,41 @@ def _image_documents_from_docs_list(docs_list: list) -> list:
]
def _image_refs_from_extract_node_data(extract_data: Any) -> list:
"""Synthetic image document dicts from ``context.extractContent`` ``_meta.persistedImageArtifacts``."""
if not isinstance(extract_data, dict):
return []
meta = extract_data.get("_meta")
if not isinstance(meta, dict):
return []
arts = meta.get("persistedImageArtifacts")
if not isinstance(arts, list):
return []
out: list = []
for a in arts:
if not isinstance(a, dict):
continue
fid = a.get("fileId")
if not fid:
continue
out.append(
{
"documentName": a.get("fileName") or f"extract_image_{fid}",
"mimeType": str(a.get("mimeType") or "application/octet-stream"),
"documentData": None,
"fileId": str(fid),
"_hasBinaryData": True,
"validationMetadata": {
"actionType": "context.extractContent",
"handoverRole": "extractedMedia",
"suppressInWorkflowFileLists": True,
"sourcePartId": a.get("sourcePartId"),
},
}
)
return out
_USER_CONNECTION_ID_RE = re.compile(
r"^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$",
re.IGNORECASE,
@ -679,9 +744,12 @@ class ActionNodeExecutor:
extractedContext = ""
rd_early = getattr(result, "data", None)
if isinstance(rd_early, dict):
_r = rd_early.get("response")
if _r is not None and str(_r).strip():
extractedContext = str(_r).strip()
if rd_early.get("kind") == PRESENTATION_KIND:
extractedContext = presentation_response_text(presentation_dict_without_meta(rd_early)).strip()
else:
_r = rd_early.get("response")
if _r is not None and str(_r).strip():
extractedContext = str(_r).strip()
promptText = str(resolvedParams.get("aiPrompt") or resolvedParams.get("prompt") or "").strip()
resultData = getattr(result, "data", None)
@ -728,9 +796,17 @@ class ActionNodeExecutor:
out.setdefault("context", ctx_str if ctx_str else "")
rsp = str(out.get("response") or "").strip()
if not rsp:
out["response"] = extractedContext or ""
if nodeType != "context.extractContent":
out["response"] = extractedContext or ""
else:
out["response"] = ""
if result.success:
img_only = _image_documents_from_docs_list(docsList)
if (
nodeType == "context.extractContent"
and isinstance(result.data, dict)
):
img_only = list(img_only) + _image_refs_from_extract_node_data(result.data)
# mergeContext packs iterated payloads under ``data.merged`` only — ``documents``
# on the ActionResult is empty, so image sidecars live on ``merged.imageDocumentsOnly``.
if (
@ -766,6 +842,12 @@ class ActionNodeExecutor:
_attachConnectionProvenance(cr_out, resolvedParams, outputSchema, chatService, self.services)
return normalizeToSchema(cr_out, outputSchema)
if nodeType == "context.extractContent":
out.pop("documents", None)
if outputSchema in ("AiResult", "ActionResult") and result.success:
_attach_unified_presentation_data(out, node_type=nodeType)
_attachConnectionProvenance(out, resolvedParams, outputSchema, chatService, self.services)
# When the node declares ``surfaceDataAsTopLevel`` (typical for

View file

@ -295,14 +295,42 @@ class FlowExecutor:
def _normalize_loop_items(self, raw: Any) -> List[Any]:
"""Coerce resolved `items` into a list (lists, dict children, or scalars)."""
if isinstance(raw, list):
return raw
return self._expand_presentation_lines_loop_items(raw)
if isinstance(raw, dict):
children = raw.get("children")
if isinstance(children, list) and len(children) > 0:
return children
return [{"name": k, "value": v} for k, v in raw.items()]
return self._expand_presentation_lines_loop_items(children)
items = [{"name": k, "value": v} for k, v in raw.items()]
return self._expand_presentation_lines_loop_items(items)
return [raw] if raw is not None else []
def _expand_presentation_lines_loop_items(self, items: List[Any]) -> List[Any]:
"""When looping ``presentation.files`` in ``lines`` mode, iterate per slot (e.g. CSV row)."""
if not items:
return items
expanded: List[Any] = []
saw_lines_bucket = False
for it in items:
if not isinstance(it, dict):
expanded.append(it)
continue
val = it.get("value")
if not isinstance(val, dict) or val.get("outputMode") != "lines":
expanded.append(it)
continue
data = val.get("data")
if not isinstance(data, list) or len(data) <= 1:
expanded.append(it)
continue
saw_lines_bucket = True
base_name = str(it.get("name") or val.get("sourceFileName") or "line")
for idx, slot in enumerate(data):
if not isinstance(slot, dict):
continue
sid = str(slot.get("id") or slot.get("label") or idx)
expanded.append({"name": f"{base_name}:{sid}", "value": slot})
return expanded if saw_lines_bucket else items
def _apply_iteration_mode(self, items: List[Any], mode: str, stride: int) -> List[Any]:
"""Select which elements to iterate over (backend-defined modes)."""
if not items:

View file

@ -435,6 +435,13 @@ def resolveParameterReferences(value: Any, nodeOutputs: Dict[str, Any]) -> Any:
data = data.get("data", data)
plist = list(path)
resolved = _get_by_path(data, plist)
if resolved is None:
from modules.workflows.automation2.pickNotPushMigration import (
remap_stale_presentation_ref_path,
)
alt_path = remap_stale_presentation_ref_path(plist)
if alt_path != plist:
resolved = _get_by_path(data, alt_path)
if resolved is None and isinstance(data, dict) and plist:
if plist[0] == "payload" and len(plist) > 1:
# Strip explicit "payload" prefix (legacy DataPicker paths)
@ -491,13 +498,10 @@ def resolveParameterReferences(value: Any, nodeOutputs: Dict[str, Any]) -> Any:
# contextBuilder: list where every item is a `{"type":"ref",...}` envelope.
# Resolve each part; a single ref preserves the resolved type (str, list, dict).
if value and all(isinstance(v, dict) and v.get("type") == "ref" for v in value):
from modules.workflows.methods.methodAi._common import serialize_context
resolved_parts = [resolveParameterReferences(v, nodeOutputs) for v in value]
if len(resolved_parts) == 1:
return resolved_parts[0]
parts = [serialize_context(p, prefer_handover_primary=True) for p in resolved_parts]
return "\n\n".join(p for p in parts if p)
return resolved_parts
return [resolveParameterReferences(v, nodeOutputs) for v in value]
return value

View file

@ -5,6 +5,8 @@ Graph helpers for Pick-not-Push: materialize typed DataRefs before executeGraph
- ``materializeConnectionRefs``: empty ``connectionReference`` from upstream connection provenance.
- ``materializePrimaryTextHandover``: parameters whose static definition includes
``graphInherit.kind == "primaryTextRef"`` (canonical paths: ``PRIMARY_TEXT_HANDOVER_REF_PATH``).
- ``materializeRecommendedDataPickRef``: parameters with ``graphInherit.kind == "recommendedDataPickRef"``
use the upstream output port's ``dataPickOptions`` entry with ``recommended: true``.
Runtime: executeGraph deep-copies the version graph and applies these passes in order.
"""
@ -12,7 +14,7 @@ from __future__ import annotations
import copy
import logging
from typing import Any, Dict, List
from typing import Any, Dict, List, Optional
from modules.features.graphicalEditor.nodeDefinitions import STATIC_NODE_TYPES
from modules.features.graphicalEditor.portTypes import (
@ -154,3 +156,133 @@ def materializePrimaryTextHandover(graph: Dict[str, Any]) -> Dict[str, Any]:
)
return g
def _recommended_data_pick_path(out_port: Dict[str, Any]) -> Optional[List[Any]]:
opts = out_port.get("dataPickOptions") if isinstance(out_port, dict) else None
if not isinstance(opts, list):
return None
for opt in opts:
if not isinstance(opt, dict):
continue
if opt.get("recommended") is True:
path = opt.get("path")
if isinstance(path, list) and path:
return list(path)
return None
def materializeRecommendedDataPickRef(graph: Dict[str, Any]) -> Dict[str, Any]:
"""Materialize empty parameters that declare ``graphInherit.kind == \"recommendedDataPickRef\"``."""
g = copy.deepcopy(graph)
nodes: List[Dict[str, Any]] = g.get("nodes") or []
connections = g.get("connections") or []
if not nodes:
return g
conn_map = buildConnectionMap(connections)
node_by_id = {n["id"]: n for n in nodes if n.get("id")}
for node in nodes:
nid = node.get("id")
ntype = node.get("type")
if not nid or not ntype:
continue
node_def = _NODE_DEF_BY_ID.get(ntype)
if not node_def:
continue
params = node.get("parameters")
if not isinstance(params, dict):
node["parameters"] = {}
params = node["parameters"]
for pdef in node_def.get("parameters") or []:
gi = pdef.get("graphInherit")
if not isinstance(gi, dict) or gi.get("kind") != "recommendedDataPickRef":
continue
pname = pdef.get("name")
if not pname:
continue
port_ix = int(gi.get("port", 0))
if not _slot_empty_for_primary_text_inherit(params.get(pname)):
continue
input_sources = getInputSources(nid, conn_map)
if port_ix not in input_sources:
continue
src_id, _ = input_sources[port_ix]
src_node = node_by_id.get(src_id) or {}
src_def = _NODE_DEF_BY_ID.get(src_node.get("type") or "")
if not src_def:
continue
out_port = (src_def.get("outputPorts") or {}).get(port_ix, {}) or {}
if not isinstance(out_port, dict):
out_port = (src_def.get("outputPorts") or {}).get(0, {}) or {}
ref_path = _recommended_data_pick_path(out_port if isinstance(out_port, dict) else {})
if not ref_path:
continue
ref = _data_ref(src_id, ref_path)
if pdef.get("frontendType") == "contextBuilder":
params[pname] = [ref]
else:
params[pname] = ref
logger.debug(
"materializeRecommendedDataPickRef: %s.%s -> ref %s path=%s",
nid,
pname,
src_id,
ref_path,
)
return g
_STALE_FILE_CREATE_CONTEXT_PATHS = frozenset({
("responseData",),
("response",),
("merged",),
("documents", 0, "documentData"),
})
def remap_stale_presentation_ref_path(path: List[Any]) -> List[Any]:
"""Map legacy text-handover paths to unified presentation ``data``."""
if tuple(path) in _STALE_FILE_CREATE_CONTEXT_PATHS:
return ["data"]
return list(path)
def _normalize_presentation_refs_in_value(val: Any) -> Any:
"""Rewrite stale ref paths inside ``contextBuilder`` lists or bare refs."""
if isinstance(val, dict) and val.get("type") == "ref":
path = val.get("path")
if isinstance(path, list) and path:
new_path = remap_stale_presentation_ref_path(path)
if new_path != path:
return {**val, "path": new_path}
return val
if isinstance(val, list):
return [_normalize_presentation_refs_in_value(item) for item in val]
return val
def normalizeFileCreatePresentationRefs(graph: Dict[str, Any]) -> Dict[str, Any]:
"""Remap legacy ``file.create`` context refs to unified presentation ``data``."""
g = copy.deepcopy(graph)
nodes: List[Dict[str, Any]] = g.get("nodes") or []
for node in nodes:
if node.get("type") != "file.create":
continue
params = node.get("parameters")
if not isinstance(params, dict):
continue
ctx = params.get("context")
if ctx in (None, "", []):
continue
normalized = _normalize_presentation_refs_in_value(ctx)
if normalized != ctx:
params["context"] = normalized
logger.debug(
"normalizeFileCreatePresentationRefs: %s.context remapped to presentation data ref",
node.get("id"),
)
return g

View file

@ -0,0 +1,32 @@
# Copyright (c) 2025 Patrick Motsch
"""Heuristics for hiding internal workflow artefacts from user-facing file lists."""
from __future__ import annotations
from typing import Any, Mapping, Optional
_WORKFLOW_INTERNAL_FILE_TAG = "_workflowInternal"
def suppress_workflow_file_in_workspace_ui(meta: Optional[Mapping[str, Any]]) -> bool:
"""True when a file row should not appear in user-facing file lists.
Used by Automation Workspace **and** ``/api/files/list`` (Meine Dateien).
Matches persisted JSON handovers from transient runs (``extracted_content_transient*``),
internal extract image files (``extract_media_*``), the ``_workflowInternal`` tag, and
optional explicit flags.
"""
if not isinstance(meta, Mapping):
return False
tags = meta.get("tags")
if isinstance(tags, list) and _WORKFLOW_INTERNAL_FILE_TAG in tags:
return True
fn = str(meta.get("fileName") or "").lower()
if "extracted_content_transient" in fn:
return True
if "extract_media_" in fn:
return True
if meta.get("suppressInWorkflowFileLists") is True:
return True
return False

View file

@ -30,6 +30,49 @@ def _handover_response_plain(val: Any) -> Optional[str]:
return str(r).strip().lstrip("\ufeff")
def primary_text_for_prompt_context(val: Any) -> str:
"""Flatten ActionResult / presentation / merge payloads to readable text.
Used when merging multiple context-builder refs so extract outputs are not
turned into giant JSON via ``serialize_context`` (empty ``response``).
"""
if val is None:
return ""
if isinstance(val, str):
s = val.strip().lstrip("\ufeff")
if not s:
return ""
if len(s) >= 2 and ((s.startswith("[") and s.endswith("]")) or (s.startswith("{") and s.endswith("}"))):
try:
return primary_text_for_prompt_context(json.loads(s))
except (json.JSONDecodeError, TypeError, ValueError):
pass
return s
if isinstance(val, list):
chunks = [primary_text_for_prompt_context(item) for item in val]
chunks = [c for c in chunks if c]
return "\n\n".join(chunks)
if isinstance(val, dict):
got = _handover_response_plain(val)
if got is not None:
return got
inner = val.get("data")
if isinstance(inner, dict):
from modules.workflows.methods.methodContext.actions.extractContent import (
joined_text_from_extract_node_data,
)
t = (joined_text_from_extract_node_data(inner) or "").strip()
if t:
return t
from modules.workflows.methods.methodContext.actions.extractContent import (
joined_text_from_extract_node_data,
)
return (joined_text_from_extract_node_data(val) or "").strip()
return str(val).strip() if str(val).strip() else ""
def serialize_context(val: Any, *, prefer_handover_primary: bool = False) -> str:
"""Convert any context value to a readable string for use in AI prompts.

View file

@ -202,7 +202,15 @@ class MethodBase:
validated = {}
# System parameters that should always be preserved, even if not in paramDefs
systemParams = ['parentOperationId', 'expectedDocumentFormats']
systemParams = [
'parentOperationId',
'expectedDocumentFormats',
# Injected by automation2 ActionNodeExecutor (graph node definitions)
'_runContext',
'_upstreamPayload',
'_branchInputs',
'_workflowNodeId',
]
for sysParam in systemParams:
if sysParam in parameters:
validated[sysParam] = parameters[sysParam]

View file

@ -18,8 +18,9 @@ from typing import Any, Dict, List, Optional
from modules.datamodels.datamodelChat import ActionResult
from modules.workflows.methods.methodContext.actions.extractContent import (
_joined_text_from_handover_payload,
joined_text_from_extract_node_data,
)
from modules.workflows.methods.methodContext.contextEnvelope import wrap_merge_context_data
logger = logging.getLogger(__name__)
@ -89,6 +90,9 @@ def _primary_text_from_item(it: Any) -> str:
r = inner.get("response")
if r is not None and str(r).strip():
return str(r).strip()
ce_text = joined_text_from_extract_node_data(inner)
if ce_text.strip():
return ce_text.strip()
docs = it.get("documents")
if not isinstance(docs, list) or not docs:
return ""
@ -104,14 +108,14 @@ def _primary_text_from_item(it: Any) -> str:
except (UnicodeDecodeError, ValueError):
return ""
if isinstance(raw, dict):
return (_joined_text_from_handover_payload(raw) or "").strip()
return (joined_text_from_extract_node_data(raw) or "").strip()
if isinstance(raw, str) and raw.strip():
s = raw.strip()
if s.startswith("{") and s.endswith("}"):
try:
parsed = json.loads(s)
if isinstance(parsed, dict):
return (_joined_text_from_handover_payload(parsed) or "").strip()
return (joined_text_from_extract_node_data(parsed) or "").strip()
except (json.JSONDecodeError, TypeError):
pass
return s
@ -126,6 +130,14 @@ def _sanitize_heading_title(name: str) -> str:
def _iteration_heading_from_item(it: Any) -> Optional[str]:
if not isinstance(it, dict):
return None
inner = it.get("data")
if isinstance(inner, dict):
meta = inner.get("_meta") if isinstance(inner.get("_meta"), dict) else {}
sf = inner.get("sourceFileNames") or meta.get("sourceFileNames")
if isinstance(sf, list) and sf:
first = sf[0]
if isinstance(first, str) and first.strip():
return _sanitize_heading_title(first.strip())
docs = it.get("documents")
if not isinstance(docs, list) or not docs:
return None
@ -222,7 +234,7 @@ async def mergeContext(self, parameters: Dict[str, Any]) -> ActionResult:
(_ps[:200] + "") if len(_ps) > 200 else _ps,
len(conflicts),
)
data: Dict[str, Any] = {
payload: Dict[str, Any] = {
"merged": merged,
"inputs": inputs,
"first": inputs[0] if inputs else None,
@ -230,7 +242,7 @@ async def mergeContext(self, parameters: Dict[str, Any]) -> ActionResult:
"conflicts": sorted(set(conflicts)) if conflicts else [],
"response": primary,
}
return ActionResult.isSuccess(data=data)
return ActionResult.isSuccess(data=wrap_merge_context_data(payload))
except Exception as exc:
logger.exception("mergeContext failed")
return ActionResult.isFailure(error=str(exc))

View file

@ -18,6 +18,7 @@ import re
from typing import Any, Dict, List, Optional
from modules.datamodels.datamodelChat import ActionResult
from modules.workflows.methods.methodContext.contextEnvelope import wrap_transform_context_data
logger = logging.getLogger(__name__)
@ -216,7 +217,7 @@ async def transformContext(self, parameters: Dict[str, Any]) -> ActionResult:
if cast_errors:
result["_castErrors"] = cast_errors
return ActionResult.isSuccess(data=result)
return ActionResult.isSuccess(data=wrap_transform_context_data(result))
except Exception as exc:
logger.exception("transformContext failed")
return ActionResult.isFailure(error=str(exc))

View file

@ -0,0 +1,42 @@
# Copyright (c) 2026 Patrick Motsch
"""Versioned ``ActionResult.data`` envelope for context.* actions (merge, transform)."""
from __future__ import annotations
from typing import Any, Dict
CONTEXT_MERGE_KIND = "context.mergeContext.v1"
CONTEXT_MERGE_SCHEMA_VERSION = 1
CONTEXT_TRANSFORM_KIND = "context.transformContext.v1"
CONTEXT_TRANSFORM_SCHEMA_VERSION = 1
def wrap_merge_context_data(body: Dict[str, Any]) -> Dict[str, Any]:
"""Wrap merge payload: ``schemaVersion``, ``kind``, body fields, ``_meta`` last."""
meta: Dict[str, Any] = {
"actionType": "context.mergeContext",
"mergePayloadSchemaVersion": CONTEXT_MERGE_SCHEMA_VERSION,
}
out: Dict[str, Any] = {
"schemaVersion": CONTEXT_MERGE_SCHEMA_VERSION,
"kind": CONTEXT_MERGE_KIND,
}
out.update(body)
out["_meta"] = meta
return out
def wrap_transform_context_data(fields: Dict[str, Any]) -> Dict[str, Any]:
"""Wrap transform output fields under a versioned envelope (``_meta`` overwrites same key in fields)."""
meta: Dict[str, Any] = {
"actionType": "context.transformContext",
"transformPayloadSchemaVersion": CONTEXT_TRANSFORM_SCHEMA_VERSION,
}
out: Dict[str, Any] = {
"schemaVersion": CONTEXT_TRANSFORM_SCHEMA_VERSION,
"kind": CONTEXT_TRANSFORM_KIND,
}
out.update(fields)
out["_meta"] = meta
return out

View file

@ -57,12 +57,9 @@ class MethodContext(MethodBase):
"extractContent": WorkflowActionDefinition(
actionId="context.extractContent",
description=(
"Extract document content without AI. Unified handover: (1) `documents[0]` "
"JSON `context.extractContent.handover.v1` with text in `parts` and image placeholders "
"linking to sibling blobs via `handoverMediaDocumentName`; "
"(2) each extracted image as a separate binary document (`extract_media_*`); "
"(3) `data.response` / top-level `response` after normalization — concatenated plain text "
"for prompts and file.create. Pick `response`, a specific document, or deep JSON paths."
"Extract document content without AI. Returns `data` as the configured presentation "
"envelope (`fileOrder`, `files`, …) plus `_meta`; no duplicated service payload or bundled "
"plain-text column. Persisted images appear via `embeddedImageFileId` in internal serial only."
),
dynamicMode=True,
outputType="UdmDocument",
@ -151,8 +148,8 @@ class MethodContext(MethodBase):
"mergeContext": WorkflowActionDefinition(
actionId="context.mergeContext",
description=(
"Führt eine Liste von Schrittergebnissen (z. B. ``bodyResults`` einer "
"``flow.loop``) zu einem zusammengeführten Dict zusammen."
"Führt Schritte zu einem Dict zusammen. ``data`` enthält einen versionierten Umschlag "
"(``context.mergeContext.v1``, ``merged``, ``response``, …) und ``_meta``."
),
outputType="ActionResult",
parameters={
@ -210,10 +207,9 @@ class MethodContext(MethodBase):
"transformContext": WorkflowActionDefinition(
actionId="context.transformContext",
description=(
"Transform the upstream payload via a list of {sourceField, outputField, "
"operation, type, expression} mappings. Operations: rename, cast, nest, "
"flatten, compute. compute uses {{...}} templates; nesting is implicit "
"via dotted outputField paths."
"Transform mappings on the upstream payload. ``data`` trägt "
"``schemaVersion``, ``kind: context.transformContext.v1``, die gemappten Felder "
"und optional ``_castErrors``, plus ``_meta``."
),
outputType="Transit",
parameters={

View file

@ -3,7 +3,7 @@
from typing import Any, Dict, List, Optional
import asyncio
import ast
import base64
import binascii
import io
@ -12,79 +12,33 @@ import logging
import re
from modules.datamodels.datamodelChat import ActionResult, ActionDocument
from modules.serviceCenter.services.serviceGeneration.subDocumentUtility import (
enhancePlainTextWithMarkdownTables,
markdownToDocumentJson,
)
from modules.shared.i18nRegistry import normalizePrimaryLanguageTag
from modules.workflows.automation2.executors.actionNodeExecutor import _coerce_document_data_to_bytes
from modules.workflows.methods.methodAi._common import is_image_action_document_list, serialize_context
from modules.workflows.methods.methodAi._common import is_image_action_document_list
from modules.workflows.methods.methodContext.actions.extractContent import (
presentation_envelopes_to_document_json,
)
logger = logging.getLogger(__name__)
_SAFE_FILENAME = re.compile(r'[^\w\-.\(\)\s\[\]%@+]')
_HEAVY_CONTEXT_KEYS = frozenset({"imageDocumentsOnly", "documents", "inputs"})
def _collect_image_documents_only(raw: Any) -> List[Any]:
"""Resolve ``imageDocumentsOnly`` whether the context is merged, nested, or surfaced."""
if not isinstance(raw, dict):
return []
paths = (
("imageDocumentsOnly",),
("merged", "imageDocumentsOnly"),
("data", "merged", "imageDocumentsOnly"),
("data", "imageDocumentsOnly"),
)
for path in paths:
cur: Any = raw
ok = True
for p in path:
if not isinstance(cur, dict):
ok = False
break
cur = cur.get(p)
if ok and isinstance(cur, list) and cur:
return cur
return []
def _context_string_for_report(raw: Any, output_format: str) -> str:
"""Build one narrative string for ``markdownToDocumentJson`` / render.
Prefer plain ``response`` text (merge node surfaces it; nested ``merged.response``
too). Never dump ``inputs`` / binary lists into the PDF body that produced giant
JSON + base64 "hash" paragraphs after merge + ``contextBuilder``.
"""
of = (output_format or "docx").strip().lower().lstrip(".")
if of == "json":
return serialize_context(raw, prefer_handover_primary=False)
if isinstance(raw, str):
return raw.strip().lstrip("\ufeff")
if isinstance(raw, dict):
for path in (
("response",),
("merged", "response"),
("data", "response"),
("data", "merged", "response"),
):
cur: Any = raw
ok = True
for k in path:
if not isinstance(cur, dict):
ok = False
break
cur = cur.get(k)
if ok and cur is not None and str(cur).strip():
return str(cur).strip().lstrip("\ufeff")
lean = {k: v for k, v in raw.items() if k not in _HEAVY_CONTEXT_KEYS}
def _coerce_structured_context(raw: Any) -> Any:
"""Undo legacy ``str`` coercion on structured refs (loop ``bodyResults``, presentation)."""
if not isinstance(raw, str):
return raw
stripped = raw.strip()
if not stripped or stripped[0] not in ("[", "{"):
return raw
for loader in (json.loads, ast.literal_eval):
try:
return json.dumps(lean, ensure_ascii=False, indent=2, default=str)
except Exception:
return serialize_context(lean, prefer_handover_primary=False)
return serialize_context(raw, prefer_handover_primary=False)
parsed = loader(stripped)
except (json.JSONDecodeError, ValueError, SyntaxError, TypeError):
continue
if isinstance(parsed, (dict, list)):
return parsed
return raw
def _raw_context_preview_for_log(raw: Any, max_len: int = 500) -> str:
@ -121,12 +75,6 @@ def _persistDocumentsToUserFiles(
return
if not mgmt:
return
logger.info(
"file.create persist: mgmt=%s id(mgmt)=%s has_createFileData=%s",
type(mgmt).__name__,
id(mgmt),
hasattr(mgmt, "createFileData"),
)
for doc in action_documents:
try:
doc_data = doc.documentData if hasattr(doc, "documentData") else doc.get("documentData")
@ -149,15 +97,8 @@ def _persistDocumentsToUserFiles(
or doc.get("mimeType")
or "application/octet-stream"
)
logger.info(
"file.create persist: calling createFile name=%s bytes=%s",
doc_name,
len(content),
)
file_item = mgmt.createFile(doc_name, mime, content, folderId=folder_id)
logger.info("file.create persist: createFile returned id=%s", file_item.id)
ok = mgmt.createFileData(file_item.id, content)
logger.info("file.create persist: createFileData returned %s for id=%s", ok, file_item.id)
mgmt.createFileData(file_item.id, content)
meta = getattr(doc, "validationMetadata", None) or doc.get("validationMetadata") or {}
if isinstance(meta, dict):
meta["fileId"] = file_item.id
@ -165,7 +106,6 @@ def _persistDocumentsToUserFiles(
doc.validationMetadata = meta
elif isinstance(doc, dict):
doc["validationMetadata"] = meta
logger.info("file.create: persisted %s to user files (id=%s)", doc_name, file_item.id)
except Exception as e:
dname = getattr(doc, "documentName", None) or doc.get("documentName", "?")
logger.warning("file.create: failed to persist document %s: %s", dname, e)
@ -215,100 +155,7 @@ def _load_image_bytes_from_action_doc(doc: dict, services) -> Optional[bytes]:
return None
# Images larger than this threshold (decoded bytes) are resized before embedding
# to avoid multi-minute PDF rendering of high-res raster scans.
_MAX_IMAGE_EMBED_BYTES = 300_000 # 300 KB decoded ≈ ~400 KB base64
_IMAGE_MAX_DIMENSION = 1200 # longest edge in pixels after resize
def _resize_image_for_document(image_bytes: bytes) -> bytes:
"""Resize image to at most ``_IMAGE_MAX_DIMENSION`` px on the longest edge
and re-encode as JPEG. Falls back to the original bytes on any error."""
try:
from PIL import Image as PILImage
import io as _io
img = PILImage.open(_io.BytesIO(image_bytes))
# Flatten transparency / palette modes to RGB (required for JPEG)
if img.mode in ("RGBA", "LA"):
bg = PILImage.new("RGB", img.size, (255, 255, 255))
bg.paste(img, mask=img.split()[-1])
img = bg
elif img.mode == "P":
img = img.convert("RGBA")
bg = PILImage.new("RGB", img.size, (255, 255, 255))
bg.paste(img, mask=img.split()[-1])
img = bg
elif img.mode != "RGB":
img = img.convert("RGB")
w, h = img.size
if max(w, h) > _IMAGE_MAX_DIMENSION:
# thumbnail() is optimised for downscaling: it uses an intermediate
# box-filter step before the final filter, making it 3-5× faster
# than resize() on large images. BILINEAR is fast and sufficient
# for document thumbnails.
img.thumbnail((_IMAGE_MAX_DIMENSION, _IMAGE_MAX_DIMENSION), PILImage.BILINEAR)
out = _io.BytesIO()
img.save(out, format="JPEG", quality=85, optimize=True)
return out.getvalue()
except Exception as e:
logger.warning("file.create: image resize failed (%s) — using original bytes", e)
return image_bytes
def _append_images_to_content(structured_content: dict, image_docs: list, services=None) -> dict:
"""Append images from imageDocumentsOnly as native image elements to the structured JSON.
Each image becomes an ``image`` element with ``base64Data`` in a trailing
"Bilder" section of the first document. Images larger than
``_MAX_IMAGE_EMBED_BYTES`` are automatically resized/compressed so the
synchronous PDF renderer does not block for minutes on high-res scans.
The renderers (DOCX / PDF) handle ``content.base64Data`` natively.
"""
elements = []
for doc in image_docs:
b = _load_image_bytes_from_action_doc(doc, services)
if not b:
raw = doc.get("documentData") if isinstance(doc, dict) else None
if isinstance(raw, str):
try:
b = base64.b64decode(raw)
except Exception:
pass
if not b:
continue
if len(b) > _MAX_IMAGE_EMBED_BYTES:
logger.info(
"file.create: image %s is %d bytes — resizing to max %dpx for embedding",
(doc.get("documentName") if isinstance(doc, dict) else "?") or "?",
len(b),
_IMAGE_MAX_DIMENSION,
)
b = _resize_image_for_document(b)
elements.append({
"type": "image",
"content": {
"base64Data": base64.b64encode(b).decode("ascii"),
"alt": (doc.get("documentName") if isinstance(doc, dict) else None) or "image",
},
})
if not elements:
return structured_content
docs = structured_content.get("documents")
if isinstance(docs, list) and docs:
docs[0].setdefault("sections", []).append({"heading": "Bilder", "elements": elements})
return structured_content
def _images_list_to_pdf(image_bytes_list: List[bytes]) -> bytes:
"""One PDF page per image; embedded raster data via PyMuPDF."""
import fitz
pdf = fitz.open()
@ -322,7 +169,6 @@ def _images_list_to_pdf(image_bytes_list: List[bytes]) -> bytes:
def _images_list_to_docx(image_bytes_list: List[bytes]) -> bytes:
"""Images embedded in the document package (inline shapes), not hyperlinks."""
from docx import Document
from docx.shared import Inches
@ -403,28 +249,13 @@ async def _create_merged_image_documents(
async def create(self, parameters: Dict[str, Any]) -> ActionResult:
"""
Create a file from context (text/markdown from upstream AI node).
Uses GenerationService.renderReport to produce docx, pdf, txt, md, html, xlsx, etc.
"""
raw_context = parameters.get("context", "")
"""Create a file from ``context.extractContent`` presentation data via ``renderReport``."""
raw_context = _coerce_structured_context(parameters.get("context", ""))
if isinstance(raw_context, list) and is_image_action_document_list(raw_context):
return await _create_merged_image_documents(self, parameters, raw_context)
outputFormat = (parameters.get("outputFormat") or "docx").strip().lower().lstrip(".")
context = _context_string_for_report(raw_context, outputFormat)
if not context:
logger.warning(
"file.create: context empty after resolve — raw_context type=%s raw_summary=%r "
"serialized_len=%s (check ActionNodeExecutor \"file.create context resolution\" log for DataRef / upstream).",
type(raw_context).__name__,
_raw_context_preview_for_log(raw_context),
len(context or ""),
)
return ActionResult.isFailure(error="context is required (connect an AI node or provide text)")
title = (parameters.get("title") or "Document").strip()
templateName = parameters.get("templateName")
language = normalizePrimaryLanguageTag(
@ -438,31 +269,30 @@ async def create(self, parameters: Dict[str, Any]) -> ActionResult:
folder_id = str(raw_folder).strip()
try:
if outputFormat != "json":
context = enhancePlainTextWithMarkdownTables(context)
structured_content = markdownToDocumentJson(context, title, language)
if templateName:
structured_content.setdefault("metadata", {})["templateName"] = templateName
structured_content = presentation_envelopes_to_document_json(
raw_context,
title=title,
language=language,
services=self.services,
)
except ValueError as e:
logger.warning(
"file.create: invalid presentation context type=%s preview=%r: %s",
type(raw_context).__name__,
_raw_context_preview_for_log(raw_context),
e,
)
return ActionResult.isFailure(error=str(e))
img_docs = _collect_image_documents_only(raw_context)
if img_docs:
# Image decoding and PIL resizing are CPU-bound; run them in a
# thread pool so the event loop is not blocked while processing
# high-res raster images (e.g. 3+ MB PNGs from PDF extraction).
loop = asyncio.get_event_loop()
structured_content = await loop.run_in_executor(
None,
_append_images_to_content,
structured_content,
img_docs,
self.services,
)
if templateName:
structured_content.setdefault("metadata", {})["templateName"] = templateName
generation = getattr(self.services, "generation", None)
if not generation:
return ActionResult.isFailure(error="Generation service not available")
generation = getattr(self.services, "generation", None)
if not generation:
return ActionResult.isFailure(error="Generation service not available")
ai_service = getattr(self.services, "ai", None)
ai_service = getattr(self.services, "ai", None)
try:
rendered_docs = await generation.renderReport(
extractedContent=structured_content,
outputFormat=outputFormat,
@ -472,43 +302,50 @@ async def create(self, parameters: Dict[str, Any]) -> ActionResult:
aiService=ai_service,
parentOperationId=parameters.get("parentOperationId"),
)
if not rendered_docs:
return ActionResult.isFailure(error="Rendering produced no output")
action_documents = []
mime_map = {
"docx": "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
"pdf": "application/pdf",
"txt": "text/plain",
"md": "text/markdown",
"html": "text/html",
"xlsx": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
"csv": "text/csv",
"json": "application/json",
}
for rd in rendered_docs:
doc_data = rd.documentData if hasattr(rd, "documentData") else getattr(rd, "document_data", None)
doc_name = getattr(rd, "filename", None) or getattr(rd, "documentName", None) or getattr(rd, "document_name", f"output.{outputFormat}")
mime = getattr(rd, "mimeType", None) or getattr(rd, "mime_type", None) or mime_map.get(outputFormat, "application/octet-stream")
if isinstance(doc_data, bytes):
doc_data = base64.b64encode(doc_data).decode("ascii")
action_documents.append(ActionDocument(
documentName=doc_name,
documentData=doc_data,
mimeType=mime,
validationMetadata={
"actionType": "file.create",
"outputFormat": outputFormat,
"templateName": templateName,
},
))
_persistDocumentsToUserFiles(action_documents, self.services, folder_id=folder_id)
return ActionResult.isSuccess(documents=action_documents)
except Exception as e:
logger.error(f"file.create failed: {e}", exc_info=True)
logger.error("file.create failed: %s", e, exc_info=True)
return ActionResult.isFailure(error=str(e))
if not rendered_docs:
return ActionResult.isFailure(error="Rendering produced no output")
action_documents = []
mime_map = {
"docx": "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
"pdf": "application/pdf",
"txt": "text/plain",
"md": "text/markdown",
"html": "text/html",
"xlsx": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
"csv": "text/csv",
"json": "application/json",
}
for rd in rendered_docs:
doc_data = rd.documentData if hasattr(rd, "documentData") else getattr(rd, "document_data", None)
doc_name = (
getattr(rd, "filename", None)
or getattr(rd, "documentName", None)
or getattr(rd, "document_name", f"output.{outputFormat}")
)
mime = (
getattr(rd, "mimeType", None)
or getattr(rd, "mime_type", None)
or mime_map.get(outputFormat, "application/octet-stream")
)
if isinstance(doc_data, bytes):
doc_data = base64.b64encode(doc_data).decode("ascii")
action_documents.append(ActionDocument(
documentName=doc_name,
documentData=doc_data,
mimeType=mime,
validationMetadata={
"actionType": "file.create",
"outputFormat": outputFormat,
"templateName": templateName,
},
))
_persistDocumentsToUserFiles(action_documents, self.services, folder_id=folder_id)
return ActionResult.isSuccess(documents=action_documents)

View file

@ -35,10 +35,13 @@ class MethodFile(MethodBase):
),
"context": WorkflowActionParameter(
name="context",
type="str",
type="Any",
frontendType=FrontendType.HIDDEN,
required=False,
description="Injected from contentSource or upstream connection",
description=(
"Resolved context: presentation envelope(s) from context.extractContent "
"(dict or list, e.g. loop bodyResults), or legacy plain text string."
),
),
"outputFormat": WorkflowActionParameter(
name="outputFormat",

View file

@ -1,15 +1,26 @@
# Unit tests: unified extractContent handover (text vs image sidecars).
# Unit tests: context.extractContent serialize + presentation helpers (legacy handover dicts vs new paths).
import base64
import copy as _copy
from modules.datamodels.datamodelExtraction import ContentExtracted, ContentPart
from modules.workflows.methods.methodContext.actions.extractContent import (
HANDOVER_KIND,
EXTRACT_PAYLOAD_SCHEMA_VERSION,
_apply_content_filter,
_canonical_content_filter,
_joined_text_from_content_extracted_serial,
_filter_extractions_by_content_filter,
_joined_text_from_handover_payload,
_split_images_to_sidecar_documents,
_persist_extracted_image_parts,
_serialize_content_extracted_for_output,
build_presentation_for_extractions,
build_presentation_for_payload,
joined_text_from_extract_node_data,
parse_presentation_parameters,
presentation_response_text,
summarize_presentation_payload,
)
@ -30,6 +41,120 @@ def test_joined_text_orders_text_table_and_skips_container():
assert _joined_text_from_handover_payload(payload) == "A\n\nB"
def test_joined_text_from_extract_node_data_prefers_content_extracted():
data = {
"contentExtracted": [
{"id": "x", "parts": [{"typeGroup": "text", "mimeType": "text/plain", "data": "Z", "id": "p"}]}
]
}
assert joined_text_from_extract_node_data(data) == "Z"
def test_joined_text_serial_list():
items = [{"parts": [{"typeGroup": "text", "mimeType": "text/plain", "data": "a", "id": "1"}]}]
assert _joined_text_from_content_extracted_serial(items) == "a"
def test_serialize_content_extracted_drops_summary():
ce = ContentExtracted(
id="doc1",
parts=[ContentPart(id="p", label="main", typeGroup="text", mimeType="text/plain", data="hi")],
summary={"ignored": True},
)
d = _serialize_content_extracted_for_output(ce)
assert "summary" not in d
def test_persist_images_without_run_context_is_noop():
raw = b"fake-binary-image"
b64 = base64.b64encode(raw).decode("ascii")
serial = [
{
"id": "1",
"parts": [
{"typeGroup": "text", "data": "x", "mimeType": "text/plain", "id": "t1"},
{"typeGroup": "image", "mimeType": "image/png", "data": b64, "id": "img1"},
],
}
]
original = _copy.deepcopy(serial)
out, arts = _persist_extracted_image_parts(serial, name_stem="stem", run_context=None)
assert arts == []
assert out == original
def test_filter_extractions_by_content_filter_text_only():
ec = ContentExtracted(
id="id1",
parts=[
ContentPart(id="t", label="t", typeGroup="text", mimeType="text/plain", data="a"),
ContentPart(id="i", label="i", typeGroup="image", mimeType="image/png", data=""),
],
)
out = _filter_extractions_by_content_filter([ec], "textOnly")
assert len(out) == 1
assert len(out[0].parts) == 1
assert out[0].parts[0].typeGroup == "text"
def test_canonical_content_filter_is_case_insensitive():
assert _canonical_content_filter("imagesOnly") == "imagesOnly"
assert _canonical_content_filter("IMAGESONLY") == "imagesOnly"
assert _canonical_content_filter("textOnly") == "textOnly"
assert _canonical_content_filter("unknown") == "all"
def test_parse_presentation_parameters_content_filter_all_coerces_legacy_pdf_text():
"""Graphs with „Alles“ but stored pdfExtractMode ``text`` must not drop image parts in presentation."""
cfg = parse_presentation_parameters({"contentFilter": "all", "pdfExtractMode": "text"})
assert cfg["pdfExtractMode"] == "all"
def test_parse_presentation_parameters_images_only_defaults_pdf_mode():
cfg = parse_presentation_parameters({"contentFilter": "imagesOnly"})
assert cfg["pdfExtractMode"] == "images"
def test_presentation_lines_includes_redacted_image_parts_when_pdf_mode_all():
payload = {
"fileOrder": ["f1"],
"files": {
"f1": {
"sourceFileName": "x.pdf",
"parts": [
{"typeGroup": "text", "data": "body", "id": "t"},
{"typeGroup": "image", "mimeType": "image/png", "data": "YQ==", "id": "img1"},
],
},
},
}
cfg = parse_presentation_parameters({"contentFilter": "all", "outputMode": "lines", "pdfExtractMode": "all"})
pres = build_presentation_for_payload(payload, cfg)
bf = pres["files"]["f1"]
assert len(bf["data"]) == 2
assert bf["data"][0]["typeGroup"] == "text"
assert bf["data"][0]["lines"] == ["body"]
assert bf["data"][1]["typeGroup"] == "image"
assert bf["data"][1]["lines"] == []
assert bf["data"][1].get("data") == ""
assert "imageParts" not in bf
def test_build_presentation_for_extractions_matches_payload_path():
ce = ContentExtracted(
id="id",
parts=[ContentPart(id="p", label="main", typeGroup="text", mimeType="text/plain", data="a\n\nb")],
)
cfg = parse_presentation_parameters({"outputMode": "lines", "splitBy": "paragraph"})
pres = build_presentation_for_extractions([ce], ["f.txt"], cfg)
fk = pres["fileOrder"][0]
b1 = pres["files"][fk]
assert b1["outputMode"] == "lines"
assert len(b1["data"]) == 1
assert b1["data"][0]["lines"] == ["a", "b"]
assert "items" not in b1
def test_joined_text_includes_csv_table_parts():
payload = {
"fileOrder": ["f1"],
@ -44,47 +169,6 @@ def test_joined_text_includes_csv_table_parts():
assert _joined_text_from_handover_payload(payload) == "a,b\n1,2"
def test_split_images_moves_pixels_to_blob_docs():
raw = b"fake-binary-image"
b64 = base64.b64encode(raw).decode("ascii")
payload = {
"kind": HANDOVER_KIND,
"schemaVersion": 1,
"fileOrder": ["f1"],
"files": {
"f1": {
"parts": [
{"typeGroup": "text", "data": "x", "id": "t1"},
{
"typeGroup": "image",
"mimeType": "image/png",
"data": b64,
"id": "p1-img",
"metadata": {},
},
]
}
},
}
stripped, blobs = _split_images_to_sidecar_documents(payload, document_name_stem="abc")
assert len(blobs) == 1
assert blobs[0].mimeType == "image/png"
assert blobs[0].documentData == raw
assert blobs[0].documentName.endswith(".png")
assert blobs[0].documentName.startswith("extract_media_")
meta = blobs[0].validationMetadata or {}
assert meta.get("handoverRole") == "extractedMedia"
img_parts = [
p
for p in stripped["files"]["f1"]["parts"]
if isinstance(p, dict) and (p.get("typeGroup") or "") == "image"
]
assert len(img_parts) == 1
assert img_parts[0]["data"] == ""
assert img_parts[0]["handoverMediaDocumentName"] == blobs[0].documentName
assert "image" in stripped["files"]["f1"]["byTypeGroup"]
def _mixed_payload():
return {
"kind": HANDOVER_KIND,
@ -106,7 +190,7 @@ def _mixed_payload():
def test_content_filter_all_is_noop():
payload = _mixed_payload()
result = _apply_content_filter(payload, "all")
assert result is payload # same object, no copy
assert result is payload
def test_content_filter_text_only_keeps_text_table_structure():
@ -129,7 +213,6 @@ def test_content_filter_no_images_removes_only_images():
parts = result["files"]["f1"]["parts"]
type_groups = {p["typeGroup"] for p in parts}
assert "image" not in type_groups
# text, table, structure all remain
assert {"text", "table", "structure"} == type_groups
@ -137,14 +220,7 @@ def test_content_filter_text_only_joined_text_has_no_image_data():
result = _apply_content_filter(_mixed_payload(), "textOnly")
text = _joined_text_from_handover_payload(result)
assert "hello" in text
assert "abc=" not in text # base64 image data must not appear
def test_content_filter_text_only_no_sidecars():
"""textOnly: no image parts → _split produces zero sidecars."""
result = _apply_content_filter(_mixed_payload(), "textOnly")
stripped, blobs = _split_images_to_sidecar_documents(result, document_name_stem="test")
assert blobs == []
assert "abc=" not in text
def test_presentation_lines_and_response():
@ -162,9 +238,12 @@ def test_presentation_lines_and_response():
}
cfg = parse_presentation_parameters({"outputMode": "lines", "splitBy": "paragraph"})
pres = build_presentation_for_payload(payload, cfg)
assert pres["files"]["f1"]["outputMode"] == "lines"
assert [it["text"] for it in pres["files"]["f1"]["items"]] == ["a", "b"]
assert presentation_response_text(pres, payload) == "a\n\nb"
b1 = pres["files"]["f1"]
assert b1["outputMode"] == "lines"
assert isinstance(b1["data"], list)
assert len(b1["data"]) == 1
assert b1["data"][0]["lines"] == ["a", "b"]
assert presentation_response_text(pres) == "a\n\nb"
def test_presentation_pdf_mode_tables_only():
@ -182,7 +261,9 @@ def test_presentation_pdf_mode_tables_only():
}
cfg = parse_presentation_parameters({"pdfExtractMode": "tables", "outputMode": "blob"})
pres = build_presentation_for_payload(payload, cfg)
assert pres["files"]["f1"]["text"] == "h1,h2\n1,2"
bf = pres["files"]["f1"]
assert isinstance(bf["data"], str)
assert bf["data"] == "h1,h2\n1,2"
def test_presentation_csv_rows():
@ -195,7 +276,7 @@ def test_presentation_csv_rows():
},
},
}
cfg = parse_presentation_parameters({"csvHeaderRow": "true"})
cfg = parse_presentation_parameters({"outputMode": "structured", "csvHeaderRow": "true"})
pres = build_presentation_for_payload(payload, cfg)
csv = pres["files"]["f1"]["csv"]
assert csv["headers"] == ["a", "b"]
@ -222,6 +303,11 @@ def test_presentation_pages_groups_by_page_index():
(0, ["p0"]),
(1, ["p1a", "p1b"]),
]
pdata = pres["files"]["f1"]["data"]
assert pdata == [
{"pageIndex": 0, "lines": ["p0"]},
{"pageIndex": 1, "lines": ["p1a", "p1b"]},
]
def test_presentation_chunks_with_overlap_chars():
@ -235,9 +321,10 @@ def test_presentation_chunks_with_overlap_chars():
pres = build_presentation_for_payload(payload, cfg)
texts = [c["text"] for c in pres["files"]["f1"]["chunks"]]
assert texts == ["abcd", "cdef", "efgh", "ghij"]
assert pres["files"]["f1"]["data"] == texts
def test_presentation_stripped_payload_gains_presentation_key_after_split():
def test_presentation_keeps_pres_key_after_inline_image_strip_simulation():
raw = b"x"
b64 = base64.b64encode(raw).decode("ascii")
payload = {
@ -254,7 +341,339 @@ def test_presentation_stripped_payload_gains_presentation_key_after_split():
},
}
pres = build_presentation_for_payload(payload, parse_presentation_parameters({}))
stripped, _blobs = _split_images_to_sidecar_documents(payload, document_name_stem="s")
stripped["presentation"] = pres
assert "presentation" in stripped
assert stripped["presentation"]["files"]["f1"]["items"]
serial = _copy.deepcopy([{"id": "1", "parts": payload["files"]["f1"]["parts"]}])
stayed, arts = _persist_extracted_image_parts(serial, name_stem="s", run_context=None)
assert arts == []
wrapper = {**pres, "_meta": {}}
fk = pres["fileOrder"][0]
assert isinstance(wrapper["files"][fk].get("data"), list)
assert len(wrapper["files"][fk]["data"]) == 2
def test_summarize_presentation_payload_shape():
payload = {
"fileOrder": ["f1"],
"files": {"f1": {"sourceFileName": "t.txt", "parts": [{"typeGroup": "text", "data": "hello", "id": "a"}]}},
}
pres = build_presentation_for_payload(payload, parse_presentation_parameters({"outputMode": "blob"}))
s = summarize_presentation_payload(pres)
assert s["fileOrder"] == ["f1"]
assert "f1" in s["files"]
assert s["files"]["f1"]["outputMode"] == "blob"
assert s["files"]["f1"]["stringLength"] == 5
assert "hello" in (s["files"]["f1"].get("head") or "")
def test_joined_text_from_extract_node_data_uses_presentation_root():
from modules.workflows.methods.methodContext.actions.extractContent import PRESENTATION_KIND
data = {
"schemaVersion": 1,
"kind": PRESENTATION_KIND,
"outputMode": "lines",
"fileOrder": ["f1"],
"files": {"f1": {"outputMode": "lines", "sourceFileName": "x.txt", "data": ["body"]}},
"_meta": {"extractPayloadSchemaVersion": EXTRACT_PAYLOAD_SCHEMA_VERSION},
}
assert joined_text_from_extract_node_data(data) == "body"
assert data["_meta"]["extractPayloadSchemaVersion"] == EXTRACT_PAYLOAD_SCHEMA_VERSION
def test_action_result_contract_new_extract_payload_keys():
from modules.workflows.methods.methodContext.actions.extractContent import PRESENTATION_KIND
data = {
"schemaVersion": 1,
"kind": PRESENTATION_KIND,
"outputMode": "lines",
"fileOrder": ["f1"],
"files": {"f1": {"outputMode": "lines", "sourceFileName": "x.txt", "data": ["body"]}},
"_meta": {"actionType": "context.extractContent", "extractPayloadSchemaVersion": EXTRACT_PAYLOAD_SCHEMA_VERSION},
}
assert data["kind"] == PRESENTATION_KIND
assert joined_text_from_extract_node_data(data) == "body"
def test_automation_workspace_suppresses_extract_artifacts():
from modules.workflows.automation2.workflowArtifactVisibility import suppress_workflow_file_in_workspace_ui
assert suppress_workflow_file_in_workspace_ui({"fileName": "extracted_content_transient-abc_99.json"})
assert suppress_workflow_file_in_workspace_ui({"fileName": "extract_media_stem_uuid.png"})
assert not suppress_workflow_file_in_workspace_ui({"fileName": "export_2026.csv"})
assert suppress_workflow_file_in_workspace_ui({"fileName": "", "suppressInWorkflowFileLists": True})
assert suppress_workflow_file_in_workspace_ui({"fileName": "report.pdf", "tags": ["_workflowInternal"]})
assert not suppress_workflow_file_in_workspace_ui({"fileName": "report.pdf", "tags": ["invoice"]})
def test_normalize_presentation_envelopes_action_result_and_list():
from modules.workflows.methods.methodContext.actions.extractContent import (
PRESENTATION_KIND,
normalize_presentation_envelopes,
)
pres = {
"kind": PRESENTATION_KIND,
"fileOrder": ["f1"],
"files": {"f1": {"outputMode": "lines", "sourceFileName": "x.txt", "data": []}},
}
wrapped = {"success": True, "data": pres}
assert len(normalize_presentation_envelopes(wrapped)) == 1
assert len(normalize_presentation_envelopes([wrapped])) == 1
def test_method_base_preserves_run_context_injection():
from modules.workflows.methods.methodFile.methodFile import MethodFile
class _Svc:
pass
action_def = MethodFile(_Svc())._actions["create"]
validated = MethodFile(_Svc())._validateParameters(
{"context": "x", "outputFormat": "pdf", "_runContext": {"mandateId": "m", "instanceId": "i"}},
action_def.parameters,
)
assert validated.get("_runContext") == {"mandateId": "m", "instanceId": "i"}
def test_presentation_envelopes_to_document_json_one_section_per_data_slot():
from modules.workflows.methods.methodContext.actions.extractContent import (
PRESENTATION_KIND,
presentation_envelopes_to_document_json,
)
pres = {
"kind": PRESENTATION_KIND,
"outputMode": "lines",
"fileOrder": ["f1"],
"files": {
"f1": {
"outputMode": "lines",
"sourceFileName": "a.pdf",
"data": [
{
"typeGroup": "text",
"mimeType": "text/plain",
"data": "ignored",
"lines": ["Line A", "Line B"],
},
],
},
},
}
out = presentation_envelopes_to_document_json(
{"success": True, "data": pres},
title="T",
language="de",
)
paragraphs = [
s for s in out["documents"][0]["sections"]
if s.get("content_type") == "paragraph"
]
assert len(paragraphs) == 1
runs = paragraphs[0]["elements"][0]["content"]["inlineRuns"]
joined = "".join(r.get("value", "") for r in runs)
assert "Line A" in joined
assert "Line B" in joined
assert "\n" in joined
def test_presentation_envelopes_table_slot_becomes_table_section():
from modules.workflows.methods.methodContext.actions.extractContent import (
PRESENTATION_KIND,
presentation_envelopes_to_document_json,
)
pres = {
"kind": PRESENTATION_KIND,
"outputMode": "lines",
"fileOrder": ["f1"],
"files": {
"f1": {
"outputMode": "lines",
"sourceFileName": "sheet.csv",
"data": [
{
"typeGroup": "table",
"mimeType": "text/csv",
"data": '"Name","Amount"\n"Alice","100"\n"Bob","200"',
"lines": [],
},
],
},
},
}
out = presentation_envelopes_to_document_json(
{"success": True, "data": pres},
title="T",
language="de",
)
tables = [s for s in out["documents"][0]["sections"] if s.get("content_type") == "table"]
assert len(tables) == 1
content = tables[0]["elements"][0]["content"]
assert content["headers"] == ["Name", "Amount"]
assert content["rows"] == [["Alice", "100"], ["Bob", "200"]]
def test_presentation_line_slot_preserves_table_without_lines():
from modules.workflows.methods.methodContext.actions.extractContent import (
_presentation_line_slot_from_part,
_presentation_line_slots_from_part,
parse_presentation_parameters,
)
cfg = parse_presentation_parameters({"outputMode": "lines", "splitBy": "newline"})
part = {
"typeGroup": "table",
"mimeType": "text/csv",
"data": '"A","B"\n"1","2"\n"3","4"',
"id": "t1",
}
slot = _presentation_line_slot_from_part(part, cfg)
assert slot.get("lines") == []
assert slot.get("data") == part["data"]
slots = _presentation_line_slots_from_part(part, cfg)
assert len(slots) == 3
assert slots[0]["lines"] == ['"A","B"']
assert slots[1]["lines"] == ['"1","2"']
def test_presentation_envelopes_preserves_data_slot_order_text_image_text():
import base64
from modules.workflows.methods.methodContext.actions.extractContent import (
PRESENTATION_KIND,
presentation_envelopes_to_document_json,
)
class _Mgmt:
def getFileData(self, _fid: str) -> bytes:
return base64.b64decode(
"iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mP8z8BQDwAEhQGAhKmMIQAAAABJRU5ErkJggg=="
)
class _Svc:
interfaceDbComponent = _Mgmt()
pres = {
"kind": PRESENTATION_KIND,
"outputMode": "lines",
"fileOrder": ["f1"],
"files": {
"f1": {
"outputMode": "lines",
"sourceFileName": "a.pdf",
"data": [
{"typeGroup": "text", "mimeType": "text/plain", "lines": ["Before"]},
{
"typeGroup": "image",
"mimeType": "image/png",
"embeddedImageFileId": "00000000-0000-0000-0000-000000000001",
},
{"typeGroup": "text", "mimeType": "text/plain", "lines": ["After"]},
],
},
},
}
out = presentation_envelopes_to_document_json(
{"success": True, "data": pres},
title="T",
language="de",
services=_Svc(),
)
types = [s.get("content_type") for s in out["documents"][0]["sections"]]
assert types == ["paragraph", "image", "paragraph"]
def test_presentation_envelopes_to_document_json_text_slots():
from modules.workflows.methods.methodContext.actions.extractContent import (
PRESENTATION_KIND,
presentation_envelopes_to_document_json,
)
pres = {
"kind": PRESENTATION_KIND,
"outputMode": "lines",
"fileOrder": ["f1"],
"files": {
"f1": {
"outputMode": "lines",
"sourceFileName": "a.pdf",
"data": [
{
"typeGroup": "text",
"mimeType": "text/plain",
"data": "Hello",
"lines": ["Hello", "World"],
},
],
},
},
}
out = presentation_envelopes_to_document_json(
[{"success": True, "data": pres}],
title="T",
language="de",
)
paragraphs = [
s for s in out["documents"][0]["sections"]
if s.get("content_type") == "paragraph"
]
assert len(paragraphs) == 1
all_text = []
for p in paragraphs:
runs = p["elements"][0]["content"]["inlineRuns"]
all_text.append("".join(r.get("value", "") for r in runs))
assert any("Hello" in t for t in all_text)
assert any("World" in t for t in all_text)
def test_presentation_envelopes_to_document_json_image_slot():
import base64
from modules.workflows.methods.methodContext.actions.extractContent import (
PRESENTATION_KIND,
presentation_envelopes_to_document_json,
)
fid = "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee"
pres = {
"kind": PRESENTATION_KIND,
"outputMode": "lines",
"fileOrder": ["f1"],
"files": {
"f1": {
"outputMode": "lines",
"sourceFileName": "a.pdf",
"data": [
{
"typeGroup": "image",
"mimeType": "image/png",
"embeddedImageFileId": fid,
"embeddedImageFileName": "clip.png",
},
],
},
},
}
class _Mgmt:
def getFileData(self, file_id):
assert file_id == fid
return b"\x89PNG\r\n"
class _Svc:
interfaceDbComponent = _Mgmt()
out = presentation_envelopes_to_document_json(
pres,
title="Img",
language="de",
services=_Svc(),
)
img_secs = [
s for s in out["documents"][0]["sections"]
if s.get("content_type") == "image"
]
assert len(img_secs) == 1
b64 = img_secs[0]["elements"][0]["content"]["base64Data"]
assert base64.b64decode(b64).startswith(b"\x89PNG")

View file

@ -45,7 +45,7 @@ async def test_mergeContext_handover_only_in_documents_yields_data_response():
}
result = await mergeContext(object(), {"dataSource": [item]})
assert result.success
assert result.data
assert result.data.get("kind") == "context.mergeContext.v1"
assert result.data.get("response") == "only-from-handover"
@ -176,3 +176,24 @@ async def test_mergeContext_accumulates_image_documents_only_across_iterations()
names = [d.get("documentName") for d in imgs]
assert "img_a.png" in names
assert "img_b.png" in names
@pytest.mark.asyncio
async def test_transform_context_envelope_has_kind_and_meta():
from modules.workflows.methods.methodContext.actions.transformContext import transformContext
svc = object()
result = await transformContext(
svc,
{
"mappings": [{"operation": "rename", "sourceField": "a", "outputField": "b"}],
"_upstreamPayload": {"a": 42},
},
)
assert result.success and result.data
assert result.data.get("kind") == "context.transformContext.v1"
assert result.data.get("schemaVersion") == 1
assert result.data.get("b") == 42
meta = result.data.get("_meta")
assert isinstance(meta, dict)
assert meta.get("actionType") == "context.transformContext"

View file

@ -18,6 +18,7 @@ def test_context_extractContent_node_exists():
def test_context_extractContent_node_shape():
node = next(n for n in STATIC_NODE_TYPES if n["id"] == "context.extractContent")
assert node["category"] == "context"
assert node.get("injectRunContext") is True
assert node["meta"]["usesAi"] is False
assert node["_method"] == "context"
assert node["_action"] == "extractContent"
@ -43,7 +44,16 @@ def test_context_extractContent_node_shape():
]
pick_paths = [opt["path"] for opt in node["outputPorts"][0]["dataPickOptions"]]
assert ["documents", 0, "documentData", "presentation"] in pick_paths
assert ["data", "files"] in pick_paths
assert ["data", "_meta"] in pick_paths
def test_context_transformContext_has_envelope_data_pick_paths():
node = next(n for n in STATIC_NODE_TYPES if n["id"] == "context.transformContext")
pick_paths = [opt["path"] for opt in node["outputPorts"][0]["dataPickOptions"]]
assert ["data"] in pick_paths
assert ["data", "_meta"] in pick_paths
def test_udm_port_types_registered():
@ -85,6 +95,14 @@ def test_getExecutor_dispatches_context():
assert isinstance(executor, ActionNodeExecutor)
def test_context_mergeContext_has_envelope_data_pick_paths():
node = next(n for n in STATIC_NODE_TYPES if n["id"] == "context.mergeContext")
pick_paths = [opt["path"] for opt in node["outputPorts"][0]["dataPickOptions"]]
assert ["data"] in pick_paths
assert ["data", "_meta"] in pick_paths
assert ["merged"] in pick_paths
def test_context_mergeContext_surfaces_data_pick_paths_match_node_outputs():
"""DataPicker uses paths like ``merged``; executor must surface ``data.*`` to top level."""
node = next(n for n in STATIC_NODE_TYPES if n["id"] == "context.mergeContext")

View file

@ -1,98 +0,0 @@
# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
import json
from modules.workflows.methods.methodAi._common import serialize_context
from modules.serviceCenter.services.serviceGeneration.subDocumentUtility import (
enhancePlainTextWithMarkdownTables,
markdownToDocumentJson,
)
from modules.workflows.methods.methodFile.actions.create import (
_collect_image_documents_only,
_context_string_for_report,
)
def test_serialize_context_nonserializable_embeds_via_default_str():
class _Ns:
def __str__(self):
return "ns"
s = serialize_context({"x": _Ns(), "n": 1})
parsed = json.loads(s)
assert parsed["n"] == 1
assert "ns" in parsed["x"]
def test_serialize_context_strips_bom_on_plain_string():
assert serialize_context("\ufeffhello") == "hello"
def test_context_string_docx_prefers_response_over_full_dict():
body = "Datum;Mandant\n2026-01-01;acme"
ctx = {"response": "\ufeff" + body, "data": {"foo": 1}}
assert _context_string_for_report(ctx, "docx") == body
def test_context_string_json_serializes_full_structure():
ctx = {"response": "hi", "data": {"foo": 1}}
out = _context_string_for_report(ctx, "json")
assert json.loads(out)["data"]["foo"] == 1
def test_serialize_context_prefers_response_when_json_fails():
d: dict = {"response": "plain", "n": 1}
d["_loop"] = d # circular — json.dumps fails
assert serialize_context(d).strip() == "plain"
def test_serialize_context_prefer_handover_primary_skips_metadata():
blob = {"response": "LINE", "data": {"nested": {"x" * 200}}, "extra": {"y": 2}}
s = serialize_context(blob, prefer_handover_primary=True)
assert s == "LINE"
def test_context_string_plain_str_passthrough_docx():
assert _context_string_for_report(" hello ", "docx") == "hello"
def test_collect_image_documents_nested_paths():
imgs = [{"documentName": "m.png", "mimeType": "image/png"}]
assert _collect_image_documents_only({"merged": {"imageDocumentsOnly": imgs}}) == imgs
assert _collect_image_documents_only({"data": {"merged": {"imageDocumentsOnly": imgs}}}) == imgs
def test_context_string_prefers_merged_response_over_inputs_noise():
raw = {"merged": {"response": "from-merged"}, "inputs": {"0": {"documentData": "X" * 10000}}}
assert _context_string_for_report(raw, "docx") == "from-merged"
def test_context_string_fallback_json_strips_heavy_keys():
raw = {"foo": 1, "inputs": {"nasty": True}, "imageDocumentsOnly": [{"documentName": "x"}]}
out = _context_string_for_report(raw, "docx")
parsed = json.loads(out)
assert "inputs" not in parsed
assert "imageDocumentsOnly" not in parsed
assert parsed["foo"] == 1
def test_enhance_plain_csv_semicolon_to_markdown_table():
body = "Datum;Betrag\n2026-01-01;12.50\n2026-01-02;3.00"
out = enhancePlainTextWithMarkdownTables(body)
assert "| Datum |" in out
assert "| Betrag |" in out
assert "---" in out
def test_enhance_preserves_normal_paragraphs():
body = "Ein Absatz ohne Raster.\n\nZweiter Gedanke."
assert enhancePlainTextWithMarkdownTables(body) == body
def test_enhance_then_markdown_json_contains_table_section():
body = "Datum;Betrag\n2026-01-01;12\n2026-01-02;3"
enhanced = enhancePlainTextWithMarkdownTables(body)
doc = markdownToDocumentJson(enhanced, "Report", "de")
sections = doc["documents"][0]["sections"]
assert any(s.get("content_type") == "table" for s in sections)