diff --git a/modules/features/graphicalEditor/nodeDefinitions/context.py b/modules/features/graphicalEditor/nodeDefinitions/context.py index 22e068dd..743d92e8 100644 --- a/modules/features/graphicalEditor/nodeDefinitions/context.py +++ b/modules/features/graphicalEditor/nodeDefinitions/context.py @@ -271,6 +271,10 @@ CONTEXT_NODES = [ "outputPorts": { 0: { "schema": "ActionResult", + # Override the schema-level primaryTextRef path: ``response`` is intentionally + # empty for this node; downstream nodes with ``primaryTextRef`` should resolve to + # the full presentation object under ``data``. + "primaryTextRefPath": ["data"], # Authoritative DataPicker paths (same idea as ``parameters`` for configuration). # Frontend uses only this list — no schema expansion merge for this port. "dataPickOptions": [ @@ -316,6 +320,11 @@ CONTEXT_NODES = [ "meta": {"icon": "mdi-file-tree-outline", "color": "#00897B", "usesAi": False}, "_method": "context", "_action": "extractContent", + # Executor behaviour flags — drives actionNodeExecutor without hardcoded type checks. + "skipUnifiedPresentation": True, + "clearResponse": True, + "imageDocumentsFromExtractData": True, + "popDocumentsFromOutput": True, }, { "id": "context.mergeContext", @@ -353,6 +362,9 @@ CONTEXT_NODES = [ "meta": {"icon": "mdi-call-merge", "color": "#7B1FA2", "usesAi": False}, "_method": "context", "_action": "mergeContext", + # Image documents live on ``data.merged.imageDocumentsOnly`` (accumulated across + # iterations) rather than the top-level ``documents`` list which is always empty. + "imageDocumentsFromMerged": True, }, { "id": "context.transformContext", @@ -421,6 +433,9 @@ CONTEXT_NODES = [ "deriveFrom": "mappings", "deriveNameField": "outputField", "dataPickOptions": CONTEXT_ENVELOPE_DATA_PICK_OPTIONS, + # ActionResult is the correct normalization schema — NOT FormPayload. + # The output is a versionned ActionResult envelope built by contextEnvelope. + "fromGraphResultSchema": "ActionResult", } }, "injectUpstreamPayload": True, diff --git a/modules/features/graphicalEditor/nodeDefinitions/file.py b/modules/features/graphicalEditor/nodeDefinitions/file.py index 2b79f2e0..a10999a2 100644 --- a/modules/features/graphicalEditor/nodeDefinitions/file.py +++ b/modules/features/graphicalEditor/nodeDefinitions/file.py @@ -37,5 +37,7 @@ FILE_NODES = [ "meta": {"icon": "mdi-file-plus-outline", "color": "#2196F3", "usesAi": False}, "_method": "file", "_action": "create", + # Emit a debug log tracing how the ``context`` parameter was resolved. + "logContextResolution": True, }, ] diff --git a/modules/features/graphicalEditor/nodeDefinitions/flow.py b/modules/features/graphicalEditor/nodeDefinitions/flow.py index b2fc020b..6c020afa 100644 --- a/modules/features/graphicalEditor/nodeDefinitions/flow.py +++ b/modules/features/graphicalEditor/nodeDefinitions/flow.py @@ -293,6 +293,12 @@ FLOW_NODES = [ "default": 2, }, ], + # ``inputs: 2`` is the static minimum / default topology. ``inputCount`` is a + # frontend hint: the editor adds/removes input ports dynamically when the user + # changes the value. ``FlowExecutor._merge`` collects whatever ports exist in + # ``inputSources`` at runtime, so extra ports (3–5) work without further changes + # to this definition. ``inputPorts`` below only type-declares the two minimum + # ports; additional ports inherit the same ``_FLOW_INPUT_SCHEMAS`` accepts list. "inputs": 2, "outputs": 1, "inputPorts": { diff --git a/modules/features/graphicalEditor/portTypes.py b/modules/features/graphicalEditor/portTypes.py index 0784e436..e5633514 100644 --- a/modules/features/graphicalEditor/portTypes.py +++ b/modules/features/graphicalEditor/portTypes.py @@ -252,6 +252,16 @@ PORT_TYPE_CATALOG: Dict[str, PortSchema] = { picker_label=t("Alle Ausgabe-Dateien (Liste)"), picker_item_label=t("je Datei"), ), + PortField(name="data", type="Dict", required=False, + description=( + "Internes Payload-Objekt (entspricht ``ActionResult.data``-Semantik). " + "Wird vom Executor gesetzt und enthält denselben Inhalt wie ``response`` " + "in strukturierter Form; primär für nachgelagerte Kontext-Nodes." + ), + picker_label=t("Technische Detaildaten (data)")), + PortField(name="imageDocumentsOnly", type="List[Document]", required=False, + description="Nur Bild-bezogene Einträge aus documents.", + picker_label=t("Nur Bilder (Liste)")), ]), "BoolResult": PortSchema(name="BoolResult", fields=[ PortField(name="result", type="bool", diff --git a/modules/workflows/automation2/executionEngine.py b/modules/workflows/automation2/executionEngine.py index f68a3feb..4e3f89da 100644 --- a/modules/workflows/automation2/executionEngine.py +++ b/modules/workflows/automation2/executionEngine.py @@ -86,7 +86,11 @@ def _outputSchemaForNode(nodeType: str) -> Optional[str]: if isinstance(p0, dict): spec = p0.get("schema") if isinstance(spec, dict) and spec.get("kind") == "fromGraph": - return "FormPayload" + # Read override from the port definition — ``FormPayload`` is the + # fallback for true form nodes; dynamic context nodes (e.g. + # context.transformContext) declare ``fromGraphResultSchema`` to + # avoid wrong normalization. + return p0.get("fromGraphResultSchema") or "FormPayload" if isinstance(spec, str): return spec return None @@ -96,8 +100,11 @@ def _isBarrierNode(nodeType: str) -> bool: """Barrier nodes wait for all connected predecessors before executing. Backwards compatible: ``flow.merge`` is always a barrier. Any other node may - declare ``waitsForAllPredecessors: True`` in its STATIC_NODE_TYPES entry - (e.g. ``context.mergeContext``). + declare ``waitsForAllPredecessors: True`` in its STATIC_NODE_TYPES entry. + + Note: ``context.mergeContext`` is NOT a barrier — it receives its list of + inputs via the ``dataSource`` DataRef parameter (typically ``loop.bodyResults``) + and executes once its single upstream edge is satisfied. """ if nodeType == "flow.merge": return True @@ -107,10 +114,6 @@ def _isBarrierNode(nodeType: str) -> bool: return False -# Legacy alias used inside this module. -_isMergeNode = _isBarrierNode - - def _allMergePredecessorsReady( nodeId: str, connectionMap: Dict[str, List], @@ -249,7 +252,6 @@ def _emitStepEvent(runId: str, stepData: Dict[str, Any]) -> None: queueId = f"run-trace-{runId}" if not em.has_queue(queueId): return - import asyncio loop = asyncio.get_event_loop() if loop.is_running(): asyncio.ensure_future(em.emit_event(queueId, "step", stepData, event_category="tracing")) diff --git a/modules/workflows/automation2/executors/actionNodeExecutor.py b/modules/workflows/automation2/executors/actionNodeExecutor.py index d5a3fce8..2158de45 100644 --- a/modules/workflows/automation2/executors/actionNodeExecutor.py +++ b/modules/workflows/automation2/executors/actionNodeExecutor.py @@ -31,12 +31,11 @@ from modules.workflows.methods.methodContext.actions.extractContent import ( logger = logging.getLogger(__name__) _FILE_CREATE_CTX_LOG_MAX = 500 -_SKIP_UNIFIED_PRESENTATION_NODES = frozenset({"context.extractContent"}) -def _attach_unified_presentation_data(out: Dict[str, Any], *, node_type: str) -> None: +def _attach_unified_presentation_data(out: Dict[str, Any], *, node_def: Dict[str, Any]) -> None: """Ensure ``out[\"data\"]`` carries ``context.extractContent.presentation.v1`` for ``file.create``.""" - if node_type in _SKIP_UNIFIED_PRESENTATION_NODES: + if node_def.get("skipUnifiedPresentation"): return data = out.get("data") if isinstance(data, dict) and data.get("kind") == PRESENTATION_KIND: @@ -601,7 +600,7 @@ class ActionNodeExecutor: # 4. Apply declarative paramMappers from the node definition _applyParamMappers(nodeDef, resolvedParams) - if nodeType == "file.create": + if nodeDef.get("logContextResolution"): _log_file_create_context_resolution(nodeId, params, resolvedParams, context) # 5. email.checkEmail pause for email wait @@ -619,26 +618,7 @@ class ActionNodeExecutor: } raise PauseForEmailWaitError(runId=runId, nodeId=nodeId, waitConfig=waitConfig) - # 6. AI nodes: normalize legacy "prompt" -> "aiPrompt" - if nodeType == "ai.prompt": - if "aiPrompt" not in resolvedParams and "prompt" in resolvedParams: - resolvedParams["aiPrompt"] = resolvedParams.pop("prompt") - - # 7. Build context for email.draftEmail from subject + body - if nodeType == "email.draftEmail": - subject = resolvedParams.get("subject", "") - body = resolvedParams.get("body", "") - if subject or body: - contextParts = [] - if subject: - contextParts.append(f"Subject: {subject}") - if body: - contextParts.append(f"Body:\n{body}") - resolvedParams["context"] = "\n\n".join(contextParts) - resolvedParams.pop("subject", None) - resolvedParams.pop("body", None) - - # 8. Create progress parent so nested actions have a hierarchy + # 6. Create progress parent so nested actions have a hierarchy import time as _time nodeOperationId = f"node_{nodeId}_{context.get('_runId', 'x')}_{int(_time.time())}" chatService = getattr(self.services, "chat", None) @@ -796,23 +776,17 @@ class ActionNodeExecutor: out.setdefault("context", ctx_str if ctx_str else "") rsp = str(out.get("response") or "").strip() if not rsp: - if nodeType != "context.extractContent": - out["response"] = extractedContext or "" - else: + if nodeDef.get("clearResponse"): out["response"] = "" + else: + out["response"] = extractedContext or "" if result.success: img_only = _image_documents_from_docs_list(docsList) - if ( - nodeType == "context.extractContent" - and isinstance(result.data, dict) - ): + if nodeDef.get("imageDocumentsFromExtractData") and isinstance(result.data, dict): img_only = list(img_only) + _image_refs_from_extract_node_data(result.data) - # mergeContext packs iterated payloads under ``data.merged`` only — ``documents`` - # on the ActionResult is empty, so image sidecars live on ``merged.imageDocumentsOnly``. - if ( - nodeType == "context.mergeContext" - and isinstance(result.data, dict) - ): + if nodeDef.get("imageDocumentsFromMerged") and isinstance(result.data, dict): + # mergeContext packs iterated image sidecars under ``data.merged.imageDocumentsOnly`` + # rather than the top-level ``documents`` list which is always empty. merged_blob = result.data.get("merged") if isinstance(merged_blob, dict): merged_imgs = merged_blob.get("imageDocumentsOnly") @@ -842,11 +816,11 @@ class ActionNodeExecutor: _attachConnectionProvenance(cr_out, resolvedParams, outputSchema, chatService, self.services) return normalizeToSchema(cr_out, outputSchema) - if nodeType == "context.extractContent": + if nodeDef.get("popDocumentsFromOutput"): out.pop("documents", None) if outputSchema in ("AiResult", "ActionResult") and result.success: - _attach_unified_presentation_data(out, node_type=nodeType) + _attach_unified_presentation_data(out, node_def=nodeDef) _attachConnectionProvenance(out, resolvedParams, outputSchema, chatService, self.services) diff --git a/modules/workflows/automation2/pickNotPushMigration.py b/modules/workflows/automation2/pickNotPushMigration.py index 0bc7072f..a40e6c33 100644 --- a/modules/workflows/automation2/pickNotPushMigration.py +++ b/modules/workflows/automation2/pickNotPushMigration.py @@ -143,7 +143,14 @@ def materializePrimaryTextHandover(graph: Dict[str, Any]) -> Dict[str, Any]: continue out_port = (src_def.get("outputPorts") or {}).get(0, {}) or {} out_schema = resolve_output_schema_name(src_node, out_port if isinstance(out_port, dict) else {}) - ref_path = PRIMARY_TEXT_HANDOVER_REF_PATH.get(out_schema) + # Port-level override takes precedence over the schema-wide default path. + # Example: context.extractContent sets primaryTextRefPath=["data"] because + # its ``response`` field is intentionally empty. + ref_path = ( + out_port.get("primaryTextRefPath") + if isinstance(out_port, dict) and out_port.get("primaryTextRefPath") + else PRIMARY_TEXT_HANDOVER_REF_PATH.get(out_schema) + ) if not ref_path: continue params[pname] = _data_ref(src_id, list(ref_path)) diff --git a/modules/workflows/methods/methodContext/actions/mergeContext.py b/modules/workflows/methods/methodContext/actions/mergeContext.py index 8bc76e4b..79582cf2 100644 --- a/modules/workflows/methods/methodContext/actions/mergeContext.py +++ b/modules/workflows/methods/methodContext/actions/mergeContext.py @@ -70,7 +70,11 @@ def _merge_payload(item: Any) -> Optional[Dict[str, Any]]: """ if not isinstance(item, dict): return None - if item.get("success") is False: + # Opt-in: only merge items that explicitly report success. + # Items without a ``success`` key (e.g. DocumentList, Transit outputs) are + # still included so non-action node results are not silently dropped. + success_val = item.get("success") + if success_val is not None and success_val is not True: return None out = dict(item) if isinstance(out.get("documents"), list): @@ -223,7 +227,9 @@ async def mergeContext(self, parameters: Dict[str, Any]) -> ActionResult: return ActionResult.isFailure(error="Alle Einträge in der Datenquelle sind leer.") primary = _synthesize_primary_response(merged, inputs) - merged["response"] = primary + # ``response`` lives only at the top-level of the data envelope (``payload["response"]``). + # Do NOT set ``merged["response"]`` — that would duplicate it inside the deep-merged blob + # and overwrite whatever the natural merge produced for debugging. _ps = primary if isinstance(primary, str) else repr(primary) logger.info( @@ -231,7 +237,7 @@ async def mergeContext(self, parameters: Dict[str, Any]) -> ActionResult: len(inputs), list(merged.keys())[:20], len(_ps or ""), - (_ps[:200] + "…") if len(_ps) > 200 else _ps, + (_ps[:200] + "\u2026") if len(_ps) > 200 else _ps, len(conflicts), ) payload: Dict[str, Any] = { diff --git a/modules/workflows/methods/methodContext/methodContext.py b/modules/workflows/methods/methodContext/methodContext.py index b82d4356..2f12f707 100644 --- a/modules/workflows/methods/methodContext/methodContext.py +++ b/modules/workflows/methods/methodContext/methodContext.py @@ -157,10 +157,10 @@ class MethodContext(MethodBase): name="dataSource", type="Any", frontendType=FrontendType.CONTEXT_BUILDER, - required=False, + required=True, description=( "Datenquelle (DataRef), meist Schleife → Alle Schleifen-Ergebnisse. " - "Optional wenn der Knoten per Kabel am Schleifen-„Fertig“-Ausgang hängt." + "Pflichtfeld — die Implementierung wirft einen Fehler wenn kein Wert übergeben wird." ), ), }, diff --git a/tests/unit/workflow/test_merge_context_handover.py b/tests/unit/workflow/test_merge_context_handover.py index cd2bdfc3..30a60b8f 100644 --- a/tests/unit/workflow/test_merge_context_handover.py +++ b/tests/unit/workflow/test_merge_context_handover.py @@ -103,7 +103,12 @@ async def test_mergeContext_merged_response_wins_over_handover_chunks(): @pytest.mark.asyncio async def test_mergeContext_concatenates_each_iteration_data_response_not_only_last(): - """deep_merge overwrites ``response``; synthesis must still include every loop body result.""" + """Synthesized response must include every loop body chunk, not just the last one. + + ``response`` lives only at the top level of the data envelope (``data["response"]``). + The deep-merged ``data["merged"]`` dict retains whatever the natural merge produced + for per-item fields — it is NOT overwritten with the synthesized primary text. + """ items = [ {"success": True, "data": {"response": "chunk-aaa"}}, {"success": True, "data": {"response": "chunk-bbb"}}, @@ -116,7 +121,9 @@ async def test_mergeContext_concatenates_each_iteration_data_response_not_only_l assert "chunk-bbb" in r assert "chunk-ccc" in r assert r == "chunk-aaa\n\nchunk-bbb\n\nchunk-ccc" - assert result.data["merged"]["response"] == r + # ``merged["response"]`` reflects the natural deep-merge result (last chunk wins), + # NOT the synthesized primary. The canonical synthesized text is at data["response"]. + assert result.data["merged"].get("response") != r or len(items) == 1 @pytest.mark.asyncio diff --git a/tests/unit/workflow/test_node_combinations.py b/tests/unit/workflow/test_node_combinations.py new file mode 100644 index 00000000..7c419f6a --- /dev/null +++ b/tests/unit/workflow/test_node_combinations.py @@ -0,0 +1,373 @@ +# Tests: node handover compatibility across all major node combinations. +# +# Covers: +# - extractContent → file.create (direct, no loop) +# - loop.bodyResults → mergeContext → file.create +# - ai.prompt → transformContext → file.create +# - flow.merge with mixed upstream schemas (AiResult + ActionResult) +# - flow.ifElse Transit output accepted by downstream nodes +# - extractContent fan-in → mergeContext (multiple items, no loop) +# - data.aggregate → data.consolidate path +# - Node flags for executor behaviour (no hardcoded type strings) + +import json + +import pytest + +from modules.features.graphicalEditor.nodeDefinitions import STATIC_NODE_TYPES +from modules.features.graphicalEditor.portTypes import PORT_TYPE_CATALOG +from modules.workflows.methods.methodContext.actions.extractContent import ( + PRESENTATION_KIND, + build_presentation_envelope_from_plain_text, + normalize_presentation_envelopes, +) +from modules.workflows.methods.methodContext.actions.mergeContext import mergeContext + +_NODE_BY_ID = {n["id"]: n for n in STATIC_NODE_TYPES} + + +# --------------------------------------------------------------------------- +# Helper builders +# --------------------------------------------------------------------------- + +def _extract_output(text: str) -> dict: + """Minimal extractContent-style output (presentation envelope in ``data``).""" + pres = build_presentation_envelope_from_plain_text(text, source_name="test") + return {"success": True, "response": "", "data": pres, "documents": []} + + +def _ai_output(response: str) -> dict: + """Minimal ai.prompt-style output.""" + return {"success": True, "response": response, "data": {}, "documents": []} + + +# --------------------------------------------------------------------------- +# 1. extractContent → file.create (direct path) +# --------------------------------------------------------------------------- + +def test_extract_to_file_create_recommended_ref_is_data(): + """materializeRecommendedDataPickRef must resolve extractContent port 0 to path ['data'].""" + from modules.workflows.automation2.pickNotPushMigration import materializeRecommendedDataPickRef + + graph = { + "nodes": [ + {"id": "ex1", "type": "context.extractContent", "parameters": {}}, + { + "id": "fc1", + "type": "file.create", + "parameters": {"context": "", "outputFormat": "docx"}, + }, + ], + "connections": [{"source": "ex1", "target": "fc1", "sourceOutput": 0, "targetInput": 0}], + } + migrated = materializeRecommendedDataPickRef(graph) + fc = next(n for n in migrated["nodes"] if n["id"] == "fc1") + ctx_ref = fc["parameters"].get("context") + # file.create.context has frontendType="contextBuilder" → materialized as a list + assert isinstance(ctx_ref, list), "context should be materialized as a contextBuilder list" + assert len(ctx_ref) == 1 + assert ctx_ref[0]["nodeId"] == "ex1" + assert ctx_ref[0]["path"] == ["data"] + + +def test_extract_output_is_accepted_as_file_create_context(): + """extractContent presentation envelope must be normalizable for file.create.""" + out = _extract_output("Hello world") + envelopes = normalize_presentation_envelopes(out["data"]) + assert len(envelopes) == 1 + assert envelopes[0].get("kind") == PRESENTATION_KIND + + +def test_extract_output_response_is_empty(): + """extractContent must leave ``response`` empty — canonical text is in ``data``.""" + out = _extract_output("Some extracted content") + assert out["response"] == "" + + +# --------------------------------------------------------------------------- +# 2. primaryTextRef: extractContent overrides path to ["data"] +# --------------------------------------------------------------------------- + +def test_extract_primary_text_ref_override_materializes_to_data(): + """When ai.prompt connects to extractContent, primaryTextRef must resolve to ['data'].""" + from modules.workflows.automation2.pickNotPushMigration import materializePrimaryTextHandover + + graph = { + "nodes": [ + {"id": "ex1", "type": "context.extractContent", "parameters": {}}, + { + "id": "ai1", + "type": "ai.prompt", + "parameters": {"context": "", "aiPrompt": "Summarize"}, + }, + ], + "connections": [{"source": "ex1", "target": "ai1", "sourceOutput": 0, "targetInput": 0}], + } + migrated = materializePrimaryTextHandover(graph) + ai = next(n for n in migrated["nodes"] if n["id"] == "ai1") + ctx_ref = ai["parameters"].get("context") + # ai.prompt.context is primaryTextRef → single DataRef dict (not wrapped in list) + assert isinstance(ctx_ref, dict), f"Expected a DataRef dict, got {ctx_ref!r}" + assert ctx_ref["nodeId"] == "ex1" + assert ctx_ref["path"] == ["data"], ( + "extractContent.response is empty; primaryTextRef must point to ['data']" + ) + + +# --------------------------------------------------------------------------- +# 3. loop.bodyResults → mergeContext → file.create +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_loop_body_results_into_merge_context_produces_file_create_compatible_envelope(): + """bodyResults from a loop (list of extractContent outputs) must merge correctly.""" + body_results = [ + _extract_output("Page 1 text"), + _extract_output("Page 2 text"), + ] + result = await mergeContext(object(), {"dataSource": body_results}) + assert result.success + data = result.data + assert data.get("kind") == "context.mergeContext.v1" + assert "response" in data + assert data["response"] + # Downstream file.create uses normalize_presentation_envelopes on the full payload + envelopes = normalize_presentation_envelopes(data) + assert len(envelopes) >= 1 + + +@pytest.mark.asyncio +async def test_merge_context_response_not_duplicated_in_merged_blob(): + """``response`` must live only at the top-level of ``data``, not inside ``data.merged``.""" + body_results = [_extract_output("Item A"), _extract_output("Item B")] + result = await mergeContext(object(), {"dataSource": body_results}) + assert result.success + merged_blob = result.data.get("merged", {}) + # The natural deep-merge may include response from individual items — but + # _synthesize_primary_response no longer OVERWRITES merged["response"]. + # Verify canonical response is the synthesized one at top-level. + assert result.data.get("response") + assert "Item A" in result.data["response"] or "Item B" in result.data["response"] + + +@pytest.mark.asyncio +async def test_merge_context_skips_failed_items(): + """Items with ``success=False`` must be excluded from the deep-merge. + + Note: ``count`` reflects total inputs (including failed ones since they were + received); only the deep-merge payload excludes failed items. + """ + good = _extract_output("Good text") + bad = {"success": False, "error": "something failed", "data": {}, "documents": []} + result = await mergeContext(object(), {"dataSource": [good, bad]}) + assert result.success + # response is synthesized only from good items + assert "Good text" in result.data.get("response", "") + # merged blob should not contain the error or failed item's data + merged = result.data.get("merged", {}) + assert merged.get("error") != "something failed" + + +@pytest.mark.asyncio +async def test_merge_context_items_without_success_key_are_included(): + """Items without a ``success`` key (e.g. DocumentList output) must not be dropped.""" + no_success = {"documents": [{"documentName": "a.pdf"}], "count": 1} + result = await mergeContext(object(), {"dataSource": [no_success]}) + assert result.success + assert result.data.get("count") == 1 + + +# --------------------------------------------------------------------------- +# 4. ai.prompt → transformContext (primaryTextRef) +# --------------------------------------------------------------------------- + +def test_ai_prompt_primary_text_ref_materializes_to_response(): + """primaryTextRef from ai.prompt output must resolve to ['response'].""" + from modules.workflows.automation2.pickNotPushMigration import materializePrimaryTextHandover + + graph = { + "nodes": [ + {"id": "ai1", "type": "ai.prompt", "parameters": {}}, + { + "id": "ai2", + "type": "ai.prompt", + "parameters": {"context": "", "aiPrompt": "Continue"}, + }, + ], + "connections": [{"source": "ai1", "target": "ai2", "sourceOutput": 0, "targetInput": 0}], + } + migrated = materializePrimaryTextHandover(graph) + ai2 = next(n for n in migrated["nodes"] if n["id"] == "ai2") + ctx_ref = ai2["parameters"].get("context") + assert isinstance(ctx_ref, dict), f"Expected DataRef dict, got {ctx_ref!r}" + assert ctx_ref["path"] == ["response"] + + +def test_transform_context_from_graph_result_schema_is_action_result(): + """context.transformContext must declare ``fromGraphResultSchema: ActionResult``.""" + node = _NODE_BY_ID["context.transformContext"] + port = node["outputPorts"][0] + assert port.get("fromGraphResultSchema") == "ActionResult", ( + "fromGraph port on transformContext must be normalized as ActionResult, not FormPayload" + ) + + +# --------------------------------------------------------------------------- +# 5. flow.merge with mixed upstream schemas +# --------------------------------------------------------------------------- + +def test_flow_merge_accepts_ai_result_and_action_result(): + """Both AiResult and ActionResult must be in flow.merge input accepts.""" + node = _NODE_BY_ID["flow.merge"] + all_accepts = set() + for port in node.get("inputPorts", {}).values(): + all_accepts.update(port.get("accepts", [])) + assert "AiResult" in all_accepts + assert "ActionResult" in all_accepts + assert "Transit" in all_accepts + + +def test_flow_merge_input_count_parameter_exists_with_correct_range(): + """inputCount parameter must allow 2–5 inputs.""" + node = _NODE_BY_ID["flow.merge"] + ic_param = next((p for p in node["parameters"] if p["name"] == "inputCount"), None) + assert ic_param is not None + opts = ic_param.get("frontendOptions", {}) + assert opts.get("min") == 2 + assert opts.get("max") == 5 + + +# --------------------------------------------------------------------------- +# 6. flow.ifElse Transit output accepted downstream +# --------------------------------------------------------------------------- + +def test_flow_if_else_output_is_transit(): + """flow.ifElse must output Transit on both branches.""" + node = _NODE_BY_ID["flow.ifElse"] + for port_ix in (0, 1): + schema = node["outputPorts"][port_ix].get("schema") + assert schema == "Transit", f"ifElse port {port_ix} must be Transit, got {schema!r}" + + +def test_transit_accepted_by_all_major_downstream_nodes(): + """All major action nodes must accept Transit input on port 0.""" + expected_transit_accepting = [ + "context.extractContent", + "context.mergeContext", + "context.transformContext", + "ai.prompt", + "file.create", + ] + for node_id in expected_transit_accepting: + node = _NODE_BY_ID[node_id] + accepts = node["inputPorts"][0].get("accepts", []) + assert "Transit" in accepts, f"{node_id} port 0 must accept Transit" + + +# --------------------------------------------------------------------------- +# 7. extractContent fan-in → mergeContext (multiple items, no loop) +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_multiple_extract_outputs_fan_in_to_merge_context(): + """Multiple extractContent outputs passed as a list must merge into one envelope.""" + items = [_extract_output(f"Document {i}") for i in range(3)] + result = await mergeContext(object(), {"dataSource": items}) + assert result.success + assert result.data.get("count") == 3 + assert result.data.get("kind") == "context.mergeContext.v1" + response = result.data.get("response", "") + for i in range(3): + assert f"Document {i}" in response + + +# --------------------------------------------------------------------------- +# 8. data.aggregate → data.consolidate schema compatibility +# --------------------------------------------------------------------------- + +def test_data_aggregate_output_accepted_by_consolidate(): + """data.consolidate must accept AggregateResult from data.aggregate.""" + agg_node = _NODE_BY_ID["data.aggregate"] + con_node = _NODE_BY_ID["data.consolidate"] + agg_schema = agg_node["outputPorts"][0].get("schema") + con_accepts = con_node["inputPorts"][0].get("accepts", []) + assert agg_schema in con_accepts, ( + f"data.consolidate port 0 must accept {agg_schema!r} output from data.aggregate" + ) + + +# --------------------------------------------------------------------------- +# 9. Node executor flags (no hardcoded type strings in executor) +# --------------------------------------------------------------------------- + +def test_extract_content_executor_flags(): + """context.extractContent must carry all executor-behaviour flags.""" + node = _NODE_BY_ID["context.extractContent"] + assert node.get("skipUnifiedPresentation") is True + assert node.get("clearResponse") is True + assert node.get("imageDocumentsFromExtractData") is True + assert node.get("popDocumentsFromOutput") is True + + +def test_extract_content_primary_text_ref_path_override(): + """context.extractContent output port 0 must declare primaryTextRefPath=['data'].""" + node = _NODE_BY_ID["context.extractContent"] + port = node["outputPorts"][0] + assert port.get("primaryTextRefPath") == ["data"] + + +def test_merge_context_image_documents_flag(): + """context.mergeContext must carry imageDocumentsFromMerged flag.""" + node = _NODE_BY_ID["context.mergeContext"] + assert node.get("imageDocumentsFromMerged") is True + + +def test_file_create_log_context_resolution_flag(): + """file.create must carry logContextResolution flag.""" + node = _NODE_BY_ID["file.create"] + assert node.get("logContextResolution") is True + + +# --------------------------------------------------------------------------- +# 10. AiResult catalog must include data field +# --------------------------------------------------------------------------- + +def test_ai_result_catalog_has_data_field(): + """AiResult in PORT_TYPE_CATALOG must document the ``data`` field.""" + schema = PORT_TYPE_CATALOG["AiResult"] + field_names = [f.name for f in schema.fields] + assert "data" in field_names, "AiResult must document the data field set by executor" + + +# --------------------------------------------------------------------------- +# 11. _outputSchemaForNode returns ActionResult for context.transformContext +# --------------------------------------------------------------------------- + +def test_output_schema_for_transform_context_is_action_result(): + """_outputSchemaForNode must return ActionResult for context.transformContext.""" + from modules.workflows.automation2.executionEngine import _outputSchemaForNode + schema = _outputSchemaForNode("context.transformContext") + assert schema == "ActionResult", ( + f"Expected ActionResult, got {schema!r}. fromGraph port must use fromGraphResultSchema." + ) + + +# --------------------------------------------------------------------------- +# 12. flow.merge barrier, context.mergeContext NOT a barrier +# --------------------------------------------------------------------------- + +def test_flow_merge_is_barrier(): + from modules.workflows.automation2.executionEngine import _isBarrierNode + assert _isBarrierNode("flow.merge") is True + + +def test_context_merge_context_is_not_barrier(): + """context.mergeContext is not a barrier — it receives data via dataSource DataRef.""" + from modules.workflows.automation2.executionEngine import _isBarrierNode + assert _isBarrierNode("context.mergeContext") is False + + +def test_no_node_named_is_merge_node_in_engine(): + """Legacy _isMergeNode alias must be removed from executionEngine.""" + import modules.workflows.automation2.executionEngine as eng + assert not hasattr(eng, "_isMergeNode"), "_isMergeNode legacy alias must be deleted"