From 51dfb007f6d45757f5ec8406496a16badaed135e Mon Sep 17 00:00:00 2001 From: Ida Date: Sun, 3 May 2026 19:03:25 +0200 Subject: [PATCH] fix: looping node and content extraction --- .../graphicalEditor/nodeDefinitions/ai.py | 18 +- .../graphicalEditor/nodeDefinitions/file.py | 10 +- .../graphicalEditor/nodeDefinitions/flow.py | 78 +++++--- modules/features/graphicalEditor/portTypes.py | 66 +++++++ .../services/serviceAi/subAiCallLooping.py | 104 ++++++---- .../services/serviceAi/subLoopingUseCases.py | 35 +++- .../services/serviceAi/subStructureFilling.py | 183 +++++++----------- .../services/serviceChat/mainServiceChat.py | 38 +++- .../renderers/rendererText.py | 54 ++++-- modules/shared/jsonContinuation.py | 27 +-- .../workflows/automation2/executionEngine.py | 7 +- .../executors/actionNodeExecutor.py | 112 ++++++++++- .../automation2/executors/ioExecutor.py | 8 +- .../methods/methodAi/actions/generateCode.py | 23 +-- .../methodAi/actions/generateDocument.py | 24 +-- 15 files changed, 522 insertions(+), 265 deletions(-) diff --git a/modules/features/graphicalEditor/nodeDefinitions/ai.py b/modules/features/graphicalEditor/nodeDefinitions/ai.py index 4e29709e..43136394 100644 --- a/modules/features/graphicalEditor/nodeDefinitions/ai.py +++ b/modules/features/graphicalEditor/nodeDefinitions/ai.py @@ -59,7 +59,9 @@ AI_NODES = [ ] + _AI_COMMON_PARAMS, "inputs": 1, "outputs": 1, - "inputPorts": {0: {"accepts": ["FormPayload", "Transit", "AiResult", "DocumentList", "ActionResult"]}}, + "inputPorts": {0: {"accepts": [ + "FormPayload", "Transit", "AiResult", "DocumentList", "ActionResult", "LoopItem", "TextResult", + ]}}, "outputPorts": {0: {"schema": "AiResult"}}, "meta": {"icon": "mdi-magnify", "color": "#9C27B0", "usesAi": True}, "_method": "ai", @@ -79,7 +81,7 @@ AI_NODES = [ ] + _AI_COMMON_PARAMS, "inputs": 1, "outputs": 1, - "inputPorts": {0: {"accepts": ["DocumentList", "Transit"]}}, + "inputPorts": {0: {"accepts": ["DocumentList", "Transit", "LoopItem"]}}, "outputPorts": {0: {"schema": "AiResult"}}, "meta": {"icon": "mdi-file-document-outline", "color": "#9C27B0", "usesAi": True}, "_method": "ai", @@ -98,7 +100,7 @@ AI_NODES = [ ] + _AI_COMMON_PARAMS, "inputs": 1, "outputs": 1, - "inputPorts": {0: {"accepts": ["DocumentList", "Transit"]}}, + "inputPorts": {0: {"accepts": ["DocumentList", "Transit", "LoopItem"]}}, "outputPorts": {0: {"schema": "AiResult"}}, "meta": {"icon": "mdi-translate", "color": "#9C27B0", "usesAi": True}, "_method": "ai", @@ -118,7 +120,7 @@ AI_NODES = [ ] + _AI_COMMON_PARAMS, "inputs": 1, "outputs": 1, - "inputPorts": {0: {"accepts": ["DocumentList", "Transit"]}}, + "inputPorts": {0: {"accepts": ["DocumentList", "Transit", "LoopItem"]}}, "outputPorts": {0: {"schema": "DocumentList"}}, "meta": {"icon": "mdi-file-convert", "color": "#9C27B0", "usesAi": True}, "_method": "ai", @@ -147,7 +149,9 @@ AI_NODES = [ ] + _AI_COMMON_PARAMS, "inputs": 1, "outputs": 1, - "inputPorts": {0: {"accepts": ["FormPayload", "Transit", "AiResult", "DocumentList", "ActionResult"]}}, + "inputPorts": {0: {"accepts": [ + "FormPayload", "Transit", "AiResult", "DocumentList", "ActionResult", "LoopItem", "TextResult", + ]}}, "outputPorts": {0: {"schema": "DocumentList"}}, "meta": {"icon": "mdi-file-plus", "color": "#9C27B0", "usesAi": True}, "_method": "ai", @@ -171,7 +175,9 @@ AI_NODES = [ ] + _AI_COMMON_PARAMS, "inputs": 1, "outputs": 1, - "inputPorts": {0: {"accepts": ["FormPayload", "Transit", "AiResult", "DocumentList", "ActionResult"]}}, + "inputPorts": {0: {"accepts": [ + "FormPayload", "Transit", "AiResult", "DocumentList", "ActionResult", "LoopItem", "TextResult", + ]}}, "outputPorts": {0: {"schema": "AiResult"}}, "meta": {"icon": "mdi-code-tags", "color": "#9C27B0", "usesAi": True}, "_method": "ai", diff --git a/modules/features/graphicalEditor/nodeDefinitions/file.py b/modules/features/graphicalEditor/nodeDefinitions/file.py index 3b5ebfd4..ffa4d722 100644 --- a/modules/features/graphicalEditor/nodeDefinitions/file.py +++ b/modules/features/graphicalEditor/nodeDefinitions/file.py @@ -10,25 +10,17 @@ FILE_NODES = [ "label": t("Datei erstellen"), "description": t("Erstellt eine Datei aus Kontext (Text/Markdown von KI)."), "parameters": [ - {"name": "contentSources", "type": "json", "required": False, "frontendType": "json", - "description": t("Kontext-Quellen"), "default": []}, {"name": "outputFormat", "type": "str", "required": True, "frontendType": "select", "frontendOptions": {"options": ["docx", "pdf", "txt", "html", "md"]}, "description": t("Ausgabeformat"), "default": "docx"}, {"name": "title", "type": "str", "required": False, "frontendType": "text", "description": t("Dokumenttitel")}, - {"name": "templateName", "type": "str", "required": False, "frontendType": "select", - "frontendOptions": {"options": ["default", "corporate", "minimal"]}, - "description": t("Stil-Vorlage")}, - {"name": "language", "type": "str", "required": False, "frontendType": "select", - "frontendOptions": {"options": ["de", "en", "fr"]}, - "description": t("Sprache"), "default": "de"}, {"name": "context", "type": "Any", "required": False, "frontendType": "contextBuilder", "description": t("Daten aus vorherigen Schritten"), "default": ""}, ], "inputs": 1, "outputs": 1, - "inputPorts": {0: {"accepts": ["AiResult", "TextResult", "Transit", "FormPayload"]}}, + "inputPorts": {0: {"accepts": ["AiResult", "TextResult", "Transit", "FormPayload", "LoopItem", "ActionResult"]}}, "outputPorts": {0: {"schema": "DocumentList"}}, "meta": {"icon": "mdi-file-plus-outline", "color": "#2196F3", "usesAi": False}, "_method": "file", diff --git a/modules/features/graphicalEditor/nodeDefinitions/flow.py b/modules/features/graphicalEditor/nodeDefinitions/flow.py index 5ebf50bc..49a3dcaf 100644 --- a/modules/features/graphicalEditor/nodeDefinitions/flow.py +++ b/modules/features/graphicalEditor/nodeDefinitions/flow.py @@ -3,25 +3,46 @@ from modules.shared.i18nRegistry import t +# Ports, die typische Schritt-Ausgaben durchreichen (nicht nur leerer Transit). +_FLOW_INPUT_SCHEMAS = [ + "Transit", + "FormPayload", + "AiResult", + "TextResult", + "ActionResult", + "DocumentList", + "FileList", + "EmailList", + "TaskList", + "QueryResult", + "MergeResult", + "LoopItem", + "BoolResult", + "UdmDocument", +] + FLOW_NODES = [ { "id": "flow.ifElse", "category": "flow", "label": t("Wenn / Sonst"), - "description": t("Verzweigung nach Bedingung"), + "description": t( + "Verzweigt anhand einer Bedingung auf ein vorheriges Feld oder einen Ausdruck. " + "Die Daten vom Eingangskanal werden an den gewählten Ausgang durchgereicht." + ), "parameters": [ { "name": "condition", - "type": "str", + "type": "json", "required": True, "frontendType": "condition", - "description": t("Bedingung"), + "description": t("Bedingung: Feld aus einem vorherigen Schritt und Vergleich"), }, ], "inputs": 1, "outputs": 2, "outputLabels": [t("Ja"), t("Nein")], - "inputPorts": {0: {"accepts": ["Transit"]}}, + "inputPorts": {0: {"accepts": list(_FLOW_INPUT_SCHEMAS)}}, "outputPorts": {0: {"schema": "Transit"}, 1: {"schema": "Transit"}}, "executor": "flow", "meta": {"icon": "mdi-source-branch", "color": "#FF9800", "usesAi": False}, @@ -30,26 +51,29 @@ FLOW_NODES = [ "id": "flow.switch", "category": "flow", "label": t("Switch"), - "description": t("Mehrere Zweige nach Wert"), + "description": t( + "Mehrere Zweige nach einem Wert aus einem vorherigen Schritt (Data Picker). " + "Definiere Fälle mit Vergleichsoperator; der Eingang wird an den ersten passenden Zweig durchgereicht." + ), "parameters": [ { "name": "value", - "type": "str", + "type": "Any", "required": True, - "frontendType": "text", - "description": t("Zu vergleichender Wert"), + "frontendType": "dataRef", + "description": t("Wert zum Vergleichen (Feld aus einem vorherigen Schritt)"), }, { "name": "cases", "type": "array", "required": False, "frontendType": "caseList", - "description": t("Fälle"), + "description": t("Fälle: Operator und Vergleichswert"), }, ], "inputs": 1, "outputs": 1, - "inputPorts": {0: {"accepts": ["Transit"]}}, + "inputPorts": {0: {"accepts": list(_FLOW_INPUT_SCHEMAS)}}, "outputPorts": {0: {"schema": "Transit"}}, "executor": "flow", "meta": {"icon": "mdi-swap-horizontal", "color": "#FF9800", "usesAi": False}, @@ -57,15 +81,18 @@ FLOW_NODES = [ { "id": "flow.loop", "category": "flow", - "label": t("Schleife / Für Jedes"), - "description": t("Über Array-Elemente oder UDM-Strukturebenen iterieren"), + "label": t("Schleife / Für jedes"), + "description": t( + "Iteriert über ein Array aus einem vorherigen Schritt (z. B. documente, Zeilen, Listeneinträge). " + "Optional: UDM-Ebene für strukturierte Dokumente." + ), "parameters": [ { "name": "items", - "type": "str", + "type": "Any", "required": True, - "frontendType": "text", - "description": t("Pfad zum Array"), + "frontendType": "dataRef", + "description": t("Liste oder Sammlung zum Durchlaufen (im Data Picker wählen)"), }, { "name": "level", @@ -73,7 +100,7 @@ FLOW_NODES = [ "required": False, "frontendType": "select", "frontendOptions": {"options": ["auto", "documents", "structuralNodes", "contentBlocks"]}, - "description": t("UDM-Iterationsebene"), + "description": t("Nur bei UDM-Daten: welche Strukturebene als Elemente verwendet wird"), "default": "auto", }, { @@ -82,14 +109,15 @@ FLOW_NODES = [ "required": False, "frontendType": "number", "frontendOptions": {"min": 1, "max": 20}, - "description": t("Parallele Iterationen (1 = sequentiell)"), + "description": t("Parallele Durchläufe (1 = nacheinander)"), "default": 1, }, ], "inputs": 1, "outputs": 1, "inputPorts": {0: {"accepts": [ - "Transit", "UdmDocument", "EmailList", "DocumentList", "FileList", "TaskList", "ActionResult", + "Transit", "UdmDocument", "EmailList", "DocumentList", "FileList", "TaskList", + "ActionResult", "AiResult", "QueryResult", "FormPayload", ]}}, "outputPorts": {0: {"schema": "LoopItem"}}, "executor": "flow", @@ -99,7 +127,10 @@ FLOW_NODES = [ "id": "flow.merge", "category": "flow", "label": t("Zusammenführen"), - "description": t("Mehrere Zweige zusammenführen (2-5 Eingänge)"), + "description": t( + "Führt 2–5 Zweige zusammen, wenn alle verbunden sind. " + "Modus legt fest, wie die Eingabeobjekte im Ergebnis kombiniert werden." + ), "parameters": [ { "name": "mode", @@ -107,7 +138,7 @@ FLOW_NODES = [ "required": False, "frontendType": "select", "frontendOptions": {"options": ["first", "all", "append"]}, - "description": t("Zusammenführungsmodus"), + "description": t("first: erster Zweig; all: Dict-Felder zusammenführen; append: Listen anhängen"), "default": "first", }, { @@ -116,13 +147,16 @@ FLOW_NODES = [ "required": False, "frontendType": "number", "frontendOptions": {"min": 2, "max": 5}, - "description": t("Anzahl Eingänge"), + "description": t("Anzahl Eingänge dieses Nodes (2–5)"), "default": 2, }, ], "inputs": 2, "outputs": 1, - "inputPorts": {0: {"accepts": ["Transit"]}, 1: {"accepts": ["Transit"]}}, + "inputPorts": { + 0: {"accepts": list(_FLOW_INPUT_SCHEMAS)}, + 1: {"accepts": list(_FLOW_INPUT_SCHEMAS)}, + }, "outputPorts": {0: {"schema": "MergeResult"}}, "executor": "flow", "meta": {"icon": "mdi-call-merge", "color": "#FF9800", "usesAi": False}, diff --git a/modules/features/graphicalEditor/portTypes.py b/modules/features/graphicalEditor/portTypes.py index bd092745..deab83b9 100644 --- a/modules/features/graphicalEditor/portTypes.py +++ b/modules/features/graphicalEditor/portTypes.py @@ -644,6 +644,69 @@ def resolveSystemVariable(variable: str, context: Dict[str, Any]) -> Any: # Output normalizers # --------------------------------------------------------------------------- +def _file_record_to_document(f: Any) -> Optional[Dict[str, Any]]: + """Map API / task-upload file dicts onto PortSchema ``Document`` fields.""" + if f is None: + return None + if isinstance(f, str) and f.strip(): + return {"id": f.strip()} + if not isinstance(f, dict): + return None + inner = f.get("file") if isinstance(f.get("file"), dict) else None + src = inner or f + out: Dict[str, Any] = {} + fid = src.get("id") or f.get("id") + if fid is not None and str(fid).strip(): + out["id"] = str(fid).strip() + name = ( + src.get("name") + or src.get("fileName") + or f.get("fileName") + or f.get("name") + ) + if name is not None and str(name).strip(): + out["name"] = str(name).strip() + mime = src.get("mimeType") or src.get("mime") or f.get("mimeType") + if mime is not None and str(mime).strip(): + out["mimeType"] = str(mime).strip() + for k in ("sizeBytes", "downloadUrl", "filePath"): + v = src.get(k) if k in src else f.get(k) + if v is not None and v != "": + out[k] = v + return out if out else None + + +def _coerce_document_list_upload_fields(result: Dict[str, Any]) -> None: + """ + Human task ``input.upload`` completes with ``file`` / ``files`` / ``fileIds``. + DocumentList expects ``documents``. Without this, resume adds ``documents: []`` and drops the real files. + """ + docs = result.get("documents") + if isinstance(docs, list) and len(docs) > 0: + return + collected: List[Dict[str, Any]] = [] + files = result.get("files") + if isinstance(files, list): + for item in files: + d = _file_record_to_document(item) + if d: + collected.append(d) + if not collected: + single = result.get("file") + d = _file_record_to_document(single) + if d: + collected.append(d) + if not collected and isinstance(result.get("fileIds"), list): + for fid in result["fileIds"]: + if fid is not None and str(fid).strip(): + collected.append({"id": str(fid).strip()}) + if not collected: + return + result["documents"] = collected + if not result.get("count"): + result["count"] = len(collected) + + def normalizeToSchema(raw: Any, schemaName: str) -> Dict[str, Any]: """ Normalize raw executor output to match the declared port schema. @@ -660,6 +723,9 @@ def normalizeToSchema(raw: Any, schemaName: str) -> Dict[str, Any]: if not schema or schemaName == "Transit": return result + if schemaName == "DocumentList": + _coerce_document_list_upload_fields(result) + # Only default **required** fields. Optional fields stay absent so DataRefs / context # resolution never pick a synthetic `{}` or `[]` (e.g. AiResult.responseData when the # model returned plain text only). diff --git a/modules/serviceCenter/services/serviceAi/subAiCallLooping.py b/modules/serviceCenter/services/serviceAi/subAiCallLooping.py index cc3a014b..d532115d 100644 --- a/modules/serviceCenter/services/serviceAi/subAiCallLooping.py +++ b/modules/serviceCenter/services/serviceAi/subAiCallLooping.py @@ -57,8 +57,7 @@ from .subJsonResponseHandling import JsonResponseHandler from .subLoopingUseCases import LoopingUseCaseRegistry from modules.workflows.processing.shared.stateTools import checkWorkflowStopped from modules.shared.jsonContinuation import getContexts -from modules.shared.jsonUtils import buildContinuationContext, extractJsonString, tryParseJson -from modules.shared.jsonUtils import tryParseJson +from modules.shared.jsonUtils import buildContinuationContext, tryParseJson from modules.shared.jsonUtils import closeJsonStructures from modules.shared.jsonUtils import stripCodeFences, normalizeJsonText @@ -374,9 +373,8 @@ class AiCallLooper: if lastValidCompletePart: try: - extracted = extractJsonString(lastValidCompletePart) - parsed, parseErr, _ = tryParseJson(extracted) - if parseErr is None and parsed: + parsed, parseErr, _ = tryParseJson(lastValidCompletePart) + if parseErr is None: normalized = self._normalizeJsonStructure(parsed, useCase) return json.dumps(normalized, indent=2, ensure_ascii=False) except Exception: @@ -404,11 +402,10 @@ class AiCallLooper: # This ensures retry iterations use the correct base context lastRawResponse = candidateJson - # Try direct parse of candidate + # Try direct parse of candidate (same pipeline as structure filling / getContexts) try: - extracted = extractJsonString(candidateJson) - parsed, parseErr, _ = tryParseJson(extracted) - if parseErr is None and parsed: + parsed, parseErr, extracted = tryParseJson(candidateJson) + if parseErr is None: # Direct parse succeeded - FINISHED # Commit candidate to jsonBase jsonBase = candidateJson @@ -441,39 +438,50 @@ class AiCallLooper: # STEP 6: DECIDE based on jsonParsingSuccess and overlapContext if contexts.jsonParsingSuccess and contexts.overlapContext == "": - # JSON is complete (no cut point) - FINISHED - # Use completePart for final result (closed, repaired JSON) - # No more merging needed, so we don't need the cut version - jsonBase = contexts.completePart + # getContexts and downstream must agree with tryParseJson (same as structure filling). logger.info(f"Iteration {iteration}: jsonParsingSuccess=true, overlapContext='', JSON complete") - # Store and parse completePart lastValidCompletePart = contexts.completePart try: - extracted = extractJsonString(contexts.completePart) - parsed, parseErr, _ = tryParseJson(extracted) - if parseErr is None and parsed: - normalized = self._normalizeJsonStructure(parsed, useCase) - result = json.dumps(normalized, indent=2, ensure_ascii=False) - - if iterationOperationId: - self.services.chat.progressLogFinish(iterationOperationId, True) - - if not useCase.finalResultHandler: - raise ValueError( - f"Use case '{useCaseId}' is missing required 'finalResultHandler' callback." - ) - return useCase.finalResultHandler( - result, normalized, extracted, debugPrefix, self.services + parsed, parseErr, extracted = tryParseJson(contexts.completePart) + if parseErr is not None: + raise ValueError(str(parseErr)) + normalized = self._normalizeJsonStructure(parsed, useCase) + result = json.dumps(normalized, indent=2, ensure_ascii=False) + jsonBase = contexts.completePart + + if iterationOperationId: + self.services.chat.progressLogFinish(iterationOperationId, True) + + if not useCase.finalResultHandler: + raise ValueError( + f"Use case '{useCaseId}' is missing required 'finalResultHandler' callback." ) + return useCase.finalResultHandler( + result, normalized, extracted, debugPrefix, self.services + ) except Exception as e: - logger.warning(f"Iteration {iteration}: Failed to parse completePart: {e}") - - # Fallback: return completePart as-is - if iterationOperationId: - self.services.chat.progressLogFinish(iterationOperationId, True) - return contexts.completePart + logger.warning( + f"Iteration {iteration}: completePart not serializable after getContexts success: {e}" + ) + mergeFailCount += 1 + if mergeFailCount >= MAX_MERGE_FAILS: + logger.error( + f"Iteration {iteration}: Max failures ({MAX_MERGE_FAILS}) " + "after output pipeline mismatch" + ) + if iterationOperationId: + self.services.chat.progressLogFinish(iterationOperationId, False) + return jsonBase if jsonBase else "" + if iterationOperationId: + self.services.chat.progressLogUpdate( + iterationOperationId, + 0.7, + f"Output pipeline failed ({mergeFailCount}/{MAX_MERGE_FAILS}), retrying", + ) + self.services.chat.progressLogFinish(iterationOperationId, True) + continue elif contexts.jsonParsingSuccess and contexts.overlapContext != "": # JSON parseable but has cut point - CONTINUE to next iteration @@ -522,9 +530,8 @@ class AiCallLooper: if lastValidCompletePart: try: - extracted = extractJsonString(lastValidCompletePart) - parsed, parseErr, _ = tryParseJson(extracted) - if parseErr is None and parsed: + parsed, parseErr, _ = tryParseJson(lastValidCompletePart) + if parseErr is None: normalized = self._normalizeJsonStructure(parsed, useCase) return json.dumps(normalized, indent=2, ensure_ascii=False) except Exception: @@ -552,9 +559,24 @@ class AiCallLooper: if iteration >= maxIterations: logger.warning(f"AI call stopped after maximum iterations ({maxIterations})") - # This code path should never be reached because all registered use cases - # return early when JSON is complete. This would only execute for use cases that - # require section extraction, but no such use cases are currently registered. + # Prefer last repaired complete JSON from getContexts (raw `result` is only the last fragment). + if lastValidCompletePart and useCase and not useCase.requiresExtraction: + try: + parsed, parseErr, extracted = tryParseJson(lastValidCompletePart) + if parseErr is None: + normalized = self._normalizeJsonStructure(parsed, useCase) + out = json.dumps(normalized, indent=2, ensure_ascii=False) + if useCase.finalResultHandler: + logger.warning( + "callAiWithLooping: max iterations — returning last valid completePart for %r", + useCaseId, + ) + return useCase.finalResultHandler( + out, normalized, extracted, debugPrefix, self.services + ) + except Exception as e: + logger.debug("Max-iterations fallback on completePart failed: %s", e) + logger.error( "End of callAiWithLooping without success for use case %r (iterations=%s, lastResultLen=%s)", useCaseId, diff --git a/modules/serviceCenter/services/serviceAi/subLoopingUseCases.py b/modules/serviceCenter/services/serviceAi/subLoopingUseCases.py index a2828108..f6a7c620 100644 --- a/modules/serviceCenter/services/serviceAi/subLoopingUseCases.py +++ b/modules/serviceCenter/services/serviceAi/subLoopingUseCases.py @@ -54,6 +54,15 @@ def _handleCodeContentFinalResult(result: str, parsedJsonForUseCase: Any, extrac return final_json +def _lift_section_plain_text(d: Dict[str, Any]) -> Optional[str]: + """Models often return {\"text\": \"...\"} without an elements array; extract usable prose.""" + for key in ("text", "body", "summary", "response", "output", "answer", "message", "content"): + v = d.get(key) + if isinstance(v, str) and v.strip(): + return v.strip() + return None + + def _normalizeSectionContentJson(parsed: Any, useCaseId: str) -> Any: """Normalize JSON structure for section_content use case.""" # For section_content, expect {"elements": [...]} structure @@ -77,15 +86,29 @@ def _normalizeSectionContentJson(parsed: Any, useCaseId: str) -> Any: # Convert plain list of elements to elements structure return {"elements": parsed} elif isinstance(parsed, dict): - # If it already has "elements", return as-is if "elements" in parsed: + els = parsed.get("elements") + if isinstance(els, list) and len(els) > 0: + return parsed + lifted = _lift_section_plain_text(parsed) + if lifted: + out = dict(parsed) + out["elements"] = [{"type": "paragraph", "content": {"text": lifted}}] + logger.info( + "section_content: promoted plain-text field to elements (%d chars)", + len(lifted), + ) + return out return parsed - # If it has "type" and looks like an element, wrap in elements array - elif parsed.get("type"): + if parsed.get("type"): return {"elements": [parsed]} - # Otherwise, assume it's already in correct format - else: - return parsed + lifted = _lift_section_plain_text(parsed) + if lifted: + return { + **parsed, + "elements": [{"type": "paragraph", "content": {"text": lifted}}], + } + return parsed # For other use cases, return as-is (they have their own structures) return parsed diff --git a/modules/serviceCenter/services/serviceAi/subStructureFilling.py b/modules/serviceCenter/services/serviceAi/subStructureFilling.py index b31bc32d..33398b64 100644 --- a/modules/serviceCenter/services/serviceAi/subStructureFilling.py +++ b/modules/serviceCenter/services/serviceAi/subStructureFilling.py @@ -27,6 +27,36 @@ class _AiResponseFallback: logger = logging.getLogger(__name__) +def _elements_from_section_content_ai_json(parsed: Any) -> List[Any]: + """Normalize section_content AI JSON (incl. models that return {\"text\": ...}) into elements.""" + from modules.serviceCenter.services.serviceAi.subLoopingUseCases import _normalizeSectionContentJson + + if parsed is None: + return [] + if isinstance(parsed, dict): + has_nonempty_elements = ( + isinstance(parsed.get("elements"), list) and len(parsed["elements"]) > 0 + ) + if not has_nonempty_elements: + # Valid full-document envelope (same normalized shape the renderer uses elsewhere) + docs = parsed.get("documents") + if isinstance(docs, list) and docs and isinstance(docs[0], dict): + secs = docs[0].get("sections") + if isinstance(secs, list) and secs and isinstance(secs[0], dict): + parsed = secs[0] + elif ( + isinstance(parsed.get("sections"), list) + and parsed["sections"] + and isinstance(parsed["sections"][0], dict) + ): + parsed = parsed["sections"][0] + norm = _normalizeSectionContentJson(parsed, "section_content") + if isinstance(norm, dict): + els = norm.get("elements") + return list(els) if isinstance(els, list) else [] + return [] + + class StructureFiller: """Handles filling document structure with content.""" @@ -524,38 +554,12 @@ class StructureFiller: if generatedElements: elements.extend(generatedElements) else: - # Fallback: Try to parse JSON response directly with repair logic - try: - from modules.shared.jsonUtils import tryParseJson, repairBrokenJson - - # Use tryParseJson which handles extraction and basic parsing - fallbackElements, parseError, cleanedStr = tryParseJson(aiResponse.content) - - # If parsing failed, try repair - if parseError and isinstance(aiResponse.content, str): - logger.warning(f"Initial JSON parse failed for section {sectionId}, attempting repair: {str(parseError)}") - repairedJson = repairBrokenJson(aiResponse.content) - if repairedJson: - fallbackElements = repairedJson - parseError = None - logger.info(f"Successfully repaired JSON for section {sectionId}") - - if parseError: - raise parseError - - if isinstance(fallbackElements, list): - elements.extend(fallbackElements) - elif isinstance(fallbackElements, dict) and "elements" in fallbackElements: - elements.extend(fallbackElements["elements"]) - elif isinstance(fallbackElements, dict) and fallbackElements.get("type"): - elements.append(fallbackElements) - except (json.JSONDecodeError, ValueError) as json_error: - logger.error(f"Error parsing JSON response for section {sectionId}: {str(json_error)}") - elements.append({ - "type": "error", - "message": f"Failed to parse JSON response: {str(json_error)}", - "sectionId": sectionId - }) + logger.error(f"No elements produced for section {sectionId} (callAiWithLooping must return parseable JSON)") + elements.append({ + "type": "error", + "message": f"No parsed content for section {sectionId}", + "sectionId": sectionId + }) return elements @@ -671,7 +675,7 @@ class StructureFiller: try: self.services.chat.progressLogUpdate(sectionOperationId, 0.4, "Calling AI for content generation") - operationType = OperationTypeEnum.DATA_ANALYSE + operationType = OperationTypeEnum.DATA_GENERATE options = AiCallOptions( operationType=operationType, priority=PriorityEnum.BALANCED, @@ -703,22 +707,17 @@ class StructureFiller: ) try: - from modules.shared.jsonUtils import tryParseJson, repairBrokenJson + from modules.shared.jsonUtils import tryParseJson + if isinstance(aiResponseJson, str) and ("---" in aiResponseJson or aiResponseJson.count("```json") > 1): generatedElements = self._extractAndMergeMultipleJsonBlocks(aiResponseJson, contentType, sectionId) else: - parsedResponse, parseError, cleanedStr = tryParseJson(aiResponseJson) - if parsedResponse is None: - logger.warning(f"Section {sectionId}: tryParseJson failed, attempting repair") - repairedStr = repairBrokenJson(aiResponseJson) - parsedResponse, parseError2, _ = tryParseJson(repairedStr) - - if parsedResponse and isinstance(parsedResponse, dict): - generatedElements = parsedResponse.get("elements", []) - elif parsedResponse and isinstance(parsedResponse, list): - generatedElements = parsedResponse - else: + parsedResponse, parseError, _ = tryParseJson(aiResponseJson) + if parseError is not None: + logger.error(f"Section {sectionId}: tryParseJson failed: {parseError}") generatedElements = [] + else: + generatedElements = _elements_from_section_content_ai_json(parsedResponse) except Exception as parseErr: logger.error(f"Section {sectionId}: JSON parse error: {parseErr}") generatedElements = [] @@ -930,7 +929,7 @@ class StructureFiller: self.services.chat.progressLogUpdate(sectionOperationId, 0.4, "Calling AI for content generation") - operationType = OperationTypeEnum.IMAGE_GENERATE if contentType == "image" else OperationTypeEnum.DATA_ANALYSE + operationType = OperationTypeEnum.IMAGE_GENERATE if contentType == "image" else OperationTypeEnum.DATA_GENERATE if operationType == OperationTypeEnum.IMAGE_GENERATE: maxPromptLength = 4000 @@ -996,44 +995,17 @@ class StructureFiller: ) try: - # Use tryParseJson which handles extraction and basic parsing - from modules.shared.jsonUtils import tryParseJson, repairBrokenJson - - # Check if response contains multiple JSON blocks (separated by --- or multiple ```json blocks) - # This can happen when AI returns multiple complete responses + from modules.shared.jsonUtils import tryParseJson + if isinstance(aiResponseJson, str) and ("---" in aiResponseJson or aiResponseJson.count("```json") > 1): logger.info(f"Section {sectionId}: Detected multiple JSON blocks in response, attempting to merge") generatedElements = self._extractAndMergeMultipleJsonBlocks(aiResponseJson, contentType, sectionId) else: - parsedResponse, parseError, cleanedStr = tryParseJson(aiResponseJson) - - # If parsing failed, try repair - if parseError and isinstance(aiResponseJson, str): - logger.warning(f"Initial JSON parse failed for section {sectionId}, attempting repair: {str(parseError)}") - repairedJson = repairBrokenJson(aiResponseJson) - if repairedJson: - parsedResponse = repairedJson - parseError = None - logger.info(f"Successfully repaired JSON for section {sectionId}") - - if parseError: + parsedResponse, parseError, _ = tryParseJson(aiResponseJson) + if parseError is not None: raise parseError - - if isinstance(parsedResponse, list): - generatedElements = parsedResponse - elif isinstance(parsedResponse, dict): - if "elements" in parsedResponse: - generatedElements = parsedResponse["elements"] - elif "sections" in parsedResponse and len(parsedResponse["sections"]) > 0: - firstSection = parsedResponse["sections"][0] - generatedElements = firstSection.get("elements", []) - elif parsedResponse.get("type"): - generatedElements = [parsedResponse] - else: - generatedElements = [] - else: - generatedElements = [] - + generatedElements = _elements_from_section_content_ai_json(parsedResponse) + aiResponse = _AiResponseFallback(aiResponseJson) except Exception as parseError: logger.error(f"Error parsing response from _callAiWithLooping for section {sectionId}: {str(parseError)}") @@ -1112,7 +1084,7 @@ class StructureFiller: self.services.chat.progressLogUpdate(sectionOperationId, 0.4, "Calling AI for content generation") - operationType = OperationTypeEnum.IMAGE_GENERATE if contentType == "image" else OperationTypeEnum.DATA_ANALYSE + operationType = OperationTypeEnum.IMAGE_GENERATE if contentType == "image" else OperationTypeEnum.DATA_GENERATE if operationType == OperationTypeEnum.IMAGE_GENERATE: maxPromptLength = 4000 @@ -1135,6 +1107,7 @@ class StructureFiller: processingMode=ProcessingModeEnum.DETAILED ) ) + checkWorkflowStopped(self.services) aiResponse = await self.aiService.callAi(request) generatedElements = [] @@ -1179,22 +1152,16 @@ class StructureFiller: ) try: - parsedResponse = json.loads(self.services.utils.jsonExtractString(aiResponseJson)) - if isinstance(parsedResponse, list): - generatedElements = parsedResponse - elif isinstance(parsedResponse, dict): - if "elements" in parsedResponse: - generatedElements = parsedResponse["elements"] - elif "sections" in parsedResponse and len(parsedResponse["sections"]) > 0: - firstSection = parsedResponse["sections"][0] - generatedElements = firstSection.get("elements", []) - elif parsedResponse.get("type"): - generatedElements = [parsedResponse] - else: - generatedElements = [] - else: + from modules.shared.jsonUtils import tryParseJson + + parsedResponse, parseError, _ = tryParseJson(aiResponseJson) + if parseError is not None: + logger.error( + f"Error parsing response from _callAiWithLooping for section {sectionId}: {parseError}" + ) generatedElements = [] - + else: + generatedElements = _elements_from_section_content_ai_json(parsedResponse) aiResponse = _AiResponseFallback(aiResponseJson) except Exception as parseError: logger.error(f"Error parsing response from _callAiWithLooping for section {sectionId}: {str(parseError)}") @@ -1371,7 +1338,7 @@ class StructureFiller: self.services.chat.progressLogUpdate(sectionOperationId, 0.4, "Calling AI for content generation") - operationType = OperationTypeEnum.IMAGE_GENERATE if contentType == "image" else OperationTypeEnum.DATA_ANALYSE + operationType = OperationTypeEnum.IMAGE_GENERATE if contentType == "image" else OperationTypeEnum.DATA_GENERATE if operationType == OperationTypeEnum.IMAGE_GENERATE: maxPromptLength = 4000 @@ -1439,22 +1406,16 @@ class StructureFiller: ) try: - parsedResponse = json.loads(self.services.utils.jsonExtractString(aiResponseJson)) - if isinstance(parsedResponse, list): - generatedElements = parsedResponse - elif isinstance(parsedResponse, dict): - if "elements" in parsedResponse: - generatedElements = parsedResponse["elements"] - elif "sections" in parsedResponse and len(parsedResponse["sections"]) > 0: - firstSection = parsedResponse["sections"][0] - generatedElements = firstSection.get("elements", []) - elif parsedResponse.get("type"): - generatedElements = [parsedResponse] - else: - generatedElements = [] - else: + from modules.shared.jsonUtils import tryParseJson + + parsedResponse, parseError, _ = tryParseJson(aiResponseJson) + if parseError is not None: + logger.error( + f"Error parsing response from _callAiWithLooping for section {sectionId}: {parseError}" + ) generatedElements = [] - + else: + generatedElements = _elements_from_section_content_ai_json(parsedResponse) aiResponse = _AiResponseFallback(aiResponseJson) except Exception as parseError: logger.error(f"Error parsing response from _callAiWithLooping for section {sectionId}: {str(parseError)}") diff --git a/modules/serviceCenter/services/serviceChat/mainServiceChat.py b/modules/serviceCenter/services/serviceChat/mainServiceChat.py index fcf9be2f..b5c7a542 100644 --- a/modules/serviceCenter/services/serviceChat/mainServiceChat.py +++ b/modules/serviceCenter/services/serviceChat/mainServiceChat.py @@ -40,6 +40,26 @@ class ChatService: """Workflow from context (stable during workflow execution).""" return self._context.workflow + def _chat_document_from_management_file(self, file_id: str) -> Optional[ChatDocument]: + """Build a ChatDocument when docItem references a management FileItem (e.g. automation uploads) without a chat message.""" + try: + fi = self.interfaceDbComponent.getFile(file_id) + except Exception as e: + logger.debug("getFile(%s) failed: %s", file_id, e) + return None + if fi is None: + return None + wf = self._workflow + wf_id = wf.id if wf else "no-workflow" + return ChatDocument( + id=file_id, + messageId=f"_filestore:{wf_id}", + fileId=fi.id, + fileName=fi.fileName or "document", + fileSize=int(fi.fileSize or 0), + mimeType=fi.mimeType or "application/octet-stream", + ) + def getChatDocumentsFromDocumentList(self, documentList) -> List[ChatDocument]: """Get ChatDocuments from a DocumentReferenceList. @@ -130,14 +150,28 @@ class ChatService: if message.documents: for doc in message.documents: - if doc.id == docId: + if doc.id == docId or getattr(doc, "fileId", None) == docId: allDocuments.append(doc) docFound = True - logger.debug(f"Matched document reference '{docRef}' to document {doc.id} (fileName: {getattr(doc, 'fileName', 'unknown')}) by documentId") + logger.debug( + f"Matched document reference '{docRef}' to document {doc.id} " + f"(fileName: {getattr(doc, 'fileName', 'unknown')}) by id/fileId" + ) break if docFound: break + if not docFound: + synth = self._chat_document_from_management_file(docId) + if synth is not None: + allDocuments.append(synth) + docFound = True + logger.info( + "Resolved document reference %r via FileItem %s (automation / transient workflow)", + docRef, + docId, + ) + # Fallback: If not found by documentId and it looks like a filename (has file extension), try filename matching # This handles cases where AI incorrectly generates docItem:filename.docx if not docFound and '.' in docId and len(parts) == 2: diff --git a/modules/serviceCenter/services/serviceGeneration/renderers/rendererText.py b/modules/serviceCenter/services/serviceGeneration/renderers/rendererText.py index 596feeeb..67eab4e8 100644 --- a/modules/serviceCenter/services/serviceGeneration/renderers/rendererText.py +++ b/modules/serviceCenter/services/serviceGeneration/renderers/rendererText.py @@ -196,8 +196,10 @@ class RendererText(BaseRenderer): textParts.append(f"[Reference: {label}]") continue elif element_type == "extracted_text": - # Extracted text format + # Extracted text format (str or raw bytes from ContentPart) content = element.get("content", "") + if isinstance(content, (bytes, bytearray, memoryview)): + content = bytes(content).decode("utf-8", errors="replace") source = element.get("source", "") if content: source_text = f" (Source: {source})" if source else "" @@ -323,22 +325,27 @@ class RendererText(BaseRenderer): try: # Extract from nested content structure: element.content.{text, level} content = headingData.get("content", {}) - if not isinstance(content, dict): + if isinstance(content, dict) and content: + text = self._stripMarkdownForPlainText(content.get("text", "")) + level = content.get("level", 1) + else: + # AI shorthand: {"type":"heading","text":"...","level":2} + text = self._stripMarkdownForPlainText(str(headingData.get("text", "") or "")) + level = headingData.get("level", 1) + if not text: return "" - text = self._stripMarkdownForPlainText(content.get("text", "")) - level = content.get("level", 1) - - if text: - level = max(1, min(6, level)) - if level == 1: - return f"{text}\n{'=' * len(text)}" - elif level == 2: - return f"{text}\n{'-' * len(text)}" - else: - return f"{'#' * level} {text}" - - return "" - + + try: + level_i = int(level) if level is not None else 1 + except (TypeError, ValueError): + level_i = 1 + level_i = max(1, min(6, level_i)) + if level_i == 1: + return f"{text}\n{'=' * len(text)}" + if level_i == 2: + return f"{text}\n{'-' * len(text)}" + return f"{'#' * level_i} {text}" + except Exception as e: self.logger.warning(f"Error rendering heading: {str(e)}") return "" @@ -399,8 +406,19 @@ class RendererText(BaseRenderer): def _renderJsonParagraph(self, paragraphData: Dict[str, Any]) -> str: """Render a JSON paragraph to text. Strips markdown for plain text output.""" try: - # Extract from nested content structure - content = paragraphData.get("content", {}) + # Models often return {"type":"paragraph","text":"..."} without nested "content" + top = paragraphData.get("text") + raw_content = paragraphData.get("content", {}) + if isinstance(top, str) and top.strip(): + if raw_content is None or raw_content == {}: + return self._stripMarkdownForPlainText(top) + if isinstance(raw_content, dict): + if not (raw_content.get("text") or raw_content.get("inlineRuns")): + return self._stripMarkdownForPlainText(top) + + content = raw_content + if content is None: + content = {} if isinstance(content, dict): runs = self._inlineRunsFromContent(content) if runs: diff --git a/modules/shared/jsonContinuation.py b/modules/shared/jsonContinuation.py index 22180b41..9d282c62 100644 --- a/modules/shared/jsonContinuation.py +++ b/modules/shared/jsonContinuation.py @@ -2172,11 +2172,13 @@ def getContexts( >>> print(contexts.overlapContext) # "" (empty - JSON is complete) >>> print(contexts.jsonParsingSuccess) # True """ - # First, check if original JSON is already complete (parseable without modification) + # Completeness must use the same pipeline as callers (fences, balanced extract, normalization). + from modules.shared.jsonUtils import tryParseJson as _utils_try_parse_json + jsonIsComplete = False if truncatedJson and truncatedJson.strip(): - parsed, error = _tryParseJson(truncatedJson.strip()) - if error is None: + _parsed_hdr, error_hdr, _ = _utils_try_parse_json(truncatedJson) + if error_hdr is None: jsonIsComplete = True logger.debug("Original JSON is already complete (no cut point)") @@ -2193,28 +2195,27 @@ def getContexts( jsonParsingSuccess = False if completePart and completePart.strip(): - # First attempt: parse as-is - parsed, error = _tryParseJson(completePart) - + parsed, error, _ = _utils_try_parse_json(completePart) if error is None: jsonParsingSuccess = True else: - # Second attempt: repair internal errors and retry - logger.debug(f"Initial parse failed: {error}, attempting repair") + logger.debug(f"Initial parse failed: {error}, attempting internal repair") repairedCompletePart = _repairInternalJsonErrors(completePart) - - parsed, error = _tryParseJson(repairedCompletePart) - + parsed, error, _ = _utils_try_parse_json(repairedCompletePart) if error is None: - # Repair succeeded - use repaired version completePart = repairedCompletePart jsonParsingSuccess = True logger.debug("JSON repair successful") else: - # Repair also failed - keep original completePart, mark as failed logger.debug(f"JSON repair also failed: {error}") jsonParsingSuccess = False + # If completePart parses successfully, the merged/candidate JSON is structurally complete + # after repair/closing — overlap from extractContinuationContexts on the *raw* candidate + # would falsely signal truncation and trap callAiWithLooping in continuation iterations. + if jsonParsingSuccess: + overlap = "" + return JsonContinuationContexts( overlapContext=overlap, hierarchyContext=hierarchy, diff --git a/modules/workflows/automation2/executionEngine.py b/modules/workflows/automation2/executionEngine.py index 55a63281..e49754f8 100644 --- a/modules/workflows/automation2/executionEngine.py +++ b/modules/workflows/automation2/executionEngine.py @@ -393,9 +393,10 @@ async def executeGraph( ordered_ids = [n.get("id") for n in ordered if n.get("id")] logger.info("executeGraph topoSort order: %s", ordered_ids) - nodeOutputs: Dict[str, Any] = dict(initialNodeOutputs or {}) + # Normalize resumed human-node output BEFORE copying into nodeOutputs — otherwise + # normalizeToSchema only updates initialNodeOutputs and loop/refs still see raw + # e.g. input.upload {files} without coerced DocumentList.documents. is_resume = startAfterNodeId is not None - if is_resume and initialNodeOutputs and startAfterNodeId: resumedNode = next((n for n in nodes if n.get("id") == startAfterNodeId), None) if resumedNode: @@ -408,6 +409,8 @@ async def executeGraph( initialNodeOutputs[startAfterNodeId] = normalizeToSchema(resumedOutput, schema) except Exception as valErr: logger.warning("executeGraph resume: schema validation failed for %s: %s", startAfterNodeId, valErr) + + nodeOutputs: Dict[str, Any] = dict(initialNodeOutputs or {}) if not runId and automation2_interface and workflowId and not is_resume: run_context = { "connectionMap": connectionMap, diff --git a/modules/workflows/automation2/executors/actionNodeExecutor.py b/modules/workflows/automation2/executors/actionNodeExecutor.py index 6a736654..c1be349c 100644 --- a/modules/workflows/automation2/executors/actionNodeExecutor.py +++ b/modules/workflows/automation2/executors/actionNodeExecutor.py @@ -1,7 +1,8 @@ # Copyright (c) 2025 Patrick Motsch # Action node executor - maps ai.*, email.*, sharepoint.*, clickup.*, file.*, trustee.* to method actions. # -# Typed Port System: explicit DataRefs / static parameters only (no runtime wire-handover). +# Typed Port System: explicit DataRefs / static parameters; optional ``documentList`` from input port 0 +# when the param is empty (same idea as IOExecutor wire fill). # ``materializeConnectionRefs`` (see pickNotPushMigration) may still rewrite empty connectionReference at run start. import json @@ -18,6 +19,25 @@ from modules.serviceCenter.services.serviceBilling.mainServiceBilling import Bil logger = logging.getLogger(__name__) + +def _coerce_document_data_to_bytes(raw: Any) -> Optional[bytes]: + """Normalize documentData (bytes/str/buffer) for DB file persistence.""" + if raw is None: + return None + if isinstance(raw, bytes): + return raw if len(raw) > 0 else None + if isinstance(raw, bytearray): + b = bytes(raw) + return b if len(b) > 0 else None + if isinstance(raw, memoryview): + b = raw.tobytes() + return b if len(b) > 0 else None + if isinstance(raw, str): + b = raw.encode("utf-8") + return b if len(b) > 0 else None + return None + + _USER_CONNECTION_ID_RE = re.compile( r"^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$", re.IGNORECASE, @@ -219,6 +239,78 @@ def _getOutputSchemaName(nodeDef: Dict) -> str: return port0.get("schema", "ActionResult") +def _extract_wired_document_list(inp: Any) -> Optional[Dict[str, Any]]: + """ + Build a DocumentList-shaped dict from upstream node output (matches IOExecutor wire behavior). + Handles DocumentList, human upload shapes (file / files / fileIds), FileList, loop file items. + During flow.loop body execution the loop node's output is + {items, count, currentItem, currentIndex}; wired document actions must use currentItem. + """ + if inp is None: + return None + from modules.features.graphicalEditor.portTypes import ( + unwrapTransit, + _coerce_document_list_upload_fields, + _file_record_to_document, + ) + + data = unwrapTransit(inp) + if isinstance(data, str): + one = _file_record_to_document(data) + return {"documents": [one], "count": 1} if one else None + if not isinstance(data, dict): + return None + d = dict(data) + _coerce_document_list_upload_fields(d) + # Per-iteration payload from executionEngine (flow.loop → downstream in loop body) + if "currentItem" in d: + ci = d.get("currentItem") + if ci is not None: + nested = _extract_wired_document_list(ci) + if nested: + return nested + docs = d.get("documents") + if isinstance(docs, list) and len(docs) > 0: + return {"documents": docs, "count": d.get("count", len(docs))} + raw_list = d.get("documentList") + if isinstance(raw_list, list) and len(raw_list) > 0 and isinstance(raw_list[0], dict): + return {"documents": raw_list, "count": len(raw_list)} + doc_id = d.get("documentId") or d.get("id") + if doc_id and str(doc_id).strip(): + one: Dict[str, Any] = {"id": str(doc_id).strip()} + fn = d.get("fileName") or d.get("name") + if fn: + one["name"] = str(fn) + mt = d.get("mimeType") + if mt: + one["mimeType"] = str(mt) + return {"documents": [one], "count": 1} + files = d.get("files") + if isinstance(files, list) and files: + collected = [] + for item in files: + conv = _file_record_to_document(item) if isinstance(item, dict) else None + if conv: + collected.append(conv) + if collected: + return {"documents": collected, "count": len(collected)} + return None + + +def _document_list_param_is_empty(val: Any) -> bool: + if val is None or val == "": + return True + if isinstance(val, list) and len(val) == 0: + return True + if isinstance(val, dict): + if val.get("documents") or val.get("references") or val.get("items"): + return False + if val.get("documentId") or val.get("id"): + return False + return True + return False + + class ActionNodeExecutor: """Execute action nodes by mapping to method actions via ActionExecutor.""" @@ -260,6 +352,17 @@ class ActionNodeExecutor: if pName and pName not in resolvedParams and "default" in pDef: resolvedParams[pName] = pDef["default"] + _param_names = {p.get("name") for p in nodeDef.get("parameters", []) if p.get("name")} + if "documentList" in _param_names and _document_list_param_is_empty(resolvedParams.get("documentList")): + _src_map = (context.get("inputSources") or {}).get(nodeId) or {} + _entry = _src_map.get(0) + if _entry: + _src_node_id, _ = _entry + _upstream = (context.get("nodeOutputs") or {}).get(_src_node_id) + _wired = _extract_wired_document_list(_upstream) + if _wired: + resolvedParams["documentList"] = _wired + # 3. Resolve connectionReference chatService = getattr(self.services, "chat", None) _resolveConnectionParam(resolvedParams, chatService, self.services) @@ -323,7 +426,8 @@ class ActionNodeExecutor: for d in (result.documents or []): dumped = d.model_dump() if hasattr(d, "model_dump") else dict(d) if isinstance(d, dict) else d rawData = getattr(d, "documentData", None) if hasattr(d, "documentData") else (dumped.get("documentData") if isinstance(dumped, dict) else None) - if isinstance(dumped, dict) and isinstance(rawData, bytes) and len(rawData) > 0: + rawBytes = _coerce_document_data_to_bytes(rawData) + if isinstance(dumped, dict) and rawBytes: try: from modules.interfaces.interfaceDbManagement import getInterface as _getMgmtInterface from modules.interfaces.interfaceDbApp import getInterface as _getAppInterface @@ -347,8 +451,8 @@ class ActionNodeExecutor: _mgmt = _getMgmtInterface(_owner, mandateId=_mandateId, featureInstanceId=_instanceId) _docName = dumped.get("documentName") or f"workflow-result-{nodeId}.bin" _mimeType = dumped.get("mimeType") or "application/octet-stream" - _fileItem = _mgmt.createFile(_docName, _mimeType, rawData) - _mgmt.createFileData(_fileItem.id, rawData) + _fileItem = _mgmt.createFile(_docName, _mimeType, rawBytes) + _mgmt.createFileData(_fileItem.id, rawBytes) dumped["fileId"] = _fileItem.id dumped["id"] = _fileItem.id dumped["fileName"] = _fileItem.fileName diff --git a/modules/workflows/automation2/executors/ioExecutor.py b/modules/workflows/automation2/executors/ioExecutor.py index 38e2570c..f6d40b05 100644 --- a/modules/workflows/automation2/executors/ioExecutor.py +++ b/modules/workflows/automation2/executors/ioExecutor.py @@ -45,10 +45,12 @@ class IOExecutor: if 0 in inputSources: srcId, _ = inputSources[0] inp = nodeOutputs.get(srcId) - from modules.workflows.automation2.executors.actionNodeExecutor import _getDocumentsFromUpstream - docs = _getDocumentsFromUpstream(inp) if isinstance(inp, dict) else [] + from modules.workflows.automation2.executors.actionNodeExecutor import _extract_wired_document_list + + wired = _extract_wired_document_list(inp) + docs = (wired or {}).get("documents") if isinstance(wired, dict) else None if docs: - resolvedParams.setdefault("documentList", docs) + resolvedParams.setdefault("documentList", wired) elif inp is not None: resolvedParams.setdefault("input", inp) diff --git a/modules/workflows/methods/methodAi/actions/generateCode.py b/modules/workflows/methods/methodAi/actions/generateCode.py index ee375d89..bc7a5a64 100644 --- a/modules/workflows/methods/methodAi/actions/generateCode.py +++ b/modules/workflows/methods/methodAi/actions/generateCode.py @@ -21,7 +21,6 @@ async def generateCode(self, parameters: Dict[str, Any]) -> ActionResult: if not prompt.strip(): return ActionResult.isFailure(error="prompt is required") - documentList = parameters.get("documentList", []) # Optional: if omitted, formats determined from prompt by AI resultType = parameters.get("resultType") @@ -34,19 +33,15 @@ async def generateCode(self, parameters: Dict[str, Any]) -> ActionResult: parentOperationId = parameters.get('parentOperationId') try: - # Convert documentList to DocumentReferenceList if needed - docRefList = None - if documentList: - from modules.datamodels.datamodelDocref import DocumentReferenceList - - if isinstance(documentList, DocumentReferenceList): - docRefList = documentList - elif isinstance(documentList, str): - docRefList = DocumentReferenceList.from_string_list([documentList]) - elif isinstance(documentList, list): - docRefList = DocumentReferenceList.from_string_list(documentList) - else: - docRefList = DocumentReferenceList(references=[]) + from modules.datamodels.datamodelDocref import coerceDocumentReferenceList + + raw_dl = parameters.get("documentList") + if raw_dl is None or raw_dl == "": + docRefList = None + else: + docRefList = coerceDocumentReferenceList(raw_dl) + if not docRefList.references: + docRefList = None # Prepare title title = "Generated Code" diff --git a/modules/workflows/methods/methodAi/actions/generateDocument.py b/modules/workflows/methods/methodAi/actions/generateDocument.py index 0edcd141..5a1ff0eb 100644 --- a/modules/workflows/methods/methodAi/actions/generateDocument.py +++ b/modules/workflows/methods/methodAi/actions/generateDocument.py @@ -21,7 +21,6 @@ async def generateDocument(self, parameters: Dict[str, Any]) -> ActionResult: if not prompt.strip(): return ActionResult.isFailure(error="prompt is required") - documentList = parameters.get("documentList", []) documentType = parameters.get("documentType") # Prefer explicit outputFormat (flow UI); resultType remains for legacy / API callers. resultType = parameters.get("outputFormat") or parameters.get("resultType") @@ -37,19 +36,16 @@ async def generateDocument(self, parameters: Dict[str, Any]) -> ActionResult: parentOperationId = parameters.get('parentOperationId') try: - # Convert documentList to DocumentReferenceList if needed - docRefList = None - if documentList: - from modules.datamodels.datamodelDocref import DocumentReferenceList - - if isinstance(documentList, DocumentReferenceList): - docRefList = documentList - elif isinstance(documentList, str): - docRefList = DocumentReferenceList.from_string_list([documentList]) - elif isinstance(documentList, list): - docRefList = DocumentReferenceList.from_string_list(documentList) - else: - docRefList = DocumentReferenceList(references=[]) + # Convert documentList to DocumentReferenceList (handles dict {"documents": [...]}, list of ids, str, etc.) + from modules.datamodels.datamodelDocref import coerceDocumentReferenceList + + raw_dl = parameters.get("documentList") + if raw_dl is None or raw_dl == "": + docRefList = None + else: + docRefList = coerceDocumentReferenceList(raw_dl) + if not docRefList.references: + docRefList = None title_raw = parameters.get("title") title = (title_raw.strip() if isinstance(title_raw, str) else "") or None