fix: kritische bugs behoben, legacy code entfernt, test suite erweitert

This commit is contained in:
Ida 2026-05-14 17:02:55 +02:00
parent 64b58802a4
commit 716837e8fb
11 changed files with 457 additions and 55 deletions

View file

@ -271,6 +271,10 @@ CONTEXT_NODES = [
"outputPorts": {
0: {
"schema": "ActionResult",
# Override the schema-level primaryTextRef path: ``response`` is intentionally
# empty for this node; downstream nodes with ``primaryTextRef`` should resolve to
# the full presentation object under ``data``.
"primaryTextRefPath": ["data"],
# Authoritative DataPicker paths (same idea as ``parameters`` for configuration).
# Frontend uses only this list — no schema expansion merge for this port.
"dataPickOptions": [
@ -316,6 +320,11 @@ CONTEXT_NODES = [
"meta": {"icon": "mdi-file-tree-outline", "color": "#00897B", "usesAi": False},
"_method": "context",
"_action": "extractContent",
# Executor behaviour flags — drives actionNodeExecutor without hardcoded type checks.
"skipUnifiedPresentation": True,
"clearResponse": True,
"imageDocumentsFromExtractData": True,
"popDocumentsFromOutput": True,
},
{
"id": "context.mergeContext",
@ -353,6 +362,9 @@ CONTEXT_NODES = [
"meta": {"icon": "mdi-call-merge", "color": "#7B1FA2", "usesAi": False},
"_method": "context",
"_action": "mergeContext",
# Image documents live on ``data.merged.imageDocumentsOnly`` (accumulated across
# iterations) rather than the top-level ``documents`` list which is always empty.
"imageDocumentsFromMerged": True,
},
{
"id": "context.transformContext",
@ -421,6 +433,9 @@ CONTEXT_NODES = [
"deriveFrom": "mappings",
"deriveNameField": "outputField",
"dataPickOptions": CONTEXT_ENVELOPE_DATA_PICK_OPTIONS,
# ActionResult is the correct normalization schema — NOT FormPayload.
# The output is a versionned ActionResult envelope built by contextEnvelope.
"fromGraphResultSchema": "ActionResult",
}
},
"injectUpstreamPayload": True,

View file

@ -37,5 +37,7 @@ FILE_NODES = [
"meta": {"icon": "mdi-file-plus-outline", "color": "#2196F3", "usesAi": False},
"_method": "file",
"_action": "create",
# Emit a debug log tracing how the ``context`` parameter was resolved.
"logContextResolution": True,
},
]

View file

@ -293,6 +293,12 @@ FLOW_NODES = [
"default": 2,
},
],
# ``inputs: 2`` is the static minimum / default topology. ``inputCount`` is a
# frontend hint: the editor adds/removes input ports dynamically when the user
# changes the value. ``FlowExecutor._merge`` collects whatever ports exist in
# ``inputSources`` at runtime, so extra ports (35) work without further changes
# to this definition. ``inputPorts`` below only type-declares the two minimum
# ports; additional ports inherit the same ``_FLOW_INPUT_SCHEMAS`` accepts list.
"inputs": 2,
"outputs": 1,
"inputPorts": {

View file

@ -252,6 +252,16 @@ PORT_TYPE_CATALOG: Dict[str, PortSchema] = {
picker_label=t("Alle Ausgabe-Dateien (Liste)"),
picker_item_label=t("je Datei"),
),
PortField(name="data", type="Dict", required=False,
description=(
"Internes Payload-Objekt (entspricht ``ActionResult.data``-Semantik). "
"Wird vom Executor gesetzt und enthält denselben Inhalt wie ``response`` "
"in strukturierter Form; primär für nachgelagerte Kontext-Nodes."
),
picker_label=t("Technische Detaildaten (data)")),
PortField(name="imageDocumentsOnly", type="List[Document]", required=False,
description="Nur Bild-bezogene Einträge aus documents.",
picker_label=t("Nur Bilder (Liste)")),
]),
"BoolResult": PortSchema(name="BoolResult", fields=[
PortField(name="result", type="bool",

View file

@ -86,7 +86,11 @@ def _outputSchemaForNode(nodeType: str) -> Optional[str]:
if isinstance(p0, dict):
spec = p0.get("schema")
if isinstance(spec, dict) and spec.get("kind") == "fromGraph":
return "FormPayload"
# Read override from the port definition — ``FormPayload`` is the
# fallback for true form nodes; dynamic context nodes (e.g.
# context.transformContext) declare ``fromGraphResultSchema`` to
# avoid wrong normalization.
return p0.get("fromGraphResultSchema") or "FormPayload"
if isinstance(spec, str):
return spec
return None
@ -96,8 +100,11 @@ def _isBarrierNode(nodeType: str) -> bool:
"""Barrier nodes wait for all connected predecessors before executing.
Backwards compatible: ``flow.merge`` is always a barrier. Any other node may
declare ``waitsForAllPredecessors: True`` in its STATIC_NODE_TYPES entry
(e.g. ``context.mergeContext``).
declare ``waitsForAllPredecessors: True`` in its STATIC_NODE_TYPES entry.
Note: ``context.mergeContext`` is NOT a barrier it receives its list of
inputs via the ``dataSource`` DataRef parameter (typically ``loop.bodyResults``)
and executes once its single upstream edge is satisfied.
"""
if nodeType == "flow.merge":
return True
@ -107,10 +114,6 @@ def _isBarrierNode(nodeType: str) -> bool:
return False
# Legacy alias used inside this module.
_isMergeNode = _isBarrierNode
def _allMergePredecessorsReady(
nodeId: str,
connectionMap: Dict[str, List],
@ -249,7 +252,6 @@ def _emitStepEvent(runId: str, stepData: Dict[str, Any]) -> None:
queueId = f"run-trace-{runId}"
if not em.has_queue(queueId):
return
import asyncio
loop = asyncio.get_event_loop()
if loop.is_running():
asyncio.ensure_future(em.emit_event(queueId, "step", stepData, event_category="tracing"))

View file

@ -31,12 +31,11 @@ from modules.workflows.methods.methodContext.actions.extractContent import (
logger = logging.getLogger(__name__)
_FILE_CREATE_CTX_LOG_MAX = 500
_SKIP_UNIFIED_PRESENTATION_NODES = frozenset({"context.extractContent"})
def _attach_unified_presentation_data(out: Dict[str, Any], *, node_type: str) -> None:
def _attach_unified_presentation_data(out: Dict[str, Any], *, node_def: Dict[str, Any]) -> None:
"""Ensure ``out[\"data\"]`` carries ``context.extractContent.presentation.v1`` for ``file.create``."""
if node_type in _SKIP_UNIFIED_PRESENTATION_NODES:
if node_def.get("skipUnifiedPresentation"):
return
data = out.get("data")
if isinstance(data, dict) and data.get("kind") == PRESENTATION_KIND:
@ -601,7 +600,7 @@ class ActionNodeExecutor:
# 4. Apply declarative paramMappers from the node definition
_applyParamMappers(nodeDef, resolvedParams)
if nodeType == "file.create":
if nodeDef.get("logContextResolution"):
_log_file_create_context_resolution(nodeId, params, resolvedParams, context)
# 5. email.checkEmail pause for email wait
@ -619,26 +618,7 @@ class ActionNodeExecutor:
}
raise PauseForEmailWaitError(runId=runId, nodeId=nodeId, waitConfig=waitConfig)
# 6. AI nodes: normalize legacy "prompt" -> "aiPrompt"
if nodeType == "ai.prompt":
if "aiPrompt" not in resolvedParams and "prompt" in resolvedParams:
resolvedParams["aiPrompt"] = resolvedParams.pop("prompt")
# 7. Build context for email.draftEmail from subject + body
if nodeType == "email.draftEmail":
subject = resolvedParams.get("subject", "")
body = resolvedParams.get("body", "")
if subject or body:
contextParts = []
if subject:
contextParts.append(f"Subject: {subject}")
if body:
contextParts.append(f"Body:\n{body}")
resolvedParams["context"] = "\n\n".join(contextParts)
resolvedParams.pop("subject", None)
resolvedParams.pop("body", None)
# 8. Create progress parent so nested actions have a hierarchy
# 6. Create progress parent so nested actions have a hierarchy
import time as _time
nodeOperationId = f"node_{nodeId}_{context.get('_runId', 'x')}_{int(_time.time())}"
chatService = getattr(self.services, "chat", None)
@ -796,23 +776,17 @@ class ActionNodeExecutor:
out.setdefault("context", ctx_str if ctx_str else "")
rsp = str(out.get("response") or "").strip()
if not rsp:
if nodeType != "context.extractContent":
out["response"] = extractedContext or ""
else:
if nodeDef.get("clearResponse"):
out["response"] = ""
else:
out["response"] = extractedContext or ""
if result.success:
img_only = _image_documents_from_docs_list(docsList)
if (
nodeType == "context.extractContent"
and isinstance(result.data, dict)
):
if nodeDef.get("imageDocumentsFromExtractData") and isinstance(result.data, dict):
img_only = list(img_only) + _image_refs_from_extract_node_data(result.data)
# mergeContext packs iterated payloads under ``data.merged`` only — ``documents``
# on the ActionResult is empty, so image sidecars live on ``merged.imageDocumentsOnly``.
if (
nodeType == "context.mergeContext"
and isinstance(result.data, dict)
):
if nodeDef.get("imageDocumentsFromMerged") and isinstance(result.data, dict):
# mergeContext packs iterated image sidecars under ``data.merged.imageDocumentsOnly``
# rather than the top-level ``documents`` list which is always empty.
merged_blob = result.data.get("merged")
if isinstance(merged_blob, dict):
merged_imgs = merged_blob.get("imageDocumentsOnly")
@ -842,11 +816,11 @@ class ActionNodeExecutor:
_attachConnectionProvenance(cr_out, resolvedParams, outputSchema, chatService, self.services)
return normalizeToSchema(cr_out, outputSchema)
if nodeType == "context.extractContent":
if nodeDef.get("popDocumentsFromOutput"):
out.pop("documents", None)
if outputSchema in ("AiResult", "ActionResult") and result.success:
_attach_unified_presentation_data(out, node_type=nodeType)
_attach_unified_presentation_data(out, node_def=nodeDef)
_attachConnectionProvenance(out, resolvedParams, outputSchema, chatService, self.services)

View file

@ -143,7 +143,14 @@ def materializePrimaryTextHandover(graph: Dict[str, Any]) -> Dict[str, Any]:
continue
out_port = (src_def.get("outputPorts") or {}).get(0, {}) or {}
out_schema = resolve_output_schema_name(src_node, out_port if isinstance(out_port, dict) else {})
ref_path = PRIMARY_TEXT_HANDOVER_REF_PATH.get(out_schema)
# Port-level override takes precedence over the schema-wide default path.
# Example: context.extractContent sets primaryTextRefPath=["data"] because
# its ``response`` field is intentionally empty.
ref_path = (
out_port.get("primaryTextRefPath")
if isinstance(out_port, dict) and out_port.get("primaryTextRefPath")
else PRIMARY_TEXT_HANDOVER_REF_PATH.get(out_schema)
)
if not ref_path:
continue
params[pname] = _data_ref(src_id, list(ref_path))

View file

@ -70,7 +70,11 @@ def _merge_payload(item: Any) -> Optional[Dict[str, Any]]:
"""
if not isinstance(item, dict):
return None
if item.get("success") is False:
# Opt-in: only merge items that explicitly report success.
# Items without a ``success`` key (e.g. DocumentList, Transit outputs) are
# still included so non-action node results are not silently dropped.
success_val = item.get("success")
if success_val is not None and success_val is not True:
return None
out = dict(item)
if isinstance(out.get("documents"), list):
@ -223,7 +227,9 @@ async def mergeContext(self, parameters: Dict[str, Any]) -> ActionResult:
return ActionResult.isFailure(error="Alle Einträge in der Datenquelle sind leer.")
primary = _synthesize_primary_response(merged, inputs)
merged["response"] = primary
# ``response`` lives only at the top-level of the data envelope (``payload["response"]``).
# Do NOT set ``merged["response"]`` — that would duplicate it inside the deep-merged blob
# and overwrite whatever the natural merge produced for debugging.
_ps = primary if isinstance(primary, str) else repr(primary)
logger.info(
@ -231,7 +237,7 @@ async def mergeContext(self, parameters: Dict[str, Any]) -> ActionResult:
len(inputs),
list(merged.keys())[:20],
len(_ps or ""),
(_ps[:200] + "") if len(_ps) > 200 else _ps,
(_ps[:200] + "\u2026") if len(_ps) > 200 else _ps,
len(conflicts),
)
payload: Dict[str, Any] = {

View file

@ -157,10 +157,10 @@ class MethodContext(MethodBase):
name="dataSource",
type="Any",
frontendType=FrontendType.CONTEXT_BUILDER,
required=False,
required=True,
description=(
"Datenquelle (DataRef), meist Schleife → Alle Schleifen-Ergebnisse. "
"Optional wenn der Knoten per Kabel am Schleifen-„Fertig“-Ausgang hängt."
"Pflichtfeld — die Implementierung wirft einen Fehler wenn kein Wert übergeben wird."
),
),
},

View file

@ -103,7 +103,12 @@ async def test_mergeContext_merged_response_wins_over_handover_chunks():
@pytest.mark.asyncio
async def test_mergeContext_concatenates_each_iteration_data_response_not_only_last():
"""deep_merge overwrites ``response``; synthesis must still include every loop body result."""
"""Synthesized response must include every loop body chunk, not just the last one.
``response`` lives only at the top level of the data envelope (``data["response"]``).
The deep-merged ``data["merged"]`` dict retains whatever the natural merge produced
for per-item fields it is NOT overwritten with the synthesized primary text.
"""
items = [
{"success": True, "data": {"response": "chunk-aaa"}},
{"success": True, "data": {"response": "chunk-bbb"}},
@ -116,7 +121,9 @@ async def test_mergeContext_concatenates_each_iteration_data_response_not_only_l
assert "chunk-bbb" in r
assert "chunk-ccc" in r
assert r == "chunk-aaa\n\nchunk-bbb\n\nchunk-ccc"
assert result.data["merged"]["response"] == r
# ``merged["response"]`` reflects the natural deep-merge result (last chunk wins),
# NOT the synthesized primary. The canonical synthesized text is at data["response"].
assert result.data["merged"].get("response") != r or len(items) == 1
@pytest.mark.asyncio

View file

@ -0,0 +1,373 @@
# Tests: node handover compatibility across all major node combinations.
#
# Covers:
# - extractContent → file.create (direct, no loop)
# - loop.bodyResults → mergeContext → file.create
# - ai.prompt → transformContext → file.create
# - flow.merge with mixed upstream schemas (AiResult + ActionResult)
# - flow.ifElse Transit output accepted by downstream nodes
# - extractContent fan-in → mergeContext (multiple items, no loop)
# - data.aggregate → data.consolidate path
# - Node flags for executor behaviour (no hardcoded type strings)
import json
import pytest
from modules.features.graphicalEditor.nodeDefinitions import STATIC_NODE_TYPES
from modules.features.graphicalEditor.portTypes import PORT_TYPE_CATALOG
from modules.workflows.methods.methodContext.actions.extractContent import (
PRESENTATION_KIND,
build_presentation_envelope_from_plain_text,
normalize_presentation_envelopes,
)
from modules.workflows.methods.methodContext.actions.mergeContext import mergeContext
_NODE_BY_ID = {n["id"]: n for n in STATIC_NODE_TYPES}
# ---------------------------------------------------------------------------
# Helper builders
# ---------------------------------------------------------------------------
def _extract_output(text: str) -> dict:
"""Minimal extractContent-style output (presentation envelope in ``data``)."""
pres = build_presentation_envelope_from_plain_text(text, source_name="test")
return {"success": True, "response": "", "data": pres, "documents": []}
def _ai_output(response: str) -> dict:
"""Minimal ai.prompt-style output."""
return {"success": True, "response": response, "data": {}, "documents": []}
# ---------------------------------------------------------------------------
# 1. extractContent → file.create (direct path)
# ---------------------------------------------------------------------------
def test_extract_to_file_create_recommended_ref_is_data():
"""materializeRecommendedDataPickRef must resolve extractContent port 0 to path ['data']."""
from modules.workflows.automation2.pickNotPushMigration import materializeRecommendedDataPickRef
graph = {
"nodes": [
{"id": "ex1", "type": "context.extractContent", "parameters": {}},
{
"id": "fc1",
"type": "file.create",
"parameters": {"context": "", "outputFormat": "docx"},
},
],
"connections": [{"source": "ex1", "target": "fc1", "sourceOutput": 0, "targetInput": 0}],
}
migrated = materializeRecommendedDataPickRef(graph)
fc = next(n for n in migrated["nodes"] if n["id"] == "fc1")
ctx_ref = fc["parameters"].get("context")
# file.create.context has frontendType="contextBuilder" → materialized as a list
assert isinstance(ctx_ref, list), "context should be materialized as a contextBuilder list"
assert len(ctx_ref) == 1
assert ctx_ref[0]["nodeId"] == "ex1"
assert ctx_ref[0]["path"] == ["data"]
def test_extract_output_is_accepted_as_file_create_context():
"""extractContent presentation envelope must be normalizable for file.create."""
out = _extract_output("Hello world")
envelopes = normalize_presentation_envelopes(out["data"])
assert len(envelopes) == 1
assert envelopes[0].get("kind") == PRESENTATION_KIND
def test_extract_output_response_is_empty():
"""extractContent must leave ``response`` empty — canonical text is in ``data``."""
out = _extract_output("Some extracted content")
assert out["response"] == ""
# ---------------------------------------------------------------------------
# 2. primaryTextRef: extractContent overrides path to ["data"]
# ---------------------------------------------------------------------------
def test_extract_primary_text_ref_override_materializes_to_data():
"""When ai.prompt connects to extractContent, primaryTextRef must resolve to ['data']."""
from modules.workflows.automation2.pickNotPushMigration import materializePrimaryTextHandover
graph = {
"nodes": [
{"id": "ex1", "type": "context.extractContent", "parameters": {}},
{
"id": "ai1",
"type": "ai.prompt",
"parameters": {"context": "", "aiPrompt": "Summarize"},
},
],
"connections": [{"source": "ex1", "target": "ai1", "sourceOutput": 0, "targetInput": 0}],
}
migrated = materializePrimaryTextHandover(graph)
ai = next(n for n in migrated["nodes"] if n["id"] == "ai1")
ctx_ref = ai["parameters"].get("context")
# ai.prompt.context is primaryTextRef → single DataRef dict (not wrapped in list)
assert isinstance(ctx_ref, dict), f"Expected a DataRef dict, got {ctx_ref!r}"
assert ctx_ref["nodeId"] == "ex1"
assert ctx_ref["path"] == ["data"], (
"extractContent.response is empty; primaryTextRef must point to ['data']"
)
# ---------------------------------------------------------------------------
# 3. loop.bodyResults → mergeContext → file.create
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_loop_body_results_into_merge_context_produces_file_create_compatible_envelope():
"""bodyResults from a loop (list of extractContent outputs) must merge correctly."""
body_results = [
_extract_output("Page 1 text"),
_extract_output("Page 2 text"),
]
result = await mergeContext(object(), {"dataSource": body_results})
assert result.success
data = result.data
assert data.get("kind") == "context.mergeContext.v1"
assert "response" in data
assert data["response"]
# Downstream file.create uses normalize_presentation_envelopes on the full payload
envelopes = normalize_presentation_envelopes(data)
assert len(envelopes) >= 1
@pytest.mark.asyncio
async def test_merge_context_response_not_duplicated_in_merged_blob():
"""``response`` must live only at the top-level of ``data``, not inside ``data.merged``."""
body_results = [_extract_output("Item A"), _extract_output("Item B")]
result = await mergeContext(object(), {"dataSource": body_results})
assert result.success
merged_blob = result.data.get("merged", {})
# The natural deep-merge may include response from individual items — but
# _synthesize_primary_response no longer OVERWRITES merged["response"].
# Verify canonical response is the synthesized one at top-level.
assert result.data.get("response")
assert "Item A" in result.data["response"] or "Item B" in result.data["response"]
@pytest.mark.asyncio
async def test_merge_context_skips_failed_items():
"""Items with ``success=False`` must be excluded from the deep-merge.
Note: ``count`` reflects total inputs (including failed ones since they were
received); only the deep-merge payload excludes failed items.
"""
good = _extract_output("Good text")
bad = {"success": False, "error": "something failed", "data": {}, "documents": []}
result = await mergeContext(object(), {"dataSource": [good, bad]})
assert result.success
# response is synthesized only from good items
assert "Good text" in result.data.get("response", "")
# merged blob should not contain the error or failed item's data
merged = result.data.get("merged", {})
assert merged.get("error") != "something failed"
@pytest.mark.asyncio
async def test_merge_context_items_without_success_key_are_included():
"""Items without a ``success`` key (e.g. DocumentList output) must not be dropped."""
no_success = {"documents": [{"documentName": "a.pdf"}], "count": 1}
result = await mergeContext(object(), {"dataSource": [no_success]})
assert result.success
assert result.data.get("count") == 1
# ---------------------------------------------------------------------------
# 4. ai.prompt → transformContext (primaryTextRef)
# ---------------------------------------------------------------------------
def test_ai_prompt_primary_text_ref_materializes_to_response():
"""primaryTextRef from ai.prompt output must resolve to ['response']."""
from modules.workflows.automation2.pickNotPushMigration import materializePrimaryTextHandover
graph = {
"nodes": [
{"id": "ai1", "type": "ai.prompt", "parameters": {}},
{
"id": "ai2",
"type": "ai.prompt",
"parameters": {"context": "", "aiPrompt": "Continue"},
},
],
"connections": [{"source": "ai1", "target": "ai2", "sourceOutput": 0, "targetInput": 0}],
}
migrated = materializePrimaryTextHandover(graph)
ai2 = next(n for n in migrated["nodes"] if n["id"] == "ai2")
ctx_ref = ai2["parameters"].get("context")
assert isinstance(ctx_ref, dict), f"Expected DataRef dict, got {ctx_ref!r}"
assert ctx_ref["path"] == ["response"]
def test_transform_context_from_graph_result_schema_is_action_result():
"""context.transformContext must declare ``fromGraphResultSchema: ActionResult``."""
node = _NODE_BY_ID["context.transformContext"]
port = node["outputPorts"][0]
assert port.get("fromGraphResultSchema") == "ActionResult", (
"fromGraph port on transformContext must be normalized as ActionResult, not FormPayload"
)
# ---------------------------------------------------------------------------
# 5. flow.merge with mixed upstream schemas
# ---------------------------------------------------------------------------
def test_flow_merge_accepts_ai_result_and_action_result():
"""Both AiResult and ActionResult must be in flow.merge input accepts."""
node = _NODE_BY_ID["flow.merge"]
all_accepts = set()
for port in node.get("inputPorts", {}).values():
all_accepts.update(port.get("accepts", []))
assert "AiResult" in all_accepts
assert "ActionResult" in all_accepts
assert "Transit" in all_accepts
def test_flow_merge_input_count_parameter_exists_with_correct_range():
"""inputCount parameter must allow 25 inputs."""
node = _NODE_BY_ID["flow.merge"]
ic_param = next((p for p in node["parameters"] if p["name"] == "inputCount"), None)
assert ic_param is not None
opts = ic_param.get("frontendOptions", {})
assert opts.get("min") == 2
assert opts.get("max") == 5
# ---------------------------------------------------------------------------
# 6. flow.ifElse Transit output accepted downstream
# ---------------------------------------------------------------------------
def test_flow_if_else_output_is_transit():
"""flow.ifElse must output Transit on both branches."""
node = _NODE_BY_ID["flow.ifElse"]
for port_ix in (0, 1):
schema = node["outputPorts"][port_ix].get("schema")
assert schema == "Transit", f"ifElse port {port_ix} must be Transit, got {schema!r}"
def test_transit_accepted_by_all_major_downstream_nodes():
"""All major action nodes must accept Transit input on port 0."""
expected_transit_accepting = [
"context.extractContent",
"context.mergeContext",
"context.transformContext",
"ai.prompt",
"file.create",
]
for node_id in expected_transit_accepting:
node = _NODE_BY_ID[node_id]
accepts = node["inputPorts"][0].get("accepts", [])
assert "Transit" in accepts, f"{node_id} port 0 must accept Transit"
# ---------------------------------------------------------------------------
# 7. extractContent fan-in → mergeContext (multiple items, no loop)
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_multiple_extract_outputs_fan_in_to_merge_context():
"""Multiple extractContent outputs passed as a list must merge into one envelope."""
items = [_extract_output(f"Document {i}") for i in range(3)]
result = await mergeContext(object(), {"dataSource": items})
assert result.success
assert result.data.get("count") == 3
assert result.data.get("kind") == "context.mergeContext.v1"
response = result.data.get("response", "")
for i in range(3):
assert f"Document {i}" in response
# ---------------------------------------------------------------------------
# 8. data.aggregate → data.consolidate schema compatibility
# ---------------------------------------------------------------------------
def test_data_aggregate_output_accepted_by_consolidate():
"""data.consolidate must accept AggregateResult from data.aggregate."""
agg_node = _NODE_BY_ID["data.aggregate"]
con_node = _NODE_BY_ID["data.consolidate"]
agg_schema = agg_node["outputPorts"][0].get("schema")
con_accepts = con_node["inputPorts"][0].get("accepts", [])
assert agg_schema in con_accepts, (
f"data.consolidate port 0 must accept {agg_schema!r} output from data.aggregate"
)
# ---------------------------------------------------------------------------
# 9. Node executor flags (no hardcoded type strings in executor)
# ---------------------------------------------------------------------------
def test_extract_content_executor_flags():
"""context.extractContent must carry all executor-behaviour flags."""
node = _NODE_BY_ID["context.extractContent"]
assert node.get("skipUnifiedPresentation") is True
assert node.get("clearResponse") is True
assert node.get("imageDocumentsFromExtractData") is True
assert node.get("popDocumentsFromOutput") is True
def test_extract_content_primary_text_ref_path_override():
"""context.extractContent output port 0 must declare primaryTextRefPath=['data']."""
node = _NODE_BY_ID["context.extractContent"]
port = node["outputPorts"][0]
assert port.get("primaryTextRefPath") == ["data"]
def test_merge_context_image_documents_flag():
"""context.mergeContext must carry imageDocumentsFromMerged flag."""
node = _NODE_BY_ID["context.mergeContext"]
assert node.get("imageDocumentsFromMerged") is True
def test_file_create_log_context_resolution_flag():
"""file.create must carry logContextResolution flag."""
node = _NODE_BY_ID["file.create"]
assert node.get("logContextResolution") is True
# ---------------------------------------------------------------------------
# 10. AiResult catalog must include data field
# ---------------------------------------------------------------------------
def test_ai_result_catalog_has_data_field():
"""AiResult in PORT_TYPE_CATALOG must document the ``data`` field."""
schema = PORT_TYPE_CATALOG["AiResult"]
field_names = [f.name for f in schema.fields]
assert "data" in field_names, "AiResult must document the data field set by executor"
# ---------------------------------------------------------------------------
# 11. _outputSchemaForNode returns ActionResult for context.transformContext
# ---------------------------------------------------------------------------
def test_output_schema_for_transform_context_is_action_result():
"""_outputSchemaForNode must return ActionResult for context.transformContext."""
from modules.workflows.automation2.executionEngine import _outputSchemaForNode
schema = _outputSchemaForNode("context.transformContext")
assert schema == "ActionResult", (
f"Expected ActionResult, got {schema!r}. fromGraph port must use fromGraphResultSchema."
)
# ---------------------------------------------------------------------------
# 12. flow.merge barrier, context.mergeContext NOT a barrier
# ---------------------------------------------------------------------------
def test_flow_merge_is_barrier():
from modules.workflows.automation2.executionEngine import _isBarrierNode
assert _isBarrierNode("flow.merge") is True
def test_context_merge_context_is_not_barrier():
"""context.mergeContext is not a barrier — it receives data via dataSource DataRef."""
from modules.workflows.automation2.executionEngine import _isBarrierNode
assert _isBarrierNode("context.mergeContext") is False
def test_no_node_named_is_merge_node_in_engine():
"""Legacy _isMergeNode alias must be removed from executionEngine."""
import modules.workflows.automation2.executionEngine as eng
assert not hasattr(eng, "_isMergeNode"), "_isMergeNode legacy alias must be deleted"