#!/usr/bin/env python3 """ Datenbank-Anpassung an Pydantic-Modelle. Einfaches Script das: 1. Fehlende Felder in DB ergänzt (gemäss Pydantic-Modellen) 2. Falsche Datentypen korrigiert (Daten werden ggf. gelöscht) 3. Spezialfall: UserInDB.privilege → roleLabels migriert Verwendung: python script_db_adapt_to_models.py [--dry-run] [--db ] """ import os import sys import json import argparse import logging from pathlib import Path from typing import Dict, List, Any, Optional # Gateway-Pfad setzen scriptPath = Path(__file__).resolve() gatewayPath = scriptPath.parent.parent sys.path.insert(0, str(gatewayPath)) os.chdir(str(gatewayPath)) # Logging ZUERST konfigurieren logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', force=True # Überschreibt bestehende Config ) logger = logging.getLogger(__name__) import psycopg2 import psycopg2.extras from modules.shared.configuration import APP_CONFIG # Datenbank-Konfiguration: DB-Name → (Config-Prefix, Pydantic-Modelle) DATABASE_CONFIG = { "poweron_app": ("DB_APP", ["datamodelUam", "datamodelRbac", "datamodelSecurity"]), "poweron_chat": ("DB_CHAT", ["datamodelChat"]), "poweron_management": ("DB_MANAGEMENT", ["datamodelWorkflow", "datamodelFiles"]), } # Python-Typ → PostgreSQL-Typ Mapping TYPE_MAPPING = { "str": "text", "int": "integer", "float": "double precision", "bool": "boolean", "list": "jsonb", "dict": "jsonb", "List": "jsonb", "Dict": "jsonb", "Optional": None, # Wird separat behandelt "datetime": "timestamp", "EmailStr": "text", "UUID": "uuid", } def _getDbConnection(dbName: str): """Verbindet mit einer Datenbank über APP_CONFIG.""" prefix = DATABASE_CONFIG.get(dbName, ("DB", []))[0] host = APP_CONFIG.get(f"{prefix}_HOST") or APP_CONFIG.get("DB_HOST", "localhost") port = APP_CONFIG.get(f"{prefix}_PORT") or APP_CONFIG.get("DB_PORT", "5432") user = APP_CONFIG.get(f"{prefix}_USER") or APP_CONFIG.get("DB_USER") password = APP_CONFIG.get(f"{prefix}_PASSWORD_SECRET") or APP_CONFIG.get(f"{prefix}_PASSWORD") or APP_CONFIG.get("DB_PASSWORD_SECRET") or APP_CONFIG.get("DB_PASSWORD") if not user or not password: logger.error(f"Keine Credentials für {dbName} ({prefix}_*)") return None try: conn = psycopg2.connect( host=host, port=int(port), database=dbName, user=user, password=password, cursor_factory=psycopg2.extras.RealDictCursor ) conn.autocommit = True return conn except Exception as e: logger.error(f"Verbindungsfehler {dbName}: {e}") return None def _getDbTables(conn) -> Dict[str, Dict[str, str]]: """Holt alle Tabellen und deren Spalten aus der DB.""" cur = conn.cursor() cur.execute(""" SELECT table_name, column_name, data_type, is_nullable FROM information_schema.columns WHERE table_schema = 'public' ORDER BY table_name, ordinal_position """) tables = {} for row in cur.fetchall(): tableName = row['table_name'] if tableName not in tables: tables[tableName] = {} tables[tableName][row['column_name']] = { 'type': row['data_type'], 'nullable': row['is_nullable'] == 'YES' } cur.close() return tables def _parsePydanticModels(moduleNames: List[str]) -> Dict[str, Dict[str, str]]: """Parst Pydantic-Modelle aus den angegebenen Modulen.""" import ast models = {} datamodelsPath = gatewayPath / "modules" / "datamodels" for moduleName in moduleNames: filePath = datamodelsPath / f"{moduleName}.py" if not filePath.exists(): logger.warning(f"Modul nicht gefunden: {filePath}") continue with open(filePath, 'r', encoding='utf-8') as f: tree = ast.parse(f.read()) for node in ast.walk(tree): if isinstance(node, ast.ClassDef): className = node.name # Prüfe ob Klasse von BaseModel erbt isBaseModel = False for base in node.bases: if isinstance(base, ast.Name) and base.id == "BaseModel": isBaseModel = True break if not isBaseModel: continue fields = {} for item in node.body: if isinstance(item, ast.AnnAssign) and isinstance(item.target, ast.Name): fieldName = item.target.id if fieldName.startswith('_'): continue # Typ extrahieren fieldType = _extractType(item.annotation) if fieldType: fields[fieldName] = fieldType if fields: models[className] = fields return models def _extractType(annotation) -> Optional[str]: """Extrahiert den PostgreSQL-Typ aus einer AST-Annotation.""" import ast if isinstance(annotation, ast.Name): return TYPE_MAPPING.get(annotation.id, "text") elif isinstance(annotation, ast.Subscript): # Optional[X], List[X], Dict[X, Y] if isinstance(annotation.value, ast.Name): outerType = annotation.value.id if outerType == "Optional": # Rekursiv den inneren Typ holen if isinstance(annotation.slice, ast.Name): return TYPE_MAPPING.get(annotation.slice.id, "text") elif isinstance(annotation.slice, ast.Subscript): return _extractType(annotation.slice) return "text" elif outerType in ("List", "list", "Dict", "dict"): return "jsonb" elif isinstance(annotation, ast.Constant): return "text" return "text" def _adaptTable(conn, tableName: str, modelFields: Dict[str, str], dbColumns: Dict[str, Any], dryRun: bool) -> bool: """Passt eine Tabelle an das Pydantic-Modell an.""" cur = conn.cursor() success = True for fieldName, pgType in modelFields.items(): # pgType kommt direkt aus _extractType (ist bereits PostgreSQL-Typ) # Suche Spalte case-insensitive dbCol = None actualColName = None for colName, colInfo in dbColumns.items(): if colName.lower() == fieldName.lower(): dbCol = colInfo actualColName = colName break if dbCol is None: # Spalte fehlt → hinzufügen sql = f'ALTER TABLE "{tableName}" ADD COLUMN "{fieldName}" {pgType}' if dryRun: logger.info(f"[DRY-RUN] {sql}") else: try: cur.execute(sql) logger.info(f"Spalte hinzugefügt: {tableName}.{fieldName} ({pgType})") except Exception as e: logger.error(f"Fehler beim Hinzufügen von {tableName}.{fieldName}: {e}") success = False else: # Spalte existiert → Typ prüfen currentType = dbCol['type'] if not _typesCompatible(currentType, pgType): sql = f'ALTER TABLE "{tableName}" ALTER COLUMN "{actualColName}" TYPE {pgType} USING NULL' if dryRun: logger.info(f"[DRY-RUN] {sql}") else: try: cur.execute(sql) logger.info(f"Typ geändert: {tableName}.{actualColName} ({currentType} → {pgType})") except Exception as e: logger.error(f"Fehler beim Ändern von {tableName}.{actualColName}: {e}") success = False cur.close() return success def _typesCompatible(dbType: str, targetType: str) -> bool: """Prüft ob DB-Typ mit Ziel-Typ kompatibel ist.""" dbType = dbType.lower() targetType = targetType.lower() # Gleiche Typen if dbType == targetType: return True # Kompatible Typen compatiblePairs = [ ("character varying", "text"), ("varchar", "text"), ("integer", "bigint"), ("real", "double precision"), ("timestamp without time zone", "timestamp"), ("timestamp with time zone", "timestamp"), ] for a, b in compatiblePairs: if (dbType == a and targetType == b) or (dbType == b and targetType == a): return True return False def _migratePrivilegeToRoleLabels(conn, tableName: str, dryRun: bool) -> bool: """Migriert privilege-Wert nach roleLabels (Spezialfall UserInDB).""" cur = conn.cursor() # Prüfe ob beide Spalten existieren cur.execute(""" SELECT column_name FROM information_schema.columns WHERE table_name = %s AND column_name IN ('privilege', 'roleLabels') """, (tableName,)) columns = [row['column_name'] for row in cur.fetchall()] if 'privilege' not in columns: logger.info(f"Spalte 'privilege' existiert nicht in {tableName} - keine Migration nötig") cur.close() return True if 'roleLabels' not in columns: # roleLabels erstellen sql = f'ALTER TABLE "{tableName}" ADD COLUMN "roleLabels" jsonb' if dryRun: logger.info(f"[DRY-RUN] {sql}") cur.close() return True else: cur.execute(sql) logger.info(f"Spalte roleLabels in {tableName} erstellt") # Migriere privilege → roleLabels cur.execute(f""" SELECT id, privilege, "roleLabels" FROM "{tableName}" WHERE privilege IS NOT NULL AND privilege != '' """) users = cur.fetchall() if not users: logger.info(f"Keine Einträge mit privilege-Wert in {tableName}") cur.close() return True logger.info(f"Migriere {len(users)} Einträge: privilege → roleLabels") for user in users: userId = user['id'] privilege = user['privilege'] roleLabels = user['roleLabels'] or [] if isinstance(roleLabels, str): try: roleLabels = json.loads(roleLabels) except: roleLabels = [] if privilege not in roleLabels: roleLabels.append(privilege) if dryRun: logger.info(f"[DRY-RUN] UPDATE {tableName} SET roleLabels = {roleLabels} WHERE id = {userId}") else: cur.execute( f'UPDATE "{tableName}" SET "roleLabels" = %s WHERE id = %s', (json.dumps(roleLabels), userId) ) if not dryRun: logger.info(f"Migration privilege → roleLabels abgeschlossen") cur.close() return True def runMigration(dbName: str, dryRun: bool) -> bool: """Führt Migration für eine Datenbank durch.""" logger.info(f"\n{'='*60}") logger.info(f"DATENBANK: {dbName}") logger.info(f"{'='*60}") if dbName not in DATABASE_CONFIG: logger.error(f"Unbekannte Datenbank: {dbName}") return False conn = _getDbConnection(dbName) if not conn: return False prefix, moduleNames = DATABASE_CONFIG[dbName] # DB-Schema laden dbTables = _getDbTables(conn) logger.info(f"Tabellen in DB: {', '.join(dbTables.keys()) if dbTables else 'keine'}") # Pydantic-Modelle laden models = _parsePydanticModels(moduleNames) logger.info(f"Pydantic-Modelle: {', '.join(models.keys()) if models else 'keine'}") success = True # Jedes Modell mit passender DB-Tabelle abgleichen for modelName, modelFields in models.items(): # Finde passende Tabelle (case-insensitive) tableName = None tableColumns = None for dbTable, dbCols in dbTables.items(): if dbTable.lower() == modelName.lower(): tableName = dbTable tableColumns = dbCols break if tableName is None: logger.warning(f"Keine Tabelle für Modell {modelName} gefunden - übersprungen") continue logger.info(f"\nPrüfe {modelName} ↔ {tableName}...") # Tabelle anpassen if not _adaptTable(conn, tableName, modelFields, tableColumns, dryRun): success = False # Spezialfall: UserInDB privilege → roleLabels if modelName == "UserInDB": if not _migratePrivilegeToRoleLabels(conn, tableName, dryRun): success = False conn.close() return success def main(): parser = argparse.ArgumentParser(description="Passt DB-Struktur an Pydantic-Modelle an") parser.add_argument("--dry-run", action="store_true", help="Zeigt nur geplante Änderungen") parser.add_argument("--db", help="Nur bestimmte DB(s) migrieren (komma-getrennt)") args = parser.parse_args() databases = list(DATABASE_CONFIG.keys()) if args.db: databases = [db.strip() for db in args.db.split(",")] logger.info("="*60) logger.info("DATENBANK-ANPASSUNG AN PYDANTIC-MODELLE") logger.info("="*60) logger.info(f"Modus: {'DRY-RUN' if args.dry_run else 'LIVE'}") logger.info(f"Datenbanken: {', '.join(databases)}") if not args.dry_run: response = input("\nÄnderungen durchführen? (yes/no): ") if response.lower() != "yes": logger.info("Abgebrochen") return 0 allSuccess = True for dbName in databases: if not runMigration(dbName, args.dry_run): allSuccess = False if allSuccess: logger.info("\n" + "="*60) logger.info("ALLE MIGRATIONEN ERFOLGREICH") logger.info("="*60) else: logger.error("\n" + "="*60) logger.error("EINIGE MIGRATIONEN HATTEN FEHLER") logger.error("="*60) return 0 if allSuccess else 1 if __name__ == "__main__": sys.exit(main())