# Copyright (c) 2026 PowerOn AG # All rights reserved. """Export the database schema from Pydantic MODEL_REGISTRY + fk_target metadata. Usage (run from gateway/): python scripts/exportDbSchemaFromModels.py python scripts/exportDbSchemaFromModels.py --validate python scripts/exportDbSchemaFromModels.py --output ../wiki/b-reference/database-schema.md The Pydantic classes are the single source of truth. The optional --validate flag cross-checks against the live database and reports mismatches. """ import argparse import importlib import os import sys from collections import defaultdict from datetime import datetime from typing import Dict, List, Optional, Tuple sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) def _getArgs(): p = argparse.ArgumentParser(description="Export DB schema from Pydantic models") p.add_argument("--output", default="../wiki/b-reference/database-schema.md") p.add_argument("--validate", action="store_true", help="Cross-check against live DB and report mismatches") return p.parse_args() def _loadAllModels(): """Import all datamodel and interface modules to populate MODEL_REGISTRY + dbRegistry.""" for root, _dirs, files in os.walk("modules"): for f in files: if not f.endswith(".py") or f.startswith("__"): continue isDatamodel = f.startswith("datamodel") isInterface = f.startswith("interface") and ("Db" in f or "Feature" in f) if not isDatamodel and not isInterface: continue modPath = os.path.join(root, f).replace(os.sep, ".").replace(".py", "") try: importlib.import_module(modPath) except Exception: pass def _buildCompleteTableToDbMap() -> Dict[str, str]: """Build tableName -> dbName by querying every registered DB's catalog. More reliable than fkRegistry._buildTableToDbMap() for the schema script because it catches ALL tables, not just FK targets. """ from modules.dbHelpers.dbRegistry import getRegisteredDatabases from modules.system.databaseHealth import getConnection mapping: Dict[str, str] = {} for dbName in getRegisteredDatabases(): try: conn = getConnection(dbName) try: with conn.cursor() as cur: cur.execute(""" SELECT table_name FROM information_schema.tables WHERE table_schema = 'public' AND table_type = 'BASE TABLE' AND table_name NOT LIKE '\\_%' """) for row in cur.fetchall(): tbl = row["table_name"] if isinstance(row, dict) else row[0] if tbl not in mapping: mapping[tbl] = dbName finally: conn.close() except Exception as e: print(f" Warning: could not query {dbName}: {e}") return mapping def _buildSchema() -> Tuple[Dict[str, List[dict]], Dict[str, str]]: """Build {dbName: [tableInfo, ...]} from MODEL_REGISTRY + fk_target. Returns (schema, tableToDb). """ from modules.datamodels.datamodelBase import MODEL_REGISTRY tableToDb = _buildCompleteTableToDbMap() schema: Dict[str, List[dict]] = defaultdict(list) for tableName, modelCls in sorted(MODEL_REGISTRY.items()): dbName = tableToDb.get(tableName) if not dbName: continue fields = [] fkRefs = [] pkField = None for fieldName, fieldInfo in modelCls.model_fields.items(): annotation = modelCls.__annotations__.get(fieldName) typeName = _resolveTypeName(annotation) isOptional = typeName.startswith("Optional[") extra = fieldInfo.json_schema_extra or {} fkTarget = extra.get("fk_target") if fieldName == "id": pkField = {"name": fieldName, "type": typeName} continue if fkTarget: fkRefs.append({ "column": fieldName, "targetDb": fkTarget.get("db", ""), "targetTable": fkTarget.get("table", ""), "targetColumn": fkTarget.get("column", "id"), "labelField": fkTarget.get("labelField"), "softFk": fkTarget.get("softFk", False), }) fields.append({ "name": fieldName, "type": typeName, "optional": isOptional, "description": fieldInfo.description or "", }) schema[dbName].append({ "tableName": tableName, "pk": pkField, "fields": fields, "fks": fkRefs, "modelClass": f"{modelCls.__module__}.{modelCls.__name__}", }) return dict(schema), tableToDb def _resolveTypeName(annotation) -> str: """Best-effort stringification of a type annotation.""" if annotation is None: return "Any" origin = getattr(annotation, "__origin__", None) if origin is not None: args = getattr(annotation, "__args__", ()) if str(origin) == "typing.Union" or getattr(origin, "__name__", "") == "Union": nonNone = [a for a in args if a is not type(None)] if len(nonNone) == 1: return f"Optional[{_resolveTypeName(nonNone[0])}]" return f"Union[{', '.join(_resolveTypeName(a) for a in args)}]" argStr = ", ".join(_resolveTypeName(a) for a in args) name = getattr(origin, "__name__", str(origin)) return f"{name}[{argStr}]" if argStr else name return getattr(annotation, "__name__", str(annotation)) def _renderMarkdown(schema: Dict[str, List[dict]]) -> str: """Render the schema as markdown.""" from modules.dbHelpers.dbRegistry import getRegisteredDatabases registeredDbs = getRegisteredDatabases() now = datetime.now().strftime("%Y-%m-%d %H:%M") totalTables = sum(len(tables) for tables in schema.values()) totalFks = sum(len(t["fks"]) for tables in schema.values() for t in tables) lines = [ "# PowerOn Database Schema\n", f"> **Generated from**: Pydantic MODEL_REGISTRY + fk_target", f"> **Date**: {now}", f"> **Registered databases**: {len(registeredDbs)}", f"> **Tables**: {totalTables}", f"> **FK relationships**: {totalFks}\n", "---\n", ] for dbName in sorted(schema.keys()): tables = schema[dbName] lines.append(f"## {dbName}\n") for tbl in sorted(tables, key=lambda t: t["tableName"]): lines.append(f"### {tbl['tableName']}\n") if tbl["pk"]: lines.append(f"- **PK**: `{tbl['pk']['name']}` ({tbl['pk']['type']})") for fk in tbl["fks"]: crossDb = "" if fk["targetDb"] != dbName: crossDb = f" [cross-db: {fk['targetDb']}]" soft = " **(soft)**" if fk["softFk"] else "" lines.append( f"- **FK**: `{fk['column']}` -> `{fk['targetTable']}.{fk['targetColumn']}`{crossDb}{soft}" ) nonFkFields = [] fkCols = {fk["column"] for fk in tbl["fks"]} for f in tbl["fields"]: if f["name"] in fkCols or f["name"].startswith("sys"): continue opt = " (optional)" if f["optional"] else "" nonFkFields.append(f"`{f['name']}` {f['type']}{opt}") if nonFkFields: lines.append(f"- **Fields**: {', '.join(nonFkFields)}") lines.append("") return "\n".join(lines) def _validateAgainstLiveDb(schema: Dict[str, List[dict]], tableToDb: Dict[str, str]) -> List[str]: """Compare Pydantic schema against live PostgreSQL and return mismatch warnings.""" from modules.shared.configuration import APP_CONFIG import psycopg2 import psycopg2.extras host = APP_CONFIG.get("DB_HOST", "localhost") port = int(APP_CONFIG.get("DB_PORT", 5432)) user = APP_CONFIG.get("DB_USER", "poweron_dev") password = APP_CONFIG.get("DB_PASSWORD_SECRET") if not password: return ["ERROR: DB_PASSWORD_SECRET not available for validation"] warnings = [] for dbName, tables in sorted(schema.items()): try: conn = psycopg2.connect( host=host, port=port, user=user, password=password, database=dbName, client_encoding="utf8", cursor_factory=psycopg2.extras.RealDictCursor, ) except Exception as e: warnings.append(f" {dbName}: connection failed ({e})") continue try: with conn.cursor() as cur: cur.execute(""" SELECT table_name FROM information_schema.tables WHERE table_schema = 'public' AND table_type = 'BASE TABLE' """) liveTables = {row["table_name"] for row in cur.fetchall()} for tbl in tables: name = tbl["tableName"] if name not in liveTables: warnings.append(f" {dbName}.{name}: model exists but NO table in DB") continue with conn.cursor() as cur: cur.execute(""" SELECT column_name FROM information_schema.columns WHERE table_schema = 'public' AND table_name = %s """, (name,)) liveCols = {row["column_name"] for row in cur.fetchall()} modelCols = {"id"} | {f["name"] for f in tbl["fields"]} missingInDb = modelCols - liveCols legacyAuditCols = { "_createdAt", "_createdBy", "_modifiedAt", "_modifiedBy", "sysCreatedAt", "sysCreatedBy", "sysModifiedAt", "sysModifiedBy", "createdAt", "updatedAt", "creationDate", "lastModified", } extraInDb = liveCols - modelCols - legacyAuditCols if missingInDb: warnings.append(f" {dbName}.{name}: columns in model but not in DB: {sorted(missingInDb)}") if extraInDb: warnings.append(f" {dbName}.{name}: columns in DB but not in model: {sorted(extraInDb)}") modelTableNames = {t["tableName"] for t in tables} for lt in sorted(liveTables): if lt not in modelTableNames and not lt.startswith("_"): warnings.append(f" {dbName}.{lt}: table in DB but no Pydantic model (legacy?)") finally: conn.close() return warnings def main(): args = _getArgs() _loadAllModels() print("Building schema from MODEL_REGISTRY...") schema, tableToDb = _buildSchema() totalTables = sum(len(t) for t in schema.values()) totalFks = sum(len(t["fks"]) for tables in schema.values() for t in tables) print(f" {len(schema)} databases, {totalTables} tables, {totalFks} FK relationships") md = _renderMarkdown(schema) with open(args.output, "w", encoding="utf-8") as f: f.write(md) print(f"\nSchema written to {args.output}") if args.validate: print("\nValidating against live database...") warnings = _validateAgainstLiveDb(schema, tableToDb) if warnings: print(f"\n{len(warnings)} mismatches found:") for w in warnings: print(w) else: print(" No mismatches - live DB matches Pydantic models perfectly.") if __name__ == "__main__": main()