209 lines
7.5 KiB
Python
209 lines
7.5 KiB
Python
# Copyright (c) 2025 Patrick Motsch
|
|
# All rights reserved.
|
|
"""
|
|
Unit tests for the one-shot sysadmin role -> isPlatformAdmin migration.
|
|
|
|
Covers acceptance criteria from
|
|
``wiki/c-work/4-done/2026-04-sysadmin-authority-split.md``:
|
|
|
|
- AC#4 -> Existing sysadmin role-holders are promoted to ``isPlatformAdmin=True``
|
|
and the legacy role is removed (Role + UserMandateRole + AccessRules)
|
|
when the gateway boots.
|
|
- AC#10 -> The migration is idempotent and removes ALL artefacts (Role,
|
|
AccessRules, UserMandateRole) of the legacy ``sysadmin`` role.
|
|
|
|
Strategy: use an in-memory fake ``DatabaseConnector`` that records calls
|
|
and returns deterministic recordsets for ``Role``/``UserMandateRole``/
|
|
``UserMandate``/``UserInDB``/``AccessRule`` lookups.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from typing import Any, Dict, List
|
|
from unittest.mock import Mock
|
|
|
|
from modules.interfaces.interfaceBootstrap import _migrateAndDropSysAdminRole
|
|
from modules.datamodels.datamodelMembership import UserMandate, UserMandateRole
|
|
from modules.datamodels.datamodelRbac import AccessRule, Role
|
|
from modules.datamodels.datamodelUam import UserInDB
|
|
|
|
|
|
_ROOT_MANDATE_ID = "root-mandate-id"
|
|
_SYSADMIN_ROLE_ID = "sysadmin-role-id"
|
|
_USER_MANDATE_ID = "user-mandate-id"
|
|
_USER_ID = "legacy-user-id"
|
|
_UMR_ROW_ID = "umr-row-id"
|
|
_ACCESS_RULE_ID = "access-rule-id"
|
|
|
|
|
|
def _buildFakeDb(
|
|
*,
|
|
sysadminRoles: List[Dict[str, Any]],
|
|
umRoleRows: List[Dict[str, Any]],
|
|
userMandateRows: List[Dict[str, Any]],
|
|
users: List[Dict[str, Any]],
|
|
accessRules: List[Dict[str, Any]],
|
|
) -> Mock:
|
|
"""Build a fake ``DatabaseConnector`` that maps model -> recordset."""
|
|
|
|
deletes: List[tuple] = []
|
|
modifies: List[tuple] = []
|
|
|
|
def _getRecordset(model, recordFilter=None, **_): # noqa: ANN001
|
|
recordFilter = recordFilter or {}
|
|
if model is Role:
|
|
label = recordFilter.get("roleLabel")
|
|
mandateId = recordFilter.get("mandateId")
|
|
if label == "sysadmin" and mandateId == _ROOT_MANDATE_ID:
|
|
return list(sysadminRoles)
|
|
return []
|
|
if model is UserMandateRole:
|
|
wanted = recordFilter.get("roleId")
|
|
return [r for r in umRoleRows if r.get("roleId") == wanted]
|
|
if model is UserMandate:
|
|
wanted = recordFilter.get("id")
|
|
return [r for r in userMandateRows if r.get("id") == wanted]
|
|
if model is UserInDB:
|
|
wanted = recordFilter.get("id")
|
|
return [r for r in users if r.get("id") == wanted]
|
|
if model is AccessRule:
|
|
wanted = recordFilter.get("roleId")
|
|
return [r for r in accessRules if r.get("roleId") == wanted]
|
|
return []
|
|
|
|
def _recordModify(model, recordId, payload): # noqa: ANN001
|
|
modifies.append((model, recordId, payload))
|
|
# Reflect the change so a subsequent migration call is idempotent.
|
|
if model is UserInDB:
|
|
for u in users:
|
|
if u.get("id") == recordId:
|
|
u.update(payload)
|
|
return True
|
|
|
|
def _recordDelete(model, recordId): # noqa: ANN001
|
|
deletes.append((model, recordId))
|
|
if model is UserMandateRole:
|
|
umRoleRows[:] = [r for r in umRoleRows if r.get("id") != recordId]
|
|
elif model is AccessRule:
|
|
accessRules[:] = [r for r in accessRules if r.get("id") != recordId]
|
|
elif model is Role:
|
|
sysadminRoles[:] = [r for r in sysadminRoles if r.get("id") != recordId]
|
|
return True
|
|
|
|
db = Mock()
|
|
db.getRecordset = Mock(side_effect=_getRecordset)
|
|
db.recordModify = Mock(side_effect=_recordModify)
|
|
db.recordDelete = Mock(side_effect=_recordDelete)
|
|
db._modifies = modifies # exposed for assertions
|
|
db._deletes = deletes
|
|
return db
|
|
|
|
|
|
def _seed():
|
|
return {
|
|
"sysadminRoles": [{"id": _SYSADMIN_ROLE_ID, "roleLabel": "sysadmin",
|
|
"mandateId": _ROOT_MANDATE_ID}],
|
|
"umRoleRows": [{"id": _UMR_ROW_ID, "roleId": _SYSADMIN_ROLE_ID,
|
|
"userMandateId": _USER_MANDATE_ID}],
|
|
"userMandateRows": [{"id": _USER_MANDATE_ID, "userId": _USER_ID,
|
|
"mandateId": _ROOT_MANDATE_ID}],
|
|
"users": [{"id": _USER_ID, "username": "legacy",
|
|
"isSysAdmin": False, "isPlatformAdmin": False}],
|
|
"accessRules": [{"id": _ACCESS_RULE_ID, "roleId": _SYSADMIN_ROLE_ID}],
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# AC #4 — promote + drop on first run
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def testMigrationPromotesUserAndDropsArtefacts():
|
|
"""AC#4: legacy holder is promoted; Role+AccessRule+UMR are deleted."""
|
|
seed = _seed()
|
|
db = _buildFakeDb(**seed)
|
|
|
|
_migrateAndDropSysAdminRole(db, _ROOT_MANDATE_ID)
|
|
|
|
# User got isPlatformAdmin=True
|
|
assert seed["users"][0]["isPlatformAdmin"] is True
|
|
assert any(
|
|
m[0] is UserInDB and m[2] == {"isPlatformAdmin": True}
|
|
for m in db._modifies
|
|
), "Expected UserInDB.isPlatformAdmin promotion call"
|
|
|
|
# All three artefact tables had their rows deleted.
|
|
deletedModels = {m[0] for m in db._deletes}
|
|
assert UserMandateRole in deletedModels, "UserMandateRole row not deleted"
|
|
assert AccessRule in deletedModels, "AccessRule row not deleted"
|
|
assert Role in deletedModels, "Sysadmin Role record not deleted"
|
|
|
|
# And the seeded lists are empty after the migration.
|
|
assert seed["umRoleRows"] == []
|
|
assert seed["accessRules"] == []
|
|
assert seed["sysadminRoles"] == []
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# AC #10 — idempotent: a second run is a no-op
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def testMigrationIsIdempotent():
|
|
"""AC#10: a second invocation finds no sysadmin role and exits silently."""
|
|
seed = _seed()
|
|
db = _buildFakeDb(**seed)
|
|
|
|
_migrateAndDropSysAdminRole(db, _ROOT_MANDATE_ID)
|
|
firstModifies = list(db._modifies)
|
|
firstDeletes = list(db._deletes)
|
|
|
|
_migrateAndDropSysAdminRole(db, _ROOT_MANDATE_ID)
|
|
|
|
# No additional writes on the second call.
|
|
assert db._modifies == firstModifies, (
|
|
"Second migration call must not perform additional writes"
|
|
)
|
|
assert db._deletes == firstDeletes, (
|
|
"Second migration call must not perform additional deletes"
|
|
)
|
|
|
|
|
|
def testMigrationSkipsAlreadyPromotedUsers():
|
|
"""If a user already has ``isPlatformAdmin=True``, no redundant write."""
|
|
seed = _seed()
|
|
seed["users"][0]["isPlatformAdmin"] = True # already promoted
|
|
db = _buildFakeDb(**seed)
|
|
|
|
_migrateAndDropSysAdminRole(db, _ROOT_MANDATE_ID)
|
|
|
|
# No promotion write for an already-promoted user.
|
|
promotionWrites = [
|
|
m for m in db._modifies
|
|
if m[0] is UserInDB and m[2].get("isPlatformAdmin") is True
|
|
]
|
|
assert promotionWrites == [], (
|
|
"Should not re-write isPlatformAdmin if user already has it"
|
|
)
|
|
|
|
# But role + access-rule cleanup still happens.
|
|
deletedModels = {m[0] for m in db._deletes}
|
|
assert Role in deletedModels
|
|
assert AccessRule in deletedModels
|
|
assert UserMandateRole in deletedModels
|
|
|
|
|
|
def testMigrationOnEmptyDbIsNoop():
|
|
"""No legacy sysadmin role at all -> no calls, no errors."""
|
|
db = _buildFakeDb(
|
|
sysadminRoles=[],
|
|
umRoleRows=[],
|
|
userMandateRows=[],
|
|
users=[],
|
|
accessRules=[],
|
|
)
|
|
|
|
_migrateAndDropSysAdminRole(db, _ROOT_MANDATE_ID)
|
|
|
|
assert db._modifies == []
|
|
assert db._deletes == []
|