gateway/modules/shared/fkRegistry.py
2026-04-26 18:11:42 +02:00

272 lines
9.4 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
FK-Discovery — scans the Model-Registry for `fk_target` annotations and
builds a cached list of foreign-key relationships.
Each relationship describes one directed edge:
sourceTable.sourceColumn → targetTable.targetColumn
(possibly across databases)
The table→db mapping is derived automatically from the `fk_target`
annotations themselves: every `fk_target` declares `{"db": "...", "table": "..."}`
for the *target* side. By collecting all such declarations we know which DB
each table lives in — no extra registration step needed.
Usage:
from modules.shared.fkRegistry import getFkRelationships
rels = getFkRelationships()
"""
import importlib
import logging
import os
import threading
from dataclasses import dataclass
from typing import Dict, List, Optional
from modules.datamodels.datamodelBase import MODEL_REGISTRY
logger = logging.getLogger(__name__)
_modelsLoaded = False
def _ensureModelsLoaded() -> None:
"""Import all datamodel modules so that __init_subclass__ fills MODEL_REGISTRY.
In a running server the interfaces import the datamodels automatically.
This function makes FK-Discovery work in standalone / test contexts too.
"""
global _modelsLoaded
if _modelsLoaded:
return
gatewayRoot = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
datamodelDir = os.path.join(gatewayRoot, "modules", "datamodels")
for fname in os.listdir(datamodelDir):
if fname.startswith("datamodel") and fname.endswith(".py") and fname != "__init__.py":
modName = f"modules.datamodels.{fname[:-3]}"
try:
importlib.import_module(modName)
except Exception as e:
logger.debug(f"Could not import {modName}: {e}")
featuresDir = os.path.join(gatewayRoot, "modules", "features")
if os.path.isdir(featuresDir):
for featureDir in os.listdir(featuresDir):
featurePath = os.path.join(featuresDir, featureDir)
if not os.path.isdir(featurePath):
continue
for fname in os.listdir(featurePath):
if fname.startswith("datamodel") and fname.endswith(".py"):
modName = f"modules.features.{featureDir}.{fname[:-3]}"
try:
importlib.import_module(modName)
except Exception as e:
logger.debug(f"Could not import {modName}: {e}")
_modelsLoaded = True
_lock = threading.Lock()
_cachedRelationships: Optional[List["FkRelationship"]] = None
_cachedTableToDb: Optional[Dict[str, str]] = None
@dataclass(frozen=True)
class FkRelationship:
sourceDb: str
sourceTable: str
sourceColumn: str
targetDb: str
targetTable: str
targetColumn: str
def _buildTableToDbMap() -> Dict[str, str]:
"""Derive {tableName → dbName} for every PowerOnModel subclass.
Two-pass approach:
1. Collect explicit mappings from fk_target annotations
(every fk_target declares the DB for its *target* table).
2. For models still unmapped, query each registered database's
catalog (information_schema) to find the table there.
"""
_ensureModelsLoaded()
mapping: Dict[str, str] = {}
for modelCls in MODEL_REGISTRY.values():
for fieldInfo in modelCls.model_fields.values():
extra = fieldInfo.json_schema_extra
if not isinstance(extra, dict):
continue
fkTarget = extra.get("fk_target")
if not isinstance(fkTarget, dict):
continue
table = fkTarget.get("table", "")
db = fkTarget.get("db", "")
if table and db:
mapping[table] = db
unmapped = [name for name in MODEL_REGISTRY if name not in mapping]
if unmapped:
try:
from modules.shared.dbRegistry import getRegisteredDatabases
_resolveUnmappedTablesFromCatalog(mapping, unmapped, getRegisteredDatabases())
except Exception as e:
logger.warning(f"Could not resolve unmapped tables from catalog: {e}")
return mapping
def _resolveUnmappedTablesFromCatalog(
mapping: Dict[str, str],
unmapped: List[str],
registeredDbs: Dict[str, str],
) -> None:
"""Query information_schema in each registered DB for unmapped table names."""
import psycopg2
import psycopg2.extras
from modules.shared.configuration import APP_CONFIG
unmappedSet = set(unmapped)
for dbName, configPrefix in registeredDbs.items():
if not unmappedSet:
break
try:
hostKey = f"{configPrefix}_HOST" if configPrefix != "DB" else "DB_HOST"
portKey = f"{configPrefix}_PORT" if configPrefix != "DB" else "DB_PORT"
userKey = f"{configPrefix}_USER" if configPrefix != "DB" else "DB_USER"
pwKey = f"{configPrefix}_PASSWORD_SECRET" if configPrefix != "DB" else "DB_PASSWORD_SECRET"
conn = psycopg2.connect(
host=APP_CONFIG.get(hostKey, "localhost"),
port=int(APP_CONFIG.get(portKey, 5432)),
database=dbName,
user=APP_CONFIG.get(userKey),
password=APP_CONFIG.get(pwKey),
client_encoding="utf8",
)
try:
with conn.cursor() as cur:
cur.execute("""
SELECT table_name FROM information_schema.tables
WHERE table_schema = 'public'
AND table_name NOT LIKE '\\_%%'
""")
dbTables = {row[0] for row in cur.fetchall()}
for tableName in list(unmappedSet):
if tableName in dbTables:
mapping[tableName] = dbName
unmappedSet.discard(tableName)
finally:
conn.close()
except Exception as e:
logger.debug(f"Catalog query for {dbName} failed: {e}")
def _discoverFkRelationships() -> List[FkRelationship]:
"""Scan every PowerOnModel subclass for `fk_target` in json_schema_extra.
Returns a de-duplicated, sorted list of FkRelationship objects.
"""
tableToDb = _buildTableToDbMap()
relationships: List[FkRelationship] = []
for tableName, modelCls in MODEL_REGISTRY.items():
sourceDb = tableToDb.get(tableName)
if sourceDb is None:
continue
for fieldName, fieldInfo in modelCls.model_fields.items():
extra = fieldInfo.json_schema_extra
if not isinstance(extra, dict):
continue
fkTarget = extra.get("fk_target")
if not isinstance(fkTarget, dict):
continue
targetDb = fkTarget.get("db", "")
targetTable = fkTarget.get("table", "")
targetColumn = fkTarget.get("column", "id")
if not targetDb or not targetTable:
continue
relationships.append(
FkRelationship(
sourceDb=sourceDb,
sourceTable=tableName,
sourceColumn=fieldName,
targetDb=targetDb,
targetTable=targetTable,
targetColumn=targetColumn,
)
)
relationships.sort(key=lambda r: (r.sourceDb, r.sourceTable, r.sourceColumn))
return relationships
def getFkRelationships() -> List[FkRelationship]:
"""Return the cached list of FK relationships (discovered on first call)."""
global _cachedRelationships
with _lock:
if _cachedRelationships is not None:
return _cachedRelationships
rels = _discoverFkRelationships()
with _lock:
_cachedRelationships = rels
return rels
def _getTableToDbMap() -> Dict[str, str]:
"""Return the cached table→db mapping (built on first call)."""
global _cachedTableToDb
with _lock:
if _cachedTableToDb is not None:
return _cachedTableToDb
mapping = _buildTableToDbMap()
with _lock:
_cachedTableToDb = mapping
return mapping
def _invalidateFkCache() -> None:
"""Force re-scan on next call (useful for testing)."""
global _cachedRelationships, _cachedTableToDb
with _lock:
_cachedRelationships = None
_cachedTableToDb = None
_FK_TARGET_REQUIRED_KEYS = {"db", "table", "labelField"}
def validateFkTargets() -> List[str]:
"""Validate every ``fk_target`` dict across all registered PowerOnModel subclasses.
Returns a list of error strings (empty = all good).
Each ``fk_target`` must contain exactly ``db``, ``table``, and ``labelField``
(``labelField`` may be ``None``).
"""
_ensureModelsLoaded()
errors: List[str] = []
for tableName, modelCls in MODEL_REGISTRY.items():
for fieldName, fieldInfo in modelCls.model_fields.items():
extra = fieldInfo.json_schema_extra
if not isinstance(extra, dict):
continue
fkTarget = extra.get("fk_target")
if fkTarget is None:
continue
if not isinstance(fkTarget, dict):
errors.append(f"{tableName}.{fieldName}: fk_target is not a dict ({type(fkTarget).__name__})")
continue
missing = _FK_TARGET_REQUIRED_KEYS - fkTarget.keys()
if missing:
errors.append(f"{tableName}.{fieldName}: fk_target missing keys {sorted(missing)}")
return errors