gateway/scripts/script_db_audit_legacy_state.py
2026-04-29 21:27:08 +02:00

382 lines
12 KiB
Python

#!/usr/bin/env python3
"""Audit-Skript fuer Legacy-Bestaende vor Bootstrap-Cleanup (Plan C).
Prueft fuer jede der 5 Bootstrap-Migrationsroutinen, ob noch Restbestand
existiert. Wenn alle Checks 0 / GREEN liefern, kann die jeweilige Routine
sicher aus ``interfaceBootstrap.py`` / ``interfaceDbKnowledge.py`` entfernt
werden.
Checks:
1. Mandate.description != NULL und Mandate.label leer
-> _migrateMandateDescriptionToLabel
2. Mandate.label leer ODER Mandate.name verstoesst gegen Slug-Regeln
-> _migrateMandateNameLabelSlugRules
3. Mandate mit name='Root' und isSystem=False
-> initRootMandate Legacy-Zweig
4. Role mit roleLabel='sysadmin' im Root-Mandat
-> _migrateAndDropSysAdminRole
5. FileContentIndex mit leerem mandateId UND leerem featureInstanceId
-> aggregateMandateRagTotalBytes Fallback-Block
Verwendung:
python -m scripts.script_db_audit_legacy_state # text-output
python -m scripts.script_db_audit_legacy_state --json # JSON-output
python -m scripts.script_db_audit_legacy_state --purge-rag-orphans
# loescht FileContentIndex-Rows ohne mandateId UND ohne featureInstanceId
# (Voraussetzung fuer Removal des aggregateMandateRagTotalBytes-Fallback)
Exit-Code:
0 alle Checks GREEN (Removal sicher)
1 mind. ein Check RED (erst Daten bereinigen)
2 Skript-Fehler (DB nicht erreichbar etc.)
Lese-Zugriffe sind die Default. Schreibzugriffe NUR mit explizitem
``--purge-*``-Flag.
"""
from __future__ import annotations
import argparse
import json
import logging
import os
import sys
from dataclasses import dataclass, field
from typing import Any, Callable, Dict, List, Optional
_gatewayDir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if _gatewayDir not in sys.path:
sys.path.insert(0, _gatewayDir)
from dotenv import load_dotenv
_envPath = os.path.join(_gatewayDir, "env_dev.env")
if os.path.exists(_envPath):
load_dotenv(_envPath)
from modules.datamodels.datamodelUam import Mandate
from modules.datamodels.datamodelRbac import Role
from modules.datamodels.datamodelKnowledge import FileContentIndex
from modules.security.rootAccess import getRootDbAppConnector
from modules.interfaces.interfaceDbKnowledge import KnowledgeObjects
from modules.shared.mandateNameUtils import isValidMandateName
logging.basicConfig(level=logging.WARNING, format="%(message)s")
logger = logging.getLogger(__name__)
@dataclass
class _CheckResult:
"""Ergebnis eines einzelnen Audit-Checks."""
name: str
routine: str
location: str
count: int
status: str
samples: List[Dict[str, Any]] = field(default_factory=list)
error: Optional[str] = None
def toDict(self) -> Dict[str, Any]:
return {
"name": self.name,
"routine": self.routine,
"location": self.location,
"count": self.count,
"status": self.status,
"samples": self.samples,
"error": self.error,
}
def _getAppDb():
return getRootDbAppConnector()
def _getKnowledgeDb():
return KnowledgeObjects().db
def _checkMandateDescription(db) -> _CheckResult:
"""Mandate.description noch vorhanden und label leer?"""
rows = db.getRecordset(Mandate)
legacy = [
{
"id": r.get("id"),
"name": r.get("name"),
"description": str(r.get("description"))[:60] if r.get("description") else None,
"label": r.get("label"),
}
for r in rows
if r.get("description") and not r.get("label")
]
return _CheckResult(
name="mandate-description-to-label",
routine="_migrateMandateDescriptionToLabel",
location="interfaces/interfaceBootstrap.py:422-445",
count=len(legacy),
status="GREEN" if not legacy else "RED",
samples=legacy[:5],
)
def _checkMandateSlugRules(db) -> _CheckResult:
"""Mandate.name verletzt Slug-Regeln ODER Mandate.label leer?"""
rows = db.getRecordset(Mandate)
legacy = []
seen: set[str] = set()
for r in sorted(rows, key=lambda x: str(x.get("id", ""))):
name = (r.get("name") or "").strip()
labelRaw = r.get("label")
labelEmpty = not (labelRaw or "").strip() if labelRaw is not None else True
nameInvalid = not isValidMandateName(name)
nameCollides = name in seen
if not nameInvalid and not nameCollides:
seen.add(name)
if labelEmpty or nameInvalid or nameCollides:
legacy.append(
{
"id": r.get("id"),
"name": name,
"label": r.get("label"),
"labelEmpty": labelEmpty,
"nameInvalid": nameInvalid,
"nameCollides": nameCollides,
}
)
return _CheckResult(
name="mandate-name-slug-rules",
routine="_migrateMandateNameLabelSlugRules",
location="interfaces/interfaceBootstrap.py:448-511",
count=len(legacy),
status="GREEN" if not legacy else "RED",
samples=legacy[:5],
)
def _checkRootMandateLegacy(db) -> _CheckResult:
"""Mandate mit name='Root' (case-sensitive) ODER isSystem=False fuer root?"""
legacyByName = db.getRecordset(Mandate, recordFilter={"name": "Root"})
rows = db.getRecordset(Mandate, recordFilter={"name": "root"})
legacyByFlag = [r for r in rows if not r.get("isSystem")]
combined = list(legacyByName) + legacyByFlag
samples = [
{
"id": r.get("id"),
"name": r.get("name"),
"isSystem": r.get("isSystem"),
}
for r in combined
]
return _CheckResult(
name="root-mandate-legacy",
routine="initRootMandate-legacy-branch",
location="interfaces/interfaceBootstrap.py:406-412",
count=len(samples),
status="GREEN" if not samples else "RED",
samples=samples[:5],
)
def _checkSysadminRole(db) -> _CheckResult:
"""Legacy 'sysadmin'-Rolle im Root-Mandat?"""
rootMandates = db.getRecordset(Mandate, recordFilter={"name": "root", "isSystem": True})
if not rootMandates:
return _CheckResult(
name="sysadmin-role",
routine="_migrateAndDropSysAdminRole",
location="interfaces/interfaceBootstrap.py:840-932",
count=0,
status="GREEN",
samples=[],
error="kein Root-Mandat gefunden -- Check uebersprungen (kann nicht relevant sein)",
)
rootId = str(rootMandates[0].get("id"))
rows = db.getRecordset(
Role,
recordFilter={"roleLabel": "sysadmin", "mandateId": rootId, "featureInstanceId": None},
)
samples = [{"id": r.get("id"), "roleLabel": r.get("roleLabel")} for r in rows]
return _CheckResult(
name="sysadmin-role",
routine="_migrateAndDropSysAdminRole",
location="interfaces/interfaceBootstrap.py:840-932",
count=len(samples),
status="GREEN" if not samples else "RED",
samples=samples[:5],
)
def _checkRagFallback(knowDb) -> _CheckResult:
"""FileContentIndex-Rows ohne mandateId UND ohne featureInstanceId?"""
rows = knowDb.getRecordset(FileContentIndex)
legacy = [
{
"id": r.get("id"),
"fileName": r.get("fileName"),
"totalSize": r.get("totalSize"),
}
for r in rows
if not (r.get("mandateId") or "").strip() and not (r.get("featureInstanceId") or "").strip()
]
return _CheckResult(
name="rag-fallback-orphan-index",
routine="aggregateMandateRagTotalBytes-fallback",
location="interfaces/interfaceDbKnowledge.py:609-635",
count=len(legacy),
status="GREEN" if not legacy else "RED",
samples=legacy[:5],
)
def _runChecks() -> List[_CheckResult]:
appDb = _getAppDb()
knowDb = _getKnowledgeDb()
appChecks: List[Callable[[Any], _CheckResult]] = [
_checkMandateDescription,
_checkMandateSlugRules,
_checkRootMandateLegacy,
_checkSysadminRole,
]
results: List[_CheckResult] = []
for fn in appChecks:
try:
results.append(fn(appDb))
except Exception as exc:
results.append(
_CheckResult(
name=fn.__name__,
routine="?",
location="?",
count=-1,
status="ERROR",
error=f"{type(exc).__name__}: {exc}",
)
)
try:
results.append(_checkRagFallback(knowDb))
except Exception as exc:
results.append(
_CheckResult(
name="rag-fallback-orphan-index",
routine="aggregateMandateRagTotalBytes-fallback",
location="interfaces/interfaceDbKnowledge.py:609-635",
count=-1,
status="ERROR",
error=f"{type(exc).__name__}: {exc}",
)
)
return results
def _printText(results: List[_CheckResult]) -> None:
print("=" * 78)
print("BOOTSTRAP-MIGRATIONS LEGACY-STATE-AUDIT")
print("=" * 78)
for r in results:
marker = {
"GREEN": "[OK]",
"RED": "[!!]",
"ERROR": "[ERR]",
}.get(r.status, "[?]")
print(f"\n{marker} {r.name}")
print(f" Routine : {r.routine}")
print(f" Location: {r.location}")
print(f" Count : {r.count}")
print(f" Status : {r.status}")
if r.error:
print(f" Note : {r.error}")
if r.samples:
print(f" Samples : (max 5)")
for s in r.samples:
print(f" {s}")
print("\n" + "=" * 78)
greens = sum(1 for r in results if r.status == "GREEN")
reds = sum(1 for r in results if r.status == "RED")
errs = sum(1 for r in results if r.status == "ERROR")
print(f"SUMMARY: {greens} GREEN {reds} RED {errs} ERROR ({len(results)} total)")
if reds == 0 and errs == 0:
print("VERDICT: alle Migrationsroutinen koennen entfernt werden.")
elif errs > 0:
print("VERDICT: Audit unvollstaendig (Fehler) -- bitte Skript fixen.")
else:
print("VERDICT: erst Daten bereinigen, dann Routinen entfernen.")
print("=" * 78)
def _purgeRagOrphans() -> int:
"""Loescht alle FileContentIndex-Rows ohne mandateId UND ohne featureInstanceId.
Returns: Anzahl geloeschter Rows.
"""
knowDb = _getKnowledgeDb()
rows = knowDb.getRecordset(FileContentIndex)
orphans = [
r for r in rows
if not (r.get("mandateId") or "").strip()
and not (r.get("featureInstanceId") or "").strip()
]
if not orphans:
print("Keine RAG-Orphans gefunden -- nichts zu purgen.")
return 0
print(f"Purge {len(orphans)} RAG-Orphan(s):")
deleted = 0
for r in orphans:
rid = r.get("id")
try:
knowDb.recordDelete(FileContentIndex, str(rid))
deleted += 1
print(f" geloescht: {rid} {r.get('fileName')}")
except Exception as exc:
print(f" FEHLER {rid}: {type(exc).__name__}: {exc}", file=sys.stderr)
print(f"Purge abgeschlossen: {deleted}/{len(orphans)} geloescht.")
return deleted
def main() -> int:
parser = argparse.ArgumentParser(
description="Audit-Skript fuer Legacy-Bestaende (Bootstrap-Cleanup Plan C)"
)
parser.add_argument("--json", action="store_true", help="JSON-Output statt Text")
parser.add_argument(
"--purge-rag-orphans",
action="store_true",
help="WRITE: loescht FileContentIndex-Rows ohne mandateId UND featureInstanceId",
)
args = parser.parse_args()
if args.purge_rag_orphans:
try:
_purgeRagOrphans()
except Exception as exc:
print(f"FATAL: Purge fehlgeschlagen -- {type(exc).__name__}: {exc}", file=sys.stderr)
return 2
print()
try:
results = _runChecks()
except Exception as exc:
print(f"FATAL: konnte Audit nicht starten -- {type(exc).__name__}: {exc}", file=sys.stderr)
return 2
if args.json:
print(json.dumps([r.toDict() for r in results], indent=2, default=str))
else:
_printText(results)
if any(r.status == "ERROR" for r in results):
return 2
if any(r.status == "RED" for r in results):
return 1
return 0
if __name__ == "__main__":
sys.exit(main())