"""Runtime-Check (A5): bestaetigt, dass die ``sysadmin``-Rolle aus der Datenbank entfernt wurde und liefert eine kurze Inventur fuer die isPlatformAdmin / isSysAdmin Flags. Das Skript verwendet die bestehende ``APP_CONFIG`` (entschluesselt ``DB_PASSWORD_SECRET``) und fragt direkt via ``psycopg2`` ab, ohne den ganzen FastAPI-Stack hochzufahren. Aufruf:: python gateway/scripts/check_db_no_sysadmin_role.py Exit-Code: - 0 -> sauber (Role-Count == 0) - 1 -> sysadmin-Rolle existiert noch (Migration unvollstaendig) - 2 -> Verbindungsfehler (Konfiguration / DB nicht erreichbar) """ from __future__ import annotations import os import sys from pathlib import Path _GATEWAY = Path(__file__).resolve().parents[1] if str(_GATEWAY) not in sys.path: sys.path.insert(0, str(_GATEWAY)) import psycopg2 # noqa: E402 import psycopg2.extras # noqa: E402 from modules.shared.configuration import APP_CONFIG # noqa: E402 def _connect(): host = APP_CONFIG.get("DB_HOST", "localhost") user = APP_CONFIG.get("DB_USER") password = APP_CONFIG.get("DB_PASSWORD_SECRET") port = int(APP_CONFIG.get("DB_PORT", 5432)) database = APP_CONFIG.get("DB_DATABASE", "poweron_app") return psycopg2.connect( host=host, port=port, dbname=database, user=user, password=password ) def _main() -> int: try: conn = _connect() except Exception as exc: print(f"[ERR] Could not connect to database: {exc}") return 2 try: with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: cur.execute( 'SELECT COUNT(*)::int AS n FROM "Role" WHERE "roleLabel" = %s', ("sysadmin",), ) roleCount = cur.fetchone()["n"] cur.execute( 'SELECT COUNT(*)::int AS n FROM "UserInDB" ' 'WHERE COALESCE("isPlatformAdmin", false) = true' ) platformAdmins = cur.fetchone()["n"] cur.execute( 'SELECT COUNT(*)::int AS n FROM "UserInDB" ' 'WHERE COALESCE("isSysAdmin", false) = true' ) sysAdmins = cur.fetchone()["n"] cur.execute( 'SELECT "username", "email", ' 'COALESCE("isSysAdmin", false) AS "isSysAdmin", ' 'COALESCE("isPlatformAdmin", false) AS "isPlatformAdmin" ' 'FROM "UserInDB" ' 'WHERE COALESCE("isSysAdmin", false) = true ' ' OR COALESCE("isPlatformAdmin", false) = true ' 'ORDER BY "username"' ) adminUsers = cur.fetchall() cur.execute( 'SELECT COUNT(*)::int AS n FROM "AccessRule" ar ' 'JOIN "Role" r ON ar."roleId" = r."id" ' 'WHERE r."roleLabel" = %s', ("sysadmin",), ) orphanRules = cur.fetchone()["n"] cur.execute( 'SELECT COUNT(*)::int AS n FROM "UserMandateRole" umr ' 'JOIN "Role" r ON umr."roleId" = r."id" ' 'WHERE r."roleLabel" = %s', ("sysadmin",), ) orphanGrants = cur.fetchone()["n"] finally: conn.close() print("=" * 64) print("A5 - SysAdmin Migration DB Check") print("=" * 64) print(f"Role.roleLabel == 'sysadmin' : {roleCount}") print(f"AccessRule(s) referencing sysadmin role : {orphanRules}") print(f"UserMandateRole(s) granting sysadmin role : {orphanGrants}") print(f"User.isSysAdmin = true : {sysAdmins}") print(f"User.isPlatformAdmin = true : {platformAdmins}") print() print("Admin-flagged users:") if not adminUsers: print(" (none)") for row in adminUsers: flags = [] if row["isSysAdmin"]: flags.append("isSysAdmin") if row["isPlatformAdmin"]: flags.append("isPlatformAdmin") print(f" - {row['username']:<32} {row.get('email') or '':<40} {','.join(flags)}") print("=" * 64) if roleCount == 0 and orphanRules == 0 and orphanGrants == 0: print("[OK] Migration verified: no sysadmin role artefacts in DB.") return 0 print("[FAIL] Legacy sysadmin role artefacts still present in DB.") return 1 if __name__ == "__main__": sys.exit(_main())