# Copyright (c) 2025 Patrick Motsch # All rights reserved. """ Database health utilities — table statistics and orphan detection/cleanup. All functions are intended for SysAdmin use only (access control in the route layer). """ import logging import time import threading from dataclasses import dataclass, asdict from typing import Dict, List, Optional, Set import psycopg2 import psycopg2.extras from modules.shared.configuration import APP_CONFIG from modules.shared.dbRegistry import _getRegisteredDatabases from modules.shared.fkRegistry import _getFkRelationships, FkRelationship logger = logging.getLogger(__name__) _ORPHAN_CACHE_TTL = 300 # 5 minutes _orphanCacheLock = threading.Lock() _orphanCache: Optional[Dict] = None # {"ts": float, "results": [...]} # --------------------------------------------------------------------------- # Dataclasses # --------------------------------------------------------------------------- @dataclass class TableStats: db: str table: str estimatedRows: int totalSizeBytes: int indexSizeBytes: int lastVacuum: Optional[str] lastAnalyze: Optional[str] @dataclass class OrphanResult: sourceDb: str sourceTable: str sourceColumn: str targetDb: str targetTable: str targetColumn: str orphanCount: int # --------------------------------------------------------------------------- # Low-level DB helpers (read-only, lightweight connections) # --------------------------------------------------------------------------- def _getConnection(dbName: str): """Open a psycopg2 connection to the given registered database.""" registeredDbs = _getRegisteredDatabases() configPrefix = registeredDbs.get(dbName) if configPrefix is None: raise ValueError(f"Database '{dbName}' is not registered.") hostKey = f"{configPrefix}_HOST" if configPrefix != "DB" else "DB_HOST" portKey = f"{configPrefix}_PORT" if configPrefix != "DB" else "DB_PORT" userKey = f"{configPrefix}_USER" if configPrefix != "DB" else "DB_USER" passwordKey = ( f"{configPrefix}_PASSWORD_SECRET" if configPrefix != "DB" else "DB_PASSWORD_SECRET" ) return psycopg2.connect( host=APP_CONFIG.get(hostKey, "localhost"), port=int(APP_CONFIG.get(portKey, 5432)), database=dbName, user=APP_CONFIG.get(userKey), password=APP_CONFIG.get(passwordKey), client_encoding="utf8", cursor_factory=psycopg2.extras.RealDictCursor, ) # --------------------------------------------------------------------------- # Table statistics # --------------------------------------------------------------------------- def _getTableStats(dbFilter: Optional[str] = None) -> List[dict]: """Query pg_stat_user_tables + pg_total_relation_size for every registered DB. Returns a list of TableStats dicts, optionally filtered by database name. """ registeredDbs = _getRegisteredDatabases() if dbFilter: registeredDbs = {k: v for k, v in registeredDbs.items() if k == dbFilter} results: List[dict] = [] for dbName in sorted(registeredDbs): try: conn = _getConnection(dbName) try: with conn.cursor() as cur: cur.execute(""" SELECT s.relname AS "table", s.n_live_tup AS "estimatedRows", pg_total_relation_size(quote_ident(s.relname)) AS "totalSizeBytes", pg_indexes_size(quote_ident(s.relname)) AS "indexSizeBytes", s.last_vacuum::text AS "lastVacuum", s.last_analyze::text AS "lastAnalyze" FROM pg_stat_user_tables s WHERE s.schemaname = 'public' AND s.relname NOT LIKE '\\_%%' ORDER BY s.relname """) for row in cur.fetchall(): results.append(asdict(TableStats( db=dbName, table=row["table"], estimatedRows=row["estimatedRows"], totalSizeBytes=row["totalSizeBytes"], indexSizeBytes=row["indexSizeBytes"], lastVacuum=row["lastVacuum"], lastAnalyze=row["lastAnalyze"], ))) finally: conn.close() except Exception as e: logger.error(f"Failed to get table stats for {dbName}: {e}") return results # --------------------------------------------------------------------------- # Orphan scanning # --------------------------------------------------------------------------- def _loadParentIds(conn, tableName: str, columnName: str) -> Set[str]: """Load all distinct values of a column from a table (for cross-DB checks).""" ids: Set[str] = set() with conn.cursor() as cur: cur.execute(f'SELECT DISTINCT "{columnName}" FROM "{tableName}"') for row in cur.fetchall(): val = row[columnName] if val is not None: ids.add(str(val)) return ids def _countOrphansSameDb( conn, sourceTable: str, sourceColumn: str, targetTable: str, targetColumn: str, ) -> int: """Count orphans when source and target live in the same DB.""" with conn.cursor() as cur: cur.execute(f""" SELECT COUNT(*) AS cnt FROM "{sourceTable}" s WHERE s."{sourceColumn}" IS NOT NULL AND s."{sourceColumn}" != '' AND NOT EXISTS ( SELECT 1 FROM "{targetTable}" t WHERE t."{targetColumn}" = s."{sourceColumn}" ) """) return cur.fetchone()["cnt"] def _countOrphansCrossDb( sourceConn, sourceTable: str, sourceColumn: str, parentIds: Set[str], ) -> int: """Count orphans when parent IDs come from a different DB.""" if not parentIds: with sourceConn.cursor() as cur: cur.execute(f""" SELECT COUNT(*) AS cnt FROM "{sourceTable}" WHERE "{sourceColumn}" IS NOT NULL AND "{sourceColumn}" != '' """) return cur.fetchone()["cnt"] with sourceConn.cursor() as cur: cur.execute(f""" SELECT COUNT(*) AS cnt FROM "{sourceTable}" WHERE "{sourceColumn}" IS NOT NULL AND "{sourceColumn}" != '' AND "{sourceColumn}" NOT IN ( SELECT unnest(%(ids)s::text[]) ) """, {"ids": list(parentIds)}) return cur.fetchone()["cnt"] def _scanOrphans(dbFilter: Optional[str] = None) -> List[dict]: """Scan for orphaned records across all FK relationships. Uses a 5-minute cache to avoid repeated heavy scans. """ global _orphanCache with _orphanCacheLock: if _orphanCache and (time.time() - _orphanCache["ts"]) < _ORPHAN_CACHE_TTL: cached = _orphanCache["results"] if dbFilter: return [r for r in cached if r["sourceDb"] == dbFilter] return list(cached) relationships = _getFkRelationships() if dbFilter: relationships = [r for r in relationships if r.sourceDb == dbFilter] connCache: Dict[str, any] = {} tableCache: Dict[str, Set[str]] = {} parentIdCache: Dict[str, Set[str]] = {} results: List[dict] = [] def _ensureConn(dbName: str): if dbName not in connCache: connCache[dbName] = _getConnection(dbName) return connCache[dbName] def _existingTables(dbName: str) -> Set[str]: """Cached lookup of physically existing public tables in a DB.""" if dbName not in tableCache: try: conn = _ensureConn(dbName) with conn.cursor() as cur: cur.execute(""" SELECT table_name FROM information_schema.tables WHERE table_schema = 'public' """) tableCache[dbName] = {row["table_name"] for row in cur.fetchall()} except Exception: tableCache[dbName] = set() return tableCache[dbName] try: for rel in relationships: try: sourceTables = _existingTables(rel.sourceDb) if rel.sourceTable not in sourceTables: continue if rel.sourceDb == rel.targetDb: if rel.targetTable not in sourceTables: continue else: targetTables = _existingTables(rel.targetDb) if rel.targetTable not in targetTables: continue sourceConn = _ensureConn(rel.sourceDb) if rel.sourceDb == rel.targetDb: count = _countOrphansSameDb( sourceConn, rel.sourceTable, rel.sourceColumn, rel.targetTable, rel.targetColumn, ) else: parentKey = f"{rel.targetDb}.{rel.targetTable}.{rel.targetColumn}" if parentKey not in parentIdCache: targetConn = _ensureConn(rel.targetDb) parentIdCache[parentKey] = _loadParentIds( targetConn, rel.targetTable, rel.targetColumn, ) count = _countOrphansCrossDb( sourceConn, rel.sourceTable, rel.sourceColumn, parentIdCache[parentKey], ) results.append(asdict(OrphanResult( sourceDb=rel.sourceDb, sourceTable=rel.sourceTable, sourceColumn=rel.sourceColumn, targetDb=rel.targetDb, targetTable=rel.targetTable, targetColumn=rel.targetColumn, orphanCount=count, ))) except Exception as e: logger.warning( f"Orphan scan failed for {rel.sourceDb}.{rel.sourceTable}.{rel.sourceColumn}: {e}" ) for dbKey in (rel.sourceDb, rel.targetDb): if dbKey in connCache: try: connCache[dbKey].rollback() except Exception: pass finally: for conn in connCache.values(): try: conn.close() except Exception: pass with _orphanCacheLock: _orphanCache = {"ts": time.time(), "results": results} return results # --------------------------------------------------------------------------- # Orphan cleanup # --------------------------------------------------------------------------- def _cleanOrphans(db: str, table: str, column: str) -> int: """Delete orphaned records for a single FK relationship. Returns count deleted.""" relationships = _getFkRelationships() rel = next( (r for r in relationships if r.sourceDb == db and r.sourceTable == table and r.sourceColumn == column), None, ) if rel is None: raise ValueError(f"No FK relationship found for {db}.{table}.{column}") conn = _getConnection(rel.sourceDb) try: if rel.sourceDb == rel.targetDb: with conn.cursor() as cur: cur.execute(f""" DELETE FROM "{rel.sourceTable}" WHERE "{rel.sourceColumn}" IS NOT NULL AND "{rel.sourceColumn}" != '' AND NOT EXISTS ( SELECT 1 FROM "{rel.targetTable}" t WHERE t."{rel.targetColumn}" = "{rel.sourceTable}"."{rel.sourceColumn}" ) """) deleted = cur.rowcount conn.commit() else: targetConn = _getConnection(rel.targetDb) try: parentIds = _loadParentIds(targetConn, rel.targetTable, rel.targetColumn) finally: targetConn.close() if not parentIds: with conn.cursor() as cur: cur.execute(f""" DELETE FROM "{rel.sourceTable}" WHERE "{rel.sourceColumn}" IS NOT NULL AND "{rel.sourceColumn}" != '' """) deleted = cur.rowcount else: with conn.cursor() as cur: cur.execute(f""" DELETE FROM "{rel.sourceTable}" WHERE "{rel.sourceColumn}" IS NOT NULL AND "{rel.sourceColumn}" != '' AND "{rel.sourceColumn}" NOT IN ( SELECT unnest(%(ids)s::text[]) ) """, {"ids": list(parentIds)}) deleted = cur.rowcount conn.commit() except Exception: conn.rollback() raise finally: conn.close() _invalidateOrphanCache() logger.info(f"Cleaned {deleted} orphans from {db}.{table}.{column}") return deleted def _cleanAllOrphans() -> List[dict]: """Clean all detected orphans. Returns list of {db, table, column, deleted}.""" orphans = _scanOrphans() results = [] for orphan in orphans: try: deleted = _cleanOrphans(orphan["sourceDb"], orphan["sourceTable"], orphan["sourceColumn"]) results.append({ "db": orphan["sourceDb"], "table": orphan["sourceTable"], "column": orphan["sourceColumn"], "deleted": deleted, }) except Exception as e: logger.error( f"Failed to clean orphans for {orphan['sourceDb']}.{orphan['sourceTable']}.{orphan['sourceColumn']}: {e}" ) results.append({ "db": orphan["sourceDb"], "table": orphan["sourceTable"], "column": orphan["sourceColumn"], "deleted": 0, "error": str(e), }) return results def _invalidateOrphanCache() -> None: global _orphanCache with _orphanCacheLock: _orphanCache = None