308 lines
12 KiB
Python
308 lines
12 KiB
Python
"""Export the database schema from Pydantic MODEL_REGISTRY + fk_target metadata.
|
|
|
|
Usage (run from gateway/):
|
|
python scripts/exportDbSchemaFromModels.py
|
|
python scripts/exportDbSchemaFromModels.py --validate
|
|
python scripts/exportDbSchemaFromModels.py --output ../wiki/b-reference/database-schema.md
|
|
|
|
The Pydantic classes are the single source of truth. The optional --validate
|
|
flag cross-checks against the live database and reports mismatches.
|
|
"""
|
|
|
|
import argparse
|
|
import importlib
|
|
import os
|
|
import sys
|
|
from collections import defaultdict
|
|
from datetime import datetime
|
|
from typing import Dict, List, Optional, Tuple
|
|
|
|
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
|
|
|
|
def _getArgs():
|
|
p = argparse.ArgumentParser(description="Export DB schema from Pydantic models")
|
|
p.add_argument("--output", default="../wiki/b-reference/database-schema.md")
|
|
p.add_argument("--validate", action="store_true",
|
|
help="Cross-check against live DB and report mismatches")
|
|
return p.parse_args()
|
|
|
|
|
|
def _loadAllModels():
|
|
"""Import all datamodel and interface modules to populate MODEL_REGISTRY + dbRegistry."""
|
|
for root, _dirs, files in os.walk("modules"):
|
|
for f in files:
|
|
if not f.endswith(".py") or f.startswith("__"):
|
|
continue
|
|
isDatamodel = f.startswith("datamodel")
|
|
isInterface = f.startswith("interface") and ("Db" in f or "Feature" in f)
|
|
if not isDatamodel and not isInterface:
|
|
continue
|
|
modPath = os.path.join(root, f).replace(os.sep, ".").replace(".py", "")
|
|
try:
|
|
importlib.import_module(modPath)
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def _buildCompleteTableToDbMap() -> Dict[str, str]:
|
|
"""Build tableName -> dbName by querying every registered DB's catalog.
|
|
|
|
More reliable than fkRegistry._buildTableToDbMap() for the schema script
|
|
because it catches ALL tables, not just FK targets.
|
|
"""
|
|
from modules.shared.dbRegistry import getRegisteredDatabases
|
|
from modules.system.databaseHealth import _getConnection
|
|
|
|
mapping: Dict[str, str] = {}
|
|
for dbName in getRegisteredDatabases():
|
|
try:
|
|
conn = _getConnection(dbName)
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute("""
|
|
SELECT table_name FROM information_schema.tables
|
|
WHERE table_schema = 'public' AND table_type = 'BASE TABLE'
|
|
AND table_name NOT LIKE '\\_%'
|
|
""")
|
|
for row in cur.fetchall():
|
|
tbl = row["table_name"] if isinstance(row, dict) else row[0]
|
|
if tbl not in mapping:
|
|
mapping[tbl] = dbName
|
|
finally:
|
|
conn.close()
|
|
except Exception as e:
|
|
print(f" Warning: could not query {dbName}: {e}")
|
|
return mapping
|
|
|
|
|
|
def _buildSchema() -> Tuple[Dict[str, List[dict]], Dict[str, str]]:
|
|
"""Build {dbName: [tableInfo, ...]} from MODEL_REGISTRY + fk_target.
|
|
|
|
Returns (schema, tableToDb).
|
|
"""
|
|
from modules.datamodels.datamodelBase import MODEL_REGISTRY
|
|
|
|
tableToDb = _buildCompleteTableToDbMap()
|
|
schema: Dict[str, List[dict]] = defaultdict(list)
|
|
|
|
for tableName, modelCls in sorted(MODEL_REGISTRY.items()):
|
|
dbName = tableToDb.get(tableName)
|
|
if not dbName:
|
|
continue
|
|
|
|
fields = []
|
|
fkRefs = []
|
|
pkField = None
|
|
|
|
for fieldName, fieldInfo in modelCls.model_fields.items():
|
|
annotation = modelCls.__annotations__.get(fieldName)
|
|
typeName = _resolveTypeName(annotation)
|
|
isOptional = typeName.startswith("Optional[")
|
|
extra = fieldInfo.json_schema_extra or {}
|
|
fkTarget = extra.get("fk_target")
|
|
|
|
if fieldName == "id":
|
|
pkField = {"name": fieldName, "type": typeName}
|
|
continue
|
|
|
|
if fkTarget:
|
|
fkRefs.append({
|
|
"column": fieldName,
|
|
"targetDb": fkTarget.get("db", ""),
|
|
"targetTable": fkTarget.get("table", ""),
|
|
"targetColumn": fkTarget.get("column", "id"),
|
|
"labelField": fkTarget.get("labelField"),
|
|
"softFk": fkTarget.get("softFk", False),
|
|
})
|
|
|
|
fields.append({
|
|
"name": fieldName,
|
|
"type": typeName,
|
|
"optional": isOptional,
|
|
"description": fieldInfo.description or "",
|
|
})
|
|
|
|
schema[dbName].append({
|
|
"tableName": tableName,
|
|
"pk": pkField,
|
|
"fields": fields,
|
|
"fks": fkRefs,
|
|
"modelClass": f"{modelCls.__module__}.{modelCls.__name__}",
|
|
})
|
|
|
|
return dict(schema), tableToDb
|
|
|
|
|
|
def _resolveTypeName(annotation) -> str:
|
|
"""Best-effort stringification of a type annotation."""
|
|
if annotation is None:
|
|
return "Any"
|
|
origin = getattr(annotation, "__origin__", None)
|
|
if origin is not None:
|
|
args = getattr(annotation, "__args__", ())
|
|
if str(origin) == "typing.Union" or getattr(origin, "__name__", "") == "Union":
|
|
nonNone = [a for a in args if a is not type(None)]
|
|
if len(nonNone) == 1:
|
|
return f"Optional[{_resolveTypeName(nonNone[0])}]"
|
|
return f"Union[{', '.join(_resolveTypeName(a) for a in args)}]"
|
|
argStr = ", ".join(_resolveTypeName(a) for a in args)
|
|
name = getattr(origin, "__name__", str(origin))
|
|
return f"{name}[{argStr}]" if argStr else name
|
|
return getattr(annotation, "__name__", str(annotation))
|
|
|
|
|
|
def _renderMarkdown(schema: Dict[str, List[dict]]) -> str:
|
|
"""Render the schema as markdown."""
|
|
from modules.shared.dbRegistry import getRegisteredDatabases
|
|
|
|
registeredDbs = getRegisteredDatabases()
|
|
now = datetime.now().strftime("%Y-%m-%d %H:%M")
|
|
|
|
totalTables = sum(len(tables) for tables in schema.values())
|
|
totalFks = sum(len(t["fks"]) for tables in schema.values() for t in tables)
|
|
|
|
lines = [
|
|
"# PowerOn Database Schema\n",
|
|
f"> **Generated from**: Pydantic MODEL_REGISTRY + fk_target",
|
|
f"> **Date**: {now}",
|
|
f"> **Registered databases**: {len(registeredDbs)}",
|
|
f"> **Tables**: {totalTables}",
|
|
f"> **FK relationships**: {totalFks}\n",
|
|
"---\n",
|
|
]
|
|
|
|
for dbName in sorted(schema.keys()):
|
|
tables = schema[dbName]
|
|
lines.append(f"## {dbName}\n")
|
|
|
|
for tbl in sorted(tables, key=lambda t: t["tableName"]):
|
|
lines.append(f"### {tbl['tableName']}\n")
|
|
|
|
if tbl["pk"]:
|
|
lines.append(f"- **PK**: `{tbl['pk']['name']}` ({tbl['pk']['type']})")
|
|
|
|
for fk in tbl["fks"]:
|
|
crossDb = ""
|
|
if fk["targetDb"] != dbName:
|
|
crossDb = f" [cross-db: {fk['targetDb']}]"
|
|
soft = " **(soft)**" if fk["softFk"] else ""
|
|
lines.append(
|
|
f"- **FK**: `{fk['column']}` -> `{fk['targetTable']}.{fk['targetColumn']}`{crossDb}{soft}"
|
|
)
|
|
|
|
nonFkFields = []
|
|
fkCols = {fk["column"] for fk in tbl["fks"]}
|
|
for f in tbl["fields"]:
|
|
if f["name"] in fkCols or f["name"].startswith("sys"):
|
|
continue
|
|
opt = " (optional)" if f["optional"] else ""
|
|
nonFkFields.append(f"`{f['name']}` {f['type']}{opt}")
|
|
|
|
if nonFkFields:
|
|
lines.append(f"- **Fields**: {', '.join(nonFkFields)}")
|
|
|
|
lines.append("")
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
def _validateAgainstLiveDb(schema: Dict[str, List[dict]], tableToDb: Dict[str, str]) -> List[str]:
|
|
"""Compare Pydantic schema against live PostgreSQL and return mismatch warnings."""
|
|
from modules.shared.configuration import APP_CONFIG
|
|
import psycopg2
|
|
import psycopg2.extras
|
|
|
|
host = APP_CONFIG.get("DB_HOST", "localhost")
|
|
port = int(APP_CONFIG.get("DB_PORT", 5432))
|
|
user = APP_CONFIG.get("DB_USER", "poweron_dev")
|
|
password = APP_CONFIG.get("DB_PASSWORD_SECRET")
|
|
if not password:
|
|
return ["ERROR: DB_PASSWORD_SECRET not available for validation"]
|
|
|
|
warnings = []
|
|
|
|
for dbName, tables in sorted(schema.items()):
|
|
try:
|
|
conn = psycopg2.connect(
|
|
host=host, port=port, user=user, password=password,
|
|
database=dbName, client_encoding="utf8",
|
|
cursor_factory=psycopg2.extras.RealDictCursor,
|
|
)
|
|
except Exception as e:
|
|
warnings.append(f" {dbName}: connection failed ({e})")
|
|
continue
|
|
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute("""
|
|
SELECT table_name FROM information_schema.tables
|
|
WHERE table_schema = 'public' AND table_type = 'BASE TABLE'
|
|
""")
|
|
liveTables = {row["table_name"] for row in cur.fetchall()}
|
|
|
|
for tbl in tables:
|
|
name = tbl["tableName"]
|
|
if name not in liveTables:
|
|
warnings.append(f" {dbName}.{name}: model exists but NO table in DB")
|
|
continue
|
|
|
|
with conn.cursor() as cur:
|
|
cur.execute("""
|
|
SELECT column_name FROM information_schema.columns
|
|
WHERE table_schema = 'public' AND table_name = %s
|
|
""", (name,))
|
|
liveCols = {row["column_name"] for row in cur.fetchall()}
|
|
|
|
modelCols = {"id"} | {f["name"] for f in tbl["fields"]}
|
|
missingInDb = modelCols - liveCols
|
|
legacyAuditCols = {
|
|
"_createdAt", "_createdBy", "_modifiedAt", "_modifiedBy",
|
|
"sysCreatedAt", "sysCreatedBy", "sysModifiedAt", "sysModifiedBy",
|
|
"createdAt", "updatedAt", "creationDate", "lastModified",
|
|
}
|
|
extraInDb = liveCols - modelCols - legacyAuditCols
|
|
if missingInDb:
|
|
warnings.append(f" {dbName}.{name}: columns in model but not in DB: {sorted(missingInDb)}")
|
|
if extraInDb:
|
|
warnings.append(f" {dbName}.{name}: columns in DB but not in model: {sorted(extraInDb)}")
|
|
|
|
modelTableNames = {t["tableName"] for t in tables}
|
|
for lt in sorted(liveTables):
|
|
if lt not in modelTableNames and not lt.startswith("_"):
|
|
warnings.append(f" {dbName}.{lt}: table in DB but no Pydantic model (legacy?)")
|
|
finally:
|
|
conn.close()
|
|
|
|
return warnings
|
|
|
|
|
|
def main():
|
|
args = _getArgs()
|
|
_loadAllModels()
|
|
|
|
print("Building schema from MODEL_REGISTRY...")
|
|
schema, tableToDb = _buildSchema()
|
|
|
|
totalTables = sum(len(t) for t in schema.values())
|
|
totalFks = sum(len(t["fks"]) for tables in schema.values() for t in tables)
|
|
print(f" {len(schema)} databases, {totalTables} tables, {totalFks} FK relationships")
|
|
|
|
md = _renderMarkdown(schema)
|
|
with open(args.output, "w", encoding="utf-8") as f:
|
|
f.write(md)
|
|
print(f"\nSchema written to {args.output}")
|
|
|
|
if args.validate:
|
|
print("\nValidating against live database...")
|
|
warnings = _validateAgainstLiveDb(schema, tableToDb)
|
|
if warnings:
|
|
print(f"\n{len(warnings)} mismatches found:")
|
|
for w in warnings:
|
|
print(w)
|
|
else:
|
|
print(" No mismatches - live DB matches Pydantic models perfectly.")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|