# Copyright (c) 2025 Patrick Motsch # All rights reserved. """ Unit tests for the one-shot sysadmin role -> isPlatformAdmin migration. Covers acceptance criteria from ``wiki/c-work/4-done/2026-04-sysadmin-authority-split.md``: - AC#4 -> Existing sysadmin role-holders are promoted to ``isPlatformAdmin=True`` and the legacy role is removed (Role + UserMandateRole + AccessRules) when the gateway boots. - AC#10 -> The migration is idempotent and removes ALL artefacts (Role, AccessRules, UserMandateRole) of the legacy ``sysadmin`` role. Strategy: use an in-memory fake ``DatabaseConnector`` that records calls and returns deterministic recordsets for ``Role``/``UserMandateRole``/ ``UserMandate``/``UserInDB``/``AccessRule`` lookups. """ from __future__ import annotations from typing import Any, Dict, List from unittest.mock import Mock from modules.interfaces.interfaceBootstrap import _migrateAndDropSysAdminRole from modules.datamodels.datamodelMembership import UserMandate, UserMandateRole from modules.datamodels.datamodelRbac import AccessRule, Role from modules.datamodels.datamodelUam import UserInDB _ROOT_MANDATE_ID = "root-mandate-id" _SYSADMIN_ROLE_ID = "sysadmin-role-id" _USER_MANDATE_ID = "user-mandate-id" _USER_ID = "legacy-user-id" _UMR_ROW_ID = "umr-row-id" _ACCESS_RULE_ID = "access-rule-id" def _buildFakeDb( *, sysadminRoles: List[Dict[str, Any]], umRoleRows: List[Dict[str, Any]], userMandateRows: List[Dict[str, Any]], users: List[Dict[str, Any]], accessRules: List[Dict[str, Any]], ) -> Mock: """Build a fake ``DatabaseConnector`` that maps model -> recordset.""" deletes: List[tuple] = [] modifies: List[tuple] = [] def _getRecordset(model, recordFilter=None, **_): # noqa: ANN001 recordFilter = recordFilter or {} if model is Role: label = recordFilter.get("roleLabel") mandateId = recordFilter.get("mandateId") if label == "sysadmin" and mandateId == _ROOT_MANDATE_ID: return list(sysadminRoles) return [] if model is UserMandateRole: wanted = recordFilter.get("roleId") return [r for r in umRoleRows if r.get("roleId") == wanted] if model is UserMandate: wanted = recordFilter.get("id") return [r for r in userMandateRows if r.get("id") == wanted] if model is UserInDB: wanted = recordFilter.get("id") return [r for r in users if r.get("id") == wanted] if model is AccessRule: wanted = recordFilter.get("roleId") return [r for r in accessRules if r.get("roleId") == wanted] return [] def _recordModify(model, recordId, payload): # noqa: ANN001 modifies.append((model, recordId, payload)) # Reflect the change so a subsequent migration call is idempotent. if model is UserInDB: for u in users: if u.get("id") == recordId: u.update(payload) return True def _recordDelete(model, recordId): # noqa: ANN001 deletes.append((model, recordId)) if model is UserMandateRole: umRoleRows[:] = [r for r in umRoleRows if r.get("id") != recordId] elif model is AccessRule: accessRules[:] = [r for r in accessRules if r.get("id") != recordId] elif model is Role: sysadminRoles[:] = [r for r in sysadminRoles if r.get("id") != recordId] return True db = Mock() db.getRecordset = Mock(side_effect=_getRecordset) db.recordModify = Mock(side_effect=_recordModify) db.recordDelete = Mock(side_effect=_recordDelete) db._modifies = modifies # exposed for assertions db._deletes = deletes return db def _seed(): return { "sysadminRoles": [{"id": _SYSADMIN_ROLE_ID, "roleLabel": "sysadmin", "mandateId": _ROOT_MANDATE_ID}], "umRoleRows": [{"id": _UMR_ROW_ID, "roleId": _SYSADMIN_ROLE_ID, "userMandateId": _USER_MANDATE_ID}], "userMandateRows": [{"id": _USER_MANDATE_ID, "userId": _USER_ID, "mandateId": _ROOT_MANDATE_ID}], "users": [{"id": _USER_ID, "username": "legacy", "isSysAdmin": False, "isPlatformAdmin": False}], "accessRules": [{"id": _ACCESS_RULE_ID, "roleId": _SYSADMIN_ROLE_ID}], } # --------------------------------------------------------------------------- # AC #4 — promote + drop on first run # --------------------------------------------------------------------------- def testMigrationPromotesUserAndDropsArtefacts(): """AC#4: legacy holder is promoted; Role+AccessRule+UMR are deleted.""" seed = _seed() db = _buildFakeDb(**seed) _migrateAndDropSysAdminRole(db, _ROOT_MANDATE_ID) # User got isPlatformAdmin=True assert seed["users"][0]["isPlatformAdmin"] is True assert any( m[0] is UserInDB and m[2] == {"isPlatformAdmin": True} for m in db._modifies ), "Expected UserInDB.isPlatformAdmin promotion call" # All three artefact tables had their rows deleted. deletedModels = {m[0] for m in db._deletes} assert UserMandateRole in deletedModels, "UserMandateRole row not deleted" assert AccessRule in deletedModels, "AccessRule row not deleted" assert Role in deletedModels, "Sysadmin Role record not deleted" # And the seeded lists are empty after the migration. assert seed["umRoleRows"] == [] assert seed["accessRules"] == [] assert seed["sysadminRoles"] == [] # --------------------------------------------------------------------------- # AC #10 — idempotent: a second run is a no-op # --------------------------------------------------------------------------- def testMigrationIsIdempotent(): """AC#10: a second invocation finds no sysadmin role and exits silently.""" seed = _seed() db = _buildFakeDb(**seed) _migrateAndDropSysAdminRole(db, _ROOT_MANDATE_ID) firstModifies = list(db._modifies) firstDeletes = list(db._deletes) _migrateAndDropSysAdminRole(db, _ROOT_MANDATE_ID) # No additional writes on the second call. assert db._modifies == firstModifies, ( "Second migration call must not perform additional writes" ) assert db._deletes == firstDeletes, ( "Second migration call must not perform additional deletes" ) def testMigrationSkipsAlreadyPromotedUsers(): """If a user already has ``isPlatformAdmin=True``, no redundant write.""" seed = _seed() seed["users"][0]["isPlatformAdmin"] = True # already promoted db = _buildFakeDb(**seed) _migrateAndDropSysAdminRole(db, _ROOT_MANDATE_ID) # No promotion write for an already-promoted user. promotionWrites = [ m for m in db._modifies if m[0] is UserInDB and m[2].get("isPlatformAdmin") is True ] assert promotionWrites == [], ( "Should not re-write isPlatformAdmin if user already has it" ) # But role + access-rule cleanup still happens. deletedModels = {m[0] for m in db._deletes} assert Role in deletedModels assert AccessRule in deletedModels assert UserMandateRole in deletedModels def testMigrationOnEmptyDbIsNoop(): """No legacy sysadmin role at all -> no calls, no errors.""" db = _buildFakeDb( sysadminRoles=[], umRoleRows=[], userMandateRows=[], users=[], accessRules=[], ) _migrateAndDropSysAdminRole(db, _ROOT_MANDATE_ID) assert db._modifies == [] assert db._deletes == []