274 lines
9.3 KiB
Python
274 lines
9.3 KiB
Python
#!/usr/bin/env python3
|
|
"""One-time migration: Reassign all DB references from an old user UID to a new UID.
|
|
|
|
When a user is re-created in PORTA (same username, new UUID), all existing records
|
|
still reference the old UUID. This script scans ALL registered databases and tables
|
|
for VARCHAR columns containing the old UID and updates them to the new UID.
|
|
|
|
Affected columns include:
|
|
- sysCreatedBy / sysModifiedBy (on every table via PowerOnModel)
|
|
- userId, revokedBy, createdByUserId, publishedBy, triggeredBy, assignedTo, etc.
|
|
|
|
The script auto-detects the new UID from the UserInDB table by username.
|
|
|
|
Usage:
|
|
# Dry-run (default) — shows what would change, no writes:
|
|
python scripts/script_migrate_user_uid.py --username patrick.helvetia --old-uid <OLD_UUID>
|
|
|
|
# Execute for real:
|
|
python scripts/script_migrate_user_uid.py --username patrick.helvetia --old-uid <OLD_UUID> --execute
|
|
"""
|
|
|
|
import argparse
|
|
import logging
|
|
import os
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import List, Optional, Tuple
|
|
|
|
scriptPath = Path(__file__).resolve()
|
|
gatewayPath = scriptPath.parent.parent
|
|
sys.path.insert(0, str(gatewayPath))
|
|
os.chdir(str(gatewayPath))
|
|
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", force=True)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
import psycopg2
|
|
import psycopg2.extras
|
|
from modules.shared.configuration import APP_CONFIG
|
|
|
|
|
|
ALL_DATABASES = [
|
|
"poweron_app",
|
|
"poweron_chat",
|
|
"poweron_management",
|
|
"poweron_knowledge",
|
|
"poweron_billing",
|
|
"poweron_workspace",
|
|
"poweron_graphicaleditor",
|
|
"poweron_chatbot",
|
|
"poweron_trustee",
|
|
"poweron_commcoach",
|
|
"poweron_neutralization",
|
|
"poweron_realestate",
|
|
"poweron_teamsbot",
|
|
]
|
|
|
|
|
|
def _getConnection(dbName: str):
|
|
return psycopg2.connect(
|
|
host=APP_CONFIG.get("DB_HOST", "localhost"),
|
|
port=int(APP_CONFIG.get("DB_PORT", "5432")),
|
|
database=dbName,
|
|
user=APP_CONFIG.get("DB_USER"),
|
|
password=APP_CONFIG.get("DB_PASSWORD_SECRET"),
|
|
client_encoding="utf8",
|
|
)
|
|
|
|
|
|
def _getTablesInDb(conn) -> List[str]:
|
|
with conn.cursor() as cur:
|
|
cur.execute("""
|
|
SELECT table_name FROM information_schema.tables
|
|
WHERE table_schema = 'public'
|
|
AND table_type = 'BASE TABLE'
|
|
AND table_name NOT LIKE '\\_%%'
|
|
ORDER BY table_name
|
|
""")
|
|
return [row[0] for row in cur.fetchall()]
|
|
|
|
|
|
def _getVarcharColumns(conn, tableName: str) -> List[str]:
|
|
"""Get all VARCHAR/TEXT columns for a table (potential user-ID carriers)."""
|
|
with conn.cursor() as cur:
|
|
cur.execute("""
|
|
SELECT column_name FROM information_schema.columns
|
|
WHERE table_schema = 'public'
|
|
AND table_name = %s
|
|
AND data_type IN ('character varying', 'text')
|
|
ORDER BY ordinal_position
|
|
""", (tableName,))
|
|
return [row[0] for row in cur.fetchall()]
|
|
|
|
|
|
def _countMatches(conn, tableName: str, columnName: str, oldUid: str) -> int:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
f'SELECT COUNT(*) FROM "{tableName}" WHERE "{columnName}" = %s',
|
|
(oldUid,),
|
|
)
|
|
return cur.fetchone()[0]
|
|
|
|
|
|
def _updateColumn(conn, tableName: str, columnName: str, oldUid: str, newUid: str) -> int:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
f'UPDATE "{tableName}" SET "{columnName}" = %s WHERE "{columnName}" = %s',
|
|
(newUid, oldUid),
|
|
)
|
|
return cur.rowcount
|
|
|
|
|
|
def _lookupNewUid(username: str) -> Optional[str]:
|
|
"""Find the current UID for a username in poweron_app.UserInDB."""
|
|
conn = _getConnection("poweron_app")
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
'SELECT "id" FROM "UserInDB" WHERE "username" = %s',
|
|
(username,),
|
|
)
|
|
row = cur.fetchone()
|
|
return row[0] if row else None
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def _scanJsonbForUid(conn, tableName: str, columnName: str, oldUid: str) -> int:
|
|
"""Count JSONB fields that contain the old UID as a text value anywhere."""
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
f"""SELECT COUNT(*) FROM "{tableName}"
|
|
WHERE "{columnName}"::text LIKE %s""",
|
|
(f"%{oldUid}%",),
|
|
)
|
|
return cur.fetchone()[0]
|
|
|
|
|
|
def _updateJsonbColumn(conn, tableName: str, columnName: str, oldUid: str, newUid: str) -> int:
|
|
"""Replace old UID inside JSONB columns using text replacement."""
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
f"""UPDATE "{tableName}"
|
|
SET "{columnName}" = REPLACE("{columnName}"::text, %s, %s)::jsonb
|
|
WHERE "{columnName}"::text LIKE %s""",
|
|
(oldUid, newUid, f"%{oldUid}%"),
|
|
)
|
|
return cur.rowcount
|
|
|
|
|
|
def _getJsonbColumns(conn, tableName: str) -> List[str]:
|
|
"""Get all JSONB columns for a table."""
|
|
with conn.cursor() as cur:
|
|
cur.execute("""
|
|
SELECT column_name FROM information_schema.columns
|
|
WHERE table_schema = 'public'
|
|
AND table_name = %s
|
|
AND data_type = 'jsonb'
|
|
ORDER BY ordinal_position
|
|
""", (tableName,))
|
|
return [row[0] for row in cur.fetchall()]
|
|
|
|
|
|
def migrate(username: str, oldUid: str, execute: bool = False):
|
|
newUid = _lookupNewUid(username)
|
|
if not newUid:
|
|
logger.error(f"User '{username}' not found in UserInDB. Cannot determine new UID.")
|
|
sys.exit(1)
|
|
|
|
if newUid == oldUid:
|
|
logger.error(f"Old UID and new UID are identical ({oldUid}). Nothing to migrate.")
|
|
sys.exit(1)
|
|
|
|
logger.info(f"Migration: user '{username}'")
|
|
logger.info(f" Old UID: {oldUid}")
|
|
logger.info(f" New UID: {newUid}")
|
|
logger.info(f" Mode: {'EXECUTE' if execute else 'DRY-RUN'}")
|
|
logger.info("")
|
|
|
|
totalUpdated = 0
|
|
findings: List[Tuple[str, str, str, int]] = []
|
|
|
|
for dbName in ALL_DATABASES:
|
|
try:
|
|
conn = _getConnection(dbName)
|
|
except Exception as e:
|
|
logger.warning(f" Cannot connect to {dbName}: {e}")
|
|
continue
|
|
|
|
try:
|
|
conn.autocommit = False
|
|
tables = _getTablesInDb(conn)
|
|
|
|
for tableName in tables:
|
|
varcharCols = _getVarcharColumns(conn, tableName)
|
|
for col in varcharCols:
|
|
count = _countMatches(conn, tableName, col, oldUid)
|
|
if count > 0:
|
|
findings.append((dbName, tableName, col, count))
|
|
if execute:
|
|
updated = _updateColumn(conn, tableName, col, oldUid, newUid)
|
|
totalUpdated += updated
|
|
logger.info(f" [UPDATED] {dbName}.{tableName}.{col}: {updated} rows")
|
|
else:
|
|
logger.info(f" [DRY-RUN] {dbName}.{tableName}.{col}: {count} rows would be updated")
|
|
|
|
jsonbCols = _getJsonbColumns(conn, tableName)
|
|
for col in jsonbCols:
|
|
count = _scanJsonbForUid(conn, tableName, col, oldUid)
|
|
if count > 0:
|
|
findings.append((dbName, tableName, f"{col} (JSONB)", count))
|
|
if execute:
|
|
_updateJsonbColumn(conn, tableName, col, oldUid, newUid)
|
|
totalUpdated += count
|
|
logger.info(f" [UPDATED] {dbName}.{tableName}.{col} (JSONB): {count} rows")
|
|
else:
|
|
logger.info(f" [DRY-RUN] {dbName}.{tableName}.{col} (JSONB): {count} rows would be updated")
|
|
|
|
if execute:
|
|
conn.commit()
|
|
else:
|
|
conn.rollback()
|
|
except Exception as e:
|
|
conn.rollback()
|
|
logger.error(f" Error processing {dbName}: {e}")
|
|
finally:
|
|
conn.close()
|
|
|
|
logger.info("")
|
|
logger.info("=" * 70)
|
|
logger.info("SUMMARY")
|
|
logger.info("=" * 70)
|
|
if not findings:
|
|
logger.info(" No references to old UID found in any database.")
|
|
else:
|
|
logger.info(f" Found {len(findings)} column(s) with references to old UID:")
|
|
for dbName, tableName, col, count in findings:
|
|
logger.info(f" {dbName}.{tableName}.{col}: {count} rows")
|
|
logger.info("")
|
|
if execute:
|
|
logger.info(f" Total rows updated: {totalUpdated}")
|
|
else:
|
|
logger.info(f" Total rows that would be updated: {sum(c for _, _, _, c in findings)}")
|
|
logger.info("")
|
|
logger.info(" To apply changes, re-run with --execute")
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="Migrate all DB references from old user UID to new UID."
|
|
)
|
|
parser.add_argument(
|
|
"--username",
|
|
required=True,
|
|
help="Username to migrate (e.g. 'patrick.helvetia'). Used to look up the new UID.",
|
|
)
|
|
parser.add_argument(
|
|
"--old-uid",
|
|
required=True,
|
|
help="The old UUID that is orphaned in the database.",
|
|
)
|
|
parser.add_argument(
|
|
"--execute",
|
|
action="store_true",
|
|
default=False,
|
|
help="Actually perform the migration. Without this flag, only a dry-run is done.",
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
migrate(username=args.username, oldUid=args.old_uid, execute=args.execute)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|