gateway/tests/unit/workflow/test_phase5_highvol.py
2026-04-16 23:13:05 +02:00

45 lines
1.8 KiB
Python

# Tests for Phase 5: Loop concurrency, StepLog batching, streaming aggregate.
import pytest
from modules.features.graphicalEditor.nodeDefinitions import STATIC_NODE_TYPES
def test_loop_concurrency_param_default_1():
node = next(n for n in STATIC_NODE_TYPES if n["id"] == "flow.loop")
concParam = next(p for p in node["parameters"] if p["name"] == "concurrency")
assert concParam["default"] == 1
assert concParam["frontendOptions"]["min"] == 1
assert concParam["frontendOptions"]["max"] == 20
def test_executionEngine_has_batch_threshold():
"""Verify STEPLOG_BATCH_THRESHOLD and AGGREGATE_FLUSH_THRESHOLD are defined in the loop block."""
import inspect
from modules.workflows.automation2.executionEngine import executeGraph
source = inspect.getsource(executeGraph)
assert "STEPLOG_BATCH_THRESHOLD" in source
assert "AGGREGATE_FLUSH_THRESHOLD" in source
def test_executionEngine_has_loop_progress_event():
"""Verify loop_progress SSE event is emitted for batch-mode loops."""
import inspect
from modules.workflows.automation2.executionEngine import executeGraph
source = inspect.getsource(executeGraph)
assert "loop_progress" in source
def test_executionEngine_has_concurrency_semaphore():
"""Verify asyncio.Semaphore is used for concurrent loop execution."""
import inspect
from modules.workflows.automation2.executionEngine import executeGraph
source = inspect.getsource(executeGraph)
assert "Semaphore" in source
def test_executionEngine_aggregate_temp_chunks():
"""Verify streaming aggregate flush uses _aggregateTempChunks."""
import inspect
from modules.workflows.automation2.executionEngine import executeGraph
source = inspect.getsource(executeGraph)
assert "_aggregateTempChunks" in source