132 lines
4.3 KiB
Python
132 lines
4.3 KiB
Python
"""Runtime-Check (A5): bestaetigt, dass die ``sysadmin``-Rolle aus der
|
|
Datenbank entfernt wurde und liefert eine kurze Inventur fuer die
|
|
isPlatformAdmin / isSysAdmin Flags.
|
|
|
|
Das Skript verwendet die bestehende ``APP_CONFIG`` (entschluesselt
|
|
``DB_PASSWORD_SECRET``) und fragt direkt via ``psycopg2`` ab, ohne den
|
|
ganzen FastAPI-Stack hochzufahren.
|
|
|
|
Aufruf::
|
|
|
|
python gateway/scripts/check_db_no_sysadmin_role.py
|
|
|
|
Exit-Code:
|
|
|
|
- 0 -> sauber (Role-Count == 0)
|
|
- 1 -> sysadmin-Rolle existiert noch (Migration unvollstaendig)
|
|
- 2 -> Verbindungsfehler (Konfiguration / DB nicht erreichbar)
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
_GATEWAY = Path(__file__).resolve().parents[1]
|
|
if str(_GATEWAY) not in sys.path:
|
|
sys.path.insert(0, str(_GATEWAY))
|
|
|
|
import psycopg2 # noqa: E402
|
|
import psycopg2.extras # noqa: E402
|
|
|
|
from modules.shared.configuration import APP_CONFIG # noqa: E402
|
|
|
|
|
|
def _connect():
|
|
host = APP_CONFIG.get("DB_HOST", "localhost")
|
|
user = APP_CONFIG.get("DB_USER")
|
|
password = APP_CONFIG.get("DB_PASSWORD_SECRET")
|
|
port = int(APP_CONFIG.get("DB_PORT", 5432))
|
|
database = APP_CONFIG.get("DB_DATABASE", "poweron_app")
|
|
return psycopg2.connect(
|
|
host=host, port=port, dbname=database, user=user, password=password
|
|
)
|
|
|
|
|
|
def _main() -> int:
|
|
try:
|
|
conn = _connect()
|
|
except Exception as exc:
|
|
print(f"[ERR] Could not connect to database: {exc}")
|
|
return 2
|
|
|
|
try:
|
|
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
|
|
cur.execute(
|
|
'SELECT COUNT(*)::int AS n FROM "Role" WHERE "roleLabel" = %s',
|
|
("sysadmin",),
|
|
)
|
|
roleCount = cur.fetchone()["n"]
|
|
|
|
cur.execute(
|
|
'SELECT COUNT(*)::int AS n FROM "UserInDB" '
|
|
'WHERE COALESCE("isPlatformAdmin", false) = true'
|
|
)
|
|
platformAdmins = cur.fetchone()["n"]
|
|
|
|
cur.execute(
|
|
'SELECT COUNT(*)::int AS n FROM "UserInDB" '
|
|
'WHERE COALESCE("isSysAdmin", false) = true'
|
|
)
|
|
sysAdmins = cur.fetchone()["n"]
|
|
|
|
cur.execute(
|
|
'SELECT "username", "email", '
|
|
'COALESCE("isSysAdmin", false) AS "isSysAdmin", '
|
|
'COALESCE("isPlatformAdmin", false) AS "isPlatformAdmin" '
|
|
'FROM "UserInDB" '
|
|
'WHERE COALESCE("isSysAdmin", false) = true '
|
|
' OR COALESCE("isPlatformAdmin", false) = true '
|
|
'ORDER BY "username"'
|
|
)
|
|
adminUsers = cur.fetchall()
|
|
|
|
cur.execute(
|
|
'SELECT COUNT(*)::int AS n FROM "AccessRule" ar '
|
|
'JOIN "Role" r ON ar."roleId" = r."id" '
|
|
'WHERE r."roleLabel" = %s',
|
|
("sysadmin",),
|
|
)
|
|
orphanRules = cur.fetchone()["n"]
|
|
|
|
cur.execute(
|
|
'SELECT COUNT(*)::int AS n FROM "UserMandateRole" umr '
|
|
'JOIN "Role" r ON umr."roleId" = r."id" '
|
|
'WHERE r."roleLabel" = %s',
|
|
("sysadmin",),
|
|
)
|
|
orphanGrants = cur.fetchone()["n"]
|
|
finally:
|
|
conn.close()
|
|
|
|
print("=" * 64)
|
|
print("A5 - SysAdmin Migration DB Check")
|
|
print("=" * 64)
|
|
print(f"Role.roleLabel == 'sysadmin' : {roleCount}")
|
|
print(f"AccessRule(s) referencing sysadmin role : {orphanRules}")
|
|
print(f"UserMandateRole(s) granting sysadmin role : {orphanGrants}")
|
|
print(f"User.isSysAdmin = true : {sysAdmins}")
|
|
print(f"User.isPlatformAdmin = true : {platformAdmins}")
|
|
print()
|
|
print("Admin-flagged users:")
|
|
if not adminUsers:
|
|
print(" (none)")
|
|
for row in adminUsers:
|
|
flags = []
|
|
if row["isSysAdmin"]:
|
|
flags.append("isSysAdmin")
|
|
if row["isPlatformAdmin"]:
|
|
flags.append("isPlatformAdmin")
|
|
print(f" - {row['username']:<32} {row.get('email') or '':<40} {','.join(flags)}")
|
|
print("=" * 64)
|
|
|
|
if roleCount == 0 and orphanRules == 0 and orphanGrants == 0:
|
|
print("[OK] Migration verified: no sysadmin role artefacts in DB.")
|
|
return 0
|
|
print("[FAIL] Legacy sysadmin role artefacts still present in DB.")
|
|
return 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(_main())
|