# Copyright (c) 2025 Patrick Motsch # All rights reserved. """Lightweight Bootstrap-Telemetrie fuer entfernte Migrationsroutinen. Wenn eine idempotente Bootstrap-Migration (z.B. ``_migrateAndDropSysAdminRole``) aus dem Boot-Pfad entfernt wird, koennte ein theoretischer Edge-Case (alte DB-Restore, manueller INSERT) wieder Legacy-Daten ins System bringen. Damit das nicht still bleibt, ruft ``initBootstrap`` nach Abschluss aller Init-Schritte einmalig ``runLegacyDataChecks`` auf -- das logged WARN bei Restbestand. Designprinzipien: - KEINE Schreibzugriffe (rein lesend). - Process-lokal gecached (``_cache``), damit identische Boots/Reloads den Check nur einmal laufen lassen. - Pro Check eine Recordset-Abfrage; Ausnahmen werden als WARN geloggt, nicht re-raised, damit Telemetrie den Boot nie crasht. """ from __future__ import annotations import logging from typing import Any from modules.connectors.connectorDbPostgre import DatabaseConnector from modules.datamodels.datamodelRbac import Role from modules.datamodels.datamodelUam import Mandate from modules.shared.mandateNameUtils import isValidMandateName logger = logging.getLogger(__name__) _alreadyRan: bool = False def runLegacyDataChecks(db: DatabaseConnector) -> None: """Logged WARN, falls noch Legacy-Daten existieren, die durch entfernte Migrationsroutinen behandelt wurden. Prozessweit nur einmal aktiv. Aufruf: am Ende von ``initBootstrap``. """ global _alreadyRan if _alreadyRan: return _alreadyRan = True _checkMandateDescription(db) _checkMandateSlugRules(db) _checkLegacyRootMandate(db) _checkSysadminRole(db) _backfillTargetFeatureInstanceId() def _safe(checkName: str, fn) -> Any: try: return fn() except Exception as exc: logger.warning( "Legacy-data telemetry check '%s' failed: %s: %s", checkName, type(exc).__name__, exc, ) return None def _checkMandateDescription(db: DatabaseConnector) -> None: def _do() -> None: rows = db.getRecordset(Mandate) bad = [ r.get("id") for r in rows if r.get("description") and not r.get("label") ] if bad: logger.warning( "Legacy-data check: %d Mandate row(s) still have description " "but empty label (removed migration: _migrateMandateDescriptionToLabel). " "Run scripts/script_db_audit_legacy_state.py for details. IDs: %s", len(bad), bad[:5], ) _safe("mandate-description", _do) def _checkMandateSlugRules(db: DatabaseConnector) -> None: def _do() -> None: rows = db.getRecordset(Mandate) seen: set[str] = set() bad: list[str] = [] for r in sorted(rows, key=lambda x: str(x.get("id", ""))): mid = r.get("id") if not mid: continue name = (r.get("name") or "").strip() labelRaw = r.get("label") labelEmpty = not (labelRaw or "").strip() if labelRaw is not None else True invalid = not isValidMandateName(name) collides = name in seen if not invalid and not collides: seen.add(name) if labelEmpty or invalid or collides: bad.append(str(mid)) if bad: logger.warning( "Legacy-data check: %d Mandate row(s) violate slug/label rules " "(removed migration: _migrateMandateNameLabelSlugRules). " "Run scripts/script_db_audit_legacy_state.py for details. IDs: %s", len(bad), bad[:5], ) _safe("mandate-slug-rules", _do) def _checkLegacyRootMandate(db: DatabaseConnector) -> None: def _do() -> None: legacy = db.getRecordset(Mandate, recordFilter={"name": "Root"}) rootRows = db.getRecordset(Mandate, recordFilter={"name": "root"}) legacyByFlag = [r for r in rootRows if not r.get("isSystem")] all_ = list(legacy) + legacyByFlag if all_: logger.warning( "Legacy-data check: %d Root-Mandate row(s) still in legacy form " "(removed migration: initRootMandate-legacy-branch). IDs: %s", len(all_), [r.get("id") for r in all_][:5], ) _safe("root-mandate-legacy", _do) def _checkSysadminRole(db: DatabaseConnector) -> None: def _do() -> None: rootMandates = db.getRecordset( Mandate, recordFilter={"name": "root", "isSystem": True} ) if not rootMandates: return rootId = str(rootMandates[0].get("id")) rows = db.getRecordset( Role, recordFilter={ "roleLabel": "sysadmin", "mandateId": rootId, "featureInstanceId": None, }, ) if rows: logger.warning( "Legacy-data check: %d 'sysadmin' role(s) still present in root mandate " "(removed migration: _migrateAndDropSysAdminRole). " "Authority is now User.isPlatformAdmin -- migrate manually. IDs: %s", len(rows), [r.get("id") for r in rows], ) _safe("sysadmin-role", _do) def _backfillTargetFeatureInstanceId() -> None: """Idempotent backfill: set targetFeatureInstanceId = featureInstanceId for all non-template AutoWorkflow rows where it is still NULL. Connects to ``poweron_graphicaleditor`` independently. """ def _do() -> None: from modules.shared.configuration import APP_CONFIG from modules.features.graphicalEditor.datamodelFeatureGraphicalEditor import AutoWorkflow dbHost = APP_CONFIG.get("DB_HOST", "localhost") dbUser = APP_CONFIG.get("DB_USER") dbPassword = APP_CONFIG.get("DB_PASSWORD_SECRET") or APP_CONFIG.get("DB_PASSWORD") dbPort = int(APP_CONFIG.get("DB_PORT", 5432)) geDb = DatabaseConnector( dbHost=dbHost, dbDatabase="poweron_graphicaleditor", dbUser=dbUser, dbPassword=dbPassword, dbPort=dbPort, userId=None, ) if not geDb._ensureTableExists(AutoWorkflow): return rows = geDb.getRecordset(AutoWorkflow) or [] backfilled = 0 for r in rows: if r.get("isTemplate"): continue if r.get("targetFeatureInstanceId"): continue srcId = r.get("featureInstanceId") if not srcId: continue geDb.recordModify(AutoWorkflow, r["id"], {"targetFeatureInstanceId": srcId}) backfilled += 1 if backfilled: logger.info( "targetFeatureInstanceId backfill: set %d non-template AutoWorkflow row(s) " "to their featureInstanceId", backfilled, ) _safe("backfill-targetFeatureInstanceId", _do)