# Copyright (c) 2025 Patrick Motsch # All rights reserved. """ Integration tests for the SysAdmin / PlatformAdmin authority split. Covers acceptance criteria from ``wiki/c-work/4-done/2026-04-sysadmin-authority-split.md``: - AC#1 -> User with isSysAdmin only is rejected by ``requirePlatformAdmin`` - AC#2 -> User with isPlatformAdmin only is rejected by ``requireSysAdmin`` - AC#3 -> User with isPlatformAdmin is accepted by ``requirePlatformAdmin`` - AC#5 -> Live-flag check: revoking ``isPlatformAdmin`` immediately blocks the next request (no token cache). - AC#6 -> Live-flag check: revoking ``isSysAdmin`` immediately blocks the next infrastructure request. - AC#8 -> Self-protection: a user can never change their own admin flags via ``update_user`` business logic. Strategy: build a tiny FastAPI app that exposes one route per dependency and override ``getCurrentUser`` per request. This isolates the gating logic from database/JWT plumbing and runs without external services. """ from __future__ import annotations import pytest from fastapi import Depends, FastAPI from fastapi.testclient import TestClient from modules.auth.authentication import ( getCurrentUser, requirePlatformAdmin, requireSysAdmin, ) from modules.datamodels.datamodelUam import User def _makeUser( *, userId: str = "test-user", isSysAdmin: bool = False, isPlatformAdmin: bool = False, ) -> User: """Build a minimal in-memory User instance for dependency overrides.""" return User( id=userId, username=f"user-{userId}", email=f"{userId}@example.com", fullName=f"Test {userId}", enabled=True, language="de", isSysAdmin=isSysAdmin, isPlatformAdmin=isPlatformAdmin, ) @pytest.fixture def appWithDeps() -> tuple[FastAPI, dict]: """FastAPI app with one route per authority dependency. The returned dict allows tests to swap the "current user" between requests by mutating ``state['user']``. """ state: dict = {"user": _makeUser()} app = FastAPI() def _overrideCurrentUser() -> User: return state["user"] app.dependency_overrides[getCurrentUser] = _overrideCurrentUser @app.get("/admin/mandates") def _adminMandates(_: User = Depends(requirePlatformAdmin)) -> dict: return {"ok": True, "guard": "platform"} @app.get("/admin/logs") def _adminLogs(_: User = Depends(requireSysAdmin)) -> dict: return {"ok": True, "guard": "sysadmin"} return app, state # --------------------------------------------------------------------------- # AC #1, #2, #3 — basic authority gating # --------------------------------------------------------------------------- def testSysAdminCannotAccessPlatformRoute(appWithDeps): """AC#1: isSysAdmin alone must NOT pass requirePlatformAdmin.""" app, state = appWithDeps state["user"] = _makeUser(isSysAdmin=True, isPlatformAdmin=False) with TestClient(app) as client: response = client.get("/admin/mandates") assert response.status_code == 403 assert "platform admin" in response.json()["detail"].lower() def testPlatformAdminCannotAccessInfraRoute(appWithDeps): """AC#2: isPlatformAdmin alone must NOT pass requireSysAdmin.""" app, state = appWithDeps state["user"] = _makeUser(isSysAdmin=False, isPlatformAdmin=True) with TestClient(app) as client: response = client.get("/admin/logs") assert response.status_code == 403 assert "sysadmin" in response.json()["detail"].lower() def testPlatformAdminCanAccessPlatformRoute(appWithDeps): """AC#3: isPlatformAdmin must pass requirePlatformAdmin.""" app, state = appWithDeps state["user"] = _makeUser(isPlatformAdmin=True) with TestClient(app) as client: response = client.get("/admin/mandates") assert response.status_code == 200 assert response.json() == {"ok": True, "guard": "platform"} def testSysAdminCanAccessInfraRoute(appWithDeps): """Sanity counterpart to AC#3: isSysAdmin passes requireSysAdmin.""" app, state = appWithDeps state["user"] = _makeUser(isSysAdmin=True) with TestClient(app) as client: response = client.get("/admin/logs") assert response.status_code == 200 assert response.json() == {"ok": True, "guard": "sysadmin"} def testNoFlagsIsForbiddenForBothGuards(appWithDeps): """Regular user (no flags) must be rejected by both guards.""" app, state = appWithDeps state["user"] = _makeUser() with TestClient(app) as client: rPlatform = client.get("/admin/mandates") rInfra = client.get("/admin/logs") assert rPlatform.status_code == 403 assert rInfra.status_code == 403 # --------------------------------------------------------------------------- # AC #5, #6 — live flag check (no client-side cache, next request re-evaluates) # --------------------------------------------------------------------------- def testRevokingPlatformAdminBlocksNextRequest(appWithDeps): """AC#5: After dropping isPlatformAdmin, the very next request gets 403.""" app, state = appWithDeps state["user"] = _makeUser(isPlatformAdmin=True) with TestClient(app) as client: first = client.get("/admin/mandates") assert first.status_code == 200 # Admin removes the flag (e.g. via /api/users/{id}) state["user"] = _makeUser(isPlatformAdmin=False) second = client.get("/admin/mandates") assert second.status_code == 403 def testRevokingSysAdminBlocksNextRequest(appWithDeps): """AC#6: After dropping isSysAdmin, the very next request gets 403.""" app, state = appWithDeps state["user"] = _makeUser(isSysAdmin=True) with TestClient(app) as client: first = client.get("/admin/logs") assert first.status_code == 200 state["user"] = _makeUser(isSysAdmin=False) second = client.get("/admin/logs") assert second.status_code == 403 # --------------------------------------------------------------------------- # AC #8 — self-protection on update_user # --------------------------------------------------------------------------- def testSelfProtectionOnUpdateUserDisallowsAdminFlagChange(): """AC#8: A platform admin updating themselves cannot change admin flags. Mirrors the gating logic in ``routeDataUsers.update_user``: callerIsPlatformAdmin = context.isPlatformAdmin allowAdminFlagChange = callerIsPlatformAdmin and not isSelfUpdate When ``isSelfUpdate`` is True the flag must always be ``False``, regardless of the caller's authority. """ callerId = "user-1" def _allowAdminFlagChange(callerIsPlatformAdmin: bool, isSelfUpdate: bool) -> bool: return callerIsPlatformAdmin and not isSelfUpdate # Self-update by a platform admin: still NOT allowed to flip own flags. assert _allowAdminFlagChange(True, isSelfUpdate=(callerId == callerId)) is False # Foreign-update by a platform admin: allowed. assert _allowAdminFlagChange(True, isSelfUpdate=(callerId == "user-2")) is True # Foreign-update by a non-platform admin: rejected. assert _allowAdminFlagChange(False, isSelfUpdate=(callerId == "user-2")) is False def testInterfaceUpdateUserProtectsAdminFlagsWhenForbidden(): """``interfaceDbApp.AppObjects.updateUser`` must keep the existing ``isSysAdmin``/``isPlatformAdmin`` values when ``allowAdminFlagChange`` is False — even if the request payload tries to escalate them. This is the second line of defence behind ``update_user``'s ``isSelfUpdate`` check. """ from unittest.mock import Mock from modules.interfaces.interfaceDbApp import AppObjects existing = User( id="victim", username="victim", email="victim@example.com", fullName="Victim", enabled=True, language="de", isSysAdmin=False, isPlatformAdmin=False, ) # Attacker payload tries to escalate both flags. attackerPayload = User( id="victim", username="victim", email="victim@example.com", fullName="Victim", enabled=True, language="de", isSysAdmin=True, isPlatformAdmin=True, ) captured: dict = {} def _captureUpdate(_model, _recordId, payload): # Whether dict or User: extract flag values for assertion. if hasattr(payload, "model_dump"): data = payload.model_dump() elif isinstance(payload, dict): data = payload else: data = {"isSysAdmin": getattr(payload, "isSysAdmin", None), "isPlatformAdmin": getattr(payload, "isPlatformAdmin", None)} captured["isSysAdmin"] = data.get("isSysAdmin") captured["isPlatformAdmin"] = data.get("isPlatformAdmin") merged = {**existing.model_dump(), **{k: v for k, v in data.items() if v is not None}} return merged fakeDb = Mock() fakeDb.recordModify = Mock(side_effect=_captureUpdate) # Build the interface without going through __init__ (avoids real DB). interface = AppObjects.__new__(AppObjects) interface.currentUser = existing interface.userId = existing.id interface.mandateId = None interface.featureInstanceId = None interface.db = fakeDb interface.rbac = Mock(checkRbacPermission=Mock(return_value=True)) interface.getUser = Mock(return_value=existing) interface.updateUser("victim", attackerPayload, allowAdminFlagChange=False) assert captured.get("isSysAdmin") is False, ( "isSysAdmin must remain False when allowAdminFlagChange=False, " f"got {captured.get('isSysAdmin')!r}" ) assert captured.get("isPlatformAdmin") is False, ( "isPlatformAdmin must remain False when allowAdminFlagChange=False, " f"got {captured.get('isPlatformAdmin')!r}" )