290 lines
9.7 KiB
Python
290 lines
9.7 KiB
Python
# Copyright (c) 2025 Patrick Motsch
|
|
# All rights reserved.
|
|
"""
|
|
Integration tests for the SysAdmin / PlatformAdmin authority split.
|
|
|
|
Covers acceptance criteria from
|
|
``wiki/c-work/4-done/2026-04-sysadmin-authority-split.md``:
|
|
|
|
- AC#1 -> User with isSysAdmin only is rejected by ``requirePlatformAdmin``
|
|
- AC#2 -> User with isPlatformAdmin only is rejected by ``requireSysAdmin``
|
|
- AC#3 -> User with isPlatformAdmin is accepted by ``requirePlatformAdmin``
|
|
- AC#5 -> Live-flag check: revoking ``isPlatformAdmin`` immediately blocks
|
|
the next request (no token cache).
|
|
- AC#6 -> Live-flag check: revoking ``isSysAdmin`` immediately blocks
|
|
the next infrastructure request.
|
|
- AC#8 -> Self-protection: a user can never change their own admin flags
|
|
via ``update_user`` business logic.
|
|
|
|
Strategy: build a tiny FastAPI app that exposes one route per dependency
|
|
and override ``getCurrentUser`` per request. This isolates the gating
|
|
logic from database/JWT plumbing and runs without external services.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import pytest
|
|
from fastapi import Depends, FastAPI
|
|
from fastapi.testclient import TestClient
|
|
|
|
from modules.auth.authentication import (
|
|
getCurrentUser,
|
|
requirePlatformAdmin,
|
|
requireSysAdmin,
|
|
)
|
|
from modules.datamodels.datamodelUam import User
|
|
|
|
|
|
def _makeUser(
|
|
*,
|
|
userId: str = "test-user",
|
|
isSysAdmin: bool = False,
|
|
isPlatformAdmin: bool = False,
|
|
) -> User:
|
|
"""Build a minimal in-memory User instance for dependency overrides."""
|
|
return User(
|
|
id=userId,
|
|
username=f"user-{userId}",
|
|
email=f"{userId}@example.com",
|
|
fullName=f"Test {userId}",
|
|
enabled=True,
|
|
language="de",
|
|
isSysAdmin=isSysAdmin,
|
|
isPlatformAdmin=isPlatformAdmin,
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
def appWithDeps() -> tuple[FastAPI, dict]:
|
|
"""FastAPI app with one route per authority dependency.
|
|
|
|
The returned dict allows tests to swap the "current user" between
|
|
requests by mutating ``state['user']``.
|
|
"""
|
|
state: dict = {"user": _makeUser()}
|
|
|
|
app = FastAPI()
|
|
|
|
def _overrideCurrentUser() -> User:
|
|
return state["user"]
|
|
|
|
app.dependency_overrides[getCurrentUser] = _overrideCurrentUser
|
|
|
|
@app.get("/admin/mandates")
|
|
def _adminMandates(_: User = Depends(requirePlatformAdmin)) -> dict:
|
|
return {"ok": True, "guard": "platform"}
|
|
|
|
@app.get("/admin/logs")
|
|
def _adminLogs(_: User = Depends(requireSysAdmin)) -> dict:
|
|
return {"ok": True, "guard": "sysadmin"}
|
|
|
|
return app, state
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# AC #1, #2, #3 — basic authority gating
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def testSysAdminCannotAccessPlatformRoute(appWithDeps):
|
|
"""AC#1: isSysAdmin alone must NOT pass requirePlatformAdmin."""
|
|
app, state = appWithDeps
|
|
state["user"] = _makeUser(isSysAdmin=True, isPlatformAdmin=False)
|
|
|
|
with TestClient(app) as client:
|
|
response = client.get("/admin/mandates")
|
|
|
|
assert response.status_code == 403
|
|
assert "platform admin" in response.json()["detail"].lower()
|
|
|
|
|
|
def testPlatformAdminCannotAccessInfraRoute(appWithDeps):
|
|
"""AC#2: isPlatformAdmin alone must NOT pass requireSysAdmin."""
|
|
app, state = appWithDeps
|
|
state["user"] = _makeUser(isSysAdmin=False, isPlatformAdmin=True)
|
|
|
|
with TestClient(app) as client:
|
|
response = client.get("/admin/logs")
|
|
|
|
assert response.status_code == 403
|
|
assert "sysadmin" in response.json()["detail"].lower()
|
|
|
|
|
|
def testPlatformAdminCanAccessPlatformRoute(appWithDeps):
|
|
"""AC#3: isPlatformAdmin must pass requirePlatformAdmin."""
|
|
app, state = appWithDeps
|
|
state["user"] = _makeUser(isPlatformAdmin=True)
|
|
|
|
with TestClient(app) as client:
|
|
response = client.get("/admin/mandates")
|
|
|
|
assert response.status_code == 200
|
|
assert response.json() == {"ok": True, "guard": "platform"}
|
|
|
|
|
|
def testSysAdminCanAccessInfraRoute(appWithDeps):
|
|
"""Sanity counterpart to AC#3: isSysAdmin passes requireSysAdmin."""
|
|
app, state = appWithDeps
|
|
state["user"] = _makeUser(isSysAdmin=True)
|
|
|
|
with TestClient(app) as client:
|
|
response = client.get("/admin/logs")
|
|
|
|
assert response.status_code == 200
|
|
assert response.json() == {"ok": True, "guard": "sysadmin"}
|
|
|
|
|
|
def testNoFlagsIsForbiddenForBothGuards(appWithDeps):
|
|
"""Regular user (no flags) must be rejected by both guards."""
|
|
app, state = appWithDeps
|
|
state["user"] = _makeUser()
|
|
|
|
with TestClient(app) as client:
|
|
rPlatform = client.get("/admin/mandates")
|
|
rInfra = client.get("/admin/logs")
|
|
|
|
assert rPlatform.status_code == 403
|
|
assert rInfra.status_code == 403
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# AC #5, #6 — live flag check (no client-side cache, next request re-evaluates)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def testRevokingPlatformAdminBlocksNextRequest(appWithDeps):
|
|
"""AC#5: After dropping isPlatformAdmin, the very next request gets 403."""
|
|
app, state = appWithDeps
|
|
state["user"] = _makeUser(isPlatformAdmin=True)
|
|
|
|
with TestClient(app) as client:
|
|
first = client.get("/admin/mandates")
|
|
assert first.status_code == 200
|
|
|
|
# Admin removes the flag (e.g. via /api/users/{id})
|
|
state["user"] = _makeUser(isPlatformAdmin=False)
|
|
|
|
second = client.get("/admin/mandates")
|
|
assert second.status_code == 403
|
|
|
|
|
|
def testRevokingSysAdminBlocksNextRequest(appWithDeps):
|
|
"""AC#6: After dropping isSysAdmin, the very next request gets 403."""
|
|
app, state = appWithDeps
|
|
state["user"] = _makeUser(isSysAdmin=True)
|
|
|
|
with TestClient(app) as client:
|
|
first = client.get("/admin/logs")
|
|
assert first.status_code == 200
|
|
|
|
state["user"] = _makeUser(isSysAdmin=False)
|
|
|
|
second = client.get("/admin/logs")
|
|
assert second.status_code == 403
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# AC #8 — self-protection on update_user
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def testSelfProtectionOnUpdateUserDisallowsAdminFlagChange():
|
|
"""AC#8: A platform admin updating themselves cannot change admin flags.
|
|
|
|
Mirrors the gating logic in ``routeDataUsers.update_user``:
|
|
|
|
callerIsPlatformAdmin = context.isPlatformAdmin
|
|
allowAdminFlagChange = callerIsPlatformAdmin and not isSelfUpdate
|
|
|
|
When ``isSelfUpdate`` is True the flag must always be ``False``,
|
|
regardless of the caller's authority.
|
|
"""
|
|
callerId = "user-1"
|
|
|
|
def _allowAdminFlagChange(callerIsPlatformAdmin: bool, isSelfUpdate: bool) -> bool:
|
|
return callerIsPlatformAdmin and not isSelfUpdate
|
|
|
|
# Self-update by a platform admin: still NOT allowed to flip own flags.
|
|
assert _allowAdminFlagChange(True, isSelfUpdate=(callerId == callerId)) is False
|
|
|
|
# Foreign-update by a platform admin: allowed.
|
|
assert _allowAdminFlagChange(True, isSelfUpdate=(callerId == "user-2")) is True
|
|
|
|
# Foreign-update by a non-platform admin: rejected.
|
|
assert _allowAdminFlagChange(False, isSelfUpdate=(callerId == "user-2")) is False
|
|
|
|
|
|
def testInterfaceUpdateUserProtectsAdminFlagsWhenForbidden():
|
|
"""``interfaceDbApp.AppObjects.updateUser`` must keep the existing
|
|
``isSysAdmin``/``isPlatformAdmin`` values when ``allowAdminFlagChange``
|
|
is False — even if the request payload tries to escalate them.
|
|
|
|
This is the second line of defence behind ``update_user``'s
|
|
``isSelfUpdate`` check.
|
|
"""
|
|
from unittest.mock import Mock
|
|
|
|
from modules.interfaces.interfaceDbApp import AppObjects
|
|
|
|
existing = User(
|
|
id="victim",
|
|
username="victim",
|
|
email="victim@example.com",
|
|
fullName="Victim",
|
|
enabled=True,
|
|
language="de",
|
|
isSysAdmin=False,
|
|
isPlatformAdmin=False,
|
|
)
|
|
|
|
# Attacker payload tries to escalate both flags.
|
|
attackerPayload = User(
|
|
id="victim",
|
|
username="victim",
|
|
email="victim@example.com",
|
|
fullName="Victim",
|
|
enabled=True,
|
|
language="de",
|
|
isSysAdmin=True,
|
|
isPlatformAdmin=True,
|
|
)
|
|
|
|
captured: dict = {}
|
|
|
|
def _captureUpdate(_model, _recordId, payload):
|
|
# Whether dict or User: extract flag values for assertion.
|
|
if hasattr(payload, "model_dump"):
|
|
data = payload.model_dump()
|
|
elif isinstance(payload, dict):
|
|
data = payload
|
|
else:
|
|
data = {"isSysAdmin": getattr(payload, "isSysAdmin", None),
|
|
"isPlatformAdmin": getattr(payload, "isPlatformAdmin", None)}
|
|
captured["isSysAdmin"] = data.get("isSysAdmin")
|
|
captured["isPlatformAdmin"] = data.get("isPlatformAdmin")
|
|
merged = {**existing.model_dump(), **{k: v for k, v in data.items() if v is not None}}
|
|
return merged
|
|
|
|
fakeDb = Mock()
|
|
fakeDb.recordModify = Mock(side_effect=_captureUpdate)
|
|
|
|
# Build the interface without going through __init__ (avoids real DB).
|
|
interface = AppObjects.__new__(AppObjects)
|
|
interface.currentUser = existing
|
|
interface.userId = existing.id
|
|
interface.mandateId = None
|
|
interface.featureInstanceId = None
|
|
interface.db = fakeDb
|
|
interface.rbac = Mock(checkRbacPermission=Mock(return_value=True))
|
|
interface.getUser = Mock(return_value=existing)
|
|
|
|
interface.updateUser("victim", attackerPayload, allowAdminFlagChange=False)
|
|
|
|
assert captured.get("isSysAdmin") is False, (
|
|
"isSysAdmin must remain False when allowAdminFlagChange=False, "
|
|
f"got {captured.get('isSysAdmin')!r}"
|
|
)
|
|
assert captured.get("isPlatformAdmin") is False, (
|
|
"isPlatformAdmin must remain False when allowAdminFlagChange=False, "
|
|
f"got {captured.get('isPlatformAdmin')!r}"
|
|
)
|