# Copyright (c) 2025 Patrick Motsch # All rights reserved. """ Datenbank Export-Tool für Migration. Dieses Script exportiert alle Daten aus ALLEN PowerOn PostgreSQL-Datenbanken in eine JSON-Datei, die als Migrationsdatensatz verwendet werden kann. Zusätzlich wird eine separate JSON-Datei mit nur den Strukturen (ohne Daten) erstellt: _structure.json Datenbanken: - poweron_app (User, Mandate, RBAC, Features, etc.) - poweron_chat (Chat-Konversationen und Nachrichten) - poweron_management (Workflows, Prompts, Connections, etc.) - poweron_realestate (Real Estate Daten) - poweron_trustee (Trustee Daten) Verwendung: python script_db_export_migration.py [--output ] [--pretty] Optionen: --output, -o Pfad zur Ausgabedatei (Standard: migration_export_.json) Die Struktur-Datei wird automatisch als _structure.json erstellt --pretty, -p JSON formatiert ausgeben (für bessere Lesbarkeit) --exclude Komma-getrennte Liste von Tabellen, die ausgeschlossen werden sollen --include-meta System-Metadaten (_createdAt, _modifiedAt, etc.) beibehalten --db Nur bestimmte Datenbank(en) exportieren (komma-getrennt) """ import os import sys import json import argparse import logging from datetime import datetime from typing import Dict, List, Any, Optional from pathlib import Path # Add gateway to path for imports and set working directory # Find gateway directory (could be in local/pending/ or gateway/) scriptPath = Path(__file__).resolve() gatewayPath = scriptPath.parent # If we're in scripts/, go up to gateway/ if gatewayPath.name == "scripts": gatewayPath = gatewayPath.parent # If we're in local/pending/, go up to find gateway/ elif gatewayPath.name == "pending": gatewayPath = gatewayPath.parent.parent / "gateway" elif gatewayPath.name == "local": gatewayPath = gatewayPath.parent / "gateway" # If gateway doesn't exist, try current directory if not gatewayPath.exists(): gatewayPath = scriptPath.parent.parent.parent / "gateway" if gatewayPath.exists(): sys.path.insert(0, str(gatewayPath)) # Change working directory to gateway so APP_CONFIG can find .env file os.chdir(str(gatewayPath)) else: # Fallback: assume we're already in gateway/ or add parent sys.path.insert(0, str(scriptPath.parent)) # Try to change to gateway directory if it exists potentialGateway = scriptPath.parent if potentialGateway.exists() and (potentialGateway / "modules" / "shared" / "configuration.py").exists(): os.chdir(str(potentialGateway)) import psycopg2 import psycopg2.extras # Use the real APP_CONFIG which handles encryption and environment-specific configs from modules.shared.configuration import APP_CONFIG # type: ignore # Logging konfigurieren logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) logger = logging.getLogger(__name__) # Force APP_CONFIG to refresh after changing working directory # This ensures .env file is loaded from the correct location try: APP_CONFIG.refresh() envType = APP_CONFIG.get('APP_ENV_TYPE', 'unknown') keySysVar = APP_CONFIG.get('APP_KEY_SYSVAR', 'not_set') logger.debug(f"APP_CONFIG refreshed. Environment type: {envType}") logger.debug(f"APP_KEY_SYSVAR: {keySysVar}") logger.debug(f"Current working directory: {os.getcwd()}") # Check if master key is available (needed for decrypting secrets) if keySysVar != 'not_set': masterKeyEnv = os.environ.get(keySysVar) if masterKeyEnv: logger.debug(f"Master key found in environment variable: {keySysVar}") else: logger.warning(f"Master key not found in environment variable: {keySysVar}") logger.warning("Encrypted secrets may not be decryptable!") except Exception as e: logger.warning(f"Could not refresh APP_CONFIG: {e}") # Alle PowerOn Datenbanken ALL_DATABASES = [ "poweron_app", # Haupt-App: User, Mandate, RBAC, Features "poweron_chat", # Chat-Konversationen "poweron_management", # Workflows, Prompts, Connections "poweron_realestate", # Real Estate "poweron_trustee", # Trustee ] # Datenbank-Konfiguration: Mapping von DB-Name zu Config-Prefix # Jede Datenbank hat ihre eigenen Variablen: DB_APP_HOST, DB_CHAT_HOST, etc. DATABASE_CONFIG = { "poweron_app": "DB_APP", # DB_APP_HOST, DB_APP_USER, DB_APP_PASSWORD_SECRET, etc. "poweron_chat": "DB_CHAT", # DB_CHAT_HOST, DB_CHAT_USER, etc. "poweron_management": "DB_MANAGEMENT", "poweron_realestate": "DB_REALESTATE", "poweron_trustee": "DB_TRUSTEE", } def _getConfigValue(key: str, default: str = None) -> str: """Holt einen Konfigurationswert über APP_CONFIG (unterstützt Verschlüsselung).""" return APP_CONFIG.get(key, default) def _getDbConfig(dbName: str) -> Dict[str, Any]: """ Holt die Datenbankverbindungs-Konfiguration für eine spezifische Datenbank. Unterstützt sowohl DB-spezifische Variablen (DB_APP_HOST) als auch Fallback (DB_HOST). """ prefix = DATABASE_CONFIG.get(dbName, "DB") # Versuche zuerst DB-spezifische Variablen, dann Fallback auf allgemeine host = _getConfigValue(f"{prefix}_HOST") or _getConfigValue("DB_HOST", "localhost") user = _getConfigValue(f"{prefix}_USER") or _getConfigValue("DB_USER") password = _getConfigValue(f"{prefix}_PASSWORD_SECRET") or _getConfigValue("DB_PASSWORD_SECRET") port = _getConfigValue(f"{prefix}_PORT") or _getConfigValue("DB_PORT", "5432") return { "host": host, "user": user, "password": password, "port": int(port) if port else 5432 } def _databaseExists(dbDatabase: str) -> bool: """Prüft ob eine Datenbank existiert.""" config = _getDbConfig(dbDatabase) if not config["user"]: logger.warning(f"DB-User nicht gesetzt für Datenbank {dbDatabase}") return False if not config["password"]: logger.warning(f"DB-Password nicht gesetzt für Datenbank {dbDatabase}") return False try: # Verbinde zur postgres Datenbank um zu prüfen conn = psycopg2.connect( host=config["host"], port=config["port"], database="postgres", user=config["user"], password=config["password"] ) conn.autocommit = True with conn.cursor() as cursor: cursor.execute( "SELECT 1 FROM pg_database WHERE datname = %s", (dbDatabase,) ) exists = cursor.fetchone() is not None conn.close() return exists except Exception as e: logger.error(f"Fehler beim Prüfen der Datenbank {dbDatabase} auf {config['host']}:{config['port']}: {e}") return False def _getDbConnection(dbDatabase: str): """Erstellt eine Verbindung zu einer spezifischen PostgreSQL-Datenbank.""" config = _getDbConfig(dbDatabase) # Prüfe ob wichtige Konfigurationswerte fehlen if not config["user"]: logger.error(f"DB-User nicht gesetzt für {dbDatabase} - kann keine Verbindung herstellen") return None if not config["password"]: logger.error(f"DB-Password nicht gesetzt für {dbDatabase} - kann keine Verbindung herstellen") return None # Erst prüfen ob Datenbank existiert if not _databaseExists(dbDatabase): logger.warning(f"Datenbank '{dbDatabase}' existiert nicht auf {config['host']}:{config['port']} - übersprungen") return None try: conn = psycopg2.connect( host=config["host"], port=config["port"], database=dbDatabase, user=config["user"], password=config["password"], cursor_factory=psycopg2.extras.RealDictCursor ) # Autocommit muss VOR set_client_encoding gesetzt werden, um Transaction-Konflikte zu vermeiden conn.autocommit = True conn.set_client_encoding('UTF8') logger.debug(f"Erfolgreich verbunden mit {config['host']}:{config['port']}/{dbDatabase}") return conn except Exception as e: logger.error(f"Datenbankverbindung zu {dbDatabase} auf {config['host']}:{config['port']} fehlgeschlagen: {e}") raise def _getTables(conn) -> List[str]: """Gibt alle Tabellennamen in der Datenbank zurück.""" with conn.cursor() as cursor: cursor.execute(""" SELECT table_name FROM information_schema.tables WHERE table_schema = 'public' AND table_type = 'BASE TABLE' ORDER BY table_name """) tables = [row["table_name"] for row in cursor.fetchall()] return tables def _getTableData(conn, tableName: str, includeMeta: bool = False) -> List[Dict[str, Any]]: """Liest alle Daten aus einer Tabelle.""" with conn.cursor() as cursor: cursor.execute(f'SELECT * FROM "{tableName}"') rows = cursor.fetchall() records = [] for row in rows: record = dict(row) # Optional: System-Metadaten entfernen if not includeMeta: metaFields = ["_createdAt", "_modifiedAt", "_createdBy", "_modifiedBy"] for field in metaFields: record.pop(field, None) # Konvertiere JSONB-Felder (sind bereits als Dict/List von psycopg2) for key, value in record.items(): if isinstance(value, (int, float)): record[key] = float(value) if isinstance(value, float) else int(value) records.append(record) return records def _getTableRowCount(conn, tableName: str) -> int: """Zählt die Anzahl der Zeilen in einer Tabelle.""" with conn.cursor() as cursor: cursor.execute(f'SELECT COUNT(*) as count FROM "{tableName}"') result = cursor.fetchone() return result["count"] if result else 0 def _getTableStructure(conn, tableName: str) -> Dict[str, Any]: """Holt die Struktur einer Tabelle (Spalten, Constraints, Indizes) ohne Daten.""" structure = { "columns": [], "primaryKeys": [], "foreignKeys": [], "uniqueConstraints": [], "indexes": [], "checkConstraints": [] } # Connection hat bereits autocommit = True, daher keine Transaction-Probleme with conn.cursor() as cursor: # Spalten-Informationen cursor.execute(""" SELECT column_name, data_type, character_maximum_length, numeric_precision, numeric_scale, is_nullable, column_default, udt_name FROM information_schema.columns WHERE table_schema = 'public' AND table_name = %s ORDER BY ordinal_position """, (tableName,)) for row in cursor.fetchall(): colInfo = { "name": row["column_name"], "type": row["data_type"], "udtName": row["udt_name"], "nullable": row["is_nullable"] == "YES", "default": row["column_default"] } if row["character_maximum_length"]: colInfo["maxLength"] = row["character_maximum_length"] if row["numeric_precision"]: colInfo["precision"] = row["numeric_precision"] if row["numeric_scale"]: colInfo["scale"] = row["numeric_scale"] structure["columns"].append(colInfo) # Primary Keys cursor.execute(""" SELECT kcu.column_name FROM information_schema.table_constraints tc JOIN information_schema.key_column_usage kcu ON tc.constraint_name = kcu.constraint_name WHERE tc.table_schema = 'public' AND tc.table_name = %s AND tc.constraint_type = 'PRIMARY KEY' ORDER BY kcu.ordinal_position """, (tableName,)) structure["primaryKeys"] = [row["column_name"] for row in cursor.fetchall()] # Foreign Keys cursor.execute(""" SELECT kcu.column_name, ccu.table_name AS foreign_table_name, ccu.column_name AS foreign_column_name, tc.constraint_name FROM information_schema.table_constraints AS tc JOIN information_schema.key_column_usage AS kcu ON tc.constraint_name = kcu.constraint_name JOIN information_schema.constraint_column_usage AS ccu ON ccu.constraint_name = tc.constraint_name WHERE tc.constraint_type = 'FOREIGN KEY' AND tc.table_schema = 'public' AND tc.table_name = %s """, (tableName,)) for row in cursor.fetchall(): structure["foreignKeys"].append({ "column": row["column_name"], "referencesTable": row["foreign_table_name"], "referencesColumn": row["foreign_column_name"], "constraintName": row["constraint_name"] }) # Unique Constraints - FIX: Tabellen-Aliase verwenden um ambiguous columns zu vermeiden cursor.execute(""" SELECT kcu.column_name, tc.constraint_name FROM information_schema.table_constraints tc JOIN information_schema.key_column_usage kcu ON tc.constraint_name = kcu.constraint_name AND tc.table_schema = kcu.table_schema AND tc.table_name = kcu.table_name WHERE tc.table_schema = 'public' AND tc.table_name = %s AND tc.constraint_type = 'UNIQUE' ORDER BY kcu.ordinal_position """, (tableName,)) uniqueGroups = {} for row in cursor.fetchall(): constraintName = row["constraint_name"] if constraintName not in uniqueGroups: uniqueGroups[constraintName] = [] uniqueGroups[constraintName].append(row["column_name"]) structure["uniqueConstraints"] = [ {"columns": cols, "constraintName": name} for name, cols in uniqueGroups.items() ] # Indizes (ohne Primary Key und Unique Constraints) cursor.execute(""" SELECT i.relname AS index_name, a.attname AS column_name, ix.indisunique AS is_unique FROM pg_class t JOIN pg_index ix ON t.oid = ix.indrelid JOIN pg_class i ON i.oid = ix.indexrelid JOIN pg_attribute a ON a.attrelid = t.oid AND a.attnum = ANY(ix.indkey) WHERE t.relkind = 'r' AND t.relname = %s AND NOT ix.indisprimary ORDER BY i.relname, a.attnum """, (tableName,)) indexGroups = {} for row in cursor.fetchall(): indexName = row["index_name"] if indexName not in indexGroups: indexGroups[indexName] = { "name": indexName, "columns": [], "unique": row["is_unique"] } indexGroups[indexName]["columns"].append(row["column_name"]) structure["indexes"] = list(indexGroups.values()) # Check Constraints - FIX: Tabellen-Aliase verwenden cursor.execute(""" SELECT cc.constraint_name, cc.check_clause FROM information_schema.check_constraints cc JOIN information_schema.constraint_column_usage ccu ON cc.constraint_name = ccu.constraint_name WHERE ccu.table_schema = 'public' AND ccu.table_name = %s """, (tableName,)) for row in cursor.fetchall(): structure["checkConstraints"].append({ "constraintName": row["constraint_name"], "checkClause": row["check_clause"] }) return structure def _exportSingleDatabaseStructure( dbDatabase: str, excludeTables: List[str] ) -> Optional[Dict[str, Any]]: """Exportiert nur die Struktur einer einzelnen Datenbank (ohne Daten).""" conn = _getDbConnection(dbDatabase) if conn is None: return None try: allTables = _getTables(conn) # System-Tabellen ausschliessen systemTables = ["_system"] tablesToExport = [ t for t in allTables if t not in systemTables and t not in excludeTables ] dbExport = { "tables": {}, "summary": {}, "tableCount": len(tablesToExport) } for tableName in tablesToExport: try: structure = _getTableStructure(conn, tableName) dbExport["tables"][tableName] = structure dbExport["summary"][tableName] = { "columnCount": len(structure["columns"]), "primaryKeyCount": len(structure["primaryKeys"]), "foreignKeyCount": len(structure["foreignKeys"]), "indexCount": len(structure["indexes"]) } logger.info(f" {tableName}: {len(structure['columns'])} Spalten") except Exception as e: logger.error(f" Fehler bei Tabelle {tableName}: {e}") dbExport["tables"][tableName] = {} dbExport["summary"][tableName] = {"error": str(e)} return dbExport finally: conn.close() def _exportSingleDatabase( dbDatabase: str, excludeTables: List[str], includeMeta: bool ) -> Optional[Dict[str, Any]]: """Exportiert eine einzelne Datenbank.""" conn = _getDbConnection(dbDatabase) if conn is None: return None try: allTables = _getTables(conn) # System-Tabellen ausschliessen systemTables = ["_system"] tablesToExport = [ t for t in allTables if t not in systemTables and t not in excludeTables ] dbExport = { "tables": {}, "summary": {}, "tableCount": len(tablesToExport), "totalRecords": 0 } for tableName in tablesToExport: try: records = _getTableData(conn, tableName, includeMeta) rowCount = len(records) dbExport["totalRecords"] += rowCount dbExport["tables"][tableName] = records dbExport["summary"][tableName] = {"recordCount": rowCount} if rowCount > 0: logger.info(f" {tableName}: {rowCount} Datensätze") except Exception as e: logger.error(f" Fehler bei Tabelle {tableName}: {e}") dbExport["tables"][tableName] = [] dbExport["summary"][tableName] = {"recordCount": 0, "error": str(e)} # Bei autocommit = True ist kein rollback() notwendig return dbExport finally: conn.close() def exportDatabase( outputPath: Optional[str] = None, prettyPrint: bool = False, excludeTables: Optional[List[str]] = None, includeMeta: bool = False, onlyDatabases: Optional[List[str]] = None ) -> str: """ Exportiert alle Datenbanken in eine JSON-Datei. Erstellt zusätzlich eine separate JSON-Datei mit nur den Strukturen (ohne Daten). Args: outputPath: Pfad zur Ausgabedatei (optional) prettyPrint: JSON formatiert ausgeben excludeTables: Liste von Tabellen, die ausgeschlossen werden sollen includeMeta: System-Metadaten beibehalten onlyDatabases: Nur diese Datenbanken exportieren Returns: Pfad zur erstellten Exportdatei """ excludeTables = excludeTables or [] # Welche Datenbanken exportieren? databasesToExport = onlyDatabases if onlyDatabases else ALL_DATABASES # Prüfe ob grundlegende Konfigurationswerte vorhanden sind # APP_CONFIG.get() entschlüsselt automatisch Werte, die mit _SECRET enden # Jede Datenbank hat ihre eigenen Variablen (DB_APP_USER, DB_CHAT_USER, etc.) missingConfigs = [] for dbName in databasesToExport: config = _getDbConfig(dbName) if not config["user"] or not config["password"]: missingConfigs.append(f"{dbName}: User={'gesetzt' if config['user'] else 'FEHLT'}, Password={'gesetzt' if config['password'] else 'FEHLT'}") if missingConfigs: logger.error("WICHTIG: Einige Datenbank-Konfigurationen fehlen!") for missing in missingConfigs: logger.error(f" {missing}") logger.error(" Bitte Umgebungsvariablen setzen oder .env Datei konfigurieren") logger.error(" Hinweis: Verschlüsselte Secrets benötigen APP_KEY_SYSVAR Umgebungsvariable!") logger.error(" Erwartete Variablen: DB_APP_USER, DB_CHAT_USER, DB_MANAGEMENT_USER, etc.") # Standard-Ausgabepfad generieren (im Log-Ordner) if not outputPath: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") logDir = _getConfigValue("APP_LOGGING_LOG_DIR") if logDir and os.path.isabs(logDir): outputDir = logDir else: outputDir = os.path.join(str(gatewayPath), "local", "logs") os.makedirs(outputDir, exist_ok=True) outputPath = os.path.join(outputDir, f"migration_export_{timestamp}.json") logger.info(f"Starte Export von {len(databasesToExport)} Datenbank(en)...") logger.info(f"Datenbanken: {', '.join(databasesToExport)}") # Export-Struktur erstellen exportData = { "meta": { "exportedAt": datetime.utcnow().isoformat() + "Z", "exportedFrom": _getConfigValue("APP_ENV_LABEL", "unknown"), "version": "1.0", "databaseCount": 0, "totalTables": 0, "totalRecords": 0, "excludedTables": excludeTables, "includesMeta": includeMeta }, "databases": {} } # Struktur-Export erstellen structureData = { "meta": { "exportedAt": datetime.utcnow().isoformat() + "Z", "exportedFrom": _getConfigValue("APP_ENV_LABEL", "unknown"), "version": "1.0", "databaseCount": 0, "totalTables": 0, "excludedTables": excludeTables, "note": "Nur Strukturen, keine Daten" }, "databases": {} } # Jede Datenbank exportieren for dbName in databasesToExport: logger.info(f"Exportiere Datenbank: {dbName}") # Daten exportieren dbExport = _exportSingleDatabase(dbName, excludeTables, includeMeta) # Struktur exportieren logger.info(f"Exportiere Struktur für Datenbank: {dbName}") dbStructure = _exportSingleDatabaseStructure(dbName, excludeTables) if dbExport is not None: exportData["databases"][dbName] = dbExport exportData["meta"]["databaseCount"] += 1 exportData["meta"]["totalTables"] += dbExport["tableCount"] exportData["meta"]["totalRecords"] += dbExport["totalRecords"] logger.info(f" -> {dbExport['tableCount']} Tabellen, {dbExport['totalRecords']} Datensätze") else: logger.info(f" -> Übersprungen (existiert nicht)") if dbStructure is not None: structureData["databases"][dbName] = dbStructure structureData["meta"]["databaseCount"] += 1 structureData["meta"]["totalTables"] += dbStructure["tableCount"] # JSON-Datei mit Daten schreiben logger.info(f"Schreibe Exportdatei: {outputPath}") with open(outputPath, "w", encoding="utf-8") as f: if prettyPrint: json.dump(exportData, f, indent=2, ensure_ascii=False, default=str) else: json.dump(exportData, f, ensure_ascii=False, default=str) # JSON-Datei mit Strukturen schreiben structurePath = outputPath.replace(".json", "_structure.json") logger.info(f"Schreibe Struktur-Exportdatei: {structurePath}") with open(structurePath, "w", encoding="utf-8") as f: if prettyPrint: json.dump(structureData, f, indent=2, ensure_ascii=False, default=str) else: json.dump(structureData, f, ensure_ascii=False, default=str) # Dateigrössen berechnen fileSize = os.path.getsize(outputPath) fileSizeStr = _formatFileSize(fileSize) structureFileSize = os.path.getsize(structurePath) structureFileSizeStr = _formatFileSize(structureFileSize) logger.info(f"Export abgeschlossen!") logger.info(f" Datenbanken: {exportData['meta']['databaseCount']}") logger.info(f" Tabellen: {exportData['meta']['totalTables']}") logger.info(f" Datensätze: {exportData['meta']['totalRecords']}") logger.info(f" Daten-Export: {fileSizeStr} - {outputPath}") logger.info(f" Struktur-Export: {structureFileSizeStr} - {structurePath}") return outputPath def _formatFileSize(sizeBytes: int) -> str: """Formatiert Dateigrösse in lesbares Format.""" for unit in ['B', 'KB', 'MB', 'GB']: if sizeBytes < 1024: return f"{sizeBytes:.2f} {unit}" sizeBytes /= 1024 return f"{sizeBytes:.2f} TB" def printDatabaseSummary(): """Zeigt eine Zusammenfassung aller Datenbanken an.""" print("\n" + "=" * 70) print("DATENBANK ZUSAMMENFASSUNG - ALLE POWEREON DATENBANKEN") print("=" * 70) print(f"Umgebung: {_getConfigValue('APP_ENV_LABEL', 'unknown')}") print(f"Host: {_getConfigValue('DB_HOST', 'localhost')}") print("=" * 70) grandTotalRecords = 0 grandTotalTables = 0 for dbName in ALL_DATABASES: print(f"\n{dbName}") print("-" * 70) conn = _getDbConnection(dbName) if conn is None: print(" (Datenbank existiert nicht)") continue try: tables = _getTables(conn) dbTotalRecords = 0 print(f" {'Tabelle':<45} {'Datensätze':>15}") print(f" {'-' * 45} {'-' * 15}") for tableName in tables: if tableName.startswith("_"): continue # System-Tabellen überspringen count = _getTableRowCount(conn, tableName) dbTotalRecords += count if count > 0: # Nur nicht-leere Tabellen anzeigen print(f" {tableName:<45} {count:>15}") print(f" {'-' * 45} {'-' * 15}") print(f" {'Gesamt':<45} {dbTotalRecords:>15}") grandTotalRecords += dbTotalRecords grandTotalTables += len([t for t in tables if not t.startswith("_")]) finally: conn.close() print("\n" + "=" * 70) print(f"GESAMTÜBERSICHT") print(f" Datenbanken: {len(ALL_DATABASES)}") print(f" Tabellen: {grandTotalTables}") print(f" Datensätze: {grandTotalRecords}") print("=" * 70 + "\n") def main(): parser = argparse.ArgumentParser( description="Exportiert alle PowerOn Datenbank-Daten für Migration", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Datenbanken: poweron_app - User, Mandate, RBAC, Features poweron_chat - Chat-Konversationen poweron_management - Workflows, Prompts, Connections poweron_realestate - Real Estate Daten poweron_trustee - Trustee Daten Beispiele: python script_db_export_migration.py python script_db_export_migration.py --pretty python script_db_export_migration.py -o backup.json --pretty python script_db_export_migration.py --db poweron_app,poweron_chat python script_db_export_migration.py --exclude Token,AuthEvent --include-meta python script_db_export_migration.py --summary """ ) parser.add_argument( "-o", "--output", help="Pfad zur Ausgabedatei", type=str, default=None ) parser.add_argument( "-p", "--pretty", help="JSON formatiert ausgeben", action="store_true" ) parser.add_argument( "--exclude", help="Komma-getrennte Liste von Tabellen zum Ausschliessen", type=str, default="" ) parser.add_argument( "--include-meta", help="System-Metadaten (_createdAt, etc.) beibehalten", action="store_true" ) parser.add_argument( "--db", help="Nur bestimmte Datenbank(en) exportieren (komma-getrennt)", type=str, default="" ) parser.add_argument( "--summary", help="Nur Zusammenfassung anzeigen (kein Export)", action="store_true" ) args = parser.parse_args() # Nur Zusammenfassung anzeigen if args.summary: printDatabaseSummary() return # Exclude-Liste parsen excludeTables = [] if args.exclude: excludeTables = [t.strip() for t in args.exclude.split(",") if t.strip()] # Datenbank-Liste parsen onlyDatabases = None if args.db: onlyDatabases = [db.strip() for db in args.db.split(",") if db.strip()] # Export durchführen try: outputPath = exportDatabase( outputPath=args.output, prettyPrint=args.pretty, excludeTables=excludeTables, includeMeta=args.include_meta, onlyDatabases=onlyDatabases ) print(f"\n Export erfolgreich: {outputPath}\n") except Exception as e: logger.error(f"Export fehlgeschlagen: {e}") sys.exit(1) if __name__ == "__main__": main()