From eeb9a4a1613ca775d22928da25e4dbb011302dd5 Mon Sep 17 00:00:00 2001 From: Ida Date: Wed, 6 May 2026 08:11:48 +0200 Subject: [PATCH] AI node had the full data.response, but markdownToDocumentJson stores paragraph text in inlineRuns while RendererMarkdown only read content.text, so body text was dropped, Markdown renderer now flattens inlineRuns into real Markdown so workflow-generated .md files include the upstream text, node specific shortcuts replaced --- .../graphicalEditor/nodeDefinitions/ai.py | 33 ++-- .../nodeDefinitions/context.py | 3 +- .../graphicalEditor/nodeDefinitions/email.py | 6 +- .../graphicalEditor/nodeDefinitions/file.py | 3 +- .../nodeDefinitions/trustee.py | 6 +- modules/features/graphicalEditor/portTypes.py | 15 ++ .../renderers/rendererMarkdown.py | 51 +++++- .../workflows/automation2/executionEngine.py | 6 +- .../executors/actionNodeExecutor.py | 172 +++++++++--------- .../automation2/executors/ioExecutor.py | 6 +- modules/workflows/automation2/graphUtils.py | 115 +++++++----- .../automation2/pickNotPushMigration.py | 81 ++++++++- .../methods/methodAi/actions/process.py | 43 +++-- .../methods/methodFile/actions/create.py | 11 +- .../processing/core/actionExecutor.py | 26 +-- .../processing/core/messageCreator.py | 11 ++ 16 files changed, 389 insertions(+), 199 deletions(-) diff --git a/modules/features/graphicalEditor/nodeDefinitions/ai.py b/modules/features/graphicalEditor/nodeDefinitions/ai.py index 43136394..d1df7b1d 100644 --- a/modules/features/graphicalEditor/nodeDefinitions/ai.py +++ b/modules/features/graphicalEditor/nodeDefinitions/ai.py @@ -25,9 +25,11 @@ AI_NODES = [ "frontendOptions": {"options": ["txt", "json", "md", "csv", "xml", "html", "pdf", "docx", "xlsx", "pptx", "png", "jpg"]}, "description": t("Ausgabeformat"), "default": "txt"}, {"name": "documentList", "type": "DocumentList", "required": False, "frontendType": "hidden", - "description": t("Dokumente aus vorherigen Schritten"), "default": ""}, + "description": t("Dokumente aus vorherigen Schritten"), "default": "", + "graphInherit": {"port": 0, "kind": "documentListWire"}}, {"name": "context", "type": "Any", "required": False, "frontendType": "contextBuilder", - "description": t("Daten aus vorherigen Schritten"), "default": ""}, + "description": t("Daten aus vorherigen Schritten"), "default": "", + "graphInherit": {"port": 0, "kind": "primaryTextRef"}}, {"name": "documentTheme", "type": "str", "required": False, "frontendType": "select", "frontendOptions": {"options": ["general", "finance", "legal", "technical", "hr"]}, "description": t("Dokument-Thema (Style-Hinweis fuer den Renderer)"), "default": "general"}, @@ -53,9 +55,11 @@ AI_NODES = [ {"name": "prompt", "type": "str", "required": True, "frontendType": "textarea", "description": t("Recherche-Anfrage")}, {"name": "context", "type": "Any", "required": False, "frontendType": "contextBuilder", - "description": t("Daten aus vorherigen Schritten"), "default": ""}, + "description": t("Daten aus vorherigen Schritten"), "default": "", + "graphInherit": {"port": 0, "kind": "primaryTextRef"}}, {"name": "documentList", "type": "DocumentList", "required": False, "frontendType": "hidden", - "description": t("Dokumente aus vorherigen Schritten"), "default": ""}, + "description": t("Dokumente aus vorherigen Schritten"), "default": "", + "graphInherit": {"port": 0, "kind": "documentListWire"}}, ] + _AI_COMMON_PARAMS, "inputs": 1, "outputs": 1, @@ -74,7 +78,8 @@ AI_NODES = [ "description": t("Dokumentinhalt zusammenfassen"), "parameters": [ {"name": "documentList", "type": "DocumentList", "required": True, "frontendType": "dataRef", - "description": t("Dokumente aus vorherigen Schritten")}, + "description": t("Dokumente aus vorherigen Schritten"), + "graphInherit": {"port": 0, "kind": "documentListWire"}}, {"name": "summaryLength", "type": "str", "required": False, "frontendType": "select", "frontendOptions": {"options": ["brief", "medium", "detailed"]}, "description": t("Kurz, mittel oder ausführlich"), "default": "medium"}, @@ -94,7 +99,8 @@ AI_NODES = [ "description": t("Dokument in Zielsprache übersetzen"), "parameters": [ {"name": "documentList", "type": "DocumentList", "required": True, "frontendType": "dataRef", - "description": t("Dokumente aus vorherigen Schritten")}, + "description": t("Dokumente aus vorherigen Schritten"), + "graphInherit": {"port": 0, "kind": "documentListWire"}}, {"name": "targetLanguage", "type": "str", "required": True, "frontendType": "text", "description": t("Zielsprache (z.B. de, en, French)")}, ] + _AI_COMMON_PARAMS, @@ -113,7 +119,8 @@ AI_NODES = [ "description": t("Dokument in anderes Format konvertieren"), "parameters": [ {"name": "documentList", "type": "DocumentList", "required": True, "frontendType": "dataRef", - "description": t("Dokumente aus vorherigen Schritten")}, + "description": t("Dokumente aus vorherigen Schritten"), + "graphInherit": {"port": 0, "kind": "documentListWire"}}, {"name": "targetFormat", "type": "str", "required": True, "frontendType": "select", "frontendOptions": {"options": ["docx", "pdf", "xlsx", "csv", "txt", "html", "json", "md"]}, "description": t("Zielformat")}, @@ -143,9 +150,11 @@ AI_NODES = [ "frontendOptions": {"options": ["letter", "memo", "proposal", "contract", "report", "email"]}, "description": t("Dokumentart (Inhaltshinweis fuer die KI)"), "default": "proposal"}, {"name": "context", "type": "Any", "required": False, "frontendType": "contextBuilder", - "description": t("Daten aus vorherigen Schritten"), "default": ""}, + "description": t("Daten aus vorherigen Schritten"), "default": "", + "graphInherit": {"port": 0, "kind": "primaryTextRef"}}, {"name": "documentList", "type": "DocumentList", "required": False, "frontendType": "hidden", - "description": t("Dokumente aus vorherigen Schritten"), "default": ""}, + "description": t("Dokumente aus vorherigen Schritten"), "default": "", + "graphInherit": {"port": 0, "kind": "documentListWire"}}, ] + _AI_COMMON_PARAMS, "inputs": 1, "outputs": 1, @@ -169,9 +178,11 @@ AI_NODES = [ "frontendOptions": {"options": ["py", "js", "ts", "html", "java", "cpp", "txt", "json", "csv", "xml"]}, "description": t("Datei-Endung der erzeugten Code-Datei"), "default": "py"}, {"name": "context", "type": "Any", "required": False, "frontendType": "contextBuilder", - "description": t("Daten aus vorherigen Schritten"), "default": ""}, + "description": t("Daten aus vorherigen Schritten"), "default": "", + "graphInherit": {"port": 0, "kind": "primaryTextRef"}}, {"name": "documentList", "type": "DocumentList", "required": False, "frontendType": "hidden", - "description": t("Dokumente aus vorherigen Schritten"), "default": ""}, + "description": t("Dokumente aus vorherigen Schritten"), "default": "", + "graphInherit": {"port": 0, "kind": "documentListWire"}}, ] + _AI_COMMON_PARAMS, "inputs": 1, "outputs": 1, diff --git a/modules/features/graphicalEditor/nodeDefinitions/context.py b/modules/features/graphicalEditor/nodeDefinitions/context.py index f6757cc8..97b089d4 100644 --- a/modules/features/graphicalEditor/nodeDefinitions/context.py +++ b/modules/features/graphicalEditor/nodeDefinitions/context.py @@ -11,7 +11,8 @@ CONTEXT_NODES = [ "description": t("Dokumentstruktur extrahieren ohne KI (Seiten, Abschnitte, Bilder, Tabellen)"), "parameters": [ {"name": "documentList", "type": "str", "required": True, "frontendType": "hidden", - "description": t("Dokumentenliste (via Wire oder DataRef)"), "default": ""}, + "description": t("Dokumentenliste (via Wire oder DataRef)"), "default": "", + "graphInherit": {"port": 0, "kind": "documentListWire"}}, {"name": "extractionOptions", "type": "object", "required": False, "frontendType": "json", "description": t( "Extraktions-Optionen (JSON), z.B. {\"includeImages\": true, \"includeTables\": true, " diff --git a/modules/features/graphicalEditor/nodeDefinitions/email.py b/modules/features/graphicalEditor/nodeDefinitions/email.py index 8f316605..d6c5f5b0 100644 --- a/modules/features/graphicalEditor/nodeDefinitions/email.py +++ b/modules/features/graphicalEditor/nodeDefinitions/email.py @@ -63,11 +63,13 @@ EMAIL_NODES = [ "frontendOptions": {"authority": "msft"}, "description": t("E-Mail-Konto")}, {"name": "context", "type": "Any", "required": False, "frontendType": "templateTextarea", - "description": t("Daten aus vorherigen Schritten (oder direkte Beschreibung)"), "default": ""}, + "description": t("Daten aus vorherigen Schritten (oder direkte Beschreibung)"), "default": "", + "graphInherit": {"port": 0, "kind": "primaryTextRef"}}, {"name": "to", "type": "str", "required": False, "frontendType": "text", "description": t("Empfänger (komma-separiert, optional für Entwurf)"), "default": ""}, {"name": "documentList", "type": "str", "required": False, "frontendType": "hidden", - "description": t("Anhang-Dokumente (via Wire oder DataRef)"), "default": ""}, + "description": t("Anhang-Dokumente (via Wire oder DataRef)"), "default": "", + "graphInherit": {"port": 0, "kind": "documentListWire"}}, {"name": "emailContent", "type": "str", "required": False, "frontendType": "hidden", "description": t("Direkt vorbereiteter Inhalt {subject, body, to} (via Wire — überspringt KI)"), "default": ""}, diff --git a/modules/features/graphicalEditor/nodeDefinitions/file.py b/modules/features/graphicalEditor/nodeDefinitions/file.py index ffa4d722..9cc8d5f4 100644 --- a/modules/features/graphicalEditor/nodeDefinitions/file.py +++ b/modules/features/graphicalEditor/nodeDefinitions/file.py @@ -16,7 +16,8 @@ FILE_NODES = [ {"name": "title", "type": "str", "required": False, "frontendType": "text", "description": t("Dokumenttitel")}, {"name": "context", "type": "Any", "required": False, "frontendType": "contextBuilder", - "description": t("Daten aus vorherigen Schritten"), "default": ""}, + "description": t("Daten aus vorherigen Schritten"), "default": "", + "graphInherit": {"port": 0, "kind": "primaryTextRef"}}, ], "inputs": 1, "outputs": 1, diff --git a/modules/features/graphicalEditor/nodeDefinitions/trustee.py b/modules/features/graphicalEditor/nodeDefinitions/trustee.py index 3adc9d3f..18f3e3a0 100644 --- a/modules/features/graphicalEditor/nodeDefinitions/trustee.py +++ b/modules/features/graphicalEditor/nodeDefinitions/trustee.py @@ -77,7 +77,8 @@ TRUSTEE_NODES = [ # is List[ActionDocument] (see datamodelChat.ActionResult). The # DataPicker uses this string to filter compatible upstream paths. {"name": "documentList", "type": "List[ActionDocument]", "required": True, "frontendType": "dataRef", - "description": t("Dokumente aus vorherigen Schritten")}, + "description": t("Dokumente aus vorherigen Schritten"), + "graphInherit": {"port": 0, "kind": "documentListWire"}}, dict(_TRUSTEE_INSTANCE_PARAM), ], "inputs": 1, @@ -95,7 +96,8 @@ TRUSTEE_NODES = [ "description": t("Trustee-Positionen in Buchhaltungssystem übertragen."), "parameters": [ {"name": "documentList", "type": "List[ActionDocument]", "required": True, "frontendType": "dataRef", - "description": t("Dokumente aus vorherigen Schritten")}, + "description": t("Dokumente aus vorherigen Schritten"), + "graphInherit": {"port": 0, "kind": "documentListWire"}}, dict(_TRUSTEE_INSTANCE_PARAM), ], "inputs": 1, diff --git a/modules/features/graphicalEditor/portTypes.py b/modules/features/graphicalEditor/portTypes.py index deab83b9..b2603d7d 100644 --- a/modules/features/graphicalEditor/portTypes.py +++ b/modules/features/graphicalEditor/portTypes.py @@ -612,6 +612,21 @@ SYSTEM_VARIABLES: Dict[str, Dict[str, str]] = { } +# --------------------------------------------------------------------------- +# Graph inheritance (executeGraph materialization + ActionNodeExecutor wiring) +# --------------------------------------------------------------------------- +# +# When a parameter declares ``graphInherit.kind == "primaryTextRef"``, executeGraph +# inserts an explicit DataRef before run (see pickNotPushMigration.materializePrimaryTextHandover). +# Schema names are catalog output port types (e.g. AiResult). + +PRIMARY_TEXT_HANDOVER_REF_PATH: Dict[str, List[Any]] = { + "AiResult": ["response"], + "TextResult": ["text"], + "ConsolidateResult": ["result"], +} + + def resolveSystemVariable(variable: str, context: Dict[str, Any]) -> Any: """Resolve a system variable name to its runtime value.""" from datetime import datetime, timezone diff --git a/modules/serviceCenter/services/serviceGeneration/renderers/rendererMarkdown.py b/modules/serviceCenter/services/serviceGeneration/renderers/rendererMarkdown.py index 1113f1a2..266a6abf 100644 --- a/modules/serviceCenter/services/serviceGeneration/renderers/rendererMarkdown.py +++ b/modules/serviceCenter/services/serviceGeneration/renderers/rendererMarkdown.py @@ -6,7 +6,7 @@ Markdown renderer for report generation. from .documentRendererBaseTemplate import BaseRenderer from modules.datamodels.datamodelDocument import RenderedDocument -from typing import Dict, Any, List, Optional +from typing import Any, Dict, List, Optional class RendererMarkdown(BaseRenderer): """Renders content to Markdown format with format-specific extraction.""" @@ -252,6 +252,41 @@ class RendererMarkdown(BaseRenderer): self.logger.warning(f"Error rendering table: {str(e)}") return "" + def _renderInlineRunsMarkdown(self, runs: Any) -> str: + """Turn Phase-5 inlineRuns (from markdownToDocumentJson) into markdown text.""" + if not runs: + return "" + if not isinstance(runs, list): + return str(runs) + parts: List[str] = [] + for run in runs: + if not isinstance(run, dict): + parts.append(str(run)) + continue + run_type = run.get("type", "text") + value = str(run.get("value", "")) + if run_type == "text": + parts.append(value) + elif run_type == "bold": + parts.append(f"**{value}**") + elif run_type == "italic": + parts.append(f"*{value}*") + elif run_type == "code": + if not value: + parts.append("``") + elif "`" not in value: + parts.append(f"`{value}`") + else: + parts.append(f"``{value}``") + elif run_type == "link": + href = str(run.get("href", "")) + parts.append(f"[{value}]({href})") + elif run_type == "image": + parts.append(f"![{value}](image)") + else: + parts.append(value) + return "".join(parts) + def _renderJsonBulletList(self, listData: Dict[str, Any]) -> str: """Render a JSON bullet list to markdown.""" try: @@ -268,6 +303,8 @@ class RendererMarkdown(BaseRenderer): for item in items: if isinstance(item, str): markdownParts.append(f"- {item}") + elif isinstance(item, list): + markdownParts.append(f"- {self._renderInlineRunsMarkdown(item)}") elif isinstance(item, dict) and "text" in item: markdownParts.append(f"- {item['text']}") @@ -302,14 +339,24 @@ class RendererMarkdown(BaseRenderer): try: # Extract from nested content structure content = paragraphData.get("content", {}) + top = paragraphData.get("text") + if isinstance(top, str) and top.strip(): + if not isinstance(content, dict) or ( + not content.get("text") and not content.get("inlineRuns") + ): + return top + if isinstance(content, dict): + runs = self._inlineRunsFromContent(content) + if runs: + return self._renderInlineRunsMarkdown(runs) text = content.get("text", "") elif isinstance(content, str): text = content else: text = "" return text if text else "" - + except Exception as e: self.logger.warning(f"Error rendering paragraph: {str(e)}") return "" diff --git a/modules/workflows/automation2/executionEngine.py b/modules/workflows/automation2/executionEngine.py index e49754f8..3c056df6 100644 --- a/modules/workflows/automation2/executionEngine.py +++ b/modules/workflows/automation2/executionEngine.py @@ -360,7 +360,10 @@ async def executeGraph( ) from modules.workflows.processing.shared.methodDiscovery import discoverMethods discoverMethods(services) - from modules.workflows.automation2.pickNotPushMigration import materializeConnectionRefs + from modules.workflows.automation2.pickNotPushMigration import ( + materializeConnectionRefs, + materializePrimaryTextHandover, + ) from modules.workflows.automation2.featureInstanceRefMigration import ( materializeFeatureInstanceRefs, ) @@ -372,6 +375,7 @@ async def executeGraph( # subsequent connection-ref pass and validation see the canonical shape. graph = materializeFeatureInstanceRefs(graph) graph = materializeConnectionRefs(graph) + graph = materializePrimaryTextHandover(graph) nodeTypeIds = _getNodeTypeIds(services) logger.debug("executeGraph nodeTypeIds (%d): %s", len(nodeTypeIds), sorted(nodeTypeIds)) errors = validateGraph(graph, nodeTypeIds) diff --git a/modules/workflows/automation2/executors/actionNodeExecutor.py b/modules/workflows/automation2/executors/actionNodeExecutor.py index 2806bd4c..d11c31f3 100644 --- a/modules/workflows/automation2/executors/actionNodeExecutor.py +++ b/modules/workflows/automation2/executors/actionNodeExecutor.py @@ -1,10 +1,14 @@ # Copyright (c) 2025 Patrick Motsch -# Action node executor - maps ai.*, email.*, sharepoint.*, clickup.*, file.*, trustee.* to method actions. +# Action node executor — maps ai.*, email.*, sharepoint.*, clickup.*, file.*, trustee.* to method actions. # -# Typed Port System: explicit DataRefs / static parameters; optional ``documentList`` from input port 0 -# when the param is empty (same idea as IOExecutor wire fill). -# ``materializeConnectionRefs`` (see pickNotPushMigration) may still rewrite empty connectionReference at run start. +# Typed port system: parameters resolve via DataRefs / static values. Declarative port inheritance +# uses ``graphInherit`` on parameter definitions in node JSON (see STATIC_NODE_TYPES): e.g. +# ``primaryTextRef`` is materialized to explicit refs in pickNotPushMigration.materializePrimaryTextHandover; +# ``documentListWire`` is applied at runtime in this executor via graphUtils.extract_wired_document_list. + +import base64 +import binascii import json import logging import re @@ -20,8 +24,23 @@ from modules.serviceCenter.services.serviceBilling.mainServiceBilling import Bil logger = logging.getLogger(__name__) +def _looks_like_ascii_base64_payload(s: str) -> bool: + """Heuristic: ActionDocument binary payloads use standard ASCII base64; markdown/text uses other chars (#, *, -, …).""" + t = "".join(s.split()) + if len(t) < 8: + return False + if not t.isascii(): + return False + return bool(re.fullmatch(r"[A-Za-z0-9+/]+=*", t)) and len(t) % 4 == 0 + + def _coerce_document_data_to_bytes(raw: Any) -> Optional[bytes]: - """Normalize documentData (bytes/str/buffer) for DB file persistence.""" + """Normalize documentData for DB file persistence. + + ActionDocument conventions (see methodFile.create): binary bodies are carried as ASCII + base64 strings; plain markdown/text stays as Unicode. Do not UTF-8-encode a base64 + literal — that persists the ASCII of the encoding (file looks like base64 gibberish). + """ if raw is None: return None if isinstance(raw, bytes): @@ -33,7 +52,20 @@ def _coerce_document_data_to_bytes(raw: Any) -> Optional[bytes]: b = raw.tobytes() return b if len(b) > 0 else None if isinstance(raw, str): - b = raw.encode("utf-8") + stripped = raw.strip() + if not stripped: + return None + if _looks_like_ascii_base64_payload(stripped): + try: + decoded = base64.b64decode(stripped, validate=True) + except (TypeError, binascii.Error, ValueError): + try: + decoded = base64.b64decode(stripped) + except (binascii.Error, ValueError): + decoded = b"" + if decoded: + return decoded + b = stripped.encode("utf-8") return b if len(b) > 0 else None return None @@ -239,78 +271,6 @@ def _getOutputSchemaName(nodeDef: Dict) -> str: return port0.get("schema", "ActionResult") -def _extract_wired_document_list(inp: Any) -> Optional[Dict[str, Any]]: - """ - Build a DocumentList-shaped dict from upstream node output (matches IOExecutor wire behavior). - Handles DocumentList, human upload shapes (file / files / fileIds), FileList, loop file items. - During flow.loop body execution the loop node's output is - {items, count, currentItem, currentIndex}; wired document actions must use currentItem. - """ - if inp is None: - return None - from modules.features.graphicalEditor.portTypes import ( - unwrapTransit, - _coerce_document_list_upload_fields, - _file_record_to_document, - ) - - data = unwrapTransit(inp) - if isinstance(data, str): - one = _file_record_to_document(data) - return {"documents": [one], "count": 1} if one else None - if not isinstance(data, dict): - return None - d = dict(data) - _coerce_document_list_upload_fields(d) - # Per-iteration payload from executionEngine (flow.loop → downstream in loop body) - if "currentItem" in d: - ci = d.get("currentItem") - if ci is not None: - nested = _extract_wired_document_list(ci) - if nested: - return nested - docs = d.get("documents") - if isinstance(docs, list) and len(docs) > 0: - return {"documents": docs, "count": d.get("count", len(docs))} - raw_list = d.get("documentList") - if isinstance(raw_list, list) and len(raw_list) > 0 and isinstance(raw_list[0], dict): - return {"documents": raw_list, "count": len(raw_list)} - doc_id = d.get("documentId") or d.get("id") - if doc_id and str(doc_id).strip(): - one: Dict[str, Any] = {"id": str(doc_id).strip()} - fn = d.get("fileName") or d.get("name") - if fn: - one["name"] = str(fn) - mt = d.get("mimeType") - if mt: - one["mimeType"] = str(mt) - return {"documents": [one], "count": 1} - files = d.get("files") - if isinstance(files, list) and files: - collected = [] - for item in files: - conv = _file_record_to_document(item) if isinstance(item, dict) else None - if conv: - collected.append(conv) - if collected: - return {"documents": collected, "count": len(collected)} - return None - - -def _document_list_param_is_empty(val: Any) -> bool: - if val is None or val == "": - return True - if isinstance(val, list) and len(val) == 0: - return True - if isinstance(val, dict): - if val.get("documents") or val.get("references") or val.get("items"): - return False - if val.get("documentId") or val.get("id"): - return False - return True - return False - - class ActionNodeExecutor: """Execute action nodes by mapping to method actions via ActionExecutor.""" @@ -323,7 +283,11 @@ class ActionNodeExecutor: context: Dict[str, Any], ) -> Any: from modules.features.graphicalEditor.nodeRegistry import getNodeTypeToMethodAction - from modules.workflows.automation2.graphUtils import resolveParameterReferences + from modules.workflows.automation2.graphUtils import ( + document_list_param_is_empty, + extract_wired_document_list, + resolveParameterReferences, + ) from modules.workflows.processing.core.actionExecutor import ActionExecutor nodeType = node.get("type", "") @@ -352,16 +316,23 @@ class ActionNodeExecutor: if pName and pName not in resolvedParams and "default" in pDef: resolvedParams[pName] = pDef["default"] - _param_names = {p.get("name") for p in nodeDef.get("parameters", []) if p.get("name")} - if "documentList" in _param_names and _document_list_param_is_empty(resolvedParams.get("documentList")): + for pDef in nodeDef.get("parameters") or []: + gi = pDef.get("graphInherit") or {} + if gi.get("kind") != "documentListWire": + continue + pname = pDef.get("name") + if not pname or not document_list_param_is_empty(resolvedParams.get(pname)): + continue + port_ix = int(gi.get("port", 0)) _src_map = (context.get("inputSources") or {}).get(nodeId) or {} - _entry = _src_map.get(0) - if _entry: - _src_node_id, _ = _entry - _upstream = (context.get("nodeOutputs") or {}).get(_src_node_id) - _wired = _extract_wired_document_list(_upstream) - if _wired: - resolvedParams["documentList"] = _wired + _entry = _src_map.get(port_ix) + if not _entry: + continue + _src_node_id, _ = _entry + _upstream = (context.get("nodeOutputs") or {}).get(_src_node_id) + _wired = extract_wired_document_list(_upstream) + if _wired: + resolvedParams[pname] = _wired # 3. Resolve connectionReference chatService = getattr(self.services, "chat", None) @@ -425,6 +396,16 @@ class ActionNodeExecutor: docsList = [] for d in (result.documents or []): dumped = d.model_dump() if hasattr(d, "model_dump") else dict(d) if isinstance(d, dict) else d + if isinstance(dumped, dict): + _meta = dumped.get("validationMetadata") if isinstance(dumped.get("validationMetadata"), dict) else {} + _existing = dumped.get("fileId") or _meta.get("fileId") + # e.g. file.create already persisted inside the action — avoid a second FileItem with wrong bytes + if _existing and str(_existing).strip(): + dumped["documentData"] = None + dumped.setdefault("_hasBinaryData", True) + docsList.append(dumped) + continue + rawData = getattr(d, "documentData", None) if hasattr(d, "documentData") else (dumped.get("documentData") if isinstance(dumped, dict) else None) rawBytes = _coerce_document_data_to_bytes(rawData) if isinstance(dumped, dict) and rawBytes: @@ -463,8 +444,12 @@ class ActionNodeExecutor: dumped["_hasBinaryData"] = True docsList.append(dumped) - # Clean DocumentList shape for document nodes (match file.create: documents + count, no AiResult fields) - if outputSchema == "DocumentList" and nodeType in ("ai.generateDocument", "ai.convertDocument"): + # Clean DocumentList shape for document nodes (documents + count, no ActionResult/AiResult noise) + if outputSchema == "DocumentList" and nodeType in ( + "ai.generateDocument", + "ai.convertDocument", + "file.create", + ): if not result.success: return _normalizeError( RuntimeError(str(result.error or "document action failed")), @@ -488,6 +473,13 @@ class ActionNodeExecutor: extractedContext = "" elif raw: extractedContext = str(raw).strip() + else: + # ai.process (and similar): text handover in ActionResult.data — no persisted document row + rd = getattr(result, "data", None) + if isinstance(rd, dict): + handover = rd.get("response") + if handover is not None: + extractedContext = str(handover).strip() promptText = str(resolvedParams.get("aiPrompt") or resolvedParams.get("prompt") or "").strip() diff --git a/modules/workflows/automation2/executors/ioExecutor.py b/modules/workflows/automation2/executors/ioExecutor.py index f6d40b05..14bc8f91 100644 --- a/modules/workflows/automation2/executors/ioExecutor.py +++ b/modules/workflows/automation2/executors/ioExecutor.py @@ -37,7 +37,7 @@ class IOExecutor: nodeOutputs = context.get("nodeOutputs", {}) params = dict(node.get("parameters") or {}) - from modules.workflows.automation2.graphUtils import resolveParameterReferences + from modules.workflows.automation2.graphUtils import extract_wired_document_list, resolveParameterReferences resolvedParams = resolveParameterReferences(params, nodeOutputs) logger.info("IOExecutor node %s resolvedParams keys=%s", nodeId, list(resolvedParams.keys())) @@ -45,9 +45,7 @@ class IOExecutor: if 0 in inputSources: srcId, _ = inputSources[0] inp = nodeOutputs.get(srcId) - from modules.workflows.automation2.executors.actionNodeExecutor import _extract_wired_document_list - - wired = _extract_wired_document_list(inp) + wired = extract_wired_document_list(inp) docs = (wired or {}).get("documents") if isinstance(wired, dict) else None if docs: resolvedParams.setdefault("documentList", wired) diff --git a/modules/workflows/automation2/graphUtils.py b/modules/workflows/automation2/graphUtils.py index 7ea3b4e8..fb59cec8 100644 --- a/modules/workflows/automation2/graphUtils.py +++ b/modules/workflows/automation2/graphUtils.py @@ -7,50 +7,6 @@ from typing import Dict, List, Any, Tuple, Set, Optional logger = logging.getLogger(__name__) -def _ai_result_text_from_documents(d: Dict[str, Any]) -> Optional[str]: - """Extract plain-text body from AiResult-style ``documents[0].documentData``.""" - docs = d.get("documents") - if not isinstance(docs, list) or not docs: - return None - d0 = docs[0] - raw: Any = None - if isinstance(d0, dict): - raw = d0.get("documentData") - elif d0 is not None: - raw = getattr(d0, "documentData", None) - if raw is None: - return None - if isinstance(raw, bytes): - try: - t = raw.decode("utf-8").strip() - return t or None - except (UnicodeDecodeError, ValueError): - return None - if isinstance(raw, str): - s = raw.strip() - return s or None - return None - - -def _ref_coalesce_empty_ai_result_text(data: Any, path: List[Any], resolved: Any) -> Any: - """If a ref targets AiResult text fields but resolves empty/missing, fall back to documents. - - Needed when: optional ``responseData`` is absent (no synthetic ``{}``), ``response`` is - still empty but ``documents`` hold the model output, or legacy graphs bind responseData only. - """ - if resolved not in (None, ""): - return resolved - if not isinstance(data, dict) or not path: - return resolved - head = path[0] - if head not in ("response", "responseData", "context"): - return resolved - if head == "context" and len(path) != 1: - return resolved - fb = _ai_result_text_from_documents(data) - return fb if fb is not None else resolved - - def parseGraph(graph: Dict[str, Any]) -> Tuple[List[Dict], List[Dict], Set[str]]: """ Parse graph into nodes, connections, and node IDs. @@ -408,7 +364,6 @@ def resolveParameterReferences(value: Any, nodeOutputs: Dict[str, Any]) -> Any: # Form nodes store fields under {"payload": {fieldName: …}}. # DataPicker emits bare field paths like ["url"]; try under payload. resolved = _get_by_path(data["payload"], plist) - resolved = _ref_coalesce_empty_ai_result_text(data, plist, resolved) return resolveParameterReferences(resolved, nodeOutputs) return value if value.get("type") == "value": @@ -462,3 +417,73 @@ def resolveParameterReferences(value: Any, nodeOutputs: Dict[str, Any]) -> Any: return "\n\n".join(p for p in parts if p) return [resolveParameterReferences(v, nodeOutputs) for v in value] return value + + +def document_list_param_is_empty(val: Any) -> bool: + """True when a documentList-style parameter has not been set (wire + DataRef may fill).""" + if val is None or val == "": + return True + if isinstance(val, list) and len(val) == 0: + return True + if isinstance(val, dict): + if val.get("documents") or val.get("references") or val.get("items"): + return False + if val.get("documentId") or val.get("id"): + return False + return True + return False + + +def extract_wired_document_list(inp: Any) -> Optional[Dict[str, Any]]: + """ + Build a DocumentList-shaped dict from an upstream node output (port wire). + Used when a parameter declares ``graphInherit.kind == "documentListWire"``. + """ + if inp is None: + return None + from modules.features.graphicalEditor.portTypes import ( + unwrapTransit, + _coerce_document_list_upload_fields, + _file_record_to_document, + ) + + data = unwrapTransit(inp) + if isinstance(data, str): + one = _file_record_to_document(data) + return {"documents": [one], "count": 1} if one else None + if not isinstance(data, dict): + return None + d = dict(data) + _coerce_document_list_upload_fields(d) + if "currentItem" in d: + ci = d.get("currentItem") + if ci is not None: + nested = extract_wired_document_list(ci) + if nested: + return nested + docs = d.get("documents") + if isinstance(docs, list) and len(docs) > 0: + return {"documents": docs, "count": d.get("count", len(docs))} + raw_list = d.get("documentList") + if isinstance(raw_list, list) and len(raw_list) > 0 and isinstance(raw_list[0], dict): + return {"documents": raw_list, "count": len(raw_list)} + doc_id = d.get("documentId") or d.get("id") + if doc_id and str(doc_id).strip(): + one: Dict[str, Any] = {"id": str(doc_id).strip()} + fn = d.get("fileName") or d.get("name") + if fn: + one["name"] = str(fn) + mt = d.get("mimeType") + if mt: + one["mimeType"] = str(mt) + return {"documents": [one], "count": 1} + files = d.get("files") + if isinstance(files, list) and files: + collected = [] + for item in files: + conv = _file_record_to_document(item) if isinstance(item, dict) else None + if conv: + collected.append(conv) + if collected: + return {"documents": collected, "count": len(collected)} + return None diff --git a/modules/workflows/automation2/pickNotPushMigration.py b/modules/workflows/automation2/pickNotPushMigration.py index fe347761..b6da00a2 100644 --- a/modules/workflows/automation2/pickNotPushMigration.py +++ b/modules/workflows/automation2/pickNotPushMigration.py @@ -1,9 +1,12 @@ # Copyright (c) 2025 Patrick Motsch """ -Graph helpers for Pick-not-Push: materialize connectionReference as explicit DataRefs. +Graph helpers for Pick-not-Push: materialize typed DataRefs before executeGraph runs. -Runtime: executeGraph deep-copies the version graph and applies materialize_connection_refs -so downstream nodes resolve connection UUIDs from upstream output.connection.id. +- ``materializeConnectionRefs``: empty ``connectionReference`` from upstream connection provenance. +- ``materializePrimaryTextHandover``: parameters whose static definition includes + ``graphInherit.kind == "primaryTextRef"`` (canonical paths: ``PRIMARY_TEXT_HANDOVER_REF_PATH``). + +Runtime: executeGraph deep-copies the version graph and applies these passes in order. """ from __future__ import annotations @@ -12,7 +15,10 @@ import logging from typing import Any, Dict, List from modules.features.graphicalEditor.nodeDefinitions import STATIC_NODE_TYPES -from modules.features.graphicalEditor.portTypes import resolve_output_schema_name +from modules.features.graphicalEditor.portTypes import ( + PRIMARY_TEXT_HANDOVER_REF_PATH, + resolve_output_schema_name, +) from modules.workflows.automation2.graphUtils import buildConnectionMap, getInputSources logger = logging.getLogger(__name__) @@ -81,3 +87,70 @@ def materializeConnectionRefs(graph: Dict[str, Any]) -> Dict[str, Any]: logger.debug("materializeConnectionRefs: %s.connectionReference -> ref %s.connection.id", nid, src_id) return g + + +def _slot_empty_for_primary_text_inherit(val: Any) -> bool: + return val is None or val == "" or val == [] + + +def materializePrimaryTextHandover(graph: Dict[str, Any]) -> Dict[str, Any]: + """ + For parameters declaring ``graphInherit.kind == "primaryTextRef"`` (optional ``port``, default 0) with an + empty value, set an explicit ``DataRef`` to the canonical text field of the producer on + that port (see ``PRIMARY_TEXT_HANDOVER_REF_PATH`` keyed by upstream output schema name). + """ + g = copy.deepcopy(graph) + nodes: List[Dict[str, Any]] = g.get("nodes") or [] + connections = g.get("connections") or [] + if not nodes: + return g + + conn_map = buildConnectionMap(connections) + node_by_id = {n["id"]: n for n in nodes if n.get("id")} + + for node in nodes: + nid = node.get("id") + ntype = node.get("type") + if not nid or not ntype: + continue + node_def = _NODE_DEF_BY_ID.get(ntype) + if not node_def: + continue + params = node.get("parameters") + if not isinstance(params, dict): + node["parameters"] = {} + params = node["parameters"] + + for pdef in node_def.get("parameters") or []: + gi = pdef.get("graphInherit") + if not isinstance(gi, dict) or gi.get("kind") != "primaryTextRef": + continue + pname = pdef.get("name") + if not pname: + continue + port_ix = int(gi.get("port", 0)) + if not _slot_empty_for_primary_text_inherit(params.get(pname)): + continue + input_sources = getInputSources(nid, conn_map) + if port_ix not in input_sources: + continue + src_id, _ = input_sources[port_ix] + src_node = node_by_id.get(src_id) or {} + src_def = _NODE_DEF_BY_ID.get(src_node.get("type") or "") + if not src_def: + continue + out_port = (src_def.get("outputPorts") or {}).get(0, {}) or {} + out_schema = resolve_output_schema_name(src_node, out_port if isinstance(out_port, dict) else {}) + ref_path = PRIMARY_TEXT_HANDOVER_REF_PATH.get(out_schema) + if not ref_path: + continue + params[pname] = _data_ref(src_id, list(ref_path)) + logger.debug( + "materializePrimaryTextHandover: %s.%s -> ref %s path=%s", + nid, + pname, + src_id, + ref_path, + ) + + return g diff --git a/modules/workflows/methods/methodAi/actions/process.py b/modules/workflows/methods/methodAi/actions/process.py index fee57c2e..063118c9 100644 --- a/modules/workflows/methods/methodAi/actions/process.py +++ b/modules/workflows/methods/methodAi/actions/process.py @@ -385,34 +385,33 @@ async def process(self, parameters: Dict[str, Any]) -> ActionResult: )) final_documents = action_documents + handover_data = None else: - # Text response - create document from content - # If no extension provided, use "txt" (required for filename) - extension = output_extension.lstrip('.') if output_extension else "txt" - meaningful_name = self._generateMeaningfulFileName( - base_name="ai", - extension=extension, - action_name="result" - ) - validationMetadata = { - "actionType": "ai.process", - "resultType": normalized_result_type if normalized_result_type else None, - "outputFormat": output_format if output_format else None, - "hasDocuments": False, - "contentType": "text" + # Text-only response: keep handover in ActionResult.data (no ActionDocument). + # Avoids automation2 persisting a synthetic file per run; use ai.generateDocument for files. + body = aiResponse.content + if body is None: + body = "" + elif not isinstance(body, str): + body = str(body) + final_documents = [] + handover_data = { + "response": body, + "resultType": normalized_result_type, + "outputFormat": output_format, + "contentType": "text", } - action_document = ActionDocument( - documentName=meaningful_name, - documentData=aiResponse.content, - mimeType=output_mime_type, - validationMetadata=validationMetadata - ) - final_documents = [action_document] + md = getattr(aiResponse, "metadata", None) + if md is not None: + extra = getattr(md, "additionalData", None) + if isinstance(extra, dict): + for k, v in extra.items(): + handover_data.setdefault(k, v) # Complete progress tracking self.services.chat.progressLogFinish(operationId, True) - return ActionResult.isSuccess(documents=final_documents) + return ActionResult.isSuccess(documents=final_documents, data=handover_data) except (SubscriptionInactiveException, BillingContextError): try: diff --git a/modules/workflows/methods/methodFile/actions/create.py b/modules/workflows/methods/methodFile/actions/create.py index 2fef9e9e..285b970d 100644 --- a/modules/workflows/methods/methodFile/actions/create.py +++ b/modules/workflows/methods/methodFile/actions/create.py @@ -1,10 +1,12 @@ # Copyright (c) 2025 Patrick Motsch # All rights reserved. -import base64 -import logging from typing import Dict, Any +import base64 +import binascii +import logging + from modules.datamodels.datamodelChat import ActionResult, ActionDocument from modules.serviceCenter.services.serviceGeneration.subDocumentUtility import markdownToDocumentJson from modules.shared.i18nRegistry import normalizePrimaryLanguageTag @@ -47,7 +49,10 @@ def _persistDocumentsToUserFiles( if not doc_data: continue if isinstance(doc_data, str): - content = base64.b64decode(doc_data) + try: + content = base64.b64decode(doc_data, validate=True) + except (TypeError, ValueError, binascii.Error): + content = doc_data.encode("utf-8") else: content = doc_data doc_name = ( diff --git a/modules/workflows/processing/core/actionExecutor.py b/modules/workflows/processing/core/actionExecutor.py index 2cb216f9..3d4ed7fc 100644 --- a/modules/workflows/processing/core/actionExecutor.py +++ b/modules/workflows/processing/core/actionExecutor.py @@ -251,6 +251,7 @@ class ActionExecutor: return ActionResult( success=result.success, documents=result.documents, # Return original ActionDocument objects + data=result.data, resultLabel=action.execResultLabel, # Always use action's execResultLabel error=result.error or "" ) @@ -265,18 +266,21 @@ class ActionExecutor: ) def _extractResultText(self, result: ActionResult) -> str: - """Extract result text from ActionResult documents""" - if not result.success or not result.documents: + """Extract result text from ActionResult documents or structured data (e.g. ai.process handover).""" + if not result.success: return "" - - # Extract text directly from ActionDocument objects - resultParts = [] - for doc in result.documents: - if hasattr(doc, 'documentData') and doc.documentData: - resultParts.append(str(doc.documentData)) - - # Join all document results with separators - return "\n\n---\n\n".join(resultParts) if resultParts else "" + if result.documents: + resultParts = [] + for doc in result.documents: + if hasattr(doc, "documentData") and doc.documentData: + resultParts.append(str(doc.documentData)) + return "\n\n---\n\n".join(resultParts) if resultParts else "" + data = getattr(result, "data", None) + if isinstance(data, dict): + handover = data.get("response") + if handover is not None: + return str(handover) + return "" async def _createActionCompletionMessage(self, action: ActionItem, result: ActionResult, workflow: ChatWorkflow, taskStep: TaskStep, taskIndex: int, actionIndex: int): diff --git a/modules/workflows/processing/core/messageCreator.py b/modules/workflows/processing/core/messageCreator.py index 48df832d..e0c49a52 100644 --- a/modules/workflows/processing/core/messageCreator.py +++ b/modules/workflows/processing/core/messageCreator.py @@ -161,6 +161,17 @@ class MessageCreator: messageText = f"**Action {currentAction} ({action.execMethod}.{action.execAction})**\n\n" messageText += f"❌ {userFriendlyText}\n\n" messageText += f"{errorDetails}\n\n" + + # Text handover without attachment (e.g. ai.process): show content in the message body + if ( + result.success + and not createdDocuments + and getattr(result, "data", None) + and isinstance(result.data, dict) + ): + handover_txt = result.data.get("response") + if handover_txt is not None and str(handover_txt).strip(): + messageText += "\n\n" + str(handover_txt).strip() # Build concise summary to persist for history context doc_count = len(createdDocuments) if createdDocuments else 0