# Copyright (c) 2026 Patrick Motsch # All rights reserved. """Unit tests for the pure aggregation in ``serviceRedmineStats._aggregate``. These tests run the whole bucket / KPI / aging logic against the static fixture, with no I/O and no service / connector / DB. """ from __future__ import annotations import datetime as _dt from modules.features.redmine.serviceRedmineStats import ( _aggregate, _backlogAging, _bucketKey, _kpis, _relationDistribution, _statusByTracker, _throughput, _topAssignees, ) from tests.fixtures.loadRedmineSnapshot import loadSnapshot class TestKpis: def test_kpisCountTotalsCorrectly(self) -> None: schema, tickets = loadSnapshot() kpis = _kpis(tickets, schema.rootTrackerId, periodFrom=None, periodTo=None) assert kpis.total == 6 assert kpis.open == 4 assert kpis.closed == 2 assert kpis.orphans == 2 def test_periodFiltersClosedAndCreated(self) -> None: schema, tickets = loadSnapshot() period_from = _dt.datetime(2026, 4, 1) period_to = _dt.datetime(2026, 4, 30) kpis = _kpis(tickets, schema.rootTrackerId, period_from, period_to) assert kpis.closedInPeriod == 2 # 3001 + 4001 closed in April assert kpis.createdInPeriod == 0 # nothing was created in April class TestStatusByTracker: def test_buildsOneEntryPerTracker(self) -> None: schema, tickets = loadSnapshot() rows = _statusByTracker(tickets, schema) names = {r.trackerName for r in rows} assert names == {"Userstory", "Feature", "Acc.Crit", "Bug", "Task"} task_row = next(r for r in rows if r.trackerName == "Task") assert task_row.total == 2 assert sum(task_row.countsByStatus.values()) == 2 class TestThroughput: def test_bucketByMonthCountsClosed(self) -> None: _schema, tickets = loadSnapshot() period_from = _dt.datetime(2026, 4, 1) period_to = _dt.datetime(2026, 4, 30) out = _throughput(tickets, period_from, period_to, "month") keys = [b.bucketKey for b in out] assert "2026-04" in keys april = next(b for b in out if b.bucketKey == "2026-04") assert april.closed == 2 assert april.created == 0 def test_bucketByWeekIsoFormat(self) -> None: when = _dt.datetime(2026, 4, 15) key = _bucketKey(when, "week") assert key.startswith("2026-W") class TestTopAssignees: def test_excludesClosedTickets(self) -> None: _schema, tickets = loadSnapshot() rows = _topAssignees(tickets, limit=10) names = {r.name for r in rows} # Anna has 1 open (1001), Bruno has 1 open (2001), unassigned has 1 (5001). assert "Anna Beispiel" in names assert "Bruno Test" in names assert "(nicht zugewiesen)" in names class TestRelationDistribution: def test_dedupesByRelationId(self) -> None: _schema, tickets = loadSnapshot() rows = _relationDistribution(tickets) types = {r.relationType for r in rows} assert "relates" in types assert "blocks" in types for r in rows: assert r.count >= 1 class TestBacklogAging: def test_oldOrphansLandInOlderBuckets(self) -> None: _schema, tickets = loadSnapshot() now = _dt.datetime(2026, 5, 1) buckets = _backlogAging(tickets, now=now) gt180 = next(b for b in buckets if b.bucketKey == "gt180") assert gt180.count >= 1 class TestAggregateEndToEnd: def test_aggregateProducesAllSections(self) -> None: schema, tickets = loadSnapshot() dto = _aggregate( tickets, schema=schema, rootTrackerId=schema.rootTrackerId, dateFrom="2026-04-01", dateTo="2026-04-30", bucket="month", trackerIdsFilter=[], categoryIdsFilter=[], statusFilter="", instanceId="test-instance", ) assert dto.instanceId == "test-instance" assert dto.kpis.total == 6 assert dto.kpis.orphans == 2 assert len(dto.statusByTracker) == 5 assert any(b.bucketKey == "2026-04" for b in dto.throughput) assert dto.backlogAging[-1].bucketKey == "gt180"