# Copyright (c) 2025 Patrick Motsch # All rights reserved. """ SysAdmin API for database table statistics, FK orphan detection/cleanup, and database migration (backup / restore). """ import json import logging from typing import Any, Dict, List, Optional from fastapi import APIRouter, Depends, File, HTTPException, Request, UploadFile, status from fastapi.responses import StreamingResponse from pydantic import BaseModel, Field from modules.auth import limiter from modules.auth.authentication import requireSysAdmin from modules.datamodels.datamodelUam import User from modules.system.databaseHealth import ( OrphanCleanupRefused, _cleanAllOrphans, _cleanOrphans, _getTableStats, _isUserIdFk, _listOrphans, _scanOrphans, ) from modules.system.databaseMigration import ( _exportDatabases, _exportSingleDb, _getAvailableDatabases, _getInstanceLabel, _importDatabases, _importSingleDb, _prepareImport, _validateImportPayload, ) logger = logging.getLogger(__name__) router = APIRouter( prefix="/api/admin/database-health", tags=["Admin Database Health"], ) class OrphanCleanRequest(BaseModel): """Body for deleting orphans for one FK relationship.""" db: str = Field(..., description="Source database name (e.g. poweron_app)") table: str = Field(..., description="Source table (Pydantic model class name)") column: str = Field(..., description="FK column on the source table") force: bool = Field( False, description="Override safety guards (empty target / >50%% of source). Use with care.", ) class OrphanCleanAllRequest(BaseModel): """Body for cleaning all detected orphans.""" force: bool = Field( False, description="Override safety guards on every relationship. Use with extreme care.", ) excludeUserFks: bool = Field( False, description=( "Skip FK relationships pointing at UserInDB.id. Deleted-user remnants " "(audit / billing / membership rows) are handled by a dedicated purge " "workflow and should not be touched by generic FK cleanup." ), ) @router.get("/stats") @limiter.limit("30/minute") def getDatabaseTableStats( request: Request, db: Optional[str] = None, currentUser: User = Depends(requireSysAdmin), ) -> Dict[str, Any]: """Table statistics from pg_stat_user_tables (optional filter by database name).""" rows = _getTableStats(dbFilter=db) return {"stats": rows} @router.get("/orphans") @limiter.limit("10/minute") def getDatabaseOrphans( request: Request, db: Optional[str] = None, excludeUserFks: bool = False, currentUser: User = Depends(requireSysAdmin), ) -> Dict[str, Any]: """FK orphan scan (optional filter by source database name). When ``excludeUserFks=true``, results targeting ``UserInDB.id`` are omitted from the response so the SysAdmin UI can keep deleted-user remnants visually separate from real FK drift. """ rows = _scanOrphans(dbFilter=db) if excludeUserFks: rows = [r for r in rows if not _isUserIdFk(r.get("targetTable", ""), r.get("targetColumn", ""))] return {"orphans": rows} @router.get("/orphans/list") @limiter.limit("30/minute") def getDatabaseOrphansList( request: Request, db: str, table: str, column: str, limit: int = 1000, currentUser: User = Depends(requireSysAdmin), ) -> Dict[str, Any]: """Return up to ``limit`` actual orphan source-rows for one FK relationship. Used by the SysAdmin UI's per-row download button: a human can review the orphan list (full source row + the unresolved FK value) before triggering the destructive clean operation. """ try: records = _listOrphans(db=db, table=table, column=column, limit=limit) except ValueError as e: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=str(e), ) from e return { "db": db, "table": table, "column": column, "count": len(records), "limit": limit, "records": records, } @router.post("/orphans/clean") @limiter.limit("10/minute") def postDatabaseOrphansClean( request: Request, body: OrphanCleanRequest, currentUser: User = Depends(requireSysAdmin), ) -> Dict[str, Any]: """Delete orphaned rows for a single FK relationship.""" try: deleted = _cleanOrphans(body.db, body.table, body.column, force=body.force) except OrphanCleanupRefused as e: logger.warning( "SysAdmin orphan clean REFUSED: user=%s db=%s table=%s column=%s reason=%s", currentUser.username, body.db, body.table, body.column, e, ) raise HTTPException( status_code=status.HTTP_409_CONFLICT, detail={"refused": True, "reason": str(e)}, ) from e except ValueError as e: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=str(e), ) from e logger.info( "SysAdmin orphan clean: user=%s db=%s table=%s column=%s deleted=%s force=%s", currentUser.username, body.db, body.table, body.column, deleted, body.force, ) return {"deleted": deleted} @router.post("/orphans/clean-all") @limiter.limit("2/minute") def postDatabaseOrphansCleanAll( request: Request, body: Optional[OrphanCleanAllRequest] = None, currentUser: User = Depends(requireSysAdmin), ) -> Dict[str, Any]: """Run orphan cleanup for every relationship that currently has orphans. Returns per-relationship results. Each entry contains either `deleted` (success), `skipped` (safety guard triggered, no force), or `error` (other failure). """ force = bool(body.force) if body is not None else False excludeUserFks = bool(body.excludeUserFks) if body is not None else False results: List[dict] = _cleanAllOrphans(force=force, excludeUserFks=excludeUserFks) skipped = sum(1 for r in results if "skipped" in r) errored = sum(1 for r in results if "error" in r) deletedTotal = sum(int(r.get("deleted", 0)) for r in results) logger.info( "SysAdmin orphan clean-all: user=%s batches=%s deleted=%s skipped=%s errored=%s force=%s excludeUserFks=%s", currentUser.username, len(results), deletedTotal, skipped, errored, force, excludeUserFks, ) return {"results": results, "skipped": skipped, "errored": errored, "deleted": deletedTotal} # --------------------------------------------------------------------------- # Migration (Backup / Restore) # --------------------------------------------------------------------------- class MigrationImportRequest(BaseModel): """Body for the import endpoint.""" payload: dict = Field(..., description="The full export JSON payload") mode: str = Field( ..., description="'replace' (clear + insert) or 'merge' (insert missing only)", ) @router.get("/migration/databases") @limiter.limit("30/minute") def getMigrationDatabases( request: Request, currentUser: User = Depends(requireSysAdmin), ) -> Dict[str, Any]: """List registered databases with table/record counts for the migration UI.""" databases = _getAvailableDatabases() return {"databases": databases, "instanceLabel": _getInstanceLabel()} @router.get("/migration/export") @limiter.limit("2/minute") def getMigrationExport( request: Request, databases: str = "all", currentUser: User = Depends(requireSysAdmin), ) -> StreamingResponse: """Export selected databases as a downloadable JSON file. ``databases`` is a comma-separated list of database names, or ``"all"``. """ if databases == "all": available = _getAvailableDatabases() dbList = [db["name"] for db in available] else: dbList = [d.strip() for d in databases.split(",") if d.strip()] if not dbList: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="No databases selected for export.", ) logger.info( "SysAdmin migration export: user=%s databases=%s", currentUser.username, dbList, ) try: exportData = _exportDatabases(dbList) except Exception as e: logger.error("Migration export failed: %s", e) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Export failed: {e}", ) from e from datetime import datetime, timezone ts = datetime.now(timezone.utc).strftime("%Y-%m-%d_%H-%M") filename = f"migration_backup_{ts}.json" content = json.dumps(exportData, ensure_ascii=False, default=str) return StreamingResponse( iter([content]), media_type="application/json", headers={"Content-Disposition": f'attachment; filename="{filename}"'}, ) @router.post("/migration/validate") @limiter.limit("5/minute") async def postMigrationValidate( request: Request, file: UploadFile = File(...), currentUser: User = Depends(requireSysAdmin), ) -> Dict[str, Any]: """Validate an uploaded migration JSON file without writing anything.""" try: rawBytes = await file.read() payload = json.loads(rawBytes.decode("utf-8")) except (json.JSONDecodeError, UnicodeDecodeError) as e: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Invalid JSON file: {e}", ) from e result = _validateImportPayload(payload) logger.info( "SysAdmin migration validate: user=%s valid=%s", currentUser.username, result.get("valid"), ) return result @router.post("/migration/import") @limiter.limit("2/minute") async def postMigrationImport( request: Request, file: UploadFile = File(...), mode: str = "merge", currentUser: User = Depends(requireSysAdmin), ) -> Dict[str, Any]: """Import a migration JSON file. ``mode`` is passed as a form field: - ``replace``: clear all tables (except system objects) and insert. - ``merge``: insert only records whose ID does not yet exist. """ if mode not in ("replace", "merge"): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Invalid mode: '{mode}'. Must be 'replace' or 'merge'.", ) try: rawBytes = await file.read() payload = json.loads(rawBytes.decode("utf-8")) except (json.JSONDecodeError, UnicodeDecodeError) as e: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Invalid JSON file: {e}", ) from e validation = _validateImportPayload(payload) if not validation.get("valid"): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail={"message": "Payload validation failed", "warnings": validation.get("warnings", [])}, ) logger.info( "SysAdmin migration import: user=%s mode=%s", currentUser.username, mode, ) try: result = _importDatabases(payload, mode) except Exception as e: logger.error("Migration import failed: %s", e) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Import failed: {e}", ) from e logger.info( "SysAdmin migration import complete: user=%s mode=%s totalRecords=%s warnings=%s", currentUser.username, mode, result.get("totalRecords"), len(result.get("warnings", [])), ) return result # --------------------------------------------------------------------------- # Per-DB endpoints (progress-friendly) # --------------------------------------------------------------------------- @router.get("/migration/export-single") @limiter.limit("60/minute") def getMigrationExportSingle( request: Request, database: str, currentUser: User = Depends(requireSysAdmin), ) -> Dict[str, Any]: """Export a single database as JSON (used by the frontend for per-DB progress).""" from modules.shared.dbRegistry import getRegisteredDatabases if database not in getRegisteredDatabases(): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Database '{database}' is not registered.", ) logger.info("SysAdmin migration export-single: user=%s db=%s", currentUser.username, database) try: dbPayload = _exportSingleDb(database) except Exception as e: logger.error("Export-single failed for %s: %s", database, e) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Export failed for '{database}': {e}", ) from e return {"database": database, "data": dbPayload} @router.post("/migration/prepare-import") @limiter.limit("5/minute") async def postMigrationPrepareImport( request: Request, file: UploadFile = File(...), currentUser: User = Depends(requireSysAdmin), ) -> Dict[str, Any]: """Validate + remap system-object IDs and return metadata for per-DB import. The remapped payload is stored server-side in memory (returned as opaque token) so the frontend can drive per-DB import calls without re-uploading. """ try: rawBytes = await file.read() payload = json.loads(rawBytes.decode("utf-8")) except (json.JSONDecodeError, UnicodeDecodeError) as e: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Invalid JSON file: {e}", ) from e logger.info("SysAdmin migration prepare-import: user=%s", currentUser.username) result = _prepareImport(payload) if not result.get("valid"): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail={"message": "Payload validation failed", "warnings": result.get("warnings", [])}, ) import uuid token = str(uuid.uuid4()) _pendingImports[token] = { "payload": payload, "protectedIds": result["protectedIds"], } return { "valid": True, "token": token, "databases": result["databases"], "warnings": result["warnings"], "systemObjectsFound": result["systemObjectsFound"], "protectedIds": result["protectedIds"], } _pendingImports: Dict[str, dict] = {} @router.post("/migration/import-single") @limiter.limit("60/minute") def postMigrationImportSingle( request: Request, body: dict, currentUser: User = Depends(requireSysAdmin), ) -> Dict[str, Any]: """Import a single database from a previously prepared payload. Body: ``{token, database, mode}`` """ token = body.get("token", "") database = body.get("database", "") mode = body.get("mode", "merge") if mode not in ("replace", "merge"): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Invalid mode: '{mode}'.", ) pending = _pendingImports.get(token) if not pending: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid or expired import token. Please re-upload the file.", ) logger.info("SysAdmin migration import-single: user=%s db=%s mode=%s", currentUser.username, database, mode) try: result = _importSingleDb(pending["payload"], database, mode, pending["protectedIds"]) except Exception as e: logger.error("Import-single failed for %s: %s", database, e) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Import failed for '{database}': {e}", ) from e return result @router.post("/migration/import-done") @limiter.limit("10/minute") def postMigrationImportDone( request: Request, body: dict, currentUser: User = Depends(requireSysAdmin), ) -> Dict[str, Any]: """Clean up the server-side payload cache after import is complete.""" token = body.get("token", "") if token in _pendingImports: del _pendingImports[token] return {"ok": True}