gateway/modules/routes/routeSecurityAdmin.py
2025-12-15 21:55:26 +01:00

460 lines
17 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
from fastapi import APIRouter, HTTPException, Depends, status, Request, Body
from fastapi.responses import FileResponse, JSONResponse
from typing import Optional, Dict, Any, List
import os
import logging
from modules.auth import getCurrentUser, limiter
from modules.interfaces.interfaceDbAppObjects import getInterface, getRootInterface
from modules.datamodels.datamodelUam import User, UserInDB, AuthAuthority
from modules.datamodels.datamodelSecurity import Token
from modules.shared.configuration import APP_CONFIG
logger = logging.getLogger(__name__)
router = APIRouter(
prefix="/api/admin",
tags=["Security Administration"],
responses={
404: {"description": "Not found"},
400: {"description": "Bad request"},
401: {"description": "Unauthorized"},
403: {"description": "Forbidden"},
500: {"description": "Internal server error"}
}
)
def _ensure_admin_scope(current_user: User, target_mandate_id: Optional[str] = None) -> None:
roleLabels = current_user.roleLabels or []
if "admin" not in roleLabels and "sysadmin" not in roleLabels:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin privileges required")
if "admin" in roleLabels and "sysadmin" not in roleLabels:
if target_mandate_id and str(target_mandate_id) != str(current_user.mandateId):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Forbidden for target mandate")
# ----------------------
# Token listing and revocation
# ----------------------
@router.get("/tokens")
@limiter.limit("30/minute")
async def list_tokens(
request: Request,
currentUser: User = Depends(getCurrentUser),
userId: Optional[str] = None,
authority: Optional[str] = None,
sessionId: Optional[str] = None,
statusFilter: Optional[str] = None,
connectionId: Optional[str] = None,
) -> List[Dict[str, Any]]:
try:
appInterface = getRootInterface()
target_mandate = currentUser.mandateId
_ensure_admin_scope(currentUser, target_mandate)
recordFilter: Dict[str, Any] = {}
if userId:
recordFilter["userId"] = userId
if authority:
recordFilter["authority"] = authority
if sessionId:
recordFilter["sessionId"] = sessionId
if connectionId:
recordFilter["connectionId"] = connectionId
if statusFilter:
recordFilter["status"] = statusFilter
roleLabels = currentUser.roleLabels or []
if "admin" in roleLabels and "sysadmin" not in roleLabels:
recordFilter["mandateId"] = str(currentUser.mandateId)
tokens = appInterface.db.getRecordset(Token, recordFilter=recordFilter)
return tokens
except HTTPException:
raise
except Exception as e:
logger.error(f"Error listing tokens: {str(e)}")
raise HTTPException(status_code=500, detail="Failed to list tokens")
@router.post("/tokens/revoke/user")
@limiter.limit("30/minute")
async def revoke_tokens_by_user(
request: Request,
currentUser: User = Depends(getCurrentUser),
payload: Dict[str, Any] = Body(...)
) -> Dict[str, Any]:
try:
userId = payload.get("userId")
authority = payload.get("authority")
reason = payload.get("reason", "admin revoke")
if not userId:
raise HTTPException(status_code=400, detail="userId is required")
appInterface = getRootInterface()
# Tenant scope check
target_user = appInterface.db.getRecordset(User, recordFilter={"id": userId})
target_mandate = target_user[0].get("mandateId") if target_user else None
_ensure_admin_scope(currentUser, target_mandate)
roleLabels = currentUser.roleLabels or []
count = appInterface.revokeTokensByUser(
userId=userId,
authority=AuthAuthority(authority) if authority else None,
mandateId=None if "sysadmin" in roleLabels else str(currentUser.mandateId),
revokedBy=currentUser.id,
reason=reason
)
return {"revoked": count}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error revoking tokens by user: {str(e)}")
raise HTTPException(status_code=500, detail="Failed to revoke tokens")
@router.post("/tokens/revoke/session")
@limiter.limit("30/minute")
async def revoke_tokens_by_session(
request: Request,
currentUser: User = Depends(getCurrentUser),
payload: Dict[str, Any] = Body(...)
) -> Dict[str, Any]:
try:
userId = payload.get("userId")
sessionId = payload.get("sessionId")
authority = payload.get("authority", "local")
reason = payload.get("reason", "admin session revoke")
if not userId or not sessionId:
raise HTTPException(status_code=400, detail="userId and sessionId are required")
appInterface = getRootInterface()
target_user = appInterface.db.getRecordset(User, recordFilter={"id": userId})
target_mandate = target_user[0].get("mandateId") if target_user else None
_ensure_admin_scope(currentUser, target_mandate)
count = appInterface.revokeTokensBySessionId(
sessionId=sessionId,
userId=userId,
authority=AuthAuthority(authority),
revokedBy=currentUser.id,
reason=reason
)
return {"revoked": count}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error revoking tokens by session: {str(e)}")
raise HTTPException(status_code=500, detail="Failed to revoke session tokens")
@router.post("/tokens/revoke/id")
@limiter.limit("30/minute")
async def revoke_token_by_id(
request: Request,
currentUser: User = Depends(getCurrentUser),
payload: Dict[str, Any] = Body(...)
) -> Dict[str, Any]:
try:
tokenId = payload.get("tokenId")
reason = payload.get("reason", "admin revoke")
if not tokenId:
raise HTTPException(status_code=400, detail="tokenId is required")
appInterface = getRootInterface()
# Load token to check tenant scope for admins
tokens = appInterface.db.getRecordset(Token, recordFilter={"id": tokenId})
if not tokens:
return {"revoked": 0}
target_mandate = tokens[0].get("mandateId")
_ensure_admin_scope(currentUser, target_mandate)
ok = appInterface.revokeTokenById(tokenId, revokedBy=currentUser.id, reason=reason)
return {"revoked": 1 if ok else 0}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error revoking token by id: {str(e)}")
raise HTTPException(status_code=500, detail="Failed to revoke token")
@router.post("/tokens/revoke/mandate")
@limiter.limit("10/minute")
async def revoke_tokens_by_mandate(
request: Request,
currentUser: User = Depends(getCurrentUser),
payload: Dict[str, Any] = Body(...)
) -> Dict[str, Any]:
try:
mandateId = payload.get("mandateId")
authority = payload.get("authority", "local")
reason = payload.get("reason", "admin mandate revoke")
if not mandateId:
raise HTTPException(status_code=400, detail="mandateId is required")
_ensure_admin_scope(currentUser, mandateId)
# Revoke for all users in mandate
appInterface = getRootInterface()
# IMPORTANT: user rows are stored as UserInDB in the database
users = appInterface.db.getRecordset(UserInDB, recordFilter={"mandateId": mandateId})
total = 0
for u in users:
# Revoke regardless of token.mandateId to also catch legacy tokens without mandateId
total += appInterface.revokeTokensByUser(
userId=u["id"],
authority=AuthAuthority(authority),
mandateId=None,
revokedBy=currentUser.id,
reason=reason
)
return {"revoked": total}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error revoking tokens by mandate: {str(e)}")
raise HTTPException(status_code=500, detail="Failed to revoke mandate tokens")
# ----------------------
# Logs download
# ----------------------
@router.get("/logs/{log_name}")
@limiter.limit("60/minute")
async def download_log(
request: Request,
currentUser: User = Depends(getCurrentUser),
log_name: str = "poweron"
):
_ensure_admin_scope(currentUser)
base_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
# base_dir -> gateway
if log_name == "poweron":
file_path = os.path.join(base_dir, "poweron.log")
elif log_name == "audit":
file_path = os.path.join(base_dir, "audit.log")
else:
raise HTTPException(status_code=400, detail="Unsupported log name")
if not os.path.exists(file_path):
raise HTTPException(status_code=404, detail=f"{log_name}.log not found")
return FileResponse(path=file_path, filename=f"{log_name}.log")
# ----------------------
# Database admin
# ----------------------
@router.get("/databases")
@limiter.limit("10/minute")
async def list_databases(
request: Request,
currentUser: User = Depends(getCurrentUser)
) -> Dict[str, Any]:
_ensure_admin_scope(currentUser)
# Get database names from configuration for each interface
databases = []
# App database (interfaceDbAppObjects.py)
app_db = APP_CONFIG.get("DB_APP_DATABASE")
if app_db:
databases.append(app_db)
# Chat database (interfaceDbChatObjects.py)
chat_db = APP_CONFIG.get("DB_CHAT_DATABASE")
if chat_db:
databases.append(chat_db)
# Management database (interfaceDbComponentObjects.py)
management_db = APP_CONFIG.get("DB_MANAGEMENT_DATABASE")
if management_db:
databases.append(management_db)
# Fallback to default if no databases configured
if not databases:
databases = ["poweron"]
return {"databases": databases}
@router.get("/databases/{database_name}/tables")
@limiter.limit("30/minute")
async def get_database_tables(
request: Request,
database_name: str,
currentUser: User = Depends(getCurrentUser)
) -> Dict[str, Any]:
_ensure_admin_scope(currentUser)
# Get all configured database names
configured_dbs = []
app_db = APP_CONFIG.get("DB_APP_DATABASE")
if app_db:
configured_dbs.append(app_db)
chat_db = APP_CONFIG.get("DB_CHAT_DATABASE")
if chat_db:
configured_dbs.append(chat_db)
management_db = APP_CONFIG.get("DB_MANAGEMENT_DATABASE")
if management_db:
configured_dbs.append(management_db)
if not configured_dbs:
configured_dbs = ["poweron"]
if database_name not in configured_dbs:
raise HTTPException(status_code=400, detail=f"Invalid database name. Available databases: {configured_dbs}")
try:
# Use the appropriate interface based on database name
if database_name == app_db:
appInterface = getRootInterface()
tables = appInterface.db.getTables()
elif database_name == chat_db:
from modules.interfaces.interfaceDbChatObjects import getInterface as getChatInterface
chatInterface = getChatInterface(currentUser)
tables = chatInterface.db.getTables()
elif database_name == management_db:
from modules.interfaces.interfaceDbComponentObjects import getInterface as getComponentInterface
componentInterface = getComponentInterface(currentUser)
tables = componentInterface.db.getTables()
else:
raise HTTPException(status_code=400, detail="Database not found")
return {"tables": tables}
except Exception as e:
logger.error(f"Error getting database tables: {str(e)}")
raise HTTPException(status_code=500, detail="Failed to get database tables")
@router.post("/databases/{database_name}/tables/{table_name}/drop")
@limiter.limit("10/minute")
async def drop_table(
request: Request,
database_name: str,
table_name: str,
currentUser: User = Depends(getCurrentUser),
payload: Dict[str, Any] = Body(...)
) -> Dict[str, Any]:
_ensure_admin_scope(currentUser)
# Get all configured database names
configured_dbs = []
app_db = APP_CONFIG.get("DB_APP_DATABASE")
if app_db:
configured_dbs.append(app_db)
chat_db = APP_CONFIG.get("DB_CHAT_DATABASE")
if chat_db:
configured_dbs.append(chat_db)
management_db = APP_CONFIG.get("DB_MANAGEMENT_DATABASE")
if management_db:
configured_dbs.append(management_db)
if not configured_dbs:
configured_dbs = ["poweron"]
if database_name not in configured_dbs:
raise HTTPException(status_code=400, detail=f"Invalid database name. Available databases: {configured_dbs}")
try:
# Use the appropriate interface based on database name
if database_name == app_db:
interface = getRootInterface()
elif database_name == chat_db:
from modules.interfaces.interfaceDbChatObjects import getInterface as getChatInterface
interface = getChatInterface(currentUser)
elif database_name == management_db:
from modules.interfaces.interfaceDbComponentObjects import getInterface as getComponentInterface
interface = getComponentInterface(currentUser)
else:
raise HTTPException(status_code=400, detail="Database not found")
conn = interface.db.connection
with conn.cursor() as cursor:
# Check if table exists
cursor.execute("""
SELECT table_name FROM information_schema.tables
WHERE table_schema = 'public' AND table_name = %s
""", (table_name,))
if not cursor.fetchone():
raise HTTPException(status_code=404, detail="Table not found")
# Drop the table
cursor.execute(f'DROP TABLE IF EXISTS "{table_name}" CASCADE')
conn.commit()
logger.warning(f"Admin drop_table executed by {currentUser.id}: dropped table '{table_name}' from database '{database_name}'")
return {"message": f"Table '{table_name}' dropped successfully from database '{database_name}'"}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error dropping table: {str(e)}")
if 'interface' in locals() and interface and interface.db and interface.db.connection:
interface.db.connection.rollback()
raise HTTPException(status_code=500, detail="Failed to drop table")
@router.post("/databases/drop")
@limiter.limit("5/minute")
async def drop_database(
request: Request,
currentUser: User = Depends(getCurrentUser),
payload: Dict[str, Any] = Body(...)
) -> Dict[str, Any]:
_ensure_admin_scope(currentUser)
db_name = payload.get("database")
# Get all configured database names
configured_dbs = []
app_db = APP_CONFIG.get("DB_APP_DATABASE")
if app_db:
configured_dbs.append(app_db)
chat_db = APP_CONFIG.get("DB_CHAT_DATABASE")
if chat_db:
configured_dbs.append(chat_db)
management_db = APP_CONFIG.get("DB_MANAGEMENT_DATABASE")
if management_db:
configured_dbs.append(management_db)
if not configured_dbs:
configured_dbs = ["poweron"]
if not db_name or db_name not in configured_dbs:
raise HTTPException(status_code=400, detail=f"Invalid database name. Available databases: {configured_dbs}")
try:
# Use the appropriate interface based on database name
if db_name == app_db:
interface = getRootInterface()
elif db_name == chat_db:
from modules.interfaces.interfaceDbChatObjects import getInterface as getChatInterface
interface = getChatInterface(currentUser)
elif db_name == management_db:
from modules.interfaces.interfaceDbComponentObjects import getInterface as getComponentInterface
interface = getComponentInterface(currentUser)
else:
raise HTTPException(status_code=400, detail="Database not found")
conn = interface.db.connection
with conn.cursor() as cursor:
# Drop all user tables (public schema) except system table
cursor.execute("""
SELECT table_name FROM information_schema.tables
WHERE table_schema = 'public' AND table_type = 'BASE TABLE'
""")
tables = [row['table_name'] for row in cursor.fetchall()]
dropped = []
for tbl in tables:
cursor.execute(f'DROP TABLE IF EXISTS "{tbl}" CASCADE')
dropped.append(tbl)
conn.commit()
logger.warning(f"Admin drop_database executed by {currentUser.id}: dropped tables from '{db_name}': {dropped}")
return {"droppedTables": dropped}
except Exception as e:
logger.error(f"Error dropping database tables: {str(e)}")
if 'interface' in locals() and interface and interface.db and interface.db.connection:
interface.db.connection.rollback()
raise HTTPException(status_code=500, detail="Failed to drop database tables")