gateway/tests/integration/rbac/test_platform_admin_flag.py

290 lines
9.7 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Integration tests for the SysAdmin / PlatformAdmin authority split.
Covers acceptance criteria from
``wiki/c-work/4-done/2026-04-sysadmin-authority-split.md``:
- AC#1 -> User with isSysAdmin only is rejected by ``requirePlatformAdmin``
- AC#2 -> User with isPlatformAdmin only is rejected by ``requireSysAdmin``
- AC#3 -> User with isPlatformAdmin is accepted by ``requirePlatformAdmin``
- AC#5 -> Live-flag check: revoking ``isPlatformAdmin`` immediately blocks
the next request (no token cache).
- AC#6 -> Live-flag check: revoking ``isSysAdmin`` immediately blocks
the next infrastructure request.
- AC#8 -> Self-protection: a user can never change their own admin flags
via ``update_user`` business logic.
Strategy: build a tiny FastAPI app that exposes one route per dependency
and override ``getCurrentUser`` per request. This isolates the gating
logic from database/JWT plumbing and runs without external services.
"""
from __future__ import annotations
import pytest
from fastapi import Depends, FastAPI
from fastapi.testclient import TestClient
from modules.auth.authentication import (
getCurrentUser,
requirePlatformAdmin,
requireSysAdmin,
)
from modules.datamodels.datamodelUam import User
def _makeUser(
*,
userId: str = "test-user",
isSysAdmin: bool = False,
isPlatformAdmin: bool = False,
) -> User:
"""Build a minimal in-memory User instance for dependency overrides."""
return User(
id=userId,
username=f"user-{userId}",
email=f"{userId}@example.com",
fullName=f"Test {userId}",
enabled=True,
language="de",
isSysAdmin=isSysAdmin,
isPlatformAdmin=isPlatformAdmin,
)
@pytest.fixture
def appWithDeps() -> tuple[FastAPI, dict]:
"""FastAPI app with one route per authority dependency.
The returned dict allows tests to swap the "current user" between
requests by mutating ``state['user']``.
"""
state: dict = {"user": _makeUser()}
app = FastAPI()
def _overrideCurrentUser() -> User:
return state["user"]
app.dependency_overrides[getCurrentUser] = _overrideCurrentUser
@app.get("/admin/mandates")
def _adminMandates(_: User = Depends(requirePlatformAdmin)) -> dict:
return {"ok": True, "guard": "platform"}
@app.get("/admin/logs")
def _adminLogs(_: User = Depends(requireSysAdmin)) -> dict:
return {"ok": True, "guard": "sysadmin"}
return app, state
# ---------------------------------------------------------------------------
# AC #1, #2, #3 — basic authority gating
# ---------------------------------------------------------------------------
def testSysAdminCannotAccessPlatformRoute(appWithDeps):
"""AC#1: isSysAdmin alone must NOT pass requirePlatformAdmin."""
app, state = appWithDeps
state["user"] = _makeUser(isSysAdmin=True, isPlatformAdmin=False)
with TestClient(app) as client:
response = client.get("/admin/mandates")
assert response.status_code == 403
assert "platform admin" in response.json()["detail"].lower()
def testPlatformAdminCannotAccessInfraRoute(appWithDeps):
"""AC#2: isPlatformAdmin alone must NOT pass requireSysAdmin."""
app, state = appWithDeps
state["user"] = _makeUser(isSysAdmin=False, isPlatformAdmin=True)
with TestClient(app) as client:
response = client.get("/admin/logs")
assert response.status_code == 403
assert "sysadmin" in response.json()["detail"].lower()
def testPlatformAdminCanAccessPlatformRoute(appWithDeps):
"""AC#3: isPlatformAdmin must pass requirePlatformAdmin."""
app, state = appWithDeps
state["user"] = _makeUser(isPlatformAdmin=True)
with TestClient(app) as client:
response = client.get("/admin/mandates")
assert response.status_code == 200
assert response.json() == {"ok": True, "guard": "platform"}
def testSysAdminCanAccessInfraRoute(appWithDeps):
"""Sanity counterpart to AC#3: isSysAdmin passes requireSysAdmin."""
app, state = appWithDeps
state["user"] = _makeUser(isSysAdmin=True)
with TestClient(app) as client:
response = client.get("/admin/logs")
assert response.status_code == 200
assert response.json() == {"ok": True, "guard": "sysadmin"}
def testNoFlagsIsForbiddenForBothGuards(appWithDeps):
"""Regular user (no flags) must be rejected by both guards."""
app, state = appWithDeps
state["user"] = _makeUser()
with TestClient(app) as client:
rPlatform = client.get("/admin/mandates")
rInfra = client.get("/admin/logs")
assert rPlatform.status_code == 403
assert rInfra.status_code == 403
# ---------------------------------------------------------------------------
# AC #5, #6 — live flag check (no client-side cache, next request re-evaluates)
# ---------------------------------------------------------------------------
def testRevokingPlatformAdminBlocksNextRequest(appWithDeps):
"""AC#5: After dropping isPlatformAdmin, the very next request gets 403."""
app, state = appWithDeps
state["user"] = _makeUser(isPlatformAdmin=True)
with TestClient(app) as client:
first = client.get("/admin/mandates")
assert first.status_code == 200
# Admin removes the flag (e.g. via /api/users/{id})
state["user"] = _makeUser(isPlatformAdmin=False)
second = client.get("/admin/mandates")
assert second.status_code == 403
def testRevokingSysAdminBlocksNextRequest(appWithDeps):
"""AC#6: After dropping isSysAdmin, the very next request gets 403."""
app, state = appWithDeps
state["user"] = _makeUser(isSysAdmin=True)
with TestClient(app) as client:
first = client.get("/admin/logs")
assert first.status_code == 200
state["user"] = _makeUser(isSysAdmin=False)
second = client.get("/admin/logs")
assert second.status_code == 403
# ---------------------------------------------------------------------------
# AC #8 — self-protection on update_user
# ---------------------------------------------------------------------------
def testSelfProtectionOnUpdateUserDisallowsAdminFlagChange():
"""AC#8: A platform admin updating themselves cannot change admin flags.
Mirrors the gating logic in ``routeDataUsers.update_user``:
callerIsPlatformAdmin = context.isPlatformAdmin
allowAdminFlagChange = callerIsPlatformAdmin and not isSelfUpdate
When ``isSelfUpdate`` is True the flag must always be ``False``,
regardless of the caller's authority.
"""
callerId = "user-1"
def _allowAdminFlagChange(callerIsPlatformAdmin: bool, isSelfUpdate: bool) -> bool:
return callerIsPlatformAdmin and not isSelfUpdate
# Self-update by a platform admin: still NOT allowed to flip own flags.
assert _allowAdminFlagChange(True, isSelfUpdate=(callerId == callerId)) is False
# Foreign-update by a platform admin: allowed.
assert _allowAdminFlagChange(True, isSelfUpdate=(callerId == "user-2")) is True
# Foreign-update by a non-platform admin: rejected.
assert _allowAdminFlagChange(False, isSelfUpdate=(callerId == "user-2")) is False
def testInterfaceUpdateUserProtectsAdminFlagsWhenForbidden():
"""``interfaceDbApp.AppObjects.updateUser`` must keep the existing
``isSysAdmin``/``isPlatformAdmin`` values when ``allowAdminFlagChange``
is False — even if the request payload tries to escalate them.
This is the second line of defence behind ``update_user``'s
``isSelfUpdate`` check.
"""
from unittest.mock import Mock
from modules.interfaces.interfaceDbApp import AppObjects
existing = User(
id="victim",
username="victim",
email="victim@example.com",
fullName="Victim",
enabled=True,
language="de",
isSysAdmin=False,
isPlatformAdmin=False,
)
# Attacker payload tries to escalate both flags.
attackerPayload = User(
id="victim",
username="victim",
email="victim@example.com",
fullName="Victim",
enabled=True,
language="de",
isSysAdmin=True,
isPlatformAdmin=True,
)
captured: dict = {}
def _captureUpdate(_model, _recordId, payload):
# Whether dict or User: extract flag values for assertion.
if hasattr(payload, "model_dump"):
data = payload.model_dump()
elif isinstance(payload, dict):
data = payload
else:
data = {"isSysAdmin": getattr(payload, "isSysAdmin", None),
"isPlatformAdmin": getattr(payload, "isPlatformAdmin", None)}
captured["isSysAdmin"] = data.get("isSysAdmin")
captured["isPlatformAdmin"] = data.get("isPlatformAdmin")
merged = {**existing.model_dump(), **{k: v for k, v in data.items() if v is not None}}
return merged
fakeDb = Mock()
fakeDb.recordModify = Mock(side_effect=_captureUpdate)
# Build the interface without going through __init__ (avoids real DB).
interface = AppObjects.__new__(AppObjects)
interface.currentUser = existing
interface.userId = existing.id
interface.mandateId = None
interface.featureInstanceId = None
interface.db = fakeDb
interface.rbac = Mock(checkRbacPermission=Mock(return_value=True))
interface.getUser = Mock(return_value=existing)
interface.updateUser("victim", attackerPayload, allowAdminFlagChange=False)
assert captured.get("isSysAdmin") is False, (
"isSysAdmin must remain False when allowAdminFlagChange=False, "
f"got {captured.get('isSysAdmin')!r}"
)
assert captured.get("isPlatformAdmin") is False, (
"isPlatformAdmin must remain False when allowAdminFlagChange=False, "
f"got {captured.get('isPlatformAdmin')!r}"
)