gateway/tests/unit/workflow/test_phase4_workflow_nodes.py
2026-04-16 23:13:05 +02:00

177 lines
8 KiB
Python

# Tests for Phase 4: data.consolidate, ai.consolidate, flow.loop level/concurrency, flow.merge dynamic.
import pytest
from modules.features.graphicalEditor.nodeDefinitions import STATIC_NODE_TYPES
class TestNodeDefinitions:
def test_data_consolidate_exists(self):
node = next(n for n in STATIC_NODE_TYPES if n["id"] == "data.consolidate")
assert node["meta"]["usesAi"] is False
assert node["outputPorts"][0]["schema"] == "ConsolidateResult"
modes = node["parameters"][0]["frontendOptions"]["options"]
assert "table" in modes
assert "csvJoin" in modes
def test_ai_consolidate_exists(self):
node = next(n for n in STATIC_NODE_TYPES if n["id"] == "ai.consolidate")
assert node["meta"]["usesAi"] is True
assert node["_method"] == "ai"
assert node["_action"] == "consolidate"
assert node["outputPorts"][0]["schema"] == "ConsolidateResult"
def test_flow_loop_has_level_and_concurrency(self):
node = next(n for n in STATIC_NODE_TYPES if n["id"] == "flow.loop")
paramNames = [p["name"] for p in node["parameters"]]
assert "level" in paramNames
assert "concurrency" in paramNames
levelParam = next(p for p in node["parameters"] if p["name"] == "level")
assert "structuralNodes" in levelParam["frontendOptions"]["options"]
assert "contentBlocks" in levelParam["frontendOptions"]["options"]
concParam = next(p for p in node["parameters"] if p["name"] == "concurrency")
assert concParam["default"] == 1
def test_flow_loop_accepts_udm(self):
node = next(n for n in STATIC_NODE_TYPES if n["id"] == "flow.loop")
assert "UdmDocument" in node["inputPorts"][0]["accepts"]
def test_flow_merge_has_inputCount(self):
node = next(n for n in STATIC_NODE_TYPES if n["id"] == "flow.merge")
paramNames = [p["name"] for p in node["parameters"]]
assert "inputCount" in paramNames
icParam = next(p for p in node["parameters"] if p["name"] == "inputCount")
assert icParam["frontendOptions"]["min"] == 2
assert icParam["frontendOptions"]["max"] == 5
def test_data_filter_accepts_udm_types(self):
node = next(n for n in STATIC_NODE_TYPES if n["id"] == "data.filter")
accepts = node["inputPorts"][0]["accepts"]
assert "UdmDocument" in accepts
assert "UdmNodeList" in accepts
def test_data_filter_has_udmContentType_param(self):
node = next(n for n in STATIC_NODE_TYPES if n["id"] == "data.filter")
paramNames = [p["name"] for p in node["parameters"]]
assert "udmContentType" in paramNames
@pytest.mark.asyncio
class TestDataConsolidateExecutor:
async def test_consolidate_table_mode(self):
from modules.workflows.automation2.executors.dataExecutor import DataExecutor
ex = DataExecutor()
node = {"type": "data.consolidate", "id": "dc1", "parameters": {"mode": "table"}}
ctx = {"nodeOutputs": {"src": {"items": [{"a": 1, "b": 2}, {"a": 3, "b": 4}], "count": 2}}, "inputSources": {"dc1": {0: ("src", 0)}}}
result = await ex.execute(node, ctx)
assert result["_success"]
assert result["mode"] == "table"
assert result["count"] == 2
assert len(result["result"]["headers"]) == 2
assert len(result["result"]["rows"]) == 2
async def test_consolidate_concat_mode(self):
from modules.workflows.automation2.executors.dataExecutor import DataExecutor
ex = DataExecutor()
node = {"type": "data.consolidate", "id": "dc1", "parameters": {"mode": "concat", "separator": "; "}}
ctx = {"nodeOutputs": {"src": {"items": ["hello", "world"], "count": 2}}, "inputSources": {"dc1": {0: ("src", 0)}}}
result = await ex.execute(node, ctx)
assert result["_success"]
assert result["result"] == "hello; world"
async def test_consolidate_merge_mode(self):
from modules.workflows.automation2.executors.dataExecutor import DataExecutor
ex = DataExecutor()
node = {"type": "data.consolidate", "id": "dc1", "parameters": {"mode": "merge"}}
ctx = {"nodeOutputs": {"src": {"items": [{"a": 1}, {"b": 2}, {"a": 99}], "count": 3}}, "inputSources": {"dc1": {0: ("src", 0)}}}
result = await ex.execute(node, ctx)
assert result["_success"]
assert result["result"]["a"] == 99
assert result["result"]["b"] == 2
class TestFlowLoopUdmLevel:
"""Unit tests for FlowExecutor._resolveUdmLevel (bypass resolveParameterReferences)."""
def test_resolveUdmLevel_structural_nodes(self):
from modules.workflows.automation2.executors.flowExecutor import FlowExecutor
ex = FlowExecutor()
udm = {
"id": "d1", "role": "document",
"children": [
{"id": "p1", "role": "page", "index": 0, "children": [{"id": "c1"}]},
{"id": "p2", "role": "page", "index": 1, "children": [{"id": "c2"}]},
]
}
result = ex._resolveUdmLevel(udm, "structuralNodes")
assert len(result) == 2
assert result[0]["id"] == "p1"
def test_resolveUdmLevel_content_blocks(self):
from modules.workflows.automation2.executors.flowExecutor import FlowExecutor
ex = FlowExecutor()
udm = {
"id": "d1", "role": "document",
"children": [
{"id": "p1", "role": "page", "children": [
{"id": "c1", "contentType": "text"},
{"id": "c2", "contentType": "image"},
]},
{"id": "p2", "role": "page", "children": [
{"id": "c3", "contentType": "table"},
]},
]
}
result = ex._resolveUdmLevel(udm, "contentBlocks")
assert len(result) == 3
def test_resolveUdmLevel_documents(self):
from modules.workflows.automation2.executors.flowExecutor import FlowExecutor
ex = FlowExecutor()
archive = {
"id": "a1", "role": "archive",
"children": [
{"id": "d1", "role": "document", "children": []},
{"id": "d2", "role": "document", "children": []},
{"id": "x1", "role": "page", "children": []},
]
}
result = ex._resolveUdmLevel(archive, "documents")
assert len(result) == 2
@pytest.mark.asyncio
async def test_loop_auto_dict_with_children(self):
from modules.workflows.automation2.executors.flowExecutor import FlowExecutor
ex = FlowExecutor()
udm = {"id": "d1", "role": "document", "children": [{"id": "p1"}, {"id": "p2"}]}
node = {"type": "flow.loop", "id": "loop1",
"parameters": {"items": "direct", "level": "auto"}}
ctx = {"nodeOutputs": {"loop1": udm, "direct": udm}, "connectionMap": {}, "inputSources": {"loop1": {0: ("direct", 0)}}}
from unittest.mock import patch
with patch("modules.workflows.automation2.graphUtils.resolveParameterReferences", return_value=udm):
result = await ex.execute(node, ctx)
assert result["count"] == 2
@pytest.mark.asyncio
class TestDataFilterUdm:
async def test_filter_by_udm_content_type(self):
from modules.workflows.automation2.executors.dataExecutor import DataExecutor
ex = DataExecutor()
udmData = {
"id": "d1", "role": "document",
"children": [
{"id": "p1", "role": "page", "children": [
{"id": "c1", "contentType": "text", "raw": "hello"},
{"id": "c2", "contentType": "image", "raw": "base64..."},
]},
]
}
node = {"type": "data.filter", "id": "f1",
"parameters": {"condition": "", "udmContentType": "image"}}
ctx = {"nodeOutputs": {"src": udmData}, "inputSources": {"f1": {0: ("src", 0)}}}
result = await ex.execute(node, ctx)
inner = result.get("data") if isinstance(result, dict) and result.get("_transit") else result
assert inner is not None