#!/usr/bin/env python3 """Audit-Skript fuer Legacy-Bestaende vor Bootstrap-Cleanup (Plan C). Prueft fuer jede der 5 Bootstrap-Migrationsroutinen, ob noch Restbestand existiert. Wenn alle Checks 0 / GREEN liefern, kann die jeweilige Routine sicher aus ``interfaceBootstrap.py`` / ``interfaceDbKnowledge.py`` entfernt werden. Checks: 1. Mandate.description != NULL und Mandate.label leer -> _migrateMandateDescriptionToLabel 2. Mandate.label leer ODER Mandate.name verstoesst gegen Slug-Regeln -> _migrateMandateNameLabelSlugRules 3. Mandate mit name='Root' und isSystem=False -> initRootMandate Legacy-Zweig 4. Role mit roleLabel='sysadmin' im Root-Mandat -> _migrateAndDropSysAdminRole 5. FileContentIndex mit leerem mandateId UND leerem featureInstanceId -> aggregateMandateRagTotalBytes Fallback-Block Verwendung: python -m scripts.script_db_audit_legacy_state # text-output python -m scripts.script_db_audit_legacy_state --json # JSON-output python -m scripts.script_db_audit_legacy_state --purge-rag-orphans # loescht FileContentIndex-Rows ohne mandateId UND ohne featureInstanceId # (Voraussetzung fuer Removal des aggregateMandateRagTotalBytes-Fallback) Exit-Code: 0 alle Checks GREEN (Removal sicher) 1 mind. ein Check RED (erst Daten bereinigen) 2 Skript-Fehler (DB nicht erreichbar etc.) Lese-Zugriffe sind die Default. Schreibzugriffe NUR mit explizitem ``--purge-*``-Flag. """ from __future__ import annotations import argparse import json import logging import os import sys from dataclasses import dataclass, field from typing import Any, Callable, Dict, List, Optional _gatewayDir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) if _gatewayDir not in sys.path: sys.path.insert(0, _gatewayDir) from dotenv import load_dotenv _envPath = os.path.join(_gatewayDir, "env_dev.env") if os.path.exists(_envPath): load_dotenv(_envPath) from modules.datamodels.datamodelUam import Mandate from modules.datamodels.datamodelRbac import Role from modules.datamodels.datamodelKnowledge import FileContentIndex from modules.security.rootAccess import getRootDbAppConnector from modules.interfaces.interfaceDbKnowledge import KnowledgeObjects from modules.shared.mandateNameUtils import isValidMandateName logging.basicConfig(level=logging.WARNING, format="%(message)s") logger = logging.getLogger(__name__) @dataclass class _CheckResult: """Ergebnis eines einzelnen Audit-Checks.""" name: str routine: str location: str count: int status: str samples: List[Dict[str, Any]] = field(default_factory=list) error: Optional[str] = None def toDict(self) -> Dict[str, Any]: return { "name": self.name, "routine": self.routine, "location": self.location, "count": self.count, "status": self.status, "samples": self.samples, "error": self.error, } def _getAppDb(): return getRootDbAppConnector() def _getKnowledgeDb(): return KnowledgeObjects().db def _checkMandateDescription(db) -> _CheckResult: """Mandate.description noch vorhanden und label leer?""" rows = db.getRecordset(Mandate) legacy = [ { "id": r.get("id"), "name": r.get("name"), "description": str(r.get("description"))[:60] if r.get("description") else None, "label": r.get("label"), } for r in rows if r.get("description") and not r.get("label") ] return _CheckResult( name="mandate-description-to-label", routine="_migrateMandateDescriptionToLabel", location="interfaces/interfaceBootstrap.py:422-445", count=len(legacy), status="GREEN" if not legacy else "RED", samples=legacy[:5], ) def _checkMandateSlugRules(db) -> _CheckResult: """Mandate.name verletzt Slug-Regeln ODER Mandate.label leer?""" rows = db.getRecordset(Mandate) legacy = [] seen: set[str] = set() for r in sorted(rows, key=lambda x: str(x.get("id", ""))): name = (r.get("name") or "").strip() labelRaw = r.get("label") labelEmpty = not (labelRaw or "").strip() if labelRaw is not None else True nameInvalid = not isValidMandateName(name) nameCollides = name in seen if not nameInvalid and not nameCollides: seen.add(name) if labelEmpty or nameInvalid or nameCollides: legacy.append( { "id": r.get("id"), "name": name, "label": r.get("label"), "labelEmpty": labelEmpty, "nameInvalid": nameInvalid, "nameCollides": nameCollides, } ) return _CheckResult( name="mandate-name-slug-rules", routine="_migrateMandateNameLabelSlugRules", location="interfaces/interfaceBootstrap.py:448-511", count=len(legacy), status="GREEN" if not legacy else "RED", samples=legacy[:5], ) def _checkRootMandateLegacy(db) -> _CheckResult: """Mandate mit name='Root' (case-sensitive) ODER isSystem=False fuer root?""" legacyByName = db.getRecordset(Mandate, recordFilter={"name": "Root"}) rows = db.getRecordset(Mandate, recordFilter={"name": "root"}) legacyByFlag = [r for r in rows if not r.get("isSystem")] combined = list(legacyByName) + legacyByFlag samples = [ { "id": r.get("id"), "name": r.get("name"), "isSystem": r.get("isSystem"), } for r in combined ] return _CheckResult( name="root-mandate-legacy", routine="initRootMandate-legacy-branch", location="interfaces/interfaceBootstrap.py:406-412", count=len(samples), status="GREEN" if not samples else "RED", samples=samples[:5], ) def _checkSysadminRole(db) -> _CheckResult: """Legacy 'sysadmin'-Rolle im Root-Mandat?""" rootMandates = db.getRecordset(Mandate, recordFilter={"name": "root", "isSystem": True}) if not rootMandates: return _CheckResult( name="sysadmin-role", routine="_migrateAndDropSysAdminRole", location="interfaces/interfaceBootstrap.py:840-932", count=0, status="GREEN", samples=[], error="kein Root-Mandat gefunden -- Check uebersprungen (kann nicht relevant sein)", ) rootId = str(rootMandates[0].get("id")) rows = db.getRecordset( Role, recordFilter={"roleLabel": "sysadmin", "mandateId": rootId, "featureInstanceId": None}, ) samples = [{"id": r.get("id"), "roleLabel": r.get("roleLabel")} for r in rows] return _CheckResult( name="sysadmin-role", routine="_migrateAndDropSysAdminRole", location="interfaces/interfaceBootstrap.py:840-932", count=len(samples), status="GREEN" if not samples else "RED", samples=samples[:5], ) def _checkRagFallback(knowDb) -> _CheckResult: """FileContentIndex-Rows ohne mandateId UND ohne featureInstanceId?""" rows = knowDb.getRecordset(FileContentIndex) legacy = [ { "id": r.get("id"), "fileName": r.get("fileName"), "totalSize": r.get("totalSize"), } for r in rows if not (r.get("mandateId") or "").strip() and not (r.get("featureInstanceId") or "").strip() ] return _CheckResult( name="rag-fallback-orphan-index", routine="aggregateMandateRagTotalBytes-fallback", location="interfaces/interfaceDbKnowledge.py:609-635", count=len(legacy), status="GREEN" if not legacy else "RED", samples=legacy[:5], ) def _runChecks() -> List[_CheckResult]: appDb = _getAppDb() knowDb = _getKnowledgeDb() appChecks: List[Callable[[Any], _CheckResult]] = [ _checkMandateDescription, _checkMandateSlugRules, _checkRootMandateLegacy, _checkSysadminRole, ] results: List[_CheckResult] = [] for fn in appChecks: try: results.append(fn(appDb)) except Exception as exc: results.append( _CheckResult( name=fn.__name__, routine="?", location="?", count=-1, status="ERROR", error=f"{type(exc).__name__}: {exc}", ) ) try: results.append(_checkRagFallback(knowDb)) except Exception as exc: results.append( _CheckResult( name="rag-fallback-orphan-index", routine="aggregateMandateRagTotalBytes-fallback", location="interfaces/interfaceDbKnowledge.py:609-635", count=-1, status="ERROR", error=f"{type(exc).__name__}: {exc}", ) ) return results def _printText(results: List[_CheckResult]) -> None: print("=" * 78) print("BOOTSTRAP-MIGRATIONS LEGACY-STATE-AUDIT") print("=" * 78) for r in results: marker = { "GREEN": "[OK]", "RED": "[!!]", "ERROR": "[ERR]", }.get(r.status, "[?]") print(f"\n{marker} {r.name}") print(f" Routine : {r.routine}") print(f" Location: {r.location}") print(f" Count : {r.count}") print(f" Status : {r.status}") if r.error: print(f" Note : {r.error}") if r.samples: print(f" Samples : (max 5)") for s in r.samples: print(f" {s}") print("\n" + "=" * 78) greens = sum(1 for r in results if r.status == "GREEN") reds = sum(1 for r in results if r.status == "RED") errs = sum(1 for r in results if r.status == "ERROR") print(f"SUMMARY: {greens} GREEN {reds} RED {errs} ERROR ({len(results)} total)") if reds == 0 and errs == 0: print("VERDICT: alle Migrationsroutinen koennen entfernt werden.") elif errs > 0: print("VERDICT: Audit unvollstaendig (Fehler) -- bitte Skript fixen.") else: print("VERDICT: erst Daten bereinigen, dann Routinen entfernen.") print("=" * 78) def _purgeRagOrphans() -> int: """Loescht alle FileContentIndex-Rows ohne mandateId UND ohne featureInstanceId. Returns: Anzahl geloeschter Rows. """ knowDb = _getKnowledgeDb() rows = knowDb.getRecordset(FileContentIndex) orphans = [ r for r in rows if not (r.get("mandateId") or "").strip() and not (r.get("featureInstanceId") or "").strip() ] if not orphans: print("Keine RAG-Orphans gefunden -- nichts zu purgen.") return 0 print(f"Purge {len(orphans)} RAG-Orphan(s):") deleted = 0 for r in orphans: rid = r.get("id") try: knowDb.recordDelete(FileContentIndex, str(rid)) deleted += 1 print(f" geloescht: {rid} {r.get('fileName')}") except Exception as exc: print(f" FEHLER {rid}: {type(exc).__name__}: {exc}", file=sys.stderr) print(f"Purge abgeschlossen: {deleted}/{len(orphans)} geloescht.") return deleted def main() -> int: parser = argparse.ArgumentParser( description="Audit-Skript fuer Legacy-Bestaende (Bootstrap-Cleanup Plan C)" ) parser.add_argument("--json", action="store_true", help="JSON-Output statt Text") parser.add_argument( "--purge-rag-orphans", action="store_true", help="WRITE: loescht FileContentIndex-Rows ohne mandateId UND featureInstanceId", ) args = parser.parse_args() if args.purge_rag_orphans: try: _purgeRagOrphans() except Exception as exc: print(f"FATAL: Purge fehlgeschlagen -- {type(exc).__name__}: {exc}", file=sys.stderr) return 2 print() try: results = _runChecks() except Exception as exc: print(f"FATAL: konnte Audit nicht starten -- {type(exc).__name__}: {exc}", file=sys.stderr) return 2 if args.json: print(json.dumps([r.toDict() for r in results], indent=2, default=str)) else: _printText(results) if any(r.status == "ERROR" for r in results): return 2 if any(r.status == "RED" for r in results): return 1 return 0 if __name__ == "__main__": sys.exit(main())