platform-core/modules/routes/routeAdminDatabaseHealth.py
ValueOn AG a46e12638e
All checks were successful
Deploy Plattform-Core / test (push) Successful in 50s
Deploy Plattform-Core / deploy (push) Successful in 6s
db restore async
2026-05-25 07:46:40 +02:00

678 lines
22 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
SysAdmin API for database table statistics, FK orphan detection/cleanup,
and database migration (backup / restore).
"""
import json
import logging
from typing import Any, Dict, List, Optional
from fastapi import APIRouter, Depends, File, HTTPException, Request, UploadFile, status
from fastapi.responses import StreamingResponse
from pydantic import BaseModel, Field
from modules.auth import limiter
from modules.auth.authentication import requireSysAdmin
from modules.datamodels.datamodelUam import User
from modules.system.databaseHealth import (
OrphanCleanupRefused,
_cleanAllOrphans,
_cleanOrphans,
_getTableStats,
_isUserIdFk,
_listOrphans,
_scanOrphans,
)
from modules.system.databaseMigration import (
_exportDatabases,
_exportSingleDb,
_getAvailableDatabases,
_getInstanceLabel,
_importDatabases,
_importSingleDb,
_prepareImport,
_validateImportPayload,
)
logger = logging.getLogger(__name__)
router = APIRouter(
prefix="/api/admin/database-health",
tags=["Admin Database Health"],
)
class OrphanCleanRequest(BaseModel):
"""Body for deleting orphans for one FK relationship."""
db: str = Field(..., description="Source database name (e.g. poweron_app)")
table: str = Field(..., description="Source table (Pydantic model class name)")
column: str = Field(..., description="FK column on the source table")
force: bool = Field(
False,
description="Override safety guards (empty target / >50%% of source). Use with care.",
)
class OrphanCleanAllRequest(BaseModel):
"""Body for cleaning all detected orphans."""
force: bool = Field(
False,
description="Override safety guards on every relationship. Use with extreme care.",
)
excludeUserFks: bool = Field(
False,
description=(
"Skip FK relationships pointing at UserInDB.id. Deleted-user remnants "
"(audit / billing / membership rows) are handled by a dedicated purge "
"workflow and should not be touched by generic FK cleanup."
),
)
@router.get("/stats")
@limiter.limit("30/minute")
def getDatabaseTableStats(
request: Request,
db: Optional[str] = None,
currentUser: User = Depends(requireSysAdmin),
) -> Dict[str, Any]:
"""Table statistics from pg_stat_user_tables (optional filter by database name)."""
rows = _getTableStats(dbFilter=db)
return {"stats": rows}
@router.get("/orphans")
@limiter.limit("10/minute")
def getDatabaseOrphans(
request: Request,
db: Optional[str] = None,
excludeUserFks: bool = False,
currentUser: User = Depends(requireSysAdmin),
) -> Dict[str, Any]:
"""FK orphan scan (optional filter by source database name).
When ``excludeUserFks=true``, results targeting ``UserInDB.id`` are
omitted from the response so the SysAdmin UI can keep deleted-user
remnants visually separate from real FK drift.
"""
rows = _scanOrphans(dbFilter=db)
if excludeUserFks:
rows = [r for r in rows if not _isUserIdFk(r.get("targetTable", ""), r.get("targetColumn", ""))]
return {"orphans": rows}
@router.get("/orphans/list")
@limiter.limit("30/minute")
def getDatabaseOrphansList(
request: Request,
db: str,
table: str,
column: str,
limit: int = 1000,
currentUser: User = Depends(requireSysAdmin),
) -> Dict[str, Any]:
"""Return up to ``limit`` actual orphan source-rows for one FK relationship.
Used by the SysAdmin UI's per-row download button: a human can review the
orphan list (full source row + the unresolved FK value) before triggering
the destructive clean operation.
"""
try:
records = _listOrphans(db=db, table=table, column=column, limit=limit)
except ValueError as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e),
) from e
return {
"db": db,
"table": table,
"column": column,
"count": len(records),
"limit": limit,
"records": records,
}
@router.post("/orphans/clean")
@limiter.limit("10/minute")
def postDatabaseOrphansClean(
request: Request,
body: OrphanCleanRequest,
currentUser: User = Depends(requireSysAdmin),
) -> Dict[str, Any]:
"""Delete orphaned rows for a single FK relationship."""
try:
deleted = _cleanOrphans(body.db, body.table, body.column, force=body.force)
except OrphanCleanupRefused as e:
logger.warning(
"SysAdmin orphan clean REFUSED: user=%s db=%s table=%s column=%s reason=%s",
currentUser.username,
body.db,
body.table,
body.column,
e,
)
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail={"refused": True, "reason": str(e)},
) from e
except ValueError as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e),
) from e
logger.info(
"SysAdmin orphan clean: user=%s db=%s table=%s column=%s deleted=%s force=%s",
currentUser.username,
body.db,
body.table,
body.column,
deleted,
body.force,
)
return {"deleted": deleted}
@router.post("/orphans/clean-all")
@limiter.limit("2/minute")
def postDatabaseOrphansCleanAll(
request: Request,
body: Optional[OrphanCleanAllRequest] = None,
currentUser: User = Depends(requireSysAdmin),
) -> Dict[str, Any]:
"""Run orphan cleanup for every relationship that currently has orphans.
Returns per-relationship results. Each entry contains either `deleted` (success),
`skipped` (safety guard triggered, no force), or `error` (other failure).
"""
force = bool(body.force) if body is not None else False
excludeUserFks = bool(body.excludeUserFks) if body is not None else False
results: List[dict] = _cleanAllOrphans(force=force, excludeUserFks=excludeUserFks)
skipped = sum(1 for r in results if "skipped" in r)
errored = sum(1 for r in results if "error" in r)
deletedTotal = sum(int(r.get("deleted", 0)) for r in results)
logger.info(
"SysAdmin orphan clean-all: user=%s batches=%s deleted=%s skipped=%s errored=%s force=%s excludeUserFks=%s",
currentUser.username,
len(results),
deletedTotal,
skipped,
errored,
force,
excludeUserFks,
)
return {"results": results, "skipped": skipped, "errored": errored, "deleted": deletedTotal}
# ---------------------------------------------------------------------------
# Migration (Backup / Restore)
# ---------------------------------------------------------------------------
class MigrationImportRequest(BaseModel):
"""Body for the import endpoint."""
payload: dict = Field(..., description="The full export JSON payload")
mode: str = Field(
...,
description="'replace' (clear + insert) or 'merge' (insert missing only)",
)
@router.get("/migration/databases")
@limiter.limit("30/minute")
def getMigrationDatabases(
request: Request,
currentUser: User = Depends(requireSysAdmin),
) -> Dict[str, Any]:
"""List registered databases with table/record counts for the migration UI."""
databases = _getAvailableDatabases()
return {"databases": databases, "instanceLabel": _getInstanceLabel()}
@router.get("/migration/export")
@limiter.limit("2/minute")
def getMigrationExport(
request: Request,
databases: str = "all",
currentUser: User = Depends(requireSysAdmin),
) -> StreamingResponse:
"""Export selected databases as a downloadable JSON file.
``databases`` is a comma-separated list of database names, or ``"all"``.
"""
if databases == "all":
available = _getAvailableDatabases()
dbList = [db["name"] for db in available]
else:
dbList = [d.strip() for d in databases.split(",") if d.strip()]
if not dbList:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="No databases selected for export.",
)
logger.info(
"SysAdmin migration export: user=%s databases=%s",
currentUser.username,
dbList,
)
try:
exportData = _exportDatabases(dbList)
except Exception as e:
logger.error("Migration export failed: %s", e)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Export failed: {e}",
) from e
from datetime import datetime, timezone
ts = datetime.now(timezone.utc).strftime("%Y-%m-%d_%H-%M")
filename = f"migration_backup_{ts}.json"
content = json.dumps(exportData, ensure_ascii=False, default=str)
return StreamingResponse(
iter([content]),
media_type="application/json",
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
)
@router.post("/migration/validate")
@limiter.limit("5/minute")
async def postMigrationValidate(
request: Request,
file: UploadFile = File(...),
currentUser: User = Depends(requireSysAdmin),
) -> Dict[str, Any]:
"""Validate an uploaded migration JSON file without writing anything."""
try:
rawBytes = await file.read()
payload = json.loads(rawBytes.decode("utf-8"))
except (json.JSONDecodeError, UnicodeDecodeError) as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid JSON file: {e}",
) from e
result = _validateImportPayload(payload)
logger.info(
"SysAdmin migration validate: user=%s valid=%s",
currentUser.username,
result.get("valid"),
)
return result
@router.post("/migration/import")
@limiter.limit("2/minute")
async def postMigrationImport(
request: Request,
file: UploadFile = File(...),
mode: str = "merge",
currentUser: User = Depends(requireSysAdmin),
) -> Dict[str, Any]:
"""Import a migration JSON file.
``mode`` is passed as a form field:
- ``replace``: clear all tables (except system objects) and insert.
- ``merge``: insert only records whose ID does not yet exist.
"""
if mode not in ("replace", "merge"):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid mode: '{mode}'. Must be 'replace' or 'merge'.",
)
try:
rawBytes = await file.read()
payload = json.loads(rawBytes.decode("utf-8"))
except (json.JSONDecodeError, UnicodeDecodeError) as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid JSON file: {e}",
) from e
validation = _validateImportPayload(payload)
if not validation.get("valid"):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail={"message": "Payload validation failed", "warnings": validation.get("warnings", [])},
)
logger.info(
"SysAdmin migration import: user=%s mode=%s",
currentUser.username,
mode,
)
try:
result = _importDatabases(payload, mode)
except Exception as e:
logger.error("Migration import failed: %s", e)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Import failed: {e}",
) from e
logger.info(
"SysAdmin migration import complete: user=%s mode=%s totalRecords=%s warnings=%s",
currentUser.username,
mode,
result.get("totalRecords"),
len(result.get("warnings", [])),
)
return result
# ---------------------------------------------------------------------------
# Per-DB endpoints (progress-friendly)
# ---------------------------------------------------------------------------
_pendingExports: Dict[str, dict] = {}
@router.post("/migration/export-start")
@limiter.limit("10/minute")
def postMigrationExportStart(
request: Request,
currentUser: User = Depends(requireSysAdmin),
) -> Dict[str, Any]:
"""Start an export session. Returns a token for subsequent per-DB calls."""
import uuid
token = str(uuid.uuid4())
_pendingExports[token] = {"databases": {}}
logger.info("SysAdmin migration export-start: user=%s token=%s", currentUser.username, token)
return {"token": token}
@router.get("/migration/export-single")
@limiter.limit("60/minute")
def getMigrationExportSingle(
request: Request,
token: str,
database: str,
currentUser: User = Depends(requireSysAdmin),
) -> Dict[str, Any]:
"""Export a single database and store it server-side. Returns only metadata."""
from modules.shared.dbRegistry import getRegisteredDatabases
pending = _pendingExports.get(token)
if not pending:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid export token.")
if database not in getRegisteredDatabases():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Database '{database}' is not registered.",
)
logger.info("SysAdmin migration export-single: user=%s db=%s", currentUser.username, database)
try:
dbPayload = _exportSingleDb(database)
except Exception as e:
logger.error("Export-single failed for %s: %s", database, e)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Export failed for '{database}': {e}",
) from e
pending["databases"][database] = dbPayload
logger.info("SysAdmin migration export-single done: user=%s db=%s tables=%s records=%s",
currentUser.username, database, dbPayload.get("tableCount", 0), dbPayload.get("totalRecords", 0))
return {
"database": database,
"tableCount": dbPayload.get("tableCount", 0),
"totalRecords": dbPayload.get("totalRecords", 0),
}
@router.get("/migration/export-download")
@limiter.limit("5/minute")
def getMigrationExportDownload(
request: Request,
token: str,
filename: str = "backup.json",
currentUser: User = Depends(requireSysAdmin),
) -> StreamingResponse:
"""Assemble and stream the final export file from server-side data."""
from datetime import datetime, timezone
pending = _pendingExports.pop(token, None)
if not pending or not pending.get("databases"):
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid or expired export token.")
databases = pending["databases"]
totalTables = sum(d.get("tableCount", 0) for d in databases.values())
totalRecords = sum(d.get("totalRecords", 0) for d in databases.values())
exportData = {
"meta": {
"exportedAt": datetime.now(timezone.utc).isoformat(),
"version": "1.0",
"databaseCount": len(databases),
"totalTables": totalTables,
"totalRecords": totalRecords,
},
"databases": databases,
}
logger.info("SysAdmin migration export-download: user=%s dbs=%s tables=%s records=%s",
currentUser.username, len(databases), totalTables, totalRecords)
content = json.dumps(exportData, ensure_ascii=False, default=str)
return StreamingResponse(
iter([content]),
media_type="application/json",
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
)
def _processUploadedFile(filePath: str, tmpDir: str, token: str) -> dict:
"""Parse JSON, validate, remap, split into per-DB files.
Runs in a thread pool to avoid blocking the asyncio event loop
during the CPU-heavy json.load() of large (500+ MB) files.
"""
import gc
import os
with open(filePath, "r", encoding="utf-8") as f:
payload = json.load(f)
try:
os.remove(filePath)
except OSError:
pass
result = _prepareImport(payload)
if not result.get("valid"):
del payload
gc.collect()
return {"result": result, "dbFiles": {}}
protectedIds = result.get("protectedIds", [])
dbFiles = {}
databases = payload.get("databases", {})
for dbName, dbData in databases.items():
dbPath = os.path.join(tmpDir, f"poweron_import_{token}_{dbName}.json")
with open(dbPath, "w", encoding="utf-8") as dbF:
json.dump(dbData, dbF, ensure_ascii=False, default=str)
dbFiles[dbName] = dbPath
del payload
del databases
gc.collect()
return {"result": result, "dbFiles": dbFiles, "protectedIds": protectedIds}
@router.post("/migration/upload-import")
@limiter.limit("5/minute")
async def postMigrationUploadImport(
request: Request,
file: UploadFile = File(...),
currentUser: User = Depends(requireSysAdmin),
) -> Dict[str, Any]:
"""Upload a backup file to disk (chunked), validate, remap IDs,
split into per-DB temp files so the full payload doesn't stay in RAM.
"""
import asyncio
import os
import tempfile
import uuid
token = str(uuid.uuid4())
tmpDir = tempfile.gettempdir()
filePath = os.path.join(tmpDir, f"poweron_import_{token}.json")
logger.info("SysAdmin migration upload-import: user=%s streaming to %s", currentUser.username, filePath)
totalBytes = 0
chunkSize = 1024 * 1024
try:
with open(filePath, "wb") as f:
while True:
chunk = await file.read(chunkSize)
if not chunk:
break
f.write(chunk)
totalBytes += len(chunk)
except Exception as e:
logger.error("Upload-import write failed: %s", e)
if os.path.exists(filePath):
os.remove(filePath)
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Upload failed: {e}") from e
logger.info("SysAdmin migration upload-import: %s bytes on disk (%.1f MB)",
totalBytes, totalBytes / 1024 / 1024)
try:
processed = await asyncio.to_thread(_processUploadedFile, filePath, tmpDir, token)
except (json.JSONDecodeError, UnicodeDecodeError) as e:
if os.path.exists(filePath):
os.remove(filePath)
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=f"Invalid JSON file: {e}") from e
except Exception as e:
if os.path.exists(filePath):
os.remove(filePath)
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Processing failed: {e}") from e
result = processed["result"]
dbFiles = processed.get("dbFiles", {})
if not result.get("valid"):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail={"message": "Payload validation failed", "warnings": result.get("warnings", [])},
)
logger.info("SysAdmin migration upload-import: split into %d per-DB files, payload freed",
len(dbFiles))
_pendingImports[token] = {
"dbFiles": dbFiles,
"protectedIds": processed.get("protectedIds", []),
}
return {
"token": token,
"valid": result.get("valid", False),
"databases": result.get("databases", []),
"warnings": result.get("warnings", []),
"systemObjectsFound": result.get("systemObjectsFound", []),
}
_pendingImports: Dict[str, dict] = {}
@router.post("/migration/import-single")
@limiter.limit("60/minute")
def postMigrationImportSingle(
request: Request,
body: dict,
currentUser: User = Depends(requireSysAdmin),
) -> Dict[str, Any]:
"""Import a single database from a previously uploaded + prepared payload.
Body: ``{token, database, mode}``
"""
import os
token = body.get("token", "")
database = body.get("database", "")
mode = body.get("mode", "merge")
if mode not in ("replace", "merge"):
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=f"Invalid mode: '{mode}'.")
pending = _pendingImports.get(token)
if not pending:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid or expired import token.")
dbFiles = pending.get("dbFiles", {})
dbFilePath = dbFiles.get(database)
if not dbFilePath or not os.path.exists(dbFilePath):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"No data for database '{database}'.",
)
logger.info("SysAdmin migration import-single: user=%s db=%s mode=%s", currentUser.username, database, mode)
try:
with open(dbFilePath, "r", encoding="utf-8") as f:
dbData = json.load(f)
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to read import data for '{database}': {e}",
) from e
payload = {"databases": {database: dbData}}
try:
result = _importSingleDb(payload, database, mode, pending["protectedIds"])
except Exception as e:
logger.error("Import-single failed for %s: %s", database, e)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Import failed for '{database}': {e}",
) from e
return result
@router.post("/migration/import-done")
@limiter.limit("10/minute")
def postMigrationImportDone(
request: Request,
body: dict,
currentUser: User = Depends(requireSysAdmin),
) -> Dict[str, Any]:
"""Clean up the per-DB temp files."""
import os
token = body.get("token", "")
pending = _pendingImports.pop(token, None)
if pending:
for dbPath in pending.get("dbFiles", {}).values():
try:
os.remove(dbPath)
except OSError:
pass
return {"ok": True}