gateway/modules/system/databaseHealth.py
2026-04-16 23:13:05 +02:00

405 lines
14 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Database health utilities — table statistics and orphan detection/cleanup.
All functions are intended for SysAdmin use only (access control in the route layer).
"""
import logging
import time
import threading
from dataclasses import dataclass, asdict
from typing import Dict, List, Optional, Set
import psycopg2
import psycopg2.extras
from modules.shared.configuration import APP_CONFIG
from modules.shared.dbRegistry import _getRegisteredDatabases
from modules.shared.fkRegistry import _getFkRelationships, FkRelationship
logger = logging.getLogger(__name__)
_ORPHAN_CACHE_TTL = 300 # 5 minutes
_orphanCacheLock = threading.Lock()
_orphanCache: Optional[Dict] = None # {"ts": float, "results": [...]}
# ---------------------------------------------------------------------------
# Dataclasses
# ---------------------------------------------------------------------------
@dataclass
class TableStats:
db: str
table: str
estimatedRows: int
totalSizeBytes: int
indexSizeBytes: int
lastVacuum: Optional[str]
lastAnalyze: Optional[str]
@dataclass
class OrphanResult:
sourceDb: str
sourceTable: str
sourceColumn: str
targetDb: str
targetTable: str
targetColumn: str
orphanCount: int
# ---------------------------------------------------------------------------
# Low-level DB helpers (read-only, lightweight connections)
# ---------------------------------------------------------------------------
def _getConnection(dbName: str):
"""Open a psycopg2 connection to the given registered database."""
registeredDbs = _getRegisteredDatabases()
configPrefix = registeredDbs.get(dbName)
if configPrefix is None:
raise ValueError(f"Database '{dbName}' is not registered.")
hostKey = f"{configPrefix}_HOST" if configPrefix != "DB" else "DB_HOST"
portKey = f"{configPrefix}_PORT" if configPrefix != "DB" else "DB_PORT"
userKey = f"{configPrefix}_USER" if configPrefix != "DB" else "DB_USER"
passwordKey = (
f"{configPrefix}_PASSWORD_SECRET" if configPrefix != "DB" else "DB_PASSWORD_SECRET"
)
return psycopg2.connect(
host=APP_CONFIG.get(hostKey, "localhost"),
port=int(APP_CONFIG.get(portKey, 5432)),
database=dbName,
user=APP_CONFIG.get(userKey),
password=APP_CONFIG.get(passwordKey),
client_encoding="utf8",
cursor_factory=psycopg2.extras.RealDictCursor,
)
# ---------------------------------------------------------------------------
# Table statistics
# ---------------------------------------------------------------------------
def _getTableStats(dbFilter: Optional[str] = None) -> List[dict]:
"""Query pg_stat_user_tables + pg_total_relation_size for every registered DB.
Returns a list of TableStats dicts, optionally filtered by database name.
"""
registeredDbs = _getRegisteredDatabases()
if dbFilter:
registeredDbs = {k: v for k, v in registeredDbs.items() if k == dbFilter}
results: List[dict] = []
for dbName in sorted(registeredDbs):
try:
conn = _getConnection(dbName)
try:
with conn.cursor() as cur:
cur.execute("""
SELECT
s.relname AS "table",
s.n_live_tup AS "estimatedRows",
pg_total_relation_size(quote_ident(s.relname)) AS "totalSizeBytes",
pg_indexes_size(quote_ident(s.relname)) AS "indexSizeBytes",
s.last_vacuum::text AS "lastVacuum",
s.last_analyze::text AS "lastAnalyze"
FROM pg_stat_user_tables s
WHERE s.schemaname = 'public'
AND s.relname NOT LIKE '\\_%%'
ORDER BY s.relname
""")
for row in cur.fetchall():
results.append(asdict(TableStats(
db=dbName,
table=row["table"],
estimatedRows=row["estimatedRows"],
totalSizeBytes=row["totalSizeBytes"],
indexSizeBytes=row["indexSizeBytes"],
lastVacuum=row["lastVacuum"],
lastAnalyze=row["lastAnalyze"],
)))
finally:
conn.close()
except Exception as e:
logger.error(f"Failed to get table stats for {dbName}: {e}")
return results
# ---------------------------------------------------------------------------
# Orphan scanning
# ---------------------------------------------------------------------------
def _loadParentIds(conn, tableName: str, columnName: str) -> Set[str]:
"""Load all distinct values of a column from a table (for cross-DB checks)."""
ids: Set[str] = set()
with conn.cursor() as cur:
cur.execute(f'SELECT DISTINCT "{columnName}" FROM "{tableName}"')
for row in cur.fetchall():
val = row[columnName]
if val is not None:
ids.add(str(val))
return ids
def _countOrphansSameDb(
conn, sourceTable: str, sourceColumn: str,
targetTable: str, targetColumn: str,
) -> int:
"""Count orphans when source and target live in the same DB."""
with conn.cursor() as cur:
cur.execute(f"""
SELECT COUNT(*) AS cnt
FROM "{sourceTable}" s
WHERE s."{sourceColumn}" IS NOT NULL
AND s."{sourceColumn}" != ''
AND NOT EXISTS (
SELECT 1 FROM "{targetTable}" t
WHERE t."{targetColumn}" = s."{sourceColumn}"
)
""")
return cur.fetchone()["cnt"]
def _countOrphansCrossDb(
sourceConn, sourceTable: str, sourceColumn: str,
parentIds: Set[str],
) -> int:
"""Count orphans when parent IDs come from a different DB."""
if not parentIds:
with sourceConn.cursor() as cur:
cur.execute(f"""
SELECT COUNT(*) AS cnt
FROM "{sourceTable}"
WHERE "{sourceColumn}" IS NOT NULL
AND "{sourceColumn}" != ''
""")
return cur.fetchone()["cnt"]
with sourceConn.cursor() as cur:
cur.execute(f"""
SELECT COUNT(*) AS cnt
FROM "{sourceTable}"
WHERE "{sourceColumn}" IS NOT NULL
AND "{sourceColumn}" != ''
AND "{sourceColumn}" NOT IN (
SELECT unnest(%(ids)s::text[])
)
""", {"ids": list(parentIds)})
return cur.fetchone()["cnt"]
def _scanOrphans(dbFilter: Optional[str] = None) -> List[dict]:
"""Scan for orphaned records across all FK relationships.
Uses a 5-minute cache to avoid repeated heavy scans.
"""
global _orphanCache
with _orphanCacheLock:
if _orphanCache and (time.time() - _orphanCache["ts"]) < _ORPHAN_CACHE_TTL:
cached = _orphanCache["results"]
if dbFilter:
return [r for r in cached if r["sourceDb"] == dbFilter]
return list(cached)
relationships = _getFkRelationships()
if dbFilter:
relationships = [r for r in relationships if r.sourceDb == dbFilter]
connCache: Dict[str, any] = {}
tableCache: Dict[str, Set[str]] = {}
parentIdCache: Dict[str, Set[str]] = {}
results: List[dict] = []
def _ensureConn(dbName: str):
if dbName not in connCache:
connCache[dbName] = _getConnection(dbName)
return connCache[dbName]
def _existingTables(dbName: str) -> Set[str]:
"""Cached lookup of physically existing public tables in a DB."""
if dbName not in tableCache:
try:
conn = _ensureConn(dbName)
with conn.cursor() as cur:
cur.execute("""
SELECT table_name FROM information_schema.tables
WHERE table_schema = 'public'
""")
tableCache[dbName] = {row["table_name"] for row in cur.fetchall()}
except Exception:
tableCache[dbName] = set()
return tableCache[dbName]
try:
for rel in relationships:
try:
sourceTables = _existingTables(rel.sourceDb)
if rel.sourceTable not in sourceTables:
continue
if rel.sourceDb == rel.targetDb:
if rel.targetTable not in sourceTables:
continue
else:
targetTables = _existingTables(rel.targetDb)
if rel.targetTable not in targetTables:
continue
sourceConn = _ensureConn(rel.sourceDb)
if rel.sourceDb == rel.targetDb:
count = _countOrphansSameDb(
sourceConn, rel.sourceTable, rel.sourceColumn,
rel.targetTable, rel.targetColumn,
)
else:
parentKey = f"{rel.targetDb}.{rel.targetTable}.{rel.targetColumn}"
if parentKey not in parentIdCache:
targetConn = _ensureConn(rel.targetDb)
parentIdCache[parentKey] = _loadParentIds(
targetConn, rel.targetTable, rel.targetColumn,
)
count = _countOrphansCrossDb(
sourceConn, rel.sourceTable, rel.sourceColumn,
parentIdCache[parentKey],
)
results.append(asdict(OrphanResult(
sourceDb=rel.sourceDb,
sourceTable=rel.sourceTable,
sourceColumn=rel.sourceColumn,
targetDb=rel.targetDb,
targetTable=rel.targetTable,
targetColumn=rel.targetColumn,
orphanCount=count,
)))
except Exception as e:
logger.warning(
f"Orphan scan failed for {rel.sourceDb}.{rel.sourceTable}.{rel.sourceColumn}: {e}"
)
for dbKey in (rel.sourceDb, rel.targetDb):
if dbKey in connCache:
try:
connCache[dbKey].rollback()
except Exception:
pass
finally:
for conn in connCache.values():
try:
conn.close()
except Exception:
pass
with _orphanCacheLock:
_orphanCache = {"ts": time.time(), "results": results}
return results
# ---------------------------------------------------------------------------
# Orphan cleanup
# ---------------------------------------------------------------------------
def _cleanOrphans(db: str, table: str, column: str) -> int:
"""Delete orphaned records for a single FK relationship. Returns count deleted."""
relationships = _getFkRelationships()
rel = next(
(r for r in relationships
if r.sourceDb == db and r.sourceTable == table and r.sourceColumn == column),
None,
)
if rel is None:
raise ValueError(f"No FK relationship found for {db}.{table}.{column}")
conn = _getConnection(rel.sourceDb)
try:
if rel.sourceDb == rel.targetDb:
with conn.cursor() as cur:
cur.execute(f"""
DELETE FROM "{rel.sourceTable}"
WHERE "{rel.sourceColumn}" IS NOT NULL
AND "{rel.sourceColumn}" != ''
AND NOT EXISTS (
SELECT 1 FROM "{rel.targetTable}" t
WHERE t."{rel.targetColumn}" = "{rel.sourceTable}"."{rel.sourceColumn}"
)
""")
deleted = cur.rowcount
conn.commit()
else:
targetConn = _getConnection(rel.targetDb)
try:
parentIds = _loadParentIds(targetConn, rel.targetTable, rel.targetColumn)
finally:
targetConn.close()
if not parentIds:
with conn.cursor() as cur:
cur.execute(f"""
DELETE FROM "{rel.sourceTable}"
WHERE "{rel.sourceColumn}" IS NOT NULL
AND "{rel.sourceColumn}" != ''
""")
deleted = cur.rowcount
else:
with conn.cursor() as cur:
cur.execute(f"""
DELETE FROM "{rel.sourceTable}"
WHERE "{rel.sourceColumn}" IS NOT NULL
AND "{rel.sourceColumn}" != ''
AND "{rel.sourceColumn}" NOT IN (
SELECT unnest(%(ids)s::text[])
)
""", {"ids": list(parentIds)})
deleted = cur.rowcount
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()
_invalidateOrphanCache()
logger.info(f"Cleaned {deleted} orphans from {db}.{table}.{column}")
return deleted
def _cleanAllOrphans() -> List[dict]:
"""Clean all detected orphans. Returns list of {db, table, column, deleted}."""
orphans = _scanOrphans()
results = []
for orphan in orphans:
try:
deleted = _cleanOrphans(orphan["sourceDb"], orphan["sourceTable"], orphan["sourceColumn"])
results.append({
"db": orphan["sourceDb"],
"table": orphan["sourceTable"],
"column": orphan["sourceColumn"],
"deleted": deleted,
})
except Exception as e:
logger.error(
f"Failed to clean orphans for {orphan['sourceDb']}.{orphan['sourceTable']}.{orphan['sourceColumn']}: {e}"
)
results.append({
"db": orphan["sourceDb"],
"table": orphan["sourceTable"],
"column": orphan["sourceColumn"],
"deleted": 0,
"error": str(e),
})
return results
def _invalidateOrphanCache() -> None:
global _orphanCache
with _orphanCacheLock:
_orphanCache = None