fix: kritische bugs behoben, legacy code entfernt, test suite erweitert
This commit is contained in:
parent
422598ff2a
commit
25bf4ad5d7
10 changed files with 451 additions and 55 deletions
|
|
@ -271,6 +271,10 @@ CONTEXT_NODES = [
|
|||
"outputPorts": {
|
||||
0: {
|
||||
"schema": "ActionResult",
|
||||
# Override the schema-level primaryTextRef path: ``response`` is intentionally
|
||||
# empty for this node; downstream nodes with ``primaryTextRef`` should resolve to
|
||||
# the full presentation object under ``data``.
|
||||
"primaryTextRefPath": ["data"],
|
||||
# Authoritative DataPicker paths (same idea as ``parameters`` for configuration).
|
||||
# Frontend uses only this list — no schema expansion merge for this port.
|
||||
"dataPickOptions": [
|
||||
|
|
@ -316,6 +320,11 @@ CONTEXT_NODES = [
|
|||
"meta": {"icon": "mdi-file-tree-outline", "color": "#00897B", "usesAi": False},
|
||||
"_method": "context",
|
||||
"_action": "extractContent",
|
||||
# Executor behaviour flags — drives actionNodeExecutor without hardcoded type checks.
|
||||
"skipUnifiedPresentation": True,
|
||||
"clearResponse": True,
|
||||
"imageDocumentsFromExtractData": True,
|
||||
"popDocumentsFromOutput": True,
|
||||
},
|
||||
{
|
||||
"id": "context.mergeContext",
|
||||
|
|
@ -353,6 +362,9 @@ CONTEXT_NODES = [
|
|||
"meta": {"icon": "mdi-call-merge", "color": "#7B1FA2", "usesAi": False},
|
||||
"_method": "context",
|
||||
"_action": "mergeContext",
|
||||
# Image documents live on ``data.merged.imageDocumentsOnly`` (accumulated across
|
||||
# iterations) rather than the top-level ``documents`` list which is always empty.
|
||||
"imageDocumentsFromMerged": True,
|
||||
},
|
||||
{
|
||||
"id": "context.transformContext",
|
||||
|
|
@ -421,6 +433,9 @@ CONTEXT_NODES = [
|
|||
"deriveFrom": "mappings",
|
||||
"deriveNameField": "outputField",
|
||||
"dataPickOptions": CONTEXT_ENVELOPE_DATA_PICK_OPTIONS,
|
||||
# ActionResult is the correct normalization schema — NOT FormPayload.
|
||||
# The output is a versionned ActionResult envelope built by contextEnvelope.
|
||||
"fromGraphResultSchema": "ActionResult",
|
||||
}
|
||||
},
|
||||
"injectUpstreamPayload": True,
|
||||
|
|
|
|||
|
|
@ -37,5 +37,7 @@ FILE_NODES = [
|
|||
"meta": {"icon": "mdi-file-plus-outline", "color": "#2196F3", "usesAi": False},
|
||||
"_method": "file",
|
||||
"_action": "create",
|
||||
# Emit a debug log tracing how the ``context`` parameter was resolved.
|
||||
"logContextResolution": True,
|
||||
},
|
||||
]
|
||||
|
|
|
|||
|
|
@ -252,6 +252,16 @@ PORT_TYPE_CATALOG: Dict[str, PortSchema] = {
|
|||
picker_label=t("Alle Ausgabe-Dateien (Liste)"),
|
||||
picker_item_label=t("je Datei"),
|
||||
),
|
||||
PortField(name="data", type="Dict", required=False,
|
||||
description=(
|
||||
"Internes Payload-Objekt (entspricht ``ActionResult.data``-Semantik). "
|
||||
"Wird vom Executor gesetzt und enthält denselben Inhalt wie ``response`` "
|
||||
"in strukturierter Form; primär für nachgelagerte Kontext-Nodes."
|
||||
),
|
||||
picker_label=t("Technische Detaildaten (data)")),
|
||||
PortField(name="imageDocumentsOnly", type="List[Document]", required=False,
|
||||
description="Nur Bild-bezogene Einträge aus documents.",
|
||||
picker_label=t("Nur Bilder (Liste)")),
|
||||
]),
|
||||
"BoolResult": PortSchema(name="BoolResult", fields=[
|
||||
PortField(name="result", type="bool",
|
||||
|
|
|
|||
|
|
@ -86,7 +86,11 @@ def _outputSchemaForNode(nodeType: str) -> Optional[str]:
|
|||
if isinstance(p0, dict):
|
||||
spec = p0.get("schema")
|
||||
if isinstance(spec, dict) and spec.get("kind") == "fromGraph":
|
||||
return "FormPayload"
|
||||
# Read override from the port definition — ``FormPayload`` is the
|
||||
# fallback for true form nodes; dynamic context nodes (e.g.
|
||||
# context.transformContext) declare ``fromGraphResultSchema`` to
|
||||
# avoid wrong normalization.
|
||||
return p0.get("fromGraphResultSchema") or "FormPayload"
|
||||
if isinstance(spec, str):
|
||||
return spec
|
||||
return None
|
||||
|
|
@ -96,8 +100,11 @@ def _isBarrierNode(nodeType: str) -> bool:
|
|||
"""Barrier nodes wait for all connected predecessors before executing.
|
||||
|
||||
Backwards compatible: ``flow.merge`` is always a barrier. Any other node may
|
||||
declare ``waitsForAllPredecessors: True`` in its STATIC_NODE_TYPES entry
|
||||
(e.g. ``context.mergeContext``).
|
||||
declare ``waitsForAllPredecessors: True`` in its STATIC_NODE_TYPES entry.
|
||||
|
||||
Note: ``context.mergeContext`` is NOT a barrier — it receives its list of
|
||||
inputs via the ``dataSource`` DataRef parameter (typically ``loop.bodyResults``)
|
||||
and executes once its single upstream edge is satisfied.
|
||||
"""
|
||||
if nodeType == "flow.merge":
|
||||
return True
|
||||
|
|
@ -107,10 +114,6 @@ def _isBarrierNode(nodeType: str) -> bool:
|
|||
return False
|
||||
|
||||
|
||||
# Legacy alias used inside this module.
|
||||
_isMergeNode = _isBarrierNode
|
||||
|
||||
|
||||
def _allMergePredecessorsReady(
|
||||
nodeId: str,
|
||||
connectionMap: Dict[str, List],
|
||||
|
|
@ -249,7 +252,6 @@ def _emitStepEvent(runId: str, stepData: Dict[str, Any]) -> None:
|
|||
queueId = f"run-trace-{runId}"
|
||||
if not em.has_queue(queueId):
|
||||
return
|
||||
import asyncio
|
||||
loop = asyncio.get_event_loop()
|
||||
if loop.is_running():
|
||||
asyncio.ensure_future(em.emit_event(queueId, "step", stepData, event_category="tracing"))
|
||||
|
|
|
|||
|
|
@ -31,12 +31,11 @@ from modules.workflows.methods.methodContext.actions.extractContent import (
|
|||
logger = logging.getLogger(__name__)
|
||||
|
||||
_FILE_CREATE_CTX_LOG_MAX = 500
|
||||
_SKIP_UNIFIED_PRESENTATION_NODES = frozenset({"context.extractContent"})
|
||||
|
||||
|
||||
def _attach_unified_presentation_data(out: Dict[str, Any], *, node_type: str) -> None:
|
||||
def _attach_unified_presentation_data(out: Dict[str, Any], *, node_def: Dict[str, Any]) -> None:
|
||||
"""Ensure ``out[\"data\"]`` carries ``context.extractContent.presentation.v1`` for ``file.create``."""
|
||||
if node_type in _SKIP_UNIFIED_PRESENTATION_NODES:
|
||||
if node_def.get("skipUnifiedPresentation"):
|
||||
return
|
||||
data = out.get("data")
|
||||
if isinstance(data, dict) and data.get("kind") == PRESENTATION_KIND:
|
||||
|
|
@ -601,7 +600,7 @@ class ActionNodeExecutor:
|
|||
# 4. Apply declarative paramMappers from the node definition
|
||||
_applyParamMappers(nodeDef, resolvedParams)
|
||||
|
||||
if nodeType == "file.create":
|
||||
if nodeDef.get("logContextResolution"):
|
||||
_log_file_create_context_resolution(nodeId, params, resolvedParams, context)
|
||||
|
||||
# 5. email.checkEmail pause for email wait
|
||||
|
|
@ -619,26 +618,7 @@ class ActionNodeExecutor:
|
|||
}
|
||||
raise PauseForEmailWaitError(runId=runId, nodeId=nodeId, waitConfig=waitConfig)
|
||||
|
||||
# 6. AI nodes: normalize legacy "prompt" -> "aiPrompt"
|
||||
if nodeType == "ai.prompt":
|
||||
if "aiPrompt" not in resolvedParams and "prompt" in resolvedParams:
|
||||
resolvedParams["aiPrompt"] = resolvedParams.pop("prompt")
|
||||
|
||||
# 7. Build context for email.draftEmail from subject + body
|
||||
if nodeType == "email.draftEmail":
|
||||
subject = resolvedParams.get("subject", "")
|
||||
body = resolvedParams.get("body", "")
|
||||
if subject or body:
|
||||
contextParts = []
|
||||
if subject:
|
||||
contextParts.append(f"Subject: {subject}")
|
||||
if body:
|
||||
contextParts.append(f"Body:\n{body}")
|
||||
resolvedParams["context"] = "\n\n".join(contextParts)
|
||||
resolvedParams.pop("subject", None)
|
||||
resolvedParams.pop("body", None)
|
||||
|
||||
# 8. Create progress parent so nested actions have a hierarchy
|
||||
# 6. Create progress parent so nested actions have a hierarchy
|
||||
import time as _time
|
||||
nodeOperationId = f"node_{nodeId}_{context.get('_runId', 'x')}_{int(_time.time())}"
|
||||
chatService = getattr(self.services, "chat", None)
|
||||
|
|
@ -796,23 +776,17 @@ class ActionNodeExecutor:
|
|||
out.setdefault("context", ctx_str if ctx_str else "")
|
||||
rsp = str(out.get("response") or "").strip()
|
||||
if not rsp:
|
||||
if nodeType != "context.extractContent":
|
||||
out["response"] = extractedContext or ""
|
||||
else:
|
||||
if nodeDef.get("clearResponse"):
|
||||
out["response"] = ""
|
||||
else:
|
||||
out["response"] = extractedContext or ""
|
||||
if result.success:
|
||||
img_only = _image_documents_from_docs_list(docsList)
|
||||
if (
|
||||
nodeType == "context.extractContent"
|
||||
and isinstance(result.data, dict)
|
||||
):
|
||||
if nodeDef.get("imageDocumentsFromExtractData") and isinstance(result.data, dict):
|
||||
img_only = list(img_only) + _image_refs_from_extract_node_data(result.data)
|
||||
# mergeContext packs iterated payloads under ``data.merged`` only — ``documents``
|
||||
# on the ActionResult is empty, so image sidecars live on ``merged.imageDocumentsOnly``.
|
||||
if (
|
||||
nodeType == "context.mergeContext"
|
||||
and isinstance(result.data, dict)
|
||||
):
|
||||
if nodeDef.get("imageDocumentsFromMerged") and isinstance(result.data, dict):
|
||||
# mergeContext packs iterated image sidecars under ``data.merged.imageDocumentsOnly``
|
||||
# rather than the top-level ``documents`` list which is always empty.
|
||||
merged_blob = result.data.get("merged")
|
||||
if isinstance(merged_blob, dict):
|
||||
merged_imgs = merged_blob.get("imageDocumentsOnly")
|
||||
|
|
@ -842,11 +816,11 @@ class ActionNodeExecutor:
|
|||
_attachConnectionProvenance(cr_out, resolvedParams, outputSchema, chatService, self.services)
|
||||
return normalizeToSchema(cr_out, outputSchema)
|
||||
|
||||
if nodeType == "context.extractContent":
|
||||
if nodeDef.get("popDocumentsFromOutput"):
|
||||
out.pop("documents", None)
|
||||
|
||||
if outputSchema in ("AiResult", "ActionResult") and result.success:
|
||||
_attach_unified_presentation_data(out, node_type=nodeType)
|
||||
_attach_unified_presentation_data(out, node_def=nodeDef)
|
||||
|
||||
_attachConnectionProvenance(out, resolvedParams, outputSchema, chatService, self.services)
|
||||
|
||||
|
|
|
|||
|
|
@ -143,7 +143,14 @@ def materializePrimaryTextHandover(graph: Dict[str, Any]) -> Dict[str, Any]:
|
|||
continue
|
||||
out_port = (src_def.get("outputPorts") or {}).get(0, {}) or {}
|
||||
out_schema = resolve_output_schema_name(src_node, out_port if isinstance(out_port, dict) else {})
|
||||
ref_path = PRIMARY_TEXT_HANDOVER_REF_PATH.get(out_schema)
|
||||
# Port-level override takes precedence over the schema-wide default path.
|
||||
# Example: context.extractContent sets primaryTextRefPath=["data"] because
|
||||
# its ``response`` field is intentionally empty.
|
||||
ref_path = (
|
||||
out_port.get("primaryTextRefPath")
|
||||
if isinstance(out_port, dict) and out_port.get("primaryTextRefPath")
|
||||
else PRIMARY_TEXT_HANDOVER_REF_PATH.get(out_schema)
|
||||
)
|
||||
if not ref_path:
|
||||
continue
|
||||
params[pname] = _data_ref(src_id, list(ref_path))
|
||||
|
|
|
|||
|
|
@ -70,7 +70,11 @@ def _merge_payload(item: Any) -> Optional[Dict[str, Any]]:
|
|||
"""
|
||||
if not isinstance(item, dict):
|
||||
return None
|
||||
if item.get("success") is False:
|
||||
# Opt-in: only merge items that explicitly report success.
|
||||
# Items without a ``success`` key (e.g. DocumentList, Transit outputs) are
|
||||
# still included so non-action node results are not silently dropped.
|
||||
success_val = item.get("success")
|
||||
if success_val is not None and success_val is not True:
|
||||
return None
|
||||
out = dict(item)
|
||||
if isinstance(out.get("documents"), list):
|
||||
|
|
@ -223,7 +227,9 @@ async def mergeContext(self, parameters: Dict[str, Any]) -> ActionResult:
|
|||
return ActionResult.isFailure(error="Alle Einträge in der Datenquelle sind leer.")
|
||||
|
||||
primary = _synthesize_primary_response(merged, inputs)
|
||||
merged["response"] = primary
|
||||
# ``response`` lives only at the top-level of the data envelope (``payload["response"]``).
|
||||
# Do NOT set ``merged["response"]`` — that would duplicate it inside the deep-merged blob
|
||||
# and overwrite whatever the natural merge produced for debugging.
|
||||
|
||||
_ps = primary if isinstance(primary, str) else repr(primary)
|
||||
logger.info(
|
||||
|
|
@ -231,7 +237,7 @@ async def mergeContext(self, parameters: Dict[str, Any]) -> ActionResult:
|
|||
len(inputs),
|
||||
list(merged.keys())[:20],
|
||||
len(_ps or ""),
|
||||
(_ps[:200] + "…") if len(_ps) > 200 else _ps,
|
||||
(_ps[:200] + "\u2026") if len(_ps) > 200 else _ps,
|
||||
len(conflicts),
|
||||
)
|
||||
payload: Dict[str, Any] = {
|
||||
|
|
|
|||
|
|
@ -157,10 +157,10 @@ class MethodContext(MethodBase):
|
|||
name="dataSource",
|
||||
type="Any",
|
||||
frontendType=FrontendType.CONTEXT_BUILDER,
|
||||
required=False,
|
||||
required=True,
|
||||
description=(
|
||||
"Datenquelle (DataRef), meist Schleife → Alle Schleifen-Ergebnisse. "
|
||||
"Optional wenn der Knoten per Kabel am Schleifen-„Fertig“-Ausgang hängt."
|
||||
"Pflichtfeld — die Implementierung wirft einen Fehler wenn kein Wert übergeben wird."
|
||||
),
|
||||
),
|
||||
},
|
||||
|
|
|
|||
|
|
@ -103,7 +103,12 @@ async def test_mergeContext_merged_response_wins_over_handover_chunks():
|
|||
|
||||
@pytest.mark.asyncio
|
||||
async def test_mergeContext_concatenates_each_iteration_data_response_not_only_last():
|
||||
"""deep_merge overwrites ``response``; synthesis must still include every loop body result."""
|
||||
"""Synthesized response must include every loop body chunk, not just the last one.
|
||||
|
||||
``response`` lives only at the top level of the data envelope (``data["response"]``).
|
||||
The deep-merged ``data["merged"]`` dict retains whatever the natural merge produced
|
||||
for per-item fields — it is NOT overwritten with the synthesized primary text.
|
||||
"""
|
||||
items = [
|
||||
{"success": True, "data": {"response": "chunk-aaa"}},
|
||||
{"success": True, "data": {"response": "chunk-bbb"}},
|
||||
|
|
@ -116,7 +121,9 @@ async def test_mergeContext_concatenates_each_iteration_data_response_not_only_l
|
|||
assert "chunk-bbb" in r
|
||||
assert "chunk-ccc" in r
|
||||
assert r == "chunk-aaa\n\nchunk-bbb\n\nchunk-ccc"
|
||||
assert result.data["merged"]["response"] == r
|
||||
# ``merged["response"]`` reflects the natural deep-merge result (last chunk wins),
|
||||
# NOT the synthesized primary. The canonical synthesized text is at data["response"].
|
||||
assert result.data["merged"].get("response") != r or len(items) == 1
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
|
|
|
|||
373
tests/unit/workflow/test_node_combinations.py
Normal file
373
tests/unit/workflow/test_node_combinations.py
Normal file
|
|
@ -0,0 +1,373 @@
|
|||
# Tests: node handover compatibility across all major node combinations.
|
||||
#
|
||||
# Covers:
|
||||
# - extractContent → file.create (direct, no loop)
|
||||
# - loop.bodyResults → mergeContext → file.create
|
||||
# - ai.prompt → transformContext → file.create
|
||||
# - flow.merge with mixed upstream schemas (AiResult + ActionResult)
|
||||
# - flow.ifElse Transit output accepted by downstream nodes
|
||||
# - extractContent fan-in → mergeContext (multiple items, no loop)
|
||||
# - data.aggregate → data.consolidate path
|
||||
# - Node flags for executor behaviour (no hardcoded type strings)
|
||||
|
||||
import json
|
||||
|
||||
import pytest
|
||||
|
||||
from modules.features.graphicalEditor.nodeDefinitions import STATIC_NODE_TYPES
|
||||
from modules.features.graphicalEditor.portTypes import PORT_TYPE_CATALOG
|
||||
from modules.workflows.methods.methodContext.actions.extractContent import (
|
||||
PRESENTATION_KIND,
|
||||
build_presentation_envelope_from_plain_text,
|
||||
normalize_presentation_envelopes,
|
||||
)
|
||||
from modules.workflows.methods.methodContext.actions.mergeContext import mergeContext
|
||||
|
||||
_NODE_BY_ID = {n["id"]: n for n in STATIC_NODE_TYPES}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helper builders
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _extract_output(text: str) -> dict:
|
||||
"""Minimal extractContent-style output (presentation envelope in ``data``)."""
|
||||
pres = build_presentation_envelope_from_plain_text(text, source_name="test")
|
||||
return {"success": True, "response": "", "data": pres, "documents": []}
|
||||
|
||||
|
||||
def _ai_output(response: str) -> dict:
|
||||
"""Minimal ai.prompt-style output."""
|
||||
return {"success": True, "response": response, "data": {}, "documents": []}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 1. extractContent → file.create (direct path)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_extract_to_file_create_recommended_ref_is_data():
|
||||
"""materializeRecommendedDataPickRef must resolve extractContent port 0 to path ['data']."""
|
||||
from modules.workflows.automation2.pickNotPushMigration import materializeRecommendedDataPickRef
|
||||
|
||||
graph = {
|
||||
"nodes": [
|
||||
{"id": "ex1", "type": "context.extractContent", "parameters": {}},
|
||||
{
|
||||
"id": "fc1",
|
||||
"type": "file.create",
|
||||
"parameters": {"context": "", "outputFormat": "docx"},
|
||||
},
|
||||
],
|
||||
"connections": [{"source": "ex1", "target": "fc1", "sourceOutput": 0, "targetInput": 0}],
|
||||
}
|
||||
migrated = materializeRecommendedDataPickRef(graph)
|
||||
fc = next(n for n in migrated["nodes"] if n["id"] == "fc1")
|
||||
ctx_ref = fc["parameters"].get("context")
|
||||
# file.create.context has frontendType="contextBuilder" → materialized as a list
|
||||
assert isinstance(ctx_ref, list), "context should be materialized as a contextBuilder list"
|
||||
assert len(ctx_ref) == 1
|
||||
assert ctx_ref[0]["nodeId"] == "ex1"
|
||||
assert ctx_ref[0]["path"] == ["data"]
|
||||
|
||||
|
||||
def test_extract_output_is_accepted_as_file_create_context():
|
||||
"""extractContent presentation envelope must be normalizable for file.create."""
|
||||
out = _extract_output("Hello world")
|
||||
envelopes = normalize_presentation_envelopes(out["data"])
|
||||
assert len(envelopes) == 1
|
||||
assert envelopes[0].get("kind") == PRESENTATION_KIND
|
||||
|
||||
|
||||
def test_extract_output_response_is_empty():
|
||||
"""extractContent must leave ``response`` empty — canonical text is in ``data``."""
|
||||
out = _extract_output("Some extracted content")
|
||||
assert out["response"] == ""
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 2. primaryTextRef: extractContent overrides path to ["data"]
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_extract_primary_text_ref_override_materializes_to_data():
|
||||
"""When ai.prompt connects to extractContent, primaryTextRef must resolve to ['data']."""
|
||||
from modules.workflows.automation2.pickNotPushMigration import materializePrimaryTextHandover
|
||||
|
||||
graph = {
|
||||
"nodes": [
|
||||
{"id": "ex1", "type": "context.extractContent", "parameters": {}},
|
||||
{
|
||||
"id": "ai1",
|
||||
"type": "ai.prompt",
|
||||
"parameters": {"context": "", "aiPrompt": "Summarize"},
|
||||
},
|
||||
],
|
||||
"connections": [{"source": "ex1", "target": "ai1", "sourceOutput": 0, "targetInput": 0}],
|
||||
}
|
||||
migrated = materializePrimaryTextHandover(graph)
|
||||
ai = next(n for n in migrated["nodes"] if n["id"] == "ai1")
|
||||
ctx_ref = ai["parameters"].get("context")
|
||||
# ai.prompt.context is primaryTextRef → single DataRef dict (not wrapped in list)
|
||||
assert isinstance(ctx_ref, dict), f"Expected a DataRef dict, got {ctx_ref!r}"
|
||||
assert ctx_ref["nodeId"] == "ex1"
|
||||
assert ctx_ref["path"] == ["data"], (
|
||||
"extractContent.response is empty; primaryTextRef must point to ['data']"
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 3. loop.bodyResults → mergeContext → file.create
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_loop_body_results_into_merge_context_produces_file_create_compatible_envelope():
|
||||
"""bodyResults from a loop (list of extractContent outputs) must merge correctly."""
|
||||
body_results = [
|
||||
_extract_output("Page 1 text"),
|
||||
_extract_output("Page 2 text"),
|
||||
]
|
||||
result = await mergeContext(object(), {"dataSource": body_results})
|
||||
assert result.success
|
||||
data = result.data
|
||||
assert data.get("kind") == "context.mergeContext.v1"
|
||||
assert "response" in data
|
||||
assert data["response"]
|
||||
# Downstream file.create uses normalize_presentation_envelopes on the full payload
|
||||
envelopes = normalize_presentation_envelopes(data)
|
||||
assert len(envelopes) >= 1
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_merge_context_response_not_duplicated_in_merged_blob():
|
||||
"""``response`` must live only at the top-level of ``data``, not inside ``data.merged``."""
|
||||
body_results = [_extract_output("Item A"), _extract_output("Item B")]
|
||||
result = await mergeContext(object(), {"dataSource": body_results})
|
||||
assert result.success
|
||||
merged_blob = result.data.get("merged", {})
|
||||
# The natural deep-merge may include response from individual items — but
|
||||
# _synthesize_primary_response no longer OVERWRITES merged["response"].
|
||||
# Verify canonical response is the synthesized one at top-level.
|
||||
assert result.data.get("response")
|
||||
assert "Item A" in result.data["response"] or "Item B" in result.data["response"]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_merge_context_skips_failed_items():
|
||||
"""Items with ``success=False`` must be excluded from the deep-merge.
|
||||
|
||||
Note: ``count`` reflects total inputs (including failed ones since they were
|
||||
received); only the deep-merge payload excludes failed items.
|
||||
"""
|
||||
good = _extract_output("Good text")
|
||||
bad = {"success": False, "error": "something failed", "data": {}, "documents": []}
|
||||
result = await mergeContext(object(), {"dataSource": [good, bad]})
|
||||
assert result.success
|
||||
# response is synthesized only from good items
|
||||
assert "Good text" in result.data.get("response", "")
|
||||
# merged blob should not contain the error or failed item's data
|
||||
merged = result.data.get("merged", {})
|
||||
assert merged.get("error") != "something failed"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_merge_context_items_without_success_key_are_included():
|
||||
"""Items without a ``success`` key (e.g. DocumentList output) must not be dropped."""
|
||||
no_success = {"documents": [{"documentName": "a.pdf"}], "count": 1}
|
||||
result = await mergeContext(object(), {"dataSource": [no_success]})
|
||||
assert result.success
|
||||
assert result.data.get("count") == 1
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 4. ai.prompt → transformContext (primaryTextRef)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_ai_prompt_primary_text_ref_materializes_to_response():
|
||||
"""primaryTextRef from ai.prompt output must resolve to ['response']."""
|
||||
from modules.workflows.automation2.pickNotPushMigration import materializePrimaryTextHandover
|
||||
|
||||
graph = {
|
||||
"nodes": [
|
||||
{"id": "ai1", "type": "ai.prompt", "parameters": {}},
|
||||
{
|
||||
"id": "ai2",
|
||||
"type": "ai.prompt",
|
||||
"parameters": {"context": "", "aiPrompt": "Continue"},
|
||||
},
|
||||
],
|
||||
"connections": [{"source": "ai1", "target": "ai2", "sourceOutput": 0, "targetInput": 0}],
|
||||
}
|
||||
migrated = materializePrimaryTextHandover(graph)
|
||||
ai2 = next(n for n in migrated["nodes"] if n["id"] == "ai2")
|
||||
ctx_ref = ai2["parameters"].get("context")
|
||||
assert isinstance(ctx_ref, dict), f"Expected DataRef dict, got {ctx_ref!r}"
|
||||
assert ctx_ref["path"] == ["response"]
|
||||
|
||||
|
||||
def test_transform_context_from_graph_result_schema_is_action_result():
|
||||
"""context.transformContext must declare ``fromGraphResultSchema: ActionResult``."""
|
||||
node = _NODE_BY_ID["context.transformContext"]
|
||||
port = node["outputPorts"][0]
|
||||
assert port.get("fromGraphResultSchema") == "ActionResult", (
|
||||
"fromGraph port on transformContext must be normalized as ActionResult, not FormPayload"
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 5. flow.merge with mixed upstream schemas
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_flow_merge_accepts_ai_result_and_action_result():
|
||||
"""Both AiResult and ActionResult must be in flow.merge input accepts."""
|
||||
node = _NODE_BY_ID["flow.merge"]
|
||||
all_accepts = set()
|
||||
for port in node.get("inputPorts", {}).values():
|
||||
all_accepts.update(port.get("accepts", []))
|
||||
assert "AiResult" in all_accepts
|
||||
assert "ActionResult" in all_accepts
|
||||
assert "Transit" in all_accepts
|
||||
|
||||
|
||||
def test_flow_merge_input_count_parameter_exists_with_correct_range():
|
||||
"""inputCount parameter must allow 2–5 inputs."""
|
||||
node = _NODE_BY_ID["flow.merge"]
|
||||
ic_param = next((p for p in node["parameters"] if p["name"] == "inputCount"), None)
|
||||
assert ic_param is not None
|
||||
opts = ic_param.get("frontendOptions", {})
|
||||
assert opts.get("min") == 2
|
||||
assert opts.get("max") == 5
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 6. flow.ifElse Transit output accepted downstream
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_flow_if_else_output_is_transit():
|
||||
"""flow.ifElse must output Transit on both branches."""
|
||||
node = _NODE_BY_ID["flow.ifElse"]
|
||||
for port_ix in (0, 1):
|
||||
schema = node["outputPorts"][port_ix].get("schema")
|
||||
assert schema == "Transit", f"ifElse port {port_ix} must be Transit, got {schema!r}"
|
||||
|
||||
|
||||
def test_transit_accepted_by_all_major_downstream_nodes():
|
||||
"""All major action nodes must accept Transit input on port 0."""
|
||||
expected_transit_accepting = [
|
||||
"context.extractContent",
|
||||
"context.mergeContext",
|
||||
"context.transformContext",
|
||||
"ai.prompt",
|
||||
"file.create",
|
||||
]
|
||||
for node_id in expected_transit_accepting:
|
||||
node = _NODE_BY_ID[node_id]
|
||||
accepts = node["inputPorts"][0].get("accepts", [])
|
||||
assert "Transit" in accepts, f"{node_id} port 0 must accept Transit"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 7. extractContent fan-in → mergeContext (multiple items, no loop)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_multiple_extract_outputs_fan_in_to_merge_context():
|
||||
"""Multiple extractContent outputs passed as a list must merge into one envelope."""
|
||||
items = [_extract_output(f"Document {i}") for i in range(3)]
|
||||
result = await mergeContext(object(), {"dataSource": items})
|
||||
assert result.success
|
||||
assert result.data.get("count") == 3
|
||||
assert result.data.get("kind") == "context.mergeContext.v1"
|
||||
response = result.data.get("response", "")
|
||||
for i in range(3):
|
||||
assert f"Document {i}" in response
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 8. data.aggregate → data.consolidate schema compatibility
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_data_aggregate_output_accepted_by_consolidate():
|
||||
"""data.consolidate must accept AggregateResult from data.aggregate."""
|
||||
agg_node = _NODE_BY_ID["data.aggregate"]
|
||||
con_node = _NODE_BY_ID["data.consolidate"]
|
||||
agg_schema = agg_node["outputPorts"][0].get("schema")
|
||||
con_accepts = con_node["inputPorts"][0].get("accepts", [])
|
||||
assert agg_schema in con_accepts, (
|
||||
f"data.consolidate port 0 must accept {agg_schema!r} output from data.aggregate"
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 9. Node executor flags (no hardcoded type strings in executor)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_extract_content_executor_flags():
|
||||
"""context.extractContent must carry all executor-behaviour flags."""
|
||||
node = _NODE_BY_ID["context.extractContent"]
|
||||
assert node.get("skipUnifiedPresentation") is True
|
||||
assert node.get("clearResponse") is True
|
||||
assert node.get("imageDocumentsFromExtractData") is True
|
||||
assert node.get("popDocumentsFromOutput") is True
|
||||
|
||||
|
||||
def test_extract_content_primary_text_ref_path_override():
|
||||
"""context.extractContent output port 0 must declare primaryTextRefPath=['data']."""
|
||||
node = _NODE_BY_ID["context.extractContent"]
|
||||
port = node["outputPorts"][0]
|
||||
assert port.get("primaryTextRefPath") == ["data"]
|
||||
|
||||
|
||||
def test_merge_context_image_documents_flag():
|
||||
"""context.mergeContext must carry imageDocumentsFromMerged flag."""
|
||||
node = _NODE_BY_ID["context.mergeContext"]
|
||||
assert node.get("imageDocumentsFromMerged") is True
|
||||
|
||||
|
||||
def test_file_create_log_context_resolution_flag():
|
||||
"""file.create must carry logContextResolution flag."""
|
||||
node = _NODE_BY_ID["file.create"]
|
||||
assert node.get("logContextResolution") is True
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 10. AiResult catalog must include data field
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_ai_result_catalog_has_data_field():
|
||||
"""AiResult in PORT_TYPE_CATALOG must document the ``data`` field."""
|
||||
schema = PORT_TYPE_CATALOG["AiResult"]
|
||||
field_names = [f.name for f in schema.fields]
|
||||
assert "data" in field_names, "AiResult must document the data field set by executor"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 11. _outputSchemaForNode returns ActionResult for context.transformContext
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_output_schema_for_transform_context_is_action_result():
|
||||
"""_outputSchemaForNode must return ActionResult for context.transformContext."""
|
||||
from modules.workflows.automation2.executionEngine import _outputSchemaForNode
|
||||
schema = _outputSchemaForNode("context.transformContext")
|
||||
assert schema == "ActionResult", (
|
||||
f"Expected ActionResult, got {schema!r}. fromGraph port must use fromGraphResultSchema."
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 12. flow.merge barrier, context.mergeContext NOT a barrier
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_flow_merge_is_barrier():
|
||||
from modules.workflows.automation2.executionEngine import _isBarrierNode
|
||||
assert _isBarrierNode("flow.merge") is True
|
||||
|
||||
|
||||
def test_context_merge_context_is_not_barrier():
|
||||
"""context.mergeContext is not a barrier — it receives data via dataSource DataRef."""
|
||||
from modules.workflows.automation2.executionEngine import _isBarrierNode
|
||||
assert _isBarrierNode("context.mergeContext") is False
|
||||
|
||||
|
||||
def test_no_node_named_is_merge_node_in_engine():
|
||||
"""Legacy _isMergeNode alias must be removed from executionEngine."""
|
||||
import modules.workflows.automation2.executionEngine as eng
|
||||
assert not hasattr(eng, "_isMergeNode"), "_isMergeNode legacy alias must be deleted"
|
||||
Loading…
Reference in a new issue