Compare commits

...

2 commits

18 changed files with 390 additions and 223 deletions

View file

@ -25,9 +25,11 @@ AI_NODES = [
"frontendOptions": {"options": ["txt", "json", "md", "csv", "xml", "html", "pdf", "docx", "xlsx", "pptx", "png", "jpg"]},
"description": t("Ausgabeformat"), "default": "txt"},
{"name": "documentList", "type": "DocumentList", "required": False, "frontendType": "hidden",
"description": t("Dokumente aus vorherigen Schritten"), "default": ""},
"description": t("Dokumente aus vorherigen Schritten"), "default": "",
"graphInherit": {"port": 0, "kind": "documentListWire"}},
{"name": "context", "type": "Any", "required": False, "frontendType": "contextBuilder",
"description": t("Daten aus vorherigen Schritten"), "default": ""},
"description": t("Daten aus vorherigen Schritten"), "default": "",
"graphInherit": {"port": 0, "kind": "primaryTextRef"}},
{"name": "documentTheme", "type": "str", "required": False, "frontendType": "select",
"frontendOptions": {"options": ["general", "finance", "legal", "technical", "hr"]},
"description": t("Dokument-Thema (Style-Hinweis fuer den Renderer)"), "default": "general"},
@ -53,9 +55,11 @@ AI_NODES = [
{"name": "prompt", "type": "str", "required": True, "frontendType": "textarea",
"description": t("Recherche-Anfrage")},
{"name": "context", "type": "Any", "required": False, "frontendType": "contextBuilder",
"description": t("Daten aus vorherigen Schritten"), "default": ""},
"description": t("Daten aus vorherigen Schritten"), "default": "",
"graphInherit": {"port": 0, "kind": "primaryTextRef"}},
{"name": "documentList", "type": "DocumentList", "required": False, "frontendType": "hidden",
"description": t("Dokumente aus vorherigen Schritten"), "default": ""},
"description": t("Dokumente aus vorherigen Schritten"), "default": "",
"graphInherit": {"port": 0, "kind": "documentListWire"}},
] + _AI_COMMON_PARAMS,
"inputs": 1,
"outputs": 1,
@ -74,7 +78,8 @@ AI_NODES = [
"description": t("Dokumentinhalt zusammenfassen"),
"parameters": [
{"name": "documentList", "type": "DocumentList", "required": True, "frontendType": "dataRef",
"description": t("Dokumente aus vorherigen Schritten")},
"description": t("Dokumente aus vorherigen Schritten"),
"graphInherit": {"port": 0, "kind": "documentListWire"}},
{"name": "summaryLength", "type": "str", "required": False, "frontendType": "select",
"frontendOptions": {"options": ["brief", "medium", "detailed"]},
"description": t("Kurz, mittel oder ausführlich"), "default": "medium"},
@ -94,7 +99,8 @@ AI_NODES = [
"description": t("Dokument in Zielsprache übersetzen"),
"parameters": [
{"name": "documentList", "type": "DocumentList", "required": True, "frontendType": "dataRef",
"description": t("Dokumente aus vorherigen Schritten")},
"description": t("Dokumente aus vorherigen Schritten"),
"graphInherit": {"port": 0, "kind": "documentListWire"}},
{"name": "targetLanguage", "type": "str", "required": True, "frontendType": "text",
"description": t("Zielsprache (z.B. de, en, French)")},
] + _AI_COMMON_PARAMS,
@ -113,7 +119,8 @@ AI_NODES = [
"description": t("Dokument in anderes Format konvertieren"),
"parameters": [
{"name": "documentList", "type": "DocumentList", "required": True, "frontendType": "dataRef",
"description": t("Dokumente aus vorherigen Schritten")},
"description": t("Dokumente aus vorherigen Schritten"),
"graphInherit": {"port": 0, "kind": "documentListWire"}},
{"name": "targetFormat", "type": "str", "required": True, "frontendType": "select",
"frontendOptions": {"options": ["docx", "pdf", "xlsx", "csv", "txt", "html", "json", "md"]},
"description": t("Zielformat")},
@ -143,9 +150,11 @@ AI_NODES = [
"frontendOptions": {"options": ["letter", "memo", "proposal", "contract", "report", "email"]},
"description": t("Dokumentart (Inhaltshinweis fuer die KI)"), "default": "proposal"},
{"name": "context", "type": "Any", "required": False, "frontendType": "contextBuilder",
"description": t("Daten aus vorherigen Schritten"), "default": ""},
"description": t("Daten aus vorherigen Schritten"), "default": "",
"graphInherit": {"port": 0, "kind": "primaryTextRef"}},
{"name": "documentList", "type": "DocumentList", "required": False, "frontendType": "hidden",
"description": t("Dokumente aus vorherigen Schritten"), "default": ""},
"description": t("Dokumente aus vorherigen Schritten"), "default": "",
"graphInherit": {"port": 0, "kind": "documentListWire"}},
] + _AI_COMMON_PARAMS,
"inputs": 1,
"outputs": 1,
@ -169,9 +178,11 @@ AI_NODES = [
"frontendOptions": {"options": ["py", "js", "ts", "html", "java", "cpp", "txt", "json", "csv", "xml"]},
"description": t("Datei-Endung der erzeugten Code-Datei"), "default": "py"},
{"name": "context", "type": "Any", "required": False, "frontendType": "contextBuilder",
"description": t("Daten aus vorherigen Schritten"), "default": ""},
"description": t("Daten aus vorherigen Schritten"), "default": "",
"graphInherit": {"port": 0, "kind": "primaryTextRef"}},
{"name": "documentList", "type": "DocumentList", "required": False, "frontendType": "hidden",
"description": t("Dokumente aus vorherigen Schritten"), "default": ""},
"description": t("Dokumente aus vorherigen Schritten"), "default": "",
"graphInherit": {"port": 0, "kind": "documentListWire"}},
] + _AI_COMMON_PARAMS,
"inputs": 1,
"outputs": 1,

View file

@ -11,7 +11,8 @@ CONTEXT_NODES = [
"description": t("Dokumentstruktur extrahieren ohne KI (Seiten, Abschnitte, Bilder, Tabellen)"),
"parameters": [
{"name": "documentList", "type": "str", "required": True, "frontendType": "hidden",
"description": t("Dokumentenliste (via Wire oder DataRef)"), "default": ""},
"description": t("Dokumentenliste (via Wire oder DataRef)"), "default": "",
"graphInherit": {"port": 0, "kind": "documentListWire"}},
{"name": "extractionOptions", "type": "object", "required": False, "frontendType": "json",
"description": t(
"Extraktions-Optionen (JSON), z.B. {\"includeImages\": true, \"includeTables\": true, "

View file

@ -63,11 +63,13 @@ EMAIL_NODES = [
"frontendOptions": {"authority": "msft"},
"description": t("E-Mail-Konto")},
{"name": "context", "type": "Any", "required": False, "frontendType": "templateTextarea",
"description": t("Daten aus vorherigen Schritten (oder direkte Beschreibung)"), "default": ""},
"description": t("Daten aus vorherigen Schritten (oder direkte Beschreibung)"), "default": "",
"graphInherit": {"port": 0, "kind": "primaryTextRef"}},
{"name": "to", "type": "str", "required": False, "frontendType": "text",
"description": t("Empfänger (komma-separiert, optional für Entwurf)"), "default": ""},
{"name": "documentList", "type": "str", "required": False, "frontendType": "hidden",
"description": t("Anhang-Dokumente (via Wire oder DataRef)"), "default": ""},
"description": t("Anhang-Dokumente (via Wire oder DataRef)"), "default": "",
"graphInherit": {"port": 0, "kind": "documentListWire"}},
{"name": "emailContent", "type": "str", "required": False, "frontendType": "hidden",
"description": t("Direkt vorbereiteter Inhalt {subject, body, to} (via Wire — überspringt KI)"),
"default": ""},

View file

@ -16,15 +16,12 @@ FILE_NODES = [
{"name": "title", "type": "str", "required": False, "frontendType": "text",
"description": t("Dokumenttitel")},
{"name": "context", "type": "Any", "required": False, "frontendType": "contextBuilder",
"description": t("Daten aus vorherigen Schritten"), "default": ""},
"description": t("Daten aus vorherigen Schritten"), "default": "",
"graphInherit": {"port": 0, "kind": "primaryTextRef"}},
],
"inputs": 1,
"outputs": 1,
<<<<<<< HEAD
"inputPorts": {0: {"accepts": ["AiResult", "TextResult", "Transit", "FormPayload", "LoopItem", "ActionResult"]}},
=======
"inputPorts": {0: {"accepts": ["AiResult", "TextResult", "Transit", "FormPayload"]}},
>>>>>>> 875f8252 (ValueOn Lead to Offer durchgespielt, bugfixes in Dateigenerierung und ai nodes)
"outputPorts": {0: {"schema": "DocumentList"}},
"meta": {"icon": "mdi-file-plus-outline", "color": "#2196F3", "usesAi": False},
"_method": "file",

View file

@ -77,7 +77,8 @@ TRUSTEE_NODES = [
# is List[ActionDocument] (see datamodelChat.ActionResult). The
# DataPicker uses this string to filter compatible upstream paths.
{"name": "documentList", "type": "List[ActionDocument]", "required": True, "frontendType": "dataRef",
"description": t("Dokumente aus vorherigen Schritten")},
"description": t("Dokumente aus vorherigen Schritten"),
"graphInherit": {"port": 0, "kind": "documentListWire"}},
dict(_TRUSTEE_INSTANCE_PARAM),
],
"inputs": 1,
@ -95,7 +96,8 @@ TRUSTEE_NODES = [
"description": t("Trustee-Positionen in Buchhaltungssystem übertragen."),
"parameters": [
{"name": "documentList", "type": "List[ActionDocument]", "required": True, "frontendType": "dataRef",
"description": t("Dokumente aus vorherigen Schritten")},
"description": t("Dokumente aus vorherigen Schritten"),
"graphInherit": {"port": 0, "kind": "documentListWire"}},
dict(_TRUSTEE_INSTANCE_PARAM),
],
"inputs": 1,

View file

@ -612,6 +612,21 @@ SYSTEM_VARIABLES: Dict[str, Dict[str, str]] = {
}
# ---------------------------------------------------------------------------
# Graph inheritance (executeGraph materialization + ActionNodeExecutor wiring)
# ---------------------------------------------------------------------------
#
# When a parameter declares ``graphInherit.kind == "primaryTextRef"``, executeGraph
# inserts an explicit DataRef before run (see pickNotPushMigration.materializePrimaryTextHandover).
# Schema names are catalog output port types (e.g. AiResult).
PRIMARY_TEXT_HANDOVER_REF_PATH: Dict[str, List[Any]] = {
"AiResult": ["response"],
"TextResult": ["text"],
"ConsolidateResult": ["result"],
}
def resolveSystemVariable(variable: str, context: Dict[str, Any]) -> Any:
"""Resolve a system variable name to its runtime value."""
from datetime import datetime, timezone
@ -723,12 +738,9 @@ def normalizeToSchema(raw: Any, schemaName: str) -> Dict[str, Any]:
if not schema or schemaName == "Transit":
return result
<<<<<<< HEAD
if schemaName == "DocumentList":
_coerce_document_list_upload_fields(result)
=======
>>>>>>> 875f8252 (ValueOn Lead to Offer durchgespielt, bugfixes in Dateigenerierung und ai nodes)
# Only default **required** fields. Optional fields stay absent so DataRefs / context
# resolution never pick a synthetic `{}` or `[]` (e.g. AiResult.responseData when the
# model returned plain text only).

View file

@ -559,7 +559,6 @@ class AiCallLooper:
if iteration >= maxIterations:
logger.warning(f"AI call stopped after maximum iterations ({maxIterations})")
<<<<<<< HEAD
# Prefer last repaired complete JSON from getContexts (raw `result` is only the last fragment).
if lastValidCompletePart and useCase and not useCase.requiresExtraction:
try:
@ -578,11 +577,6 @@ class AiCallLooper:
except Exception as e:
logger.debug("Max-iterations fallback on completePart failed: %s", e)
=======
# This code path should never be reached because all registered use cases
# return early when JSON is complete. This would only execute for use cases that
# require section extraction, but no such use cases are currently registered.
>>>>>>> 875f8252 (ValueOn Lead to Offer durchgespielt, bugfixes in Dateigenerierung und ai nodes)
logger.error(
"End of callAiWithLooping without success for use case %r (iterations=%s, lastResultLen=%s)",
useCaseId,

View file

@ -39,12 +39,6 @@ class RendererCsv(BaseRenderer):
"""
return ["table", "code_block"]
<<<<<<< HEAD
=======
<<<<<<< HEAD
async def render(self, extractedContent: Dict[str, Any], title: str, userPrompt: str = None, aiService=None, *, style: Dict[str, Any] = None) -> List[RenderedDocument]:
=======
>>>>>>> 875f8252 (ValueOn Lead to Offer durchgespielt, bugfixes in Dateigenerierung und ai nodes)
async def render(
self,
extractedContent: Dict[str, Any],
@ -54,10 +48,6 @@ class RendererCsv(BaseRenderer):
*,
style: Dict[str, Any] = None,
) -> List[RenderedDocument]:
<<<<<<< HEAD
=======
>>>>>>> 0659d0d2 (ValueOn Lead to Offer durchgespielt, bugfixes in Dateigenerierung und ai nodes)
>>>>>>> 875f8252 (ValueOn Lead to Offer durchgespielt, bugfixes in Dateigenerierung und ai nodes)
"""Render extracted JSON content to CSV format. Produces one CSV file per table section."""
_ = style
try:

View file

@ -6,7 +6,7 @@ Markdown renderer for report generation.
from .documentRendererBaseTemplate import BaseRenderer
from modules.datamodels.datamodelDocument import RenderedDocument
from typing import Dict, Any, List, Optional
from typing import Any, Dict, List, Optional
class RendererMarkdown(BaseRenderer):
"""Renders content to Markdown format with format-specific extraction."""
@ -252,6 +252,41 @@ class RendererMarkdown(BaseRenderer):
self.logger.warning(f"Error rendering table: {str(e)}")
return ""
def _renderInlineRunsMarkdown(self, runs: Any) -> str:
"""Turn Phase-5 inlineRuns (from markdownToDocumentJson) into markdown text."""
if not runs:
return ""
if not isinstance(runs, list):
return str(runs)
parts: List[str] = []
for run in runs:
if not isinstance(run, dict):
parts.append(str(run))
continue
run_type = run.get("type", "text")
value = str(run.get("value", ""))
if run_type == "text":
parts.append(value)
elif run_type == "bold":
parts.append(f"**{value}**")
elif run_type == "italic":
parts.append(f"*{value}*")
elif run_type == "code":
if not value:
parts.append("``")
elif "`" not in value:
parts.append(f"`{value}`")
else:
parts.append(f"``{value}``")
elif run_type == "link":
href = str(run.get("href", ""))
parts.append(f"[{value}]({href})")
elif run_type == "image":
parts.append(f"![{value}](image)")
else:
parts.append(value)
return "".join(parts)
def _renderJsonBulletList(self, listData: Dict[str, Any]) -> str:
"""Render a JSON bullet list to markdown."""
try:
@ -268,6 +303,8 @@ class RendererMarkdown(BaseRenderer):
for item in items:
if isinstance(item, str):
markdownParts.append(f"- {item}")
elif isinstance(item, list):
markdownParts.append(f"- {self._renderInlineRunsMarkdown(item)}")
elif isinstance(item, dict) and "text" in item:
markdownParts.append(f"- {item['text']}")
@ -302,7 +339,17 @@ class RendererMarkdown(BaseRenderer):
try:
# Extract from nested content structure
content = paragraphData.get("content", {})
top = paragraphData.get("text")
if isinstance(top, str) and top.strip():
if not isinstance(content, dict) or (
not content.get("text") and not content.get("inlineRuns")
):
return top
if isinstance(content, dict):
runs = self._inlineRunsFromContent(content)
if runs:
return self._renderInlineRunsMarkdown(runs)
text = content.get("text", "")
elif isinstance(content, str):
text = content

View file

@ -360,7 +360,10 @@ async def executeGraph(
)
from modules.workflows.processing.shared.methodDiscovery import discoverMethods
discoverMethods(services)
from modules.workflows.automation2.pickNotPushMigration import materializeConnectionRefs
from modules.workflows.automation2.pickNotPushMigration import (
materializeConnectionRefs,
materializePrimaryTextHandover,
)
from modules.workflows.automation2.featureInstanceRefMigration import (
materializeFeatureInstanceRefs,
)
@ -372,6 +375,7 @@ async def executeGraph(
# subsequent connection-ref pass and validation see the canonical shape.
graph = materializeFeatureInstanceRefs(graph)
graph = materializeConnectionRefs(graph)
graph = materializePrimaryTextHandover(graph)
nodeTypeIds = _getNodeTypeIds(services)
logger.debug("executeGraph nodeTypeIds (%d): %s", len(nodeTypeIds), sorted(nodeTypeIds))
errors = validateGraph(graph, nodeTypeIds)

View file

@ -1,10 +1,14 @@
# Copyright (c) 2025 Patrick Motsch
# Action node executor - maps ai.*, email.*, sharepoint.*, clickup.*, file.*, trustee.* to method actions.
# Action node executor maps ai.*, email.*, sharepoint.*, clickup.*, file.*, trustee.* to method actions.
#
# Typed Port System: explicit DataRefs / static parameters; optional ``documentList`` from input port 0
# when the param is empty (same idea as IOExecutor wire fill).
# ``materializeConnectionRefs`` (see pickNotPushMigration) may still rewrite empty connectionReference at run start.
# Typed port system: parameters resolve via DataRefs / static values. Declarative port inheritance
# uses ``graphInherit`` on parameter definitions in node JSON (see STATIC_NODE_TYPES): e.g.
# ``primaryTextRef`` is materialized to explicit refs in pickNotPushMigration.materializePrimaryTextHandover;
# ``documentListWire`` is applied at runtime in this executor via graphUtils.extract_wired_document_list.
import base64
import binascii
import json
import logging
import re
@ -20,8 +24,23 @@ from modules.serviceCenter.services.serviceBilling.mainServiceBilling import Bil
logger = logging.getLogger(__name__)
def _looks_like_ascii_base64_payload(s: str) -> bool:
"""Heuristic: ActionDocument binary payloads use standard ASCII base64; markdown/text uses other chars (#, *, -, …)."""
t = "".join(s.split())
if len(t) < 8:
return False
if not t.isascii():
return False
return bool(re.fullmatch(r"[A-Za-z0-9+/]+=*", t)) and len(t) % 4 == 0
def _coerce_document_data_to_bytes(raw: Any) -> Optional[bytes]:
"""Normalize documentData (bytes/str/buffer) for DB file persistence."""
"""Normalize documentData for DB file persistence.
ActionDocument conventions (see methodFile.create): binary bodies are carried as ASCII
base64 strings; plain markdown/text stays as Unicode. Do not UTF-8-encode a base64
literal that persists the ASCII of the encoding (file looks like base64 gibberish).
"""
if raw is None:
return None
if isinstance(raw, bytes):
@ -33,7 +52,20 @@ def _coerce_document_data_to_bytes(raw: Any) -> Optional[bytes]:
b = raw.tobytes()
return b if len(b) > 0 else None
if isinstance(raw, str):
b = raw.encode("utf-8")
stripped = raw.strip()
if not stripped:
return None
if _looks_like_ascii_base64_payload(stripped):
try:
decoded = base64.b64decode(stripped, validate=True)
except (TypeError, binascii.Error, ValueError):
try:
decoded = base64.b64decode(stripped)
except (binascii.Error, ValueError):
decoded = b""
if decoded:
return decoded
b = stripped.encode("utf-8")
return b if len(b) > 0 else None
return None
@ -239,78 +271,6 @@ def _getOutputSchemaName(nodeDef: Dict) -> str:
return port0.get("schema", "ActionResult")
def _extract_wired_document_list(inp: Any) -> Optional[Dict[str, Any]]:
"""
Build a DocumentList-shaped dict from upstream node output (matches IOExecutor wire behavior).
Handles DocumentList, human upload shapes (file / files / fileIds), FileList, loop file items.
During flow.loop body execution the loop node's output is
{items, count, currentItem, currentIndex}; wired document actions must use currentItem.
"""
if inp is None:
return None
from modules.features.graphicalEditor.portTypes import (
unwrapTransit,
_coerce_document_list_upload_fields,
_file_record_to_document,
)
data = unwrapTransit(inp)
if isinstance(data, str):
one = _file_record_to_document(data)
return {"documents": [one], "count": 1} if one else None
if not isinstance(data, dict):
return None
d = dict(data)
_coerce_document_list_upload_fields(d)
# Per-iteration payload from executionEngine (flow.loop → downstream in loop body)
if "currentItem" in d:
ci = d.get("currentItem")
if ci is not None:
nested = _extract_wired_document_list(ci)
if nested:
return nested
docs = d.get("documents")
if isinstance(docs, list) and len(docs) > 0:
return {"documents": docs, "count": d.get("count", len(docs))}
raw_list = d.get("documentList")
if isinstance(raw_list, list) and len(raw_list) > 0 and isinstance(raw_list[0], dict):
return {"documents": raw_list, "count": len(raw_list)}
doc_id = d.get("documentId") or d.get("id")
if doc_id and str(doc_id).strip():
one: Dict[str, Any] = {"id": str(doc_id).strip()}
fn = d.get("fileName") or d.get("name")
if fn:
one["name"] = str(fn)
mt = d.get("mimeType")
if mt:
one["mimeType"] = str(mt)
return {"documents": [one], "count": 1}
files = d.get("files")
if isinstance(files, list) and files:
collected = []
for item in files:
conv = _file_record_to_document(item) if isinstance(item, dict) else None
if conv:
collected.append(conv)
if collected:
return {"documents": collected, "count": len(collected)}
return None
def _document_list_param_is_empty(val: Any) -> bool:
if val is None or val == "":
return True
if isinstance(val, list) and len(val) == 0:
return True
if isinstance(val, dict):
if val.get("documents") or val.get("references") or val.get("items"):
return False
if val.get("documentId") or val.get("id"):
return False
return True
return False
class ActionNodeExecutor:
"""Execute action nodes by mapping to method actions via ActionExecutor."""
@ -323,7 +283,11 @@ class ActionNodeExecutor:
context: Dict[str, Any],
) -> Any:
from modules.features.graphicalEditor.nodeRegistry import getNodeTypeToMethodAction
from modules.workflows.automation2.graphUtils import resolveParameterReferences
from modules.workflows.automation2.graphUtils import (
document_list_param_is_empty,
extract_wired_document_list,
resolveParameterReferences,
)
from modules.workflows.processing.core.actionExecutor import ActionExecutor
nodeType = node.get("type", "")
@ -352,16 +316,23 @@ class ActionNodeExecutor:
if pName and pName not in resolvedParams and "default" in pDef:
resolvedParams[pName] = pDef["default"]
_param_names = {p.get("name") for p in nodeDef.get("parameters", []) if p.get("name")}
if "documentList" in _param_names and _document_list_param_is_empty(resolvedParams.get("documentList")):
for pDef in nodeDef.get("parameters") or []:
gi = pDef.get("graphInherit") or {}
if gi.get("kind") != "documentListWire":
continue
pname = pDef.get("name")
if not pname or not document_list_param_is_empty(resolvedParams.get(pname)):
continue
port_ix = int(gi.get("port", 0))
_src_map = (context.get("inputSources") or {}).get(nodeId) or {}
_entry = _src_map.get(0)
if _entry:
_entry = _src_map.get(port_ix)
if not _entry:
continue
_src_node_id, _ = _entry
_upstream = (context.get("nodeOutputs") or {}).get(_src_node_id)
_wired = _extract_wired_document_list(_upstream)
_wired = extract_wired_document_list(_upstream)
if _wired:
resolvedParams["documentList"] = _wired
resolvedParams[pname] = _wired
# 3. Resolve connectionReference
chatService = getattr(self.services, "chat", None)
@ -425,6 +396,16 @@ class ActionNodeExecutor:
docsList = []
for d in (result.documents or []):
dumped = d.model_dump() if hasattr(d, "model_dump") else dict(d) if isinstance(d, dict) else d
if isinstance(dumped, dict):
_meta = dumped.get("validationMetadata") if isinstance(dumped.get("validationMetadata"), dict) else {}
_existing = dumped.get("fileId") or _meta.get("fileId")
# e.g. file.create already persisted inside the action — avoid a second FileItem with wrong bytes
if _existing and str(_existing).strip():
dumped["documentData"] = None
dumped.setdefault("_hasBinaryData", True)
docsList.append(dumped)
continue
rawData = getattr(d, "documentData", None) if hasattr(d, "documentData") else (dumped.get("documentData") if isinstance(dumped, dict) else None)
rawBytes = _coerce_document_data_to_bytes(rawData)
if isinstance(dumped, dict) and rawBytes:
@ -463,8 +444,12 @@ class ActionNodeExecutor:
dumped["_hasBinaryData"] = True
docsList.append(dumped)
# Clean DocumentList shape for document nodes (match file.create: documents + count, no AiResult fields)
if outputSchema == "DocumentList" and nodeType in ("ai.generateDocument", "ai.convertDocument"):
# Clean DocumentList shape for document nodes (documents + count, no ActionResult/AiResult noise)
if outputSchema == "DocumentList" and nodeType in (
"ai.generateDocument",
"ai.convertDocument",
"file.create",
):
if not result.success:
return _normalizeError(
RuntimeError(str(result.error or "document action failed")),
@ -488,6 +473,13 @@ class ActionNodeExecutor:
extractedContext = ""
elif raw:
extractedContext = str(raw).strip()
else:
# ai.process (and similar): text handover in ActionResult.data — no persisted document row
rd = getattr(result, "data", None)
if isinstance(rd, dict):
handover = rd.get("response")
if handover is not None:
extractedContext = str(handover).strip()
promptText = str(resolvedParams.get("aiPrompt") or resolvedParams.get("prompt") or "").strip()

View file

@ -37,7 +37,7 @@ class IOExecutor:
nodeOutputs = context.get("nodeOutputs", {})
params = dict(node.get("parameters") or {})
from modules.workflows.automation2.graphUtils import resolveParameterReferences
from modules.workflows.automation2.graphUtils import extract_wired_document_list, resolveParameterReferences
resolvedParams = resolveParameterReferences(params, nodeOutputs)
logger.info("IOExecutor node %s resolvedParams keys=%s", nodeId, list(resolvedParams.keys()))
@ -45,9 +45,7 @@ class IOExecutor:
if 0 in inputSources:
srcId, _ = inputSources[0]
inp = nodeOutputs.get(srcId)
from modules.workflows.automation2.executors.actionNodeExecutor import _extract_wired_document_list
wired = _extract_wired_document_list(inp)
wired = extract_wired_document_list(inp)
docs = (wired or {}).get("documents") if isinstance(wired, dict) else None
if docs:
resolvedParams.setdefault("documentList", wired)

View file

@ -7,50 +7,6 @@ from typing import Dict, List, Any, Tuple, Set, Optional
logger = logging.getLogger(__name__)
def _ai_result_text_from_documents(d: Dict[str, Any]) -> Optional[str]:
"""Extract plain-text body from AiResult-style ``documents[0].documentData``."""
docs = d.get("documents")
if not isinstance(docs, list) or not docs:
return None
d0 = docs[0]
raw: Any = None
if isinstance(d0, dict):
raw = d0.get("documentData")
elif d0 is not None:
raw = getattr(d0, "documentData", None)
if raw is None:
return None
if isinstance(raw, bytes):
try:
t = raw.decode("utf-8").strip()
return t or None
except (UnicodeDecodeError, ValueError):
return None
if isinstance(raw, str):
s = raw.strip()
return s or None
return None
def _ref_coalesce_empty_ai_result_text(data: Any, path: List[Any], resolved: Any) -> Any:
"""If a ref targets AiResult text fields but resolves empty/missing, fall back to documents.
Needed when: optional ``responseData`` is absent (no synthetic ``{}``), ``response`` is
still empty but ``documents`` hold the model output, or legacy graphs bind responseData only.
"""
if resolved not in (None, ""):
return resolved
if not isinstance(data, dict) or not path:
return resolved
head = path[0]
if head not in ("response", "responseData", "context"):
return resolved
if head == "context" and len(path) != 1:
return resolved
fb = _ai_result_text_from_documents(data)
return fb if fb is not None else resolved
def parseGraph(graph: Dict[str, Any]) -> Tuple[List[Dict], List[Dict], Set[str]]:
"""
Parse graph into nodes, connections, and node IDs.
@ -408,7 +364,6 @@ def resolveParameterReferences(value: Any, nodeOutputs: Dict[str, Any]) -> Any:
# Form nodes store fields under {"payload": {fieldName: …}}.
# DataPicker emits bare field paths like ["url"]; try under payload.
resolved = _get_by_path(data["payload"], plist)
resolved = _ref_coalesce_empty_ai_result_text(data, plist, resolved)
return resolveParameterReferences(resolved, nodeOutputs)
return value
if value.get("type") == "value":
@ -462,3 +417,73 @@ def resolveParameterReferences(value: Any, nodeOutputs: Dict[str, Any]) -> Any:
return "\n\n".join(p for p in parts if p)
return [resolveParameterReferences(v, nodeOutputs) for v in value]
return value
def document_list_param_is_empty(val: Any) -> bool:
"""True when a documentList-style parameter has not been set (wire + DataRef may fill)."""
if val is None or val == "":
return True
if isinstance(val, list) and len(val) == 0:
return True
if isinstance(val, dict):
if val.get("documents") or val.get("references") or val.get("items"):
return False
if val.get("documentId") or val.get("id"):
return False
return True
return False
def extract_wired_document_list(inp: Any) -> Optional[Dict[str, Any]]:
"""
Build a DocumentList-shaped dict from an upstream node output (port wire).
Used when a parameter declares ``graphInherit.kind == "documentListWire"``.
"""
if inp is None:
return None
from modules.features.graphicalEditor.portTypes import (
unwrapTransit,
_coerce_document_list_upload_fields,
_file_record_to_document,
)
data = unwrapTransit(inp)
if isinstance(data, str):
one = _file_record_to_document(data)
return {"documents": [one], "count": 1} if one else None
if not isinstance(data, dict):
return None
d = dict(data)
_coerce_document_list_upload_fields(d)
if "currentItem" in d:
ci = d.get("currentItem")
if ci is not None:
nested = extract_wired_document_list(ci)
if nested:
return nested
docs = d.get("documents")
if isinstance(docs, list) and len(docs) > 0:
return {"documents": docs, "count": d.get("count", len(docs))}
raw_list = d.get("documentList")
if isinstance(raw_list, list) and len(raw_list) > 0 and isinstance(raw_list[0], dict):
return {"documents": raw_list, "count": len(raw_list)}
doc_id = d.get("documentId") or d.get("id")
if doc_id and str(doc_id).strip():
one: Dict[str, Any] = {"id": str(doc_id).strip()}
fn = d.get("fileName") or d.get("name")
if fn:
one["name"] = str(fn)
mt = d.get("mimeType")
if mt:
one["mimeType"] = str(mt)
return {"documents": [one], "count": 1}
files = d.get("files")
if isinstance(files, list) and files:
collected = []
for item in files:
conv = _file_record_to_document(item) if isinstance(item, dict) else None
if conv:
collected.append(conv)
if collected:
return {"documents": collected, "count": len(collected)}
return None

View file

@ -1,9 +1,12 @@
# Copyright (c) 2025 Patrick Motsch
"""
Graph helpers for Pick-not-Push: materialize connectionReference as explicit DataRefs.
Graph helpers for Pick-not-Push: materialize typed DataRefs before executeGraph runs.
Runtime: executeGraph deep-copies the version graph and applies materialize_connection_refs
so downstream nodes resolve connection UUIDs from upstream output.connection.id.
- ``materializeConnectionRefs``: empty ``connectionReference`` from upstream connection provenance.
- ``materializePrimaryTextHandover``: parameters whose static definition includes
``graphInherit.kind == "primaryTextRef"`` (canonical paths: ``PRIMARY_TEXT_HANDOVER_REF_PATH``).
Runtime: executeGraph deep-copies the version graph and applies these passes in order.
"""
from __future__ import annotations
@ -12,7 +15,10 @@ import logging
from typing import Any, Dict, List
from modules.features.graphicalEditor.nodeDefinitions import STATIC_NODE_TYPES
from modules.features.graphicalEditor.portTypes import resolve_output_schema_name
from modules.features.graphicalEditor.portTypes import (
PRIMARY_TEXT_HANDOVER_REF_PATH,
resolve_output_schema_name,
)
from modules.workflows.automation2.graphUtils import buildConnectionMap, getInputSources
logger = logging.getLogger(__name__)
@ -81,3 +87,70 @@ def materializeConnectionRefs(graph: Dict[str, Any]) -> Dict[str, Any]:
logger.debug("materializeConnectionRefs: %s.connectionReference -> ref %s.connection.id", nid, src_id)
return g
def _slot_empty_for_primary_text_inherit(val: Any) -> bool:
return val is None or val == "" or val == []
def materializePrimaryTextHandover(graph: Dict[str, Any]) -> Dict[str, Any]:
"""
For parameters declaring ``graphInherit.kind == "primaryTextRef"`` (optional ``port``, default 0) with an
empty value, set an explicit ``DataRef`` to the canonical text field of the producer on
that port (see ``PRIMARY_TEXT_HANDOVER_REF_PATH`` keyed by upstream output schema name).
"""
g = copy.deepcopy(graph)
nodes: List[Dict[str, Any]] = g.get("nodes") or []
connections = g.get("connections") or []
if not nodes:
return g
conn_map = buildConnectionMap(connections)
node_by_id = {n["id"]: n for n in nodes if n.get("id")}
for node in nodes:
nid = node.get("id")
ntype = node.get("type")
if not nid or not ntype:
continue
node_def = _NODE_DEF_BY_ID.get(ntype)
if not node_def:
continue
params = node.get("parameters")
if not isinstance(params, dict):
node["parameters"] = {}
params = node["parameters"]
for pdef in node_def.get("parameters") or []:
gi = pdef.get("graphInherit")
if not isinstance(gi, dict) or gi.get("kind") != "primaryTextRef":
continue
pname = pdef.get("name")
if not pname:
continue
port_ix = int(gi.get("port", 0))
if not _slot_empty_for_primary_text_inherit(params.get(pname)):
continue
input_sources = getInputSources(nid, conn_map)
if port_ix not in input_sources:
continue
src_id, _ = input_sources[port_ix]
src_node = node_by_id.get(src_id) or {}
src_def = _NODE_DEF_BY_ID.get(src_node.get("type") or "")
if not src_def:
continue
out_port = (src_def.get("outputPorts") or {}).get(0, {}) or {}
out_schema = resolve_output_schema_name(src_node, out_port if isinstance(out_port, dict) else {})
ref_path = PRIMARY_TEXT_HANDOVER_REF_PATH.get(out_schema)
if not ref_path:
continue
params[pname] = _data_ref(src_id, list(ref_path))
logger.debug(
"materializePrimaryTextHandover: %s.%s -> ref %s path=%s",
nid,
pname,
src_id,
ref_path,
)
return g

View file

@ -385,34 +385,33 @@ async def process(self, parameters: Dict[str, Any]) -> ActionResult:
))
final_documents = action_documents
handover_data = None
else:
# Text response - create document from content
# If no extension provided, use "txt" (required for filename)
extension = output_extension.lstrip('.') if output_extension else "txt"
meaningful_name = self._generateMeaningfulFileName(
base_name="ai",
extension=extension,
action_name="result"
)
validationMetadata = {
"actionType": "ai.process",
"resultType": normalized_result_type if normalized_result_type else None,
"outputFormat": output_format if output_format else None,
"hasDocuments": False,
"contentType": "text"
# Text-only response: keep handover in ActionResult.data (no ActionDocument).
# Avoids automation2 persisting a synthetic file per run; use ai.generateDocument for files.
body = aiResponse.content
if body is None:
body = ""
elif not isinstance(body, str):
body = str(body)
final_documents = []
handover_data = {
"response": body,
"resultType": normalized_result_type,
"outputFormat": output_format,
"contentType": "text",
}
action_document = ActionDocument(
documentName=meaningful_name,
documentData=aiResponse.content,
mimeType=output_mime_type,
validationMetadata=validationMetadata
)
final_documents = [action_document]
md = getattr(aiResponse, "metadata", None)
if md is not None:
extra = getattr(md, "additionalData", None)
if isinstance(extra, dict):
for k, v in extra.items():
handover_data.setdefault(k, v)
# Complete progress tracking
self.services.chat.progressLogFinish(operationId, True)
return ActionResult.isSuccess(documents=final_documents)
return ActionResult.isSuccess(documents=final_documents, data=handover_data)
except (SubscriptionInactiveException, BillingContextError):
try:

View file

@ -1,10 +1,12 @@
# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
import base64
import logging
from typing import Dict, Any
import base64
import binascii
import logging
from modules.datamodels.datamodelChat import ActionResult, ActionDocument
from modules.serviceCenter.services.serviceGeneration.subDocumentUtility import markdownToDocumentJson
from modules.shared.i18nRegistry import normalizePrimaryLanguageTag
@ -47,7 +49,10 @@ def _persistDocumentsToUserFiles(
if not doc_data:
continue
if isinstance(doc_data, str):
content = base64.b64decode(doc_data)
try:
content = base64.b64decode(doc_data, validate=True)
except (TypeError, ValueError, binascii.Error):
content = doc_data.encode("utf-8")
else:
content = doc_data
doc_name = (

View file

@ -251,6 +251,7 @@ class ActionExecutor:
return ActionResult(
success=result.success,
documents=result.documents, # Return original ActionDocument objects
data=result.data,
resultLabel=action.execResultLabel, # Always use action's execResultLabel
error=result.error or ""
)
@ -265,18 +266,21 @@ class ActionExecutor:
)
def _extractResultText(self, result: ActionResult) -> str:
"""Extract result text from ActionResult documents"""
if not result.success or not result.documents:
"""Extract result text from ActionResult documents or structured data (e.g. ai.process handover)."""
if not result.success:
return ""
# Extract text directly from ActionDocument objects
if result.documents:
resultParts = []
for doc in result.documents:
if hasattr(doc, 'documentData') and doc.documentData:
if hasattr(doc, "documentData") and doc.documentData:
resultParts.append(str(doc.documentData))
# Join all document results with separators
return "\n\n---\n\n".join(resultParts) if resultParts else ""
data = getattr(result, "data", None)
if isinstance(data, dict):
handover = data.get("response")
if handover is not None:
return str(handover)
return ""
async def _createActionCompletionMessage(self, action: ActionItem, result: ActionResult, workflow: ChatWorkflow,
taskStep: TaskStep, taskIndex: int, actionIndex: int):

View file

@ -162,6 +162,17 @@ class MessageCreator:
messageText += f"{userFriendlyText}\n\n"
messageText += f"{errorDetails}\n\n"
# Text handover without attachment (e.g. ai.process): show content in the message body
if (
result.success
and not createdDocuments
and getattr(result, "data", None)
and isinstance(result.data, dict)
):
handover_txt = result.data.get("response")
if handover_txt is not None and str(handover_txt).strip():
messageText += "\n\n" + str(handover_txt).strip()
# Build concise summary to persist for history context
doc_count = len(createdDocuments) if createdDocuments else 0
trimmed_msg = (messageText or "").strip().replace("\n", " ")