gateway/tool_db_import_migration.py
2026-01-19 09:18:37 +01:00

612 lines
19 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Datenbank Import-Tool für Migration.
Dieses Script importiert Daten aus einer JSON-Migrationsdatei
in ALLE PowerOn PostgreSQL-Datenbanken.
ACHTUNG: Dieses Script kann bestehende Daten überschreiben!
Bitte vor dem Import ein Backup erstellen.
Datenbanken:
- poweron_app (User, Mandate, RBAC, Features, etc.)
- poweron_chat (Chat-Konversationen und Nachrichten)
- poweron_management (Workflows, Prompts, Connections, etc.)
- poweron_realestate (Real Estate Daten)
- poweron_trustee (Trustee Daten)
Verwendung:
python tool_db_import_migration.py <import_file.json> [--dry-run] [--force]
Optionen:
--dry-run Simuliert den Import ohne Änderungen
--force Bestätigung überspringen
--clear-first Tabellen vor dem Import leeren
--only-tables Komma-getrennte Liste von Tabellen (nur diese importieren)
--only-db Komma-getrennte Liste von Datenbanken (nur diese importieren)
"""
import os
import sys
import json
import argparse
import logging
import time
from datetime import datetime
from typing import Dict, List, Any, Optional
from pathlib import Path
import psycopg2
import psycopg2.extras
# Logging konfigurieren
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger(__name__)
# Alle PowerOn Datenbanken
ALL_DATABASES = [
"poweron_app",
"poweron_chat",
"poweron_management",
"poweron_realestate",
"poweron_trustee",
]
def _loadEnvConfig() -> Dict[str, str]:
"""Lädt die Konfiguration direkt aus der .env Datei."""
config = {}
envPath = Path(__file__).parent / '.env'
if not envPath.exists():
logger.warning(f"Environment file not found at {envPath}")
return config
# Versuche verschiedene Encodings
encodings = ['utf-8', 'utf-8-sig', 'latin-1', 'cp1252']
for encoding in encodings:
try:
with open(envPath, 'r', encoding=encoding) as f:
for line in f:
line = line.strip()
if not line or line.startswith('#'):
continue
if '=' in line:
key, value = line.split('=', 1)
config[key.strip()] = value.strip()
# Erfolgreich geladen
return config
except UnicodeDecodeError:
continue
except Exception as e:
logger.error(f"Error loading .env file with {encoding}: {e}")
continue
logger.error(f"Could not load .env file with any encoding")
return config
# Globale Konfiguration laden
_ENV_CONFIG = _loadEnvConfig()
def _getConfigValue(key: str, default: str = None) -> str:
"""Holt einen Konfigurationswert."""
return _ENV_CONFIG.get(key, os.environ.get(key, default))
def _getUtcTimestamp() -> float:
"""Gibt den aktuellen UTC-Timestamp zurück."""
return time.time()
def _databaseExists(dbDatabase: str) -> bool:
"""Prüft ob eine Datenbank existiert."""
dbHost = _getConfigValue("DB_HOST", "localhost")
dbUser = _getConfigValue("DB_USER")
dbPassword = _getConfigValue("DB_PASSWORD_SECRET")
dbPort = int(_getConfigValue("DB_PORT", "5432"))
try:
conn = psycopg2.connect(
host=dbHost,
port=dbPort,
database="postgres",
user=dbUser,
password=dbPassword
)
conn.autocommit = True
with conn.cursor() as cursor:
cursor.execute(
"SELECT 1 FROM pg_database WHERE datname = %s",
(dbDatabase,)
)
exists = cursor.fetchone() is not None
conn.close()
return exists
except Exception as e:
logger.error(f"Fehler beim Prüfen der Datenbank {dbDatabase}: {e}")
return False
def _getDbConnection(dbDatabase: str, autocommit: bool = False):
"""Erstellt eine Verbindung zu einer spezifischen PostgreSQL-Datenbank."""
# Erst prüfen ob Datenbank existiert
if not _databaseExists(dbDatabase):
logger.warning(f"Datenbank '{dbDatabase}' existiert nicht")
return None
dbHost = _getConfigValue("DB_HOST", "localhost")
dbUser = _getConfigValue("DB_USER")
dbPassword = _getConfigValue("DB_PASSWORD_SECRET")
dbPort = int(_getConfigValue("DB_PORT", "5432"))
try:
conn = psycopg2.connect(
host=dbHost,
port=dbPort,
database=dbDatabase,
user=dbUser,
password=dbPassword,
cursor_factory=psycopg2.extras.RealDictCursor
)
conn.set_client_encoding('UTF8')
conn.autocommit = autocommit
return conn
except Exception as e:
logger.error(f"Datenbankverbindung zu {dbDatabase} fehlgeschlagen: {e}")
raise
def _getExistingTables(conn) -> List[str]:
"""Gibt alle Tabellennamen in der Datenbank zurück."""
with conn.cursor() as cursor:
cursor.execute("""
SELECT table_name
FROM information_schema.tables
WHERE table_schema = 'public'
AND table_type = 'BASE TABLE'
ORDER BY table_name
""")
tables = [row["table_name"] for row in cursor.fetchall()]
return tables
def _getTableColumns(conn, tableName: str) -> List[str]:
"""Gibt alle Spalten einer Tabelle zurück."""
with conn.cursor() as cursor:
cursor.execute("""
SELECT column_name
FROM information_schema.columns
WHERE table_name = %s AND table_schema = 'public'
""", (tableName,))
columns = [row["column_name"] for row in cursor.fetchall()]
return columns
def _clearTable(conn, tableName: str):
"""Löscht alle Daten aus einer Tabelle."""
with conn.cursor() as cursor:
cursor.execute(f'DELETE FROM "{tableName}"')
def _insertRecord(conn, tableName: str, record: Dict[str, Any], existingColumns: List[str]) -> bool:
"""Fügt einen Datensatz in eine Tabelle ein (UPSERT)."""
filteredRecord = {k: v for k, v in record.items() if k in existingColumns}
if not filteredRecord:
return False
# Metadaten hinzufügen falls nicht vorhanden
currentTime = _getUtcTimestamp()
if "_createdAt" not in filteredRecord and "_createdAt" in existingColumns:
filteredRecord["_createdAt"] = currentTime
if "_modifiedAt" in existingColumns:
filteredRecord["_modifiedAt"] = currentTime
columns = list(filteredRecord.keys())
values = []
for col in columns:
value = filteredRecord[col]
if isinstance(value, (dict, list)):
values.append(json.dumps(value))
else:
values.append(value)
colNames = ", ".join([f'"{col}"' for col in columns])
placeholders = ", ".join(["%s"] * len(columns))
updateCols = [col for col in columns if col not in ["id", "_createdAt", "_createdBy"]]
updateClause = ", ".join([f'"{col}" = EXCLUDED."{col}"' for col in updateCols])
if updateClause:
sql = f'''
INSERT INTO "{tableName}" ({colNames})
VALUES ({placeholders})
ON CONFLICT ("id") DO UPDATE SET {updateClause}
'''
else:
sql = f'''
INSERT INTO "{tableName}" ({colNames})
VALUES ({placeholders})
ON CONFLICT ("id") DO NOTHING
'''
try:
with conn.cursor() as cursor:
cursor.execute(sql, values)
return True
except Exception as e:
logger.error(f"Fehler beim Einfügen in {tableName}: {e}")
return False
def loadMigrationFile(filePath: str) -> Dict[str, Any]:
"""Lädt die Migrationsdatei."""
logger.info(f"Lade Migrationsdatei: {filePath}")
if not os.path.exists(filePath):
raise FileNotFoundError(f"Datei nicht gefunden: {filePath}")
with open(filePath, "r", encoding="utf-8") as f:
data = json.load(f)
# Validierung - unterstütze beide Formate (alt: tables, neu: databases)
if "databases" not in data and "tables" not in data:
raise ValueError("Ungültiges Migrationsformat: 'databases' oder 'tables' erforderlich")
return data
def _importSingleDatabase(
dbName: str,
dbData: Dict[str, Any],
dryRun: bool,
clearFirst: bool,
onlyTables: Optional[List[str]]
) -> Dict[str, Any]:
"""Importiert Daten in eine einzelne Datenbank."""
stats = {
"imported": {},
"skipped": {},
"errors": {},
"totalImported": 0,
"totalSkipped": 0,
"totalErrors": 0
}
conn = _getDbConnection(dbName)
if conn is None:
logger.warning(f" Datenbank '{dbName}' existiert nicht - übersprungen")
return stats
try:
existingTables = _getExistingTables(conn)
tables = dbData.get("tables", {})
tablesToImport = list(tables.keys())
if onlyTables:
tablesToImport = [t for t in tablesToImport if t in onlyTables]
for tableName in tablesToImport:
records = tables[tableName]
if tableName not in existingTables:
logger.warning(f" Tabelle '{tableName}' existiert nicht - übersprungen")
stats["skipped"][tableName] = len(records)
stats["totalSkipped"] += len(records)
continue
if dryRun:
stats["imported"][tableName] = len(records)
stats["totalImported"] += len(records)
continue
if clearFirst:
_clearTable(conn, tableName)
existingColumns = _getTableColumns(conn, tableName)
imported = 0
errors = 0
for record in records:
if _insertRecord(conn, tableName, record, existingColumns):
imported += 1
else:
errors += 1
stats["imported"][tableName] = imported
stats["totalImported"] += imported
if errors > 0:
stats["errors"][tableName] = errors
stats["totalErrors"] += errors
if imported > 0:
logger.info(f" {tableName}: {imported} importiert, {errors} Fehler")
if not dryRun:
conn.commit()
else:
conn.rollback()
return stats
except Exception as e:
conn.rollback()
logger.error(f" Import fehlgeschlagen: {e}")
raise
finally:
conn.close()
def importDatabase(
filePath: str,
dryRun: bool = False,
clearFirst: bool = False,
onlyTables: Optional[List[str]] = None,
onlyDatabases: Optional[List[str]] = None
) -> Dict[str, Any]:
"""
Importiert Daten aus einer Migrationsdatei.
Args:
filePath: Pfad zur Migrationsdatei
dryRun: Nur simulieren
clearFirst: Tabellen vor Import leeren
onlyTables: Nur diese Tabellen importieren
onlyDatabases: Nur diese Datenbanken importieren
Returns:
Import-Statistiken
"""
migrationData = loadMigrationFile(filePath)
meta = migrationData.get("meta", {})
logger.info(f"Migrationsdatei geladen:")
logger.info(f" Exportiert am: {meta.get('exportedAt', 'unbekannt')}")
logger.info(f" Quelle: {meta.get('exportedFrom', 'unbekannt')}")
stats = {
"databases": {},
"totalImported": 0,
"totalSkipped": 0,
"totalErrors": 0
}
# Neues Format (mehrere Datenbanken)
if "databases" in migrationData:
databases = migrationData["databases"]
logger.info(f" Datenbanken: {len(databases)}")
logger.info(f" Tabellen: {meta.get('totalTables', 'unbekannt')}")
logger.info(f" Datensätze: {meta.get('totalRecords', 'unbekannt')}")
for dbName, dbData in databases.items():
if onlyDatabases and dbName not in onlyDatabases:
continue
logger.info(f"Importiere Datenbank: {dbName}")
dbStats = _importSingleDatabase(dbName, dbData, dryRun, clearFirst, onlyTables)
stats["databases"][dbName] = dbStats
stats["totalImported"] += dbStats["totalImported"]
stats["totalSkipped"] += dbStats["totalSkipped"]
stats["totalErrors"] += dbStats["totalErrors"]
# Altes Format (einzelne Datenbank - poweron_app)
elif "tables" in migrationData:
logger.info(" Format: Legacy (einzelne Datenbank)")
dbName = "poweron_app"
dbData = {"tables": migrationData["tables"]}
if not onlyDatabases or dbName in onlyDatabases:
logger.info(f"Importiere Datenbank: {dbName}")
dbStats = _importSingleDatabase(dbName, dbData, dryRun, clearFirst, onlyTables)
stats["databases"][dbName] = dbStats
stats["totalImported"] = dbStats["totalImported"]
stats["totalSkipped"] = dbStats["totalSkipped"]
stats["totalErrors"] = dbStats["totalErrors"]
if dryRun:
logger.info("Dry-Run: Keine Änderungen vorgenommen")
return stats
def printImportPreview(filePath: str):
"""Zeigt eine Vorschau der zu importierenden Daten."""
migrationData = loadMigrationFile(filePath)
meta = migrationData.get("meta", {})
print("\n" + "=" * 70)
print("IMPORT VORSCHAU")
print("=" * 70)
print(f"Datei: {filePath}")
print(f"Exportiert am: {meta.get('exportedAt', 'unbekannt')}")
print(f"Quelle: {meta.get('exportedFrom', 'unbekannt')}")
# Neues Format
if "databases" in migrationData:
databases = migrationData["databases"]
print(f"Datenbanken: {len(databases)}")
print("=" * 70)
grandTotal = 0
for dbName, dbData in databases.items():
tables = dbData.get("tables", {})
dbTotal = sum(len(records) for records in tables.values())
grandTotal += dbTotal
print(f"\n{dbName} ({dbTotal} Datensätze)")
print("-" * 70)
print(f" {'Tabelle':<45} {'Datensätze':>15}")
print(f" {'-' * 45} {'-' * 15}")
for tableName, records in sorted(tables.items()):
if len(records) > 0:
print(f" {tableName:<45} {len(records):>15}")
print("\n" + "=" * 70)
print(f"GESAMT: {grandTotal} Datensätze")
# Altes Format
elif "tables" in migrationData:
tables = migrationData["tables"]
print(f"Format: Legacy (poweron_app)")
print("-" * 70)
print(f"{'Tabelle':<45} {'Datensätze':>15}")
print("-" * 70)
totalRecords = 0
for tableName, records in sorted(tables.items()):
count = len(records)
totalRecords += count
if count > 0:
print(f"{tableName:<45} {count:>15}")
print("-" * 70)
print(f"{'GESAMT':<45} {totalRecords:>15}")
print("=" * 70 + "\n")
def main():
parser = argparse.ArgumentParser(
description="Importiert Datenbank-Daten aus einer Migrationsdatei",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Datenbanken:
poweron_app - User, Mandate, RBAC, Features
poweron_chat - Chat-Konversationen
poweron_management - Workflows, Prompts, Connections
poweron_realestate - Real Estate Daten
poweron_trustee - Trustee Daten
Beispiele:
python tool_db_import_migration.py migration_export.json --dry-run
python tool_db_import_migration.py migration_export.json --preview
python tool_db_import_migration.py migration_export.json --force
python tool_db_import_migration.py migration_export.json --clear-first --force
python tool_db_import_migration.py migration_export.json --only-db poweron_app
python tool_db_import_migration.py migration_export.json --only-tables UserInDB,Mandate
"""
)
parser.add_argument(
"import_file",
help="Pfad zur Migrationsdatei (JSON)",
type=str
)
parser.add_argument(
"--dry-run",
help="Simuliert den Import ohne Änderungen",
action="store_true"
)
parser.add_argument(
"--force",
help="Bestätigung überspringen",
action="store_true"
)
parser.add_argument(
"--clear-first",
help="Tabellen vor dem Import leeren",
action="store_true"
)
parser.add_argument(
"--only-tables",
help="Nur diese Tabellen importieren (komma-getrennt)",
type=str,
default=""
)
parser.add_argument(
"--only-db",
help="Nur diese Datenbank(en) importieren (komma-getrennt)",
type=str,
default=""
)
parser.add_argument(
"--preview",
help="Nur Vorschau anzeigen (kein Import)",
action="store_true"
)
args = parser.parse_args()
# Nur Vorschau anzeigen
if args.preview:
printImportPreview(args.import_file)
return
# Listen parsen
onlyTables = None
if args.only_tables:
onlyTables = [t.strip() for t in args.only_tables.split(",") if t.strip()]
onlyDatabases = None
if args.only_db:
onlyDatabases = [db.strip() for db in args.only_db.split(",") if db.strip()]
# Bestätigung einholen
if not args.dry_run and not args.force:
printImportPreview(args.import_file)
if args.clear_first:
print("WARNUNG: --clear-first wird ALLE bestehenden Daten in den Zieltabellen löschen!")
response = input("\nMöchten Sie den Import starten? [y/N]: ")
if response.lower() not in ["y", "yes", "j", "ja"]:
print("Import abgebrochen.")
return
# Import durchführen
try:
if args.dry_run:
logger.info("=== DRY-RUN MODUS ===")
stats = importDatabase(
filePath=args.import_file,
dryRun=args.dry_run,
clearFirst=args.clear_first,
onlyTables=onlyTables,
onlyDatabases=onlyDatabases
)
print("\n" + "=" * 70)
print("IMPORT ERGEBNIS")
print("=" * 70)
print(f"Importiert: {stats['totalImported']} Datensätze")
print(f"Übersprungen: {stats['totalSkipped']} Datensätze")
print(f"Fehler: {stats['totalErrors']} Datensätze")
if args.dry_run:
print("\n(Dry-Run: Keine tatsächlichen Änderungen vorgenommen)")
else:
print("\n Import erfolgreich abgeschlossen!")
print("=" * 70 + "\n")
except Exception as e:
logger.error(f"Import fehlgeschlagen: {e}")
sys.exit(1)
if __name__ == "__main__":
main()