gateway/tests/unit/rbac/test_sysadmin_migration.py

209 lines
7.5 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Unit tests for the one-shot sysadmin role -> isPlatformAdmin migration.
Covers acceptance criteria from
``wiki/c-work/4-done/2026-04-sysadmin-authority-split.md``:
- AC#4 -> Existing sysadmin role-holders are promoted to ``isPlatformAdmin=True``
and the legacy role is removed (Role + UserMandateRole + AccessRules)
when the gateway boots.
- AC#10 -> The migration is idempotent and removes ALL artefacts (Role,
AccessRules, UserMandateRole) of the legacy ``sysadmin`` role.
Strategy: use an in-memory fake ``DatabaseConnector`` that records calls
and returns deterministic recordsets for ``Role``/``UserMandateRole``/
``UserMandate``/``UserInDB``/``AccessRule`` lookups.
"""
from __future__ import annotations
from typing import Any, Dict, List
from unittest.mock import Mock
from modules.interfaces.interfaceBootstrap import _migrateAndDropSysAdminRole
from modules.datamodels.datamodelMembership import UserMandate, UserMandateRole
from modules.datamodels.datamodelRbac import AccessRule, Role
from modules.datamodels.datamodelUam import UserInDB
_ROOT_MANDATE_ID = "root-mandate-id"
_SYSADMIN_ROLE_ID = "sysadmin-role-id"
_USER_MANDATE_ID = "user-mandate-id"
_USER_ID = "legacy-user-id"
_UMR_ROW_ID = "umr-row-id"
_ACCESS_RULE_ID = "access-rule-id"
def _buildFakeDb(
*,
sysadminRoles: List[Dict[str, Any]],
umRoleRows: List[Dict[str, Any]],
userMandateRows: List[Dict[str, Any]],
users: List[Dict[str, Any]],
accessRules: List[Dict[str, Any]],
) -> Mock:
"""Build a fake ``DatabaseConnector`` that maps model -> recordset."""
deletes: List[tuple] = []
modifies: List[tuple] = []
def _getRecordset(model, recordFilter=None, **_): # noqa: ANN001
recordFilter = recordFilter or {}
if model is Role:
label = recordFilter.get("roleLabel")
mandateId = recordFilter.get("mandateId")
if label == "sysadmin" and mandateId == _ROOT_MANDATE_ID:
return list(sysadminRoles)
return []
if model is UserMandateRole:
wanted = recordFilter.get("roleId")
return [r for r in umRoleRows if r.get("roleId") == wanted]
if model is UserMandate:
wanted = recordFilter.get("id")
return [r for r in userMandateRows if r.get("id") == wanted]
if model is UserInDB:
wanted = recordFilter.get("id")
return [r for r in users if r.get("id") == wanted]
if model is AccessRule:
wanted = recordFilter.get("roleId")
return [r for r in accessRules if r.get("roleId") == wanted]
return []
def _recordModify(model, recordId, payload): # noqa: ANN001
modifies.append((model, recordId, payload))
# Reflect the change so a subsequent migration call is idempotent.
if model is UserInDB:
for u in users:
if u.get("id") == recordId:
u.update(payload)
return True
def _recordDelete(model, recordId): # noqa: ANN001
deletes.append((model, recordId))
if model is UserMandateRole:
umRoleRows[:] = [r for r in umRoleRows if r.get("id") != recordId]
elif model is AccessRule:
accessRules[:] = [r for r in accessRules if r.get("id") != recordId]
elif model is Role:
sysadminRoles[:] = [r for r in sysadminRoles if r.get("id") != recordId]
return True
db = Mock()
db.getRecordset = Mock(side_effect=_getRecordset)
db.recordModify = Mock(side_effect=_recordModify)
db.recordDelete = Mock(side_effect=_recordDelete)
db._modifies = modifies # exposed for assertions
db._deletes = deletes
return db
def _seed():
return {
"sysadminRoles": [{"id": _SYSADMIN_ROLE_ID, "roleLabel": "sysadmin",
"mandateId": _ROOT_MANDATE_ID}],
"umRoleRows": [{"id": _UMR_ROW_ID, "roleId": _SYSADMIN_ROLE_ID,
"userMandateId": _USER_MANDATE_ID}],
"userMandateRows": [{"id": _USER_MANDATE_ID, "userId": _USER_ID,
"mandateId": _ROOT_MANDATE_ID}],
"users": [{"id": _USER_ID, "username": "legacy",
"isSysAdmin": False, "isPlatformAdmin": False}],
"accessRules": [{"id": _ACCESS_RULE_ID, "roleId": _SYSADMIN_ROLE_ID}],
}
# ---------------------------------------------------------------------------
# AC #4 — promote + drop on first run
# ---------------------------------------------------------------------------
def testMigrationPromotesUserAndDropsArtefacts():
"""AC#4: legacy holder is promoted; Role+AccessRule+UMR are deleted."""
seed = _seed()
db = _buildFakeDb(**seed)
_migrateAndDropSysAdminRole(db, _ROOT_MANDATE_ID)
# User got isPlatformAdmin=True
assert seed["users"][0]["isPlatformAdmin"] is True
assert any(
m[0] is UserInDB and m[2] == {"isPlatformAdmin": True}
for m in db._modifies
), "Expected UserInDB.isPlatformAdmin promotion call"
# All three artefact tables had their rows deleted.
deletedModels = {m[0] for m in db._deletes}
assert UserMandateRole in deletedModels, "UserMandateRole row not deleted"
assert AccessRule in deletedModels, "AccessRule row not deleted"
assert Role in deletedModels, "Sysadmin Role record not deleted"
# And the seeded lists are empty after the migration.
assert seed["umRoleRows"] == []
assert seed["accessRules"] == []
assert seed["sysadminRoles"] == []
# ---------------------------------------------------------------------------
# AC #10 — idempotent: a second run is a no-op
# ---------------------------------------------------------------------------
def testMigrationIsIdempotent():
"""AC#10: a second invocation finds no sysadmin role and exits silently."""
seed = _seed()
db = _buildFakeDb(**seed)
_migrateAndDropSysAdminRole(db, _ROOT_MANDATE_ID)
firstModifies = list(db._modifies)
firstDeletes = list(db._deletes)
_migrateAndDropSysAdminRole(db, _ROOT_MANDATE_ID)
# No additional writes on the second call.
assert db._modifies == firstModifies, (
"Second migration call must not perform additional writes"
)
assert db._deletes == firstDeletes, (
"Second migration call must not perform additional deletes"
)
def testMigrationSkipsAlreadyPromotedUsers():
"""If a user already has ``isPlatformAdmin=True``, no redundant write."""
seed = _seed()
seed["users"][0]["isPlatformAdmin"] = True # already promoted
db = _buildFakeDb(**seed)
_migrateAndDropSysAdminRole(db, _ROOT_MANDATE_ID)
# No promotion write for an already-promoted user.
promotionWrites = [
m for m in db._modifies
if m[0] is UserInDB and m[2].get("isPlatformAdmin") is True
]
assert promotionWrites == [], (
"Should not re-write isPlatformAdmin if user already has it"
)
# But role + access-rule cleanup still happens.
deletedModels = {m[0] for m in db._deletes}
assert Role in deletedModels
assert AccessRule in deletedModels
assert UserMandateRole in deletedModels
def testMigrationOnEmptyDbIsNoop():
"""No legacy sysadmin role at all -> no calls, no errors."""
db = _buildFakeDb(
sysadminRoles=[],
umRoleRows=[],
userMandateRows=[],
users=[],
accessRules=[],
)
_migrateAndDropSysAdminRole(db, _ROOT_MANDATE_ID)
assert db._modifies == []
assert db._deletes == []