platform-core/scripts/exportDbSchemaFromModels.py
ValueOn AG bc7c6fe27c
Some checks failed
Deploy Plattform-Core (Int) / test (push) Failing after 13s
Deploy Plattform-Core (Int) / deploy (push) Has been skipped
elimination of technical issues (imports)
2026-06-06 00:32:45 +02:00

308 lines
12 KiB
Python

"""Export the database schema from Pydantic MODEL_REGISTRY + fk_target metadata.
Usage (run from gateway/):
python scripts/exportDbSchemaFromModels.py
python scripts/exportDbSchemaFromModels.py --validate
python scripts/exportDbSchemaFromModels.py --output ../wiki/b-reference/database-schema.md
The Pydantic classes are the single source of truth. The optional --validate
flag cross-checks against the live database and reports mismatches.
"""
import argparse
import importlib
import os
import sys
from collections import defaultdict
from datetime import datetime
from typing import Dict, List, Optional, Tuple
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
def _getArgs():
p = argparse.ArgumentParser(description="Export DB schema from Pydantic models")
p.add_argument("--output", default="../wiki/b-reference/database-schema.md")
p.add_argument("--validate", action="store_true",
help="Cross-check against live DB and report mismatches")
return p.parse_args()
def _loadAllModels():
"""Import all datamodel and interface modules to populate MODEL_REGISTRY + dbRegistry."""
for root, _dirs, files in os.walk("modules"):
for f in files:
if not f.endswith(".py") or f.startswith("__"):
continue
isDatamodel = f.startswith("datamodel")
isInterface = f.startswith("interface") and ("Db" in f or "Feature" in f)
if not isDatamodel and not isInterface:
continue
modPath = os.path.join(root, f).replace(os.sep, ".").replace(".py", "")
try:
importlib.import_module(modPath)
except Exception:
pass
def _buildCompleteTableToDbMap() -> Dict[str, str]:
"""Build tableName -> dbName by querying every registered DB's catalog.
More reliable than fkRegistry._buildTableToDbMap() for the schema script
because it catches ALL tables, not just FK targets.
"""
from modules.dbHelpers.dbRegistry import getRegisteredDatabases
from modules.system.databaseHealth import getConnection
mapping: Dict[str, str] = {}
for dbName in getRegisteredDatabases():
try:
conn = getConnection(dbName)
try:
with conn.cursor() as cur:
cur.execute("""
SELECT table_name FROM information_schema.tables
WHERE table_schema = 'public' AND table_type = 'BASE TABLE'
AND table_name NOT LIKE '\\_%'
""")
for row in cur.fetchall():
tbl = row["table_name"] if isinstance(row, dict) else row[0]
if tbl not in mapping:
mapping[tbl] = dbName
finally:
conn.close()
except Exception as e:
print(f" Warning: could not query {dbName}: {e}")
return mapping
def _buildSchema() -> Tuple[Dict[str, List[dict]], Dict[str, str]]:
"""Build {dbName: [tableInfo, ...]} from MODEL_REGISTRY + fk_target.
Returns (schema, tableToDb).
"""
from modules.datamodels.datamodelBase import MODEL_REGISTRY
tableToDb = _buildCompleteTableToDbMap()
schema: Dict[str, List[dict]] = defaultdict(list)
for tableName, modelCls in sorted(MODEL_REGISTRY.items()):
dbName = tableToDb.get(tableName)
if not dbName:
continue
fields = []
fkRefs = []
pkField = None
for fieldName, fieldInfo in modelCls.model_fields.items():
annotation = modelCls.__annotations__.get(fieldName)
typeName = _resolveTypeName(annotation)
isOptional = typeName.startswith("Optional[")
extra = fieldInfo.json_schema_extra or {}
fkTarget = extra.get("fk_target")
if fieldName == "id":
pkField = {"name": fieldName, "type": typeName}
continue
if fkTarget:
fkRefs.append({
"column": fieldName,
"targetDb": fkTarget.get("db", ""),
"targetTable": fkTarget.get("table", ""),
"targetColumn": fkTarget.get("column", "id"),
"labelField": fkTarget.get("labelField"),
"softFk": fkTarget.get("softFk", False),
})
fields.append({
"name": fieldName,
"type": typeName,
"optional": isOptional,
"description": fieldInfo.description or "",
})
schema[dbName].append({
"tableName": tableName,
"pk": pkField,
"fields": fields,
"fks": fkRefs,
"modelClass": f"{modelCls.__module__}.{modelCls.__name__}",
})
return dict(schema), tableToDb
def _resolveTypeName(annotation) -> str:
"""Best-effort stringification of a type annotation."""
if annotation is None:
return "Any"
origin = getattr(annotation, "__origin__", None)
if origin is not None:
args = getattr(annotation, "__args__", ())
if str(origin) == "typing.Union" or getattr(origin, "__name__", "") == "Union":
nonNone = [a for a in args if a is not type(None)]
if len(nonNone) == 1:
return f"Optional[{_resolveTypeName(nonNone[0])}]"
return f"Union[{', '.join(_resolveTypeName(a) for a in args)}]"
argStr = ", ".join(_resolveTypeName(a) for a in args)
name = getattr(origin, "__name__", str(origin))
return f"{name}[{argStr}]" if argStr else name
return getattr(annotation, "__name__", str(annotation))
def _renderMarkdown(schema: Dict[str, List[dict]]) -> str:
"""Render the schema as markdown."""
from modules.dbHelpers.dbRegistry import getRegisteredDatabases
registeredDbs = getRegisteredDatabases()
now = datetime.now().strftime("%Y-%m-%d %H:%M")
totalTables = sum(len(tables) for tables in schema.values())
totalFks = sum(len(t["fks"]) for tables in schema.values() for t in tables)
lines = [
"# PowerOn Database Schema\n",
f"> **Generated from**: Pydantic MODEL_REGISTRY + fk_target",
f"> **Date**: {now}",
f"> **Registered databases**: {len(registeredDbs)}",
f"> **Tables**: {totalTables}",
f"> **FK relationships**: {totalFks}\n",
"---\n",
]
for dbName in sorted(schema.keys()):
tables = schema[dbName]
lines.append(f"## {dbName}\n")
for tbl in sorted(tables, key=lambda t: t["tableName"]):
lines.append(f"### {tbl['tableName']}\n")
if tbl["pk"]:
lines.append(f"- **PK**: `{tbl['pk']['name']}` ({tbl['pk']['type']})")
for fk in tbl["fks"]:
crossDb = ""
if fk["targetDb"] != dbName:
crossDb = f" [cross-db: {fk['targetDb']}]"
soft = " **(soft)**" if fk["softFk"] else ""
lines.append(
f"- **FK**: `{fk['column']}` -> `{fk['targetTable']}.{fk['targetColumn']}`{crossDb}{soft}"
)
nonFkFields = []
fkCols = {fk["column"] for fk in tbl["fks"]}
for f in tbl["fields"]:
if f["name"] in fkCols or f["name"].startswith("sys"):
continue
opt = " (optional)" if f["optional"] else ""
nonFkFields.append(f"`{f['name']}` {f['type']}{opt}")
if nonFkFields:
lines.append(f"- **Fields**: {', '.join(nonFkFields)}")
lines.append("")
return "\n".join(lines)
def _validateAgainstLiveDb(schema: Dict[str, List[dict]], tableToDb: Dict[str, str]) -> List[str]:
"""Compare Pydantic schema against live PostgreSQL and return mismatch warnings."""
from modules.shared.configuration import APP_CONFIG
import psycopg2
import psycopg2.extras
host = APP_CONFIG.get("DB_HOST", "localhost")
port = int(APP_CONFIG.get("DB_PORT", 5432))
user = APP_CONFIG.get("DB_USER", "poweron_dev")
password = APP_CONFIG.get("DB_PASSWORD_SECRET")
if not password:
return ["ERROR: DB_PASSWORD_SECRET not available for validation"]
warnings = []
for dbName, tables in sorted(schema.items()):
try:
conn = psycopg2.connect(
host=host, port=port, user=user, password=password,
database=dbName, client_encoding="utf8",
cursor_factory=psycopg2.extras.RealDictCursor,
)
except Exception as e:
warnings.append(f" {dbName}: connection failed ({e})")
continue
try:
with conn.cursor() as cur:
cur.execute("""
SELECT table_name FROM information_schema.tables
WHERE table_schema = 'public' AND table_type = 'BASE TABLE'
""")
liveTables = {row["table_name"] for row in cur.fetchall()}
for tbl in tables:
name = tbl["tableName"]
if name not in liveTables:
warnings.append(f" {dbName}.{name}: model exists but NO table in DB")
continue
with conn.cursor() as cur:
cur.execute("""
SELECT column_name FROM information_schema.columns
WHERE table_schema = 'public' AND table_name = %s
""", (name,))
liveCols = {row["column_name"] for row in cur.fetchall()}
modelCols = {"id"} | {f["name"] for f in tbl["fields"]}
missingInDb = modelCols - liveCols
legacyAuditCols = {
"_createdAt", "_createdBy", "_modifiedAt", "_modifiedBy",
"sysCreatedAt", "sysCreatedBy", "sysModifiedAt", "sysModifiedBy",
"createdAt", "updatedAt", "creationDate", "lastModified",
}
extraInDb = liveCols - modelCols - legacyAuditCols
if missingInDb:
warnings.append(f" {dbName}.{name}: columns in model but not in DB: {sorted(missingInDb)}")
if extraInDb:
warnings.append(f" {dbName}.{name}: columns in DB but not in model: {sorted(extraInDb)}")
modelTableNames = {t["tableName"] for t in tables}
for lt in sorted(liveTables):
if lt not in modelTableNames and not lt.startswith("_"):
warnings.append(f" {dbName}.{lt}: table in DB but no Pydantic model (legacy?)")
finally:
conn.close()
return warnings
def main():
args = _getArgs()
_loadAllModels()
print("Building schema from MODEL_REGISTRY...")
schema, tableToDb = _buildSchema()
totalTables = sum(len(t) for t in schema.values())
totalFks = sum(len(t["fks"]) for tables in schema.values() for t in tables)
print(f" {len(schema)} databases, {totalTables} tables, {totalFks} FK relationships")
md = _renderMarkdown(schema)
with open(args.output, "w", encoding="utf-8") as f:
f.write(md)
print(f"\nSchema written to {args.output}")
if args.validate:
print("\nValidating against live database...")
warnings = _validateAgainstLiveDb(schema, tableToDb)
if warnings:
print(f"\n{len(warnings)} mismatches found:")
for w in warnings:
print(w)
else:
print(" No mismatches - live DB matches Pydantic models perfectly.")
if __name__ == "__main__":
main()