platform-core/scripts/script_migrate_user_uid.py
2026-05-19 16:48:01 +02:00

274 lines
9.3 KiB
Python

#!/usr/bin/env python3
"""One-time migration: Reassign all DB references from an old user UID to a new UID.
When a user is re-created in PORTA (same username, new UUID), all existing records
still reference the old UUID. This script scans ALL registered databases and tables
for VARCHAR columns containing the old UID and updates them to the new UID.
Affected columns include:
- sysCreatedBy / sysModifiedBy (on every table via PowerOnModel)
- userId, revokedBy, createdByUserId, publishedBy, triggeredBy, assignedTo, etc.
The script auto-detects the new UID from the UserInDB table by username.
Usage:
# Dry-run (default) — shows what would change, no writes:
python scripts/script_migrate_user_uid.py --username patrick.helvetia --old-uid <OLD_UUID>
# Execute for real:
python scripts/script_migrate_user_uid.py --username patrick.helvetia --old-uid <OLD_UUID> --execute
"""
import argparse
import logging
import os
import sys
from pathlib import Path
from typing import List, Optional, Tuple
scriptPath = Path(__file__).resolve()
gatewayPath = scriptPath.parent.parent
sys.path.insert(0, str(gatewayPath))
os.chdir(str(gatewayPath))
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", force=True)
logger = logging.getLogger(__name__)
import psycopg2
import psycopg2.extras
from modules.shared.configuration import APP_CONFIG
ALL_DATABASES = [
"poweron_app",
"poweron_chat",
"poweron_management",
"poweron_knowledge",
"poweron_billing",
"poweron_workspace",
"poweron_graphicaleditor",
"poweron_chatbot",
"poweron_trustee",
"poweron_commcoach",
"poweron_neutralization",
"poweron_realestate",
"poweron_teamsbot",
]
def _getConnection(dbName: str):
return psycopg2.connect(
host=APP_CONFIG.get("DB_HOST", "localhost"),
port=int(APP_CONFIG.get("DB_PORT", "5432")),
database=dbName,
user=APP_CONFIG.get("DB_USER"),
password=APP_CONFIG.get("DB_PASSWORD_SECRET"),
client_encoding="utf8",
)
def _getTablesInDb(conn) -> List[str]:
with conn.cursor() as cur:
cur.execute("""
SELECT table_name FROM information_schema.tables
WHERE table_schema = 'public'
AND table_type = 'BASE TABLE'
AND table_name NOT LIKE '\\_%%'
ORDER BY table_name
""")
return [row[0] for row in cur.fetchall()]
def _getVarcharColumns(conn, tableName: str) -> List[str]:
"""Get all VARCHAR/TEXT columns for a table (potential user-ID carriers)."""
with conn.cursor() as cur:
cur.execute("""
SELECT column_name FROM information_schema.columns
WHERE table_schema = 'public'
AND table_name = %s
AND data_type IN ('character varying', 'text')
ORDER BY ordinal_position
""", (tableName,))
return [row[0] for row in cur.fetchall()]
def _countMatches(conn, tableName: str, columnName: str, oldUid: str) -> int:
with conn.cursor() as cur:
cur.execute(
f'SELECT COUNT(*) FROM "{tableName}" WHERE "{columnName}" = %s',
(oldUid,),
)
return cur.fetchone()[0]
def _updateColumn(conn, tableName: str, columnName: str, oldUid: str, newUid: str) -> int:
with conn.cursor() as cur:
cur.execute(
f'UPDATE "{tableName}" SET "{columnName}" = %s WHERE "{columnName}" = %s',
(newUid, oldUid),
)
return cur.rowcount
def _lookupNewUid(username: str) -> Optional[str]:
"""Find the current UID for a username in poweron_app.UserInDB."""
conn = _getConnection("poweron_app")
try:
with conn.cursor() as cur:
cur.execute(
'SELECT "id" FROM "UserInDB" WHERE "username" = %s',
(username,),
)
row = cur.fetchone()
return row[0] if row else None
finally:
conn.close()
def _scanJsonbForUid(conn, tableName: str, columnName: str, oldUid: str) -> int:
"""Count JSONB fields that contain the old UID as a text value anywhere."""
with conn.cursor() as cur:
cur.execute(
f"""SELECT COUNT(*) FROM "{tableName}"
WHERE "{columnName}"::text LIKE %s""",
(f"%{oldUid}%",),
)
return cur.fetchone()[0]
def _updateJsonbColumn(conn, tableName: str, columnName: str, oldUid: str, newUid: str) -> int:
"""Replace old UID inside JSONB columns using text replacement."""
with conn.cursor() as cur:
cur.execute(
f"""UPDATE "{tableName}"
SET "{columnName}" = REPLACE("{columnName}"::text, %s, %s)::jsonb
WHERE "{columnName}"::text LIKE %s""",
(oldUid, newUid, f"%{oldUid}%"),
)
return cur.rowcount
def _getJsonbColumns(conn, tableName: str) -> List[str]:
"""Get all JSONB columns for a table."""
with conn.cursor() as cur:
cur.execute("""
SELECT column_name FROM information_schema.columns
WHERE table_schema = 'public'
AND table_name = %s
AND data_type = 'jsonb'
ORDER BY ordinal_position
""", (tableName,))
return [row[0] for row in cur.fetchall()]
def migrate(username: str, oldUid: str, execute: bool = False):
newUid = _lookupNewUid(username)
if not newUid:
logger.error(f"User '{username}' not found in UserInDB. Cannot determine new UID.")
sys.exit(1)
if newUid == oldUid:
logger.error(f"Old UID and new UID are identical ({oldUid}). Nothing to migrate.")
sys.exit(1)
logger.info(f"Migration: user '{username}'")
logger.info(f" Old UID: {oldUid}")
logger.info(f" New UID: {newUid}")
logger.info(f" Mode: {'EXECUTE' if execute else 'DRY-RUN'}")
logger.info("")
totalUpdated = 0
findings: List[Tuple[str, str, str, int]] = []
for dbName in ALL_DATABASES:
try:
conn = _getConnection(dbName)
except Exception as e:
logger.warning(f" Cannot connect to {dbName}: {e}")
continue
try:
conn.autocommit = False
tables = _getTablesInDb(conn)
for tableName in tables:
varcharCols = _getVarcharColumns(conn, tableName)
for col in varcharCols:
count = _countMatches(conn, tableName, col, oldUid)
if count > 0:
findings.append((dbName, tableName, col, count))
if execute:
updated = _updateColumn(conn, tableName, col, oldUid, newUid)
totalUpdated += updated
logger.info(f" [UPDATED] {dbName}.{tableName}.{col}: {updated} rows")
else:
logger.info(f" [DRY-RUN] {dbName}.{tableName}.{col}: {count} rows would be updated")
jsonbCols = _getJsonbColumns(conn, tableName)
for col in jsonbCols:
count = _scanJsonbForUid(conn, tableName, col, oldUid)
if count > 0:
findings.append((dbName, tableName, f"{col} (JSONB)", count))
if execute:
_updateJsonbColumn(conn, tableName, col, oldUid, newUid)
totalUpdated += count
logger.info(f" [UPDATED] {dbName}.{tableName}.{col} (JSONB): {count} rows")
else:
logger.info(f" [DRY-RUN] {dbName}.{tableName}.{col} (JSONB): {count} rows would be updated")
if execute:
conn.commit()
else:
conn.rollback()
except Exception as e:
conn.rollback()
logger.error(f" Error processing {dbName}: {e}")
finally:
conn.close()
logger.info("")
logger.info("=" * 70)
logger.info("SUMMARY")
logger.info("=" * 70)
if not findings:
logger.info(" No references to old UID found in any database.")
else:
logger.info(f" Found {len(findings)} column(s) with references to old UID:")
for dbName, tableName, col, count in findings:
logger.info(f" {dbName}.{tableName}.{col}: {count} rows")
logger.info("")
if execute:
logger.info(f" Total rows updated: {totalUpdated}")
else:
logger.info(f" Total rows that would be updated: {sum(c for _, _, _, c in findings)}")
logger.info("")
logger.info(" To apply changes, re-run with --execute")
def main():
parser = argparse.ArgumentParser(
description="Migrate all DB references from old user UID to new UID."
)
parser.add_argument(
"--username",
required=True,
help="Username to migrate (e.g. 'patrick.helvetia'). Used to look up the new UID.",
)
parser.add_argument(
"--old-uid",
required=True,
help="The old UUID that is orphaned in the database.",
)
parser.add_argument(
"--execute",
action="store_true",
default=False,
help="Actually perform the migration. Without this flag, only a dry-run is done.",
)
args = parser.parse_args()
migrate(username=args.username, oldUid=args.old_uid, execute=args.execute)
if __name__ == "__main__":
main()