# Tests for Phase 5: Loop concurrency, StepLog batching, streaming aggregate. import pytest from modules.features.graphicalEditor.nodeDefinitions import STATIC_NODE_TYPES def test_loop_concurrency_param_default_1(): node = next(n for n in STATIC_NODE_TYPES if n["id"] == "flow.loop") concParam = next(p for p in node["parameters"] if p["name"] == "concurrency") assert concParam["default"] == 1 assert concParam["frontendOptions"]["min"] == 1 assert concParam["frontendOptions"]["max"] == 20 def test_executionEngine_has_batch_threshold(): """Verify STEPLOG_BATCH_THRESHOLD and AGGREGATE_FLUSH_THRESHOLD are defined in the loop block.""" import inspect from modules.workflows.automation2.executionEngine import executeGraph source = inspect.getsource(executeGraph) assert "STEPLOG_BATCH_THRESHOLD" in source assert "AGGREGATE_FLUSH_THRESHOLD" in source def test_executionEngine_has_loop_progress_event(): """Verify loop_progress SSE event is emitted for batch-mode loops.""" import inspect from modules.workflows.automation2.executionEngine import executeGraph source = inspect.getsource(executeGraph) assert "loop_progress" in source def test_executionEngine_has_concurrency_semaphore(): """Verify asyncio.Semaphore is used for concurrent loop execution.""" import inspect from modules.workflows.automation2.executionEngine import executeGraph source = inspect.getsource(executeGraph) assert "Semaphore" in source def test_executionEngine_aggregate_temp_chunks(): """Verify streaming aggregate flush uses _aggregateTempChunks.""" import inspect from modules.workflows.automation2.executionEngine import executeGraph source = inspect.getsource(executeGraph) assert "_aggregateTempChunks" in source