428 lines
14 KiB
Python
428 lines
14 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Datenbank-Anpassung an Pydantic-Modelle.
|
|
|
|
Einfaches Script das:
|
|
1. Fehlende Felder in DB ergänzt (gemäss Pydantic-Modellen)
|
|
2. Falsche Datentypen korrigiert (Daten werden ggf. gelöscht)
|
|
3. Spezialfall: UserInDB.privilege → roleLabels migriert
|
|
|
|
Verwendung:
|
|
python script_db_adapt_to_models.py [--dry-run] [--db <database>]
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import json
|
|
import argparse
|
|
import logging
|
|
from pathlib import Path
|
|
from typing import Dict, List, Any, Optional
|
|
|
|
# Gateway-Pfad setzen
|
|
scriptPath = Path(__file__).resolve()
|
|
gatewayPath = scriptPath.parent.parent
|
|
sys.path.insert(0, str(gatewayPath))
|
|
os.chdir(str(gatewayPath))
|
|
|
|
# Logging ZUERST konfigurieren
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
|
force=True # Überschreibt bestehende Config
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
import psycopg2
|
|
import psycopg2.extras
|
|
from modules.shared.configuration import APP_CONFIG
|
|
|
|
# Datenbank-Konfiguration: DB-Name → (Config-Prefix, Pydantic-Modelle)
|
|
DATABASE_CONFIG = {
|
|
"poweron_app": ("DB_APP", ["datamodelUam", "datamodelRbac", "datamodelSecurity"]),
|
|
"poweron_chat": ("DB_CHAT", ["datamodelChat"]),
|
|
"poweron_management": ("DB_MANAGEMENT", ["datamodelWorkflow", "datamodelFiles", "datamodelUiLanguage"]),
|
|
}
|
|
|
|
# Python-Typ → PostgreSQL-Typ Mapping
|
|
TYPE_MAPPING = {
|
|
"str": "text",
|
|
"int": "integer",
|
|
"float": "double precision",
|
|
"bool": "boolean",
|
|
"list": "jsonb",
|
|
"dict": "jsonb",
|
|
"List": "jsonb",
|
|
"Dict": "jsonb",
|
|
"Optional": None, # Wird separat behandelt
|
|
"datetime": "timestamp",
|
|
"EmailStr": "text",
|
|
"UUID": "uuid",
|
|
}
|
|
|
|
|
|
def _getDbConnection(dbName: str):
|
|
"""Verbindet mit einer Datenbank über APP_CONFIG."""
|
|
prefix = DATABASE_CONFIG.get(dbName, ("DB", []))[0]
|
|
|
|
host = APP_CONFIG.get(f"{prefix}_HOST") or APP_CONFIG.get("DB_HOST", "localhost")
|
|
port = APP_CONFIG.get(f"{prefix}_PORT") or APP_CONFIG.get("DB_PORT", "5432")
|
|
user = APP_CONFIG.get(f"{prefix}_USER") or APP_CONFIG.get("DB_USER")
|
|
password = APP_CONFIG.get(f"{prefix}_PASSWORD_SECRET") or APP_CONFIG.get(f"{prefix}_PASSWORD") or APP_CONFIG.get("DB_PASSWORD_SECRET") or APP_CONFIG.get("DB_PASSWORD")
|
|
|
|
if not user or not password:
|
|
logger.error(f"Keine Credentials für {dbName} ({prefix}_*)")
|
|
return None
|
|
|
|
try:
|
|
conn = psycopg2.connect(
|
|
host=host, port=int(port), database=dbName,
|
|
user=user, password=password,
|
|
cursor_factory=psycopg2.extras.RealDictCursor
|
|
)
|
|
conn.autocommit = True
|
|
return conn
|
|
except Exception as e:
|
|
logger.error(f"Verbindungsfehler {dbName}: {e}")
|
|
return None
|
|
|
|
|
|
def _getDbTables(conn) -> Dict[str, Dict[str, str]]:
|
|
"""Holt alle Tabellen und deren Spalten aus der DB."""
|
|
cur = conn.cursor()
|
|
cur.execute("""
|
|
SELECT table_name, column_name, data_type, is_nullable
|
|
FROM information_schema.columns
|
|
WHERE table_schema = 'public'
|
|
ORDER BY table_name, ordinal_position
|
|
""")
|
|
|
|
tables = {}
|
|
for row in cur.fetchall():
|
|
tableName = row['table_name']
|
|
if tableName not in tables:
|
|
tables[tableName] = {}
|
|
tables[tableName][row['column_name']] = {
|
|
'type': row['data_type'],
|
|
'nullable': row['is_nullable'] == 'YES'
|
|
}
|
|
cur.close()
|
|
return tables
|
|
|
|
|
|
def _parsePydanticModels(moduleNames: List[str]) -> Dict[str, Dict[str, str]]:
|
|
"""Parst Pydantic-Modelle aus den angegebenen Modulen."""
|
|
import ast
|
|
|
|
models = {}
|
|
datamodelsPath = gatewayPath / "modules" / "datamodels"
|
|
|
|
for moduleName in moduleNames:
|
|
filePath = datamodelsPath / f"{moduleName}.py"
|
|
if not filePath.exists():
|
|
logger.warning(f"Modul nicht gefunden: {filePath}")
|
|
continue
|
|
|
|
with open(filePath, 'r', encoding='utf-8') as f:
|
|
tree = ast.parse(f.read())
|
|
|
|
for node in ast.walk(tree):
|
|
if isinstance(node, ast.ClassDef):
|
|
className = node.name
|
|
|
|
# Prüfe ob Klasse von BaseModel erbt
|
|
isBaseModel = False
|
|
for base in node.bases:
|
|
if isinstance(base, ast.Name) and base.id == "BaseModel":
|
|
isBaseModel = True
|
|
break
|
|
|
|
if not isBaseModel:
|
|
continue
|
|
|
|
fields = {}
|
|
for item in node.body:
|
|
if isinstance(item, ast.AnnAssign) and isinstance(item.target, ast.Name):
|
|
fieldName = item.target.id
|
|
if fieldName.startswith('_'):
|
|
continue
|
|
|
|
# Typ extrahieren
|
|
fieldType = _extractType(item.annotation)
|
|
if fieldType:
|
|
fields[fieldName] = fieldType
|
|
|
|
if fields:
|
|
models[className] = fields
|
|
|
|
return models
|
|
|
|
|
|
def _extractType(annotation) -> Optional[str]:
|
|
"""Extrahiert den PostgreSQL-Typ aus einer AST-Annotation."""
|
|
import ast
|
|
|
|
if isinstance(annotation, ast.Name):
|
|
return TYPE_MAPPING.get(annotation.id, "text")
|
|
|
|
elif isinstance(annotation, ast.Subscript):
|
|
# Optional[X], List[X], Dict[X, Y]
|
|
if isinstance(annotation.value, ast.Name):
|
|
outerType = annotation.value.id
|
|
if outerType == "Optional":
|
|
# Rekursiv den inneren Typ holen
|
|
if isinstance(annotation.slice, ast.Name):
|
|
return TYPE_MAPPING.get(annotation.slice.id, "text")
|
|
elif isinstance(annotation.slice, ast.Subscript):
|
|
return _extractType(annotation.slice)
|
|
return "text"
|
|
elif outerType in ("List", "list", "Dict", "dict"):
|
|
return "jsonb"
|
|
|
|
elif isinstance(annotation, ast.Constant):
|
|
return "text"
|
|
|
|
return "text"
|
|
|
|
|
|
def _adaptTable(conn, tableName: str, modelFields: Dict[str, str], dbColumns: Dict[str, Any], dryRun: bool) -> bool:
|
|
"""Passt eine Tabelle an das Pydantic-Modell an."""
|
|
cur = conn.cursor()
|
|
success = True
|
|
|
|
for fieldName, pgType in modelFields.items():
|
|
# pgType kommt direkt aus _extractType (ist bereits PostgreSQL-Typ)
|
|
|
|
# Suche Spalte case-insensitive
|
|
dbCol = None
|
|
actualColName = None
|
|
for colName, colInfo in dbColumns.items():
|
|
if colName.lower() == fieldName.lower():
|
|
dbCol = colInfo
|
|
actualColName = colName
|
|
break
|
|
|
|
if dbCol is None:
|
|
# Spalte fehlt → hinzufügen
|
|
sql = f'ALTER TABLE "{tableName}" ADD COLUMN "{fieldName}" {pgType}'
|
|
if dryRun:
|
|
logger.info(f"[DRY-RUN] {sql}")
|
|
else:
|
|
try:
|
|
cur.execute(sql)
|
|
logger.info(f"Spalte hinzugefügt: {tableName}.{fieldName} ({pgType})")
|
|
except Exception as e:
|
|
logger.error(f"Fehler beim Hinzufügen von {tableName}.{fieldName}: {e}")
|
|
success = False
|
|
else:
|
|
# Spalte existiert → Typ prüfen
|
|
currentType = dbCol['type']
|
|
if not _typesCompatible(currentType, pgType):
|
|
sql = f'ALTER TABLE "{tableName}" ALTER COLUMN "{actualColName}" TYPE {pgType} USING NULL'
|
|
if dryRun:
|
|
logger.info(f"[DRY-RUN] {sql}")
|
|
else:
|
|
try:
|
|
cur.execute(sql)
|
|
logger.info(f"Typ geändert: {tableName}.{actualColName} ({currentType} → {pgType})")
|
|
except Exception as e:
|
|
logger.error(f"Fehler beim Ändern von {tableName}.{actualColName}: {e}")
|
|
success = False
|
|
|
|
cur.close()
|
|
return success
|
|
|
|
|
|
def _typesCompatible(dbType: str, targetType: str) -> bool:
|
|
"""Prüft ob DB-Typ mit Ziel-Typ kompatibel ist."""
|
|
dbType = dbType.lower()
|
|
targetType = targetType.lower()
|
|
|
|
# Gleiche Typen
|
|
if dbType == targetType:
|
|
return True
|
|
|
|
# Kompatible Typen
|
|
compatiblePairs = [
|
|
("character varying", "text"),
|
|
("varchar", "text"),
|
|
("integer", "bigint"),
|
|
("real", "double precision"),
|
|
("timestamp without time zone", "timestamp"),
|
|
("timestamp with time zone", "timestamp"),
|
|
]
|
|
|
|
for a, b in compatiblePairs:
|
|
if (dbType == a and targetType == b) or (dbType == b and targetType == a):
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
def _migratePrivilegeToRoleLabels(conn, tableName: str, dryRun: bool) -> bool:
|
|
"""Migriert privilege-Wert nach roleLabels (Spezialfall UserInDB)."""
|
|
cur = conn.cursor()
|
|
|
|
# Prüfe ob beide Spalten existieren
|
|
cur.execute("""
|
|
SELECT column_name FROM information_schema.columns
|
|
WHERE table_name = %s AND column_name IN ('privilege', 'roleLabels')
|
|
""", (tableName,))
|
|
columns = [row['column_name'] for row in cur.fetchall()]
|
|
|
|
if 'privilege' not in columns:
|
|
logger.info(f"Spalte 'privilege' existiert nicht in {tableName} - keine Migration nötig")
|
|
cur.close()
|
|
return True
|
|
|
|
if 'roleLabels' not in columns:
|
|
# roleLabels erstellen
|
|
sql = f'ALTER TABLE "{tableName}" ADD COLUMN "roleLabels" jsonb'
|
|
if dryRun:
|
|
logger.info(f"[DRY-RUN] {sql}")
|
|
cur.close()
|
|
return True
|
|
else:
|
|
cur.execute(sql)
|
|
logger.info(f"Spalte roleLabels in {tableName} erstellt")
|
|
|
|
# Migriere privilege → roleLabels
|
|
cur.execute(f"""
|
|
SELECT id, privilege, "roleLabels" FROM "{tableName}"
|
|
WHERE privilege IS NOT NULL AND privilege != ''
|
|
""")
|
|
users = cur.fetchall()
|
|
|
|
if not users:
|
|
logger.info(f"Keine Einträge mit privilege-Wert in {tableName}")
|
|
cur.close()
|
|
return True
|
|
|
|
logger.info(f"Migriere {len(users)} Einträge: privilege → roleLabels")
|
|
|
|
for user in users:
|
|
userId = user['id']
|
|
privilege = user['privilege']
|
|
roleLabels = user['roleLabels'] or []
|
|
|
|
if isinstance(roleLabels, str):
|
|
try:
|
|
roleLabels = json.loads(roleLabels)
|
|
except:
|
|
roleLabels = []
|
|
|
|
if privilege not in roleLabels:
|
|
roleLabels.append(privilege)
|
|
|
|
if dryRun:
|
|
logger.info(f"[DRY-RUN] UPDATE {tableName} SET roleLabels = {roleLabels} WHERE id = {userId}")
|
|
else:
|
|
cur.execute(
|
|
f'UPDATE "{tableName}" SET "roleLabels" = %s WHERE id = %s',
|
|
(json.dumps(roleLabels), userId)
|
|
)
|
|
|
|
if not dryRun:
|
|
logger.info(f"Migration privilege → roleLabels abgeschlossen")
|
|
|
|
cur.close()
|
|
return True
|
|
|
|
|
|
def runMigration(dbName: str, dryRun: bool) -> bool:
|
|
"""Führt Migration für eine Datenbank durch."""
|
|
logger.info(f"\n{'='*60}")
|
|
logger.info(f"DATENBANK: {dbName}")
|
|
logger.info(f"{'='*60}")
|
|
|
|
if dbName not in DATABASE_CONFIG:
|
|
logger.error(f"Unbekannte Datenbank: {dbName}")
|
|
return False
|
|
|
|
conn = _getDbConnection(dbName)
|
|
if not conn:
|
|
return False
|
|
|
|
prefix, moduleNames = DATABASE_CONFIG[dbName]
|
|
|
|
# DB-Schema laden
|
|
dbTables = _getDbTables(conn)
|
|
logger.info(f"Tabellen in DB: {', '.join(dbTables.keys()) if dbTables else 'keine'}")
|
|
|
|
# Pydantic-Modelle laden
|
|
models = _parsePydanticModels(moduleNames)
|
|
logger.info(f"Pydantic-Modelle: {', '.join(models.keys()) if models else 'keine'}")
|
|
|
|
success = True
|
|
|
|
# Jedes Modell mit passender DB-Tabelle abgleichen
|
|
for modelName, modelFields in models.items():
|
|
# Finde passende Tabelle (case-insensitive)
|
|
tableName = None
|
|
tableColumns = None
|
|
for dbTable, dbCols in dbTables.items():
|
|
if dbTable.lower() == modelName.lower():
|
|
tableName = dbTable
|
|
tableColumns = dbCols
|
|
break
|
|
|
|
if tableName is None:
|
|
logger.warning(f"Keine Tabelle für Modell {modelName} gefunden - übersprungen")
|
|
continue
|
|
|
|
logger.info(f"\nPrüfe {modelName} ↔ {tableName}...")
|
|
|
|
# Tabelle anpassen
|
|
if not _adaptTable(conn, tableName, modelFields, tableColumns, dryRun):
|
|
success = False
|
|
|
|
# Spezialfall: UserInDB privilege → roleLabels
|
|
if modelName == "UserInDB":
|
|
if not _migratePrivilegeToRoleLabels(conn, tableName, dryRun):
|
|
success = False
|
|
|
|
conn.close()
|
|
return success
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Passt DB-Struktur an Pydantic-Modelle an")
|
|
parser.add_argument("--dry-run", action="store_true", help="Zeigt nur geplante Änderungen")
|
|
parser.add_argument("--db", help="Nur bestimmte DB(s) migrieren (komma-getrennt)")
|
|
args = parser.parse_args()
|
|
|
|
databases = list(DATABASE_CONFIG.keys())
|
|
if args.db:
|
|
databases = [db.strip() for db in args.db.split(",")]
|
|
|
|
logger.info("="*60)
|
|
logger.info("DATENBANK-ANPASSUNG AN PYDANTIC-MODELLE")
|
|
logger.info("="*60)
|
|
logger.info(f"Modus: {'DRY-RUN' if args.dry_run else 'LIVE'}")
|
|
logger.info(f"Datenbanken: {', '.join(databases)}")
|
|
|
|
if not args.dry_run:
|
|
response = input("\nÄnderungen durchführen? (yes/no): ")
|
|
if response.lower() != "yes":
|
|
logger.info("Abgebrochen")
|
|
return 0
|
|
|
|
allSuccess = True
|
|
for dbName in databases:
|
|
if not runMigration(dbName, args.dry_run):
|
|
allSuccess = False
|
|
|
|
if allSuccess:
|
|
logger.info("\n" + "="*60)
|
|
logger.info("ALLE MIGRATIONEN ERFOLGREICH")
|
|
logger.info("="*60)
|
|
else:
|
|
logger.error("\n" + "="*60)
|
|
logger.error("EINIGE MIGRATIONEN HATTEN FEHLER")
|
|
logger.error("="*60)
|
|
|
|
return 0 if allSuccess else 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|