gateway/scripts/check_db_no_sysadmin_role.py

132 lines
4.3 KiB
Python

"""Runtime-Check (A5): bestaetigt, dass die ``sysadmin``-Rolle aus der
Datenbank entfernt wurde und liefert eine kurze Inventur fuer die
isPlatformAdmin / isSysAdmin Flags.
Das Skript verwendet die bestehende ``APP_CONFIG`` (entschluesselt
``DB_PASSWORD_SECRET``) und fragt direkt via ``psycopg2`` ab, ohne den
ganzen FastAPI-Stack hochzufahren.
Aufruf::
python gateway/scripts/check_db_no_sysadmin_role.py
Exit-Code:
- 0 -> sauber (Role-Count == 0)
- 1 -> sysadmin-Rolle existiert noch (Migration unvollstaendig)
- 2 -> Verbindungsfehler (Konfiguration / DB nicht erreichbar)
"""
from __future__ import annotations
import os
import sys
from pathlib import Path
_GATEWAY = Path(__file__).resolve().parents[1]
if str(_GATEWAY) not in sys.path:
sys.path.insert(0, str(_GATEWAY))
import psycopg2 # noqa: E402
import psycopg2.extras # noqa: E402
from modules.shared.configuration import APP_CONFIG # noqa: E402
def _connect():
host = APP_CONFIG.get("DB_HOST", "localhost")
user = APP_CONFIG.get("DB_USER")
password = APP_CONFIG.get("DB_PASSWORD_SECRET")
port = int(APP_CONFIG.get("DB_PORT", 5432))
database = APP_CONFIG.get("DB_DATABASE", "poweron_app")
return psycopg2.connect(
host=host, port=port, dbname=database, user=user, password=password
)
def _main() -> int:
try:
conn = _connect()
except Exception as exc:
print(f"[ERR] Could not connect to database: {exc}")
return 2
try:
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute(
'SELECT COUNT(*)::int AS n FROM "Role" WHERE "roleLabel" = %s',
("sysadmin",),
)
roleCount = cur.fetchone()["n"]
cur.execute(
'SELECT COUNT(*)::int AS n FROM "UserInDB" '
'WHERE COALESCE("isPlatformAdmin", false) = true'
)
platformAdmins = cur.fetchone()["n"]
cur.execute(
'SELECT COUNT(*)::int AS n FROM "UserInDB" '
'WHERE COALESCE("isSysAdmin", false) = true'
)
sysAdmins = cur.fetchone()["n"]
cur.execute(
'SELECT "username", "email", '
'COALESCE("isSysAdmin", false) AS "isSysAdmin", '
'COALESCE("isPlatformAdmin", false) AS "isPlatformAdmin" '
'FROM "UserInDB" '
'WHERE COALESCE("isSysAdmin", false) = true '
' OR COALESCE("isPlatformAdmin", false) = true '
'ORDER BY "username"'
)
adminUsers = cur.fetchall()
cur.execute(
'SELECT COUNT(*)::int AS n FROM "AccessRule" ar '
'JOIN "Role" r ON ar."roleId" = r."id" '
'WHERE r."roleLabel" = %s',
("sysadmin",),
)
orphanRules = cur.fetchone()["n"]
cur.execute(
'SELECT COUNT(*)::int AS n FROM "UserMandateRole" umr '
'JOIN "Role" r ON umr."roleId" = r."id" '
'WHERE r."roleLabel" = %s',
("sysadmin",),
)
orphanGrants = cur.fetchone()["n"]
finally:
conn.close()
print("=" * 64)
print("A5 - SysAdmin Migration DB Check")
print("=" * 64)
print(f"Role.roleLabel == 'sysadmin' : {roleCount}")
print(f"AccessRule(s) referencing sysadmin role : {orphanRules}")
print(f"UserMandateRole(s) granting sysadmin role : {orphanGrants}")
print(f"User.isSysAdmin = true : {sysAdmins}")
print(f"User.isPlatformAdmin = true : {platformAdmins}")
print()
print("Admin-flagged users:")
if not adminUsers:
print(" (none)")
for row in adminUsers:
flags = []
if row["isSysAdmin"]:
flags.append("isSysAdmin")
if row["isPlatformAdmin"]:
flags.append("isPlatformAdmin")
print(f" - {row['username']:<32} {row.get('email') or '':<40} {','.join(flags)}")
print("=" * 64)
if roleCount == 0 and orphanRules == 0 and orphanGrants == 0:
print("[OK] Migration verified: no sysadmin role artefacts in DB.")
return 0
print("[FAIL] Legacy sysadmin role artefacts still present in DB.")
return 1
if __name__ == "__main__":
sys.exit(_main())