platform-core/modules/routes/routeAdminDatabaseHealth.py
ValueOn AG 3e2c07a776
All checks were successful
Deploy Plattform-Core / test (push) Successful in 46s
Deploy Plattform-Core / deploy (push) Successful in 4s
streaming export with log
2026-05-27 23:04:58 +02:00

728 lines
23 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
SysAdmin API for database table statistics, FK orphan detection/cleanup,
and database migration (backup / restore).
"""
import json
import logging
from typing import Any, Dict, List, Optional
from fastapi import APIRouter, Depends, File, HTTPException, Request, UploadFile, status
from fastapi.responses import StreamingResponse
from pydantic import BaseModel, Field
from modules.auth import limiter
from modules.auth.authentication import requireSysAdmin
from modules.datamodels.datamodelUam import User
from modules.system.databaseHealth import (
OrphanCleanupRefused,
_cleanAllOrphans,
_cleanOrphans,
_discoverLegacyTables,
_dropLegacyTable,
_getTableStats,
_isUserIdFk,
_listOrphans,
_scanOrphans,
)
from modules.system.databaseMigration import (
_exportDatabases,
_exportSingleDb,
_getAvailableDatabases,
_getInstanceLabel,
_importDatabases,
_importSingleDb,
_prepareImport,
_validateImportPayload,
streamExportGenerator,
)
logger = logging.getLogger(__name__)
router = APIRouter(
prefix="/api/admin/database-health",
tags=["Admin Database Health"],
)
class OrphanCleanRequest(BaseModel):
"""Body for deleting orphans for one FK relationship."""
db: str = Field(..., description="Source database name (e.g. poweron_app)")
table: str = Field(..., description="Source table (Pydantic model class name)")
column: str = Field(..., description="FK column on the source table")
force: bool = Field(
False,
description="Override safety guards (empty target / >50%% of source). Use with care.",
)
class OrphanCleanAllRequest(BaseModel):
"""Body for cleaning all detected orphans."""
force: bool = Field(
False,
description="Override safety guards on every relationship. Use with extreme care.",
)
excludeUserFks: bool = Field(
False,
description=(
"Skip FK relationships pointing at UserInDB.id. Deleted-user remnants "
"(audit / billing / membership rows) are handled by a dedicated purge "
"workflow and should not be touched by generic FK cleanup."
),
)
@router.get("/stats")
@limiter.limit("30/minute")
def getDatabaseTableStats(
request: Request,
db: Optional[str] = None,
currentUser: User = Depends(requireSysAdmin),
) -> Dict[str, Any]:
"""Table statistics from pg_stat_user_tables (optional filter by database name)."""
rows = _getTableStats(dbFilter=db)
return {"stats": rows}
@router.get("/orphans")
@limiter.limit("10/minute")
def getDatabaseOrphans(
request: Request,
db: Optional[str] = None,
excludeUserFks: bool = False,
currentUser: User = Depends(requireSysAdmin),
) -> Dict[str, Any]:
"""FK orphan scan (optional filter by source database name).
When ``excludeUserFks=true``, results targeting ``UserInDB.id`` are
omitted from the response so the SysAdmin UI can keep deleted-user
remnants visually separate from real FK drift.
"""
rows = _scanOrphans(dbFilter=db)
if excludeUserFks:
rows = [r for r in rows if not _isUserIdFk(r.get("targetTable", ""), r.get("targetColumn", ""))]
return {"orphans": rows}
@router.get("/orphans/list")
@limiter.limit("30/minute")
def getDatabaseOrphansList(
request: Request,
db: str,
table: str,
column: str,
limit: int = 1000,
currentUser: User = Depends(requireSysAdmin),
) -> Dict[str, Any]:
"""Return up to ``limit`` actual orphan source-rows for one FK relationship.
Used by the SysAdmin UI's per-row download button: a human can review the
orphan list (full source row + the unresolved FK value) before triggering
the destructive clean operation.
"""
try:
records = _listOrphans(db=db, table=table, column=column, limit=limit)
except ValueError as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e),
) from e
return {
"db": db,
"table": table,
"column": column,
"count": len(records),
"limit": limit,
"records": records,
}
@router.post("/orphans/clean")
@limiter.limit("10/minute")
def postDatabaseOrphansClean(
request: Request,
body: OrphanCleanRequest,
currentUser: User = Depends(requireSysAdmin),
) -> Dict[str, Any]:
"""Delete orphaned rows for a single FK relationship."""
try:
deleted = _cleanOrphans(body.db, body.table, body.column, force=body.force)
except OrphanCleanupRefused as e:
logger.warning(
"SysAdmin orphan clean REFUSED: user=%s db=%s table=%s column=%s reason=%s",
currentUser.username,
body.db,
body.table,
body.column,
e,
)
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail={"refused": True, "reason": str(e)},
) from e
except ValueError as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e),
) from e
logger.info(
"SysAdmin orphan clean: user=%s db=%s table=%s column=%s deleted=%s force=%s",
currentUser.username,
body.db,
body.table,
body.column,
deleted,
body.force,
)
return {"deleted": deleted}
@router.post("/orphans/clean-all")
@limiter.limit("2/minute")
def postDatabaseOrphansCleanAll(
request: Request,
body: Optional[OrphanCleanAllRequest] = None,
currentUser: User = Depends(requireSysAdmin),
) -> Dict[str, Any]:
"""Run orphan cleanup for every relationship that currently has orphans.
Returns per-relationship results. Each entry contains either `deleted` (success),
`skipped` (safety guard triggered, no force), or `error` (other failure).
"""
force = bool(body.force) if body is not None else False
excludeUserFks = bool(body.excludeUserFks) if body is not None else False
results: List[dict] = _cleanAllOrphans(force=force, excludeUserFks=excludeUserFks)
skipped = sum(1 for r in results if "skipped" in r)
errored = sum(1 for r in results if "error" in r)
deletedTotal = sum(int(r.get("deleted", 0)) for r in results)
logger.info(
"SysAdmin orphan clean-all: user=%s batches=%s deleted=%s skipped=%s errored=%s force=%s excludeUserFks=%s",
currentUser.username,
len(results),
deletedTotal,
skipped,
errored,
force,
excludeUserFks,
)
return {"results": results, "skipped": skipped, "errored": errored, "deleted": deletedTotal}
# ---------------------------------------------------------------------------
# Legacy Tables (tables without Pydantic model)
# ---------------------------------------------------------------------------
class LegacyTableDropRequest(BaseModel):
"""Body for dropping a legacy table."""
db: str = Field(..., description="Database name")
table: str = Field(..., description="Table name to drop")
@router.get("/legacy-tables")
@limiter.limit("10/minute")
def getLegacyTables(
request: Request,
db: Optional[str] = None,
currentUser: User = Depends(requireSysAdmin),
) -> Dict[str, Any]:
"""List tables that exist in the database but have no Pydantic model.
Optional ``db`` filter to scope to a single database.
"""
tables = _discoverLegacyTables(dbFilter=db)
totalRows = sum(t["rowCount"] for t in tables)
totalSize = sum(t["sizeBytes"] for t in tables)
return {
"legacyTables": tables,
"totalCount": len(tables),
"totalRows": totalRows,
"totalSizeBytes": totalSize,
}
@router.post("/legacy-tables/drop")
@limiter.limit("10/minute")
def postLegacyTableDrop(
request: Request,
body: LegacyTableDropRequest,
currentUser: User = Depends(requireSysAdmin),
) -> Dict[str, Any]:
"""Drop a legacy table (CASCADE). Refuses if the table is model-backed."""
try:
result = _dropLegacyTable(body.db, body.table)
except ValueError as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e),
) from e
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Drop failed: {e}",
) from e
logger.info(
"SysAdmin legacy-table drop: user=%s db=%s table=%s rows=%s",
currentUser.username, body.db, body.table, result.get("rowCount"),
)
return result
# ---------------------------------------------------------------------------
# Migration (Backup / Restore)
# ---------------------------------------------------------------------------
class MigrationImportRequest(BaseModel):
"""Body for the import endpoint."""
payload: dict = Field(..., description="The full export JSON payload")
mode: str = Field(
...,
description="'replace' (clear + insert) or 'merge' (insert missing only)",
)
@router.get("/migration/databases")
@limiter.limit("30/minute")
def getMigrationDatabases(
request: Request,
currentUser: User = Depends(requireSysAdmin),
) -> Dict[str, Any]:
"""List registered databases with table/record counts for the migration UI."""
databases = _getAvailableDatabases()
return {"databases": databases, "instanceLabel": _getInstanceLabel()}
@router.get("/migration/export")
@limiter.limit("2/minute")
def getMigrationExport(
request: Request,
databases: str = "all",
currentUser: User = Depends(requireSysAdmin),
) -> StreamingResponse:
"""Export selected databases as a downloadable JSON file.
``databases`` is a comma-separated list of database names, or ``"all"``.
"""
if databases == "all":
available = _getAvailableDatabases()
dbList = [db["name"] for db in available]
else:
dbList = [d.strip() for d in databases.split(",") if d.strip()]
if not dbList:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="No databases selected for export.",
)
logger.info(
"SysAdmin migration export: user=%s databases=%s",
currentUser.username,
dbList,
)
try:
exportData = _exportDatabases(dbList)
except Exception as e:
logger.error("Migration export failed: %s", e)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Export failed: {e}",
) from e
from datetime import datetime, timezone
ts = datetime.now(timezone.utc).strftime("%Y-%m-%d_%H-%M")
filename = f"migration_backup_{ts}.json"
content = json.dumps(exportData, ensure_ascii=False, default=str)
return StreamingResponse(
iter([content]),
media_type="application/json",
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
)
@router.post("/migration/validate")
@limiter.limit("5/minute")
async def postMigrationValidate(
request: Request,
file: UploadFile = File(...),
currentUser: User = Depends(requireSysAdmin),
) -> Dict[str, Any]:
"""Validate an uploaded migration JSON file without writing anything."""
try:
rawBytes = await file.read()
payload = json.loads(rawBytes.decode("utf-8"))
except (json.JSONDecodeError, UnicodeDecodeError) as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid JSON file: {e}",
) from e
result = _validateImportPayload(payload)
logger.info(
"SysAdmin migration validate: user=%s valid=%s",
currentUser.username,
result.get("valid"),
)
return result
@router.post("/migration/import")
@limiter.limit("2/minute")
async def postMigrationImport(
request: Request,
file: UploadFile = File(...),
mode: str = "merge",
currentUser: User = Depends(requireSysAdmin),
) -> Dict[str, Any]:
"""Import a migration JSON file.
``mode`` is passed as a form field:
- ``replace``: clear all tables (except system objects) and insert.
- ``merge``: insert only records whose ID does not yet exist.
"""
if mode not in ("replace", "merge"):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid mode: '{mode}'. Must be 'replace' or 'merge'.",
)
try:
rawBytes = await file.read()
payload = json.loads(rawBytes.decode("utf-8"))
except (json.JSONDecodeError, UnicodeDecodeError) as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid JSON file: {e}",
) from e
validation = _validateImportPayload(payload)
if not validation.get("valid"):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail={"message": "Payload validation failed", "warnings": validation.get("warnings", [])},
)
logger.info(
"SysAdmin migration import: user=%s mode=%s",
currentUser.username,
mode,
)
try:
result = _importDatabases(payload, mode)
except Exception as e:
logger.error("Migration import failed: %s", e)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Import failed: {e}",
) from e
logger.info(
"SysAdmin migration import complete: user=%s mode=%s totalRecords=%s warnings=%s",
currentUser.username,
mode,
result.get("totalRecords"),
len(result.get("warnings", [])),
)
return result
# ---------------------------------------------------------------------------
# Per-DB endpoints (progress-friendly)
# ---------------------------------------------------------------------------
@router.get("/migration/export-single")
@limiter.limit("60/minute")
def getMigrationExportSingle(
request: Request,
database: str,
currentUser: User = Depends(requireSysAdmin),
) -> Dict[str, Any]:
"""Export a single database. Returns full payload so the frontend can
assemble the final JSON client-side (no server-side state needed)."""
from modules.shared.dbRegistry import getRegisteredDatabases
if database not in getRegisteredDatabases():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Database '{database}' is not registered.",
)
logger.info("SysAdmin migration export-single: user=%s db=%s", currentUser.username, database)
try:
dbPayload = _exportSingleDb(database)
except Exception as e:
logger.error("Export-single failed for %s: %s", database, e)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Export failed for '{database}': {e}",
) from e
logger.info("SysAdmin migration export-single done: user=%s db=%s tables=%s records=%s",
currentUser.username, database, dbPayload.get("tableCount", 0), dbPayload.get("totalRecords", 0))
return {
"database": database,
"tableCount": dbPayload.get("tableCount", 0),
"totalRecords": dbPayload.get("totalRecords", 0),
"payload": dbPayload,
}
@router.get("/migration/export-stream")
@limiter.limit("2/minute")
def getMigrationExportStream(
request: Request,
databases: str = "all",
currentUser: User = Depends(requireSysAdmin),
):
"""Stream a full database export as a single JSON file download.
Uses server-side cursors and row-by-row serialization so that neither
backend memory nor browser JS heap is exhausted — works for any DB size.
"""
from datetime import datetime, timezone
from modules.shared.dbRegistry import getRegisteredDatabases
registeredDbs = getRegisteredDatabases()
if databases == "all":
dbList = sorted(registeredDbs.keys())
else:
dbList = [db.strip() for db in databases.split(",") if db.strip()]
invalid = [db for db in dbList if db not in registeredDbs]
if invalid:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Unknown databases: {', '.join(invalid)}",
)
if not dbList:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="No databases selected for export.",
)
instanceLabel = _getInstanceLabel()
timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%SZ")
filename = f"export_{instanceLabel}_{timestamp}.json" if instanceLabel else f"export_{timestamp}.json"
logger.info("SysAdmin stream export: user=%s databases=%s", currentUser.username, dbList)
return StreamingResponse(
streamExportGenerator(dbList, instanceLabel),
media_type="application/json",
headers={
"Content-Disposition": f'attachment; filename="{filename}"',
"X-Export-Databases": ",".join(dbList),
},
)
def _processUploadedFile(filePath: str, tmpDir: str, token: str) -> dict:
"""Parse JSON, validate, remap, split into per-DB files.
Runs in a thread pool to avoid blocking the asyncio event loop
during the CPU-heavy json.load() of large (500+ MB) files.
"""
import gc
import os
with open(filePath, "r", encoding="utf-8") as f:
payload = json.load(f)
try:
os.remove(filePath)
except OSError:
pass
result = _prepareImport(payload)
if not result.get("valid"):
del payload
gc.collect()
return {"result": result, "dbFiles": {}}
protectedIds = result.get("protectedIds", [])
dbFiles = {}
databases = payload.get("databases", {})
for dbName, dbData in databases.items():
dbPath = os.path.join(tmpDir, f"poweron_import_{token}_{dbName}.json")
with open(dbPath, "w", encoding="utf-8") as dbF:
json.dump(dbData, dbF, ensure_ascii=False, default=str)
dbFiles[dbName] = dbPath
del payload
del databases
gc.collect()
return {"result": result, "dbFiles": dbFiles, "protectedIds": protectedIds}
@router.post("/migration/upload-import")
@limiter.limit("5/minute")
async def postMigrationUploadImport(
request: Request,
file: UploadFile = File(...),
currentUser: User = Depends(requireSysAdmin),
) -> Dict[str, Any]:
"""Upload a backup file to disk (chunked), validate, remap IDs,
split into per-DB temp files so the full payload doesn't stay in RAM.
"""
import asyncio
import os
import tempfile
import uuid
token = str(uuid.uuid4())
tmpDir = tempfile.gettempdir()
filePath = os.path.join(tmpDir, f"poweron_import_{token}.json")
logger.info("SysAdmin migration upload-import: user=%s streaming to %s", currentUser.username, filePath)
totalBytes = 0
chunkSize = 1024 * 1024
try:
with open(filePath, "wb") as f:
while True:
chunk = await file.read(chunkSize)
if not chunk:
break
f.write(chunk)
totalBytes += len(chunk)
except Exception as e:
logger.error("Upload-import write failed: %s", e)
if os.path.exists(filePath):
os.remove(filePath)
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Upload failed: {e}") from e
logger.info("SysAdmin migration upload-import: %s bytes on disk (%.1f MB)",
totalBytes, totalBytes / 1024 / 1024)
try:
processed = await asyncio.to_thread(_processUploadedFile, filePath, tmpDir, token)
except (json.JSONDecodeError, UnicodeDecodeError) as e:
if os.path.exists(filePath):
os.remove(filePath)
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=f"Invalid JSON file: {e}") from e
except Exception as e:
if os.path.exists(filePath):
os.remove(filePath)
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Processing failed: {e}") from e
result = processed["result"]
dbFiles = processed.get("dbFiles", {})
if not result.get("valid"):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail={"message": "Payload validation failed", "warnings": result.get("warnings", [])},
)
logger.info("SysAdmin migration upload-import: split into %d per-DB files, payload freed",
len(dbFiles))
_pendingImports[token] = {
"dbFiles": dbFiles,
"protectedIds": processed.get("protectedIds", []),
}
return {
"token": token,
"valid": result.get("valid", False),
"databases": result.get("databases", []),
"warnings": result.get("warnings", []),
"systemObjectsFound": result.get("systemObjectsFound", []),
}
_pendingImports: Dict[str, dict] = {}
@router.post("/migration/import-single")
@limiter.limit("60/minute")
def postMigrationImportSingle(
request: Request,
body: dict,
currentUser: User = Depends(requireSysAdmin),
) -> Dict[str, Any]:
"""Import a single database from a previously uploaded + prepared payload.
Body: ``{token, database, mode}``
"""
import os
token = body.get("token", "")
database = body.get("database", "")
mode = body.get("mode", "merge")
if mode not in ("replace", "merge"):
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=f"Invalid mode: '{mode}'.")
pending = _pendingImports.get(token)
if not pending:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid or expired import token.")
dbFiles = pending.get("dbFiles", {})
dbFilePath = dbFiles.get(database)
if not dbFilePath or not os.path.exists(dbFilePath):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"No data for database '{database}'.",
)
logger.info("SysAdmin migration import-single: user=%s db=%s mode=%s", currentUser.username, database, mode)
try:
with open(dbFilePath, "r", encoding="utf-8") as f:
dbData = json.load(f)
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to read import data for '{database}': {e}",
) from e
payload = {"databases": {database: dbData}}
try:
result = _importSingleDb(payload, database, mode, pending["protectedIds"])
except Exception as e:
logger.error("Import-single failed for %s: %s", database, e)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Import failed for '{database}': {e}",
) from e
return result
@router.post("/migration/import-done")
@limiter.limit("10/minute")
def postMigrationImportDone(
request: Request,
body: dict,
currentUser: User = Depends(requireSysAdmin),
) -> Dict[str, Any]:
"""Clean up the per-DB temp files."""
import os
token = body.get("token", "")
pending = _pendingImports.pop(token, None)
if pending:
for dbPath in pending.get("dbFiles", {}).values():
try:
os.remove(dbPath)
except OSError:
pass
return {"ok": True}