# Copyright (c) 2026 PowerOn AG # All rights reserved. """ Admin endpoints for session and trusted device management. Allows mandate-admins and platform-admins to view and revoke active sessions and trusted devices for users under their jurisdiction. """ from fastapi import APIRouter, HTTPException, status, Depends, Request, Query from typing import Dict, Any, List import logging from modules.auth import limiter, getCurrentUser from modules.datamodels.datamodelUam import User from modules.datamodels.datamodelSecurity import Token, TokenPurpose, TokenStatus, TrustedDevice from modules.interfaces.interfaceDbApp import getRootInterface from modules.shared.timeUtils import getUtcTimestamp from modules.shared.i18nRegistry import apiRouteContext routeApiMsg = apiRouteContext("routeAdminSessions") logger = logging.getLogger(__name__) router = APIRouter( prefix="/api/admin/sessions", tags=["Admin Sessions"], responses={404: {"description": "Not found"}}, ) def _requireAdmin(currentUser: User) -> None: """Ensure the caller is a platform admin or sysAdmin.""" if not (getattr(currentUser, "isPlatformAdmin", False) or getattr(currentUser, "isSysAdmin", False)): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=routeApiMsg("Only platform admins can manage sessions"), ) @router.get("") @limiter.limit("30/minute") def listSessions( request: Request, userId: str = Query(..., description="User ID whose sessions to list"), currentUser: User = Depends(getCurrentUser), ) -> List[Dict[str, Any]]: """List active auth sessions for a user.""" _requireAdmin(currentUser) rootInterface = getRootInterface() tokens = rootInterface.db.getRecordset( Token, recordFilter={ "userId": userId, "tokenPurpose": TokenPurpose.AUTH_SESSION.value, "status": TokenStatus.ACTIVE.value, }, ) now = getUtcTimestamp() result = [] for t in tokens: expiresAt = t.get("expiresAt", 0) if expiresAt < now: continue result.append({ "sessionId": t.get("sessionId"), "tokenId": t.get("id"), "authority": t.get("authority"), "createdAt": t.get("sysCreatedAt"), "expiresAt": expiresAt, }) return result @router.delete("/{sessionId}") @limiter.limit("30/minute") def revokeSession( request: Request, sessionId: str, currentUser: User = Depends(getCurrentUser), ) -> Dict[str, Any]: """Revoke a single session by sessionId (sets status=REVOKED, not delete).""" _requireAdmin(currentUser) rootInterface = getRootInterface() adminId = str(currentUser.id) tokens = rootInterface.db.getRecordset( Token, recordFilter={ "sessionId": sessionId, "tokenPurpose": TokenPurpose.AUTH_SESSION.value, "status": TokenStatus.ACTIVE.value, }, ) count = 0 for t in tokens: rootInterface.revokeTokenById(t["id"], revokedBy=adminId, reason="admin session revoke") count += 1 if count == 0: raise HTTPException(status_code=404, detail=routeApiMsg("Session not found")) logger.info("Admin %s revoked session %s (%d token(s))", currentUser.username, sessionId, count) return {"revoked": count, "sessionId": sessionId} @router.delete("") @limiter.limit("10/minute") def revokeAllSessions( request: Request, userId: str = Query(..., description="User ID whose sessions to revoke"), currentUser: User = Depends(getCurrentUser), ) -> Dict[str, Any]: """Revoke ALL active sessions for a user (force logout everywhere).""" _requireAdmin(currentUser) rootInterface = getRootInterface() adminId = str(currentUser.id) count = rootInterface.revokeTokensByUser( userId, revokedBy=adminId, reason="admin revoke all sessions", ) logger.info("Admin %s revoked all sessions for userId=%s (%d token(s))", currentUser.username, userId, count) return {"revoked": count, "userId": userId} # --- Trusted Devices --- trustedDeviceRouter = APIRouter( prefix="/api/admin/trusted-devices", tags=["Admin Sessions"], responses={404: {"description": "Not found"}}, ) @trustedDeviceRouter.get("") @limiter.limit("30/minute") def listTrustedDevices( request: Request, userId: str = Query(..., description="User ID whose trusted devices to list"), currentUser: User = Depends(getCurrentUser), ) -> List[Dict[str, Any]]: """List trusted devices for a user.""" _requireAdmin(currentUser) rootInterface = getRootInterface() devices = rootInterface.db.getRecordset( TrustedDevice, recordFilter={"userId": userId} ) now = getUtcTimestamp() result = [] for d in devices: result.append({ "id": d.get("id", ""), "trustedUntil": d.get("trustedUntil"), "isExpired": d.get("trustedUntil", 0) < now, "userAgent": d.get("userAgent"), "ipAddress": d.get("ipAddress"), "createdAt": d.get("createdAt"), }) return result @trustedDeviceRouter.delete("/{deviceId}") @limiter.limit("30/minute") def revokeTrustedDevice( request: Request, deviceId: str, currentUser: User = Depends(getCurrentUser), ) -> Dict[str, Any]: """Revoke a single trusted device by ID.""" _requireAdmin(currentUser) rootInterface = getRootInterface() existing = rootInterface.db.getRecord(TrustedDevice, deviceId) if not existing: raise HTTPException(status_code=404, detail=routeApiMsg("Trusted device not found")) rootInterface.db.recordDelete(TrustedDevice, deviceId) logger.info("Admin %s revoked trusted device %s", currentUser.username, deviceId) return {"revoked": 1, "deviceId": deviceId} @trustedDeviceRouter.delete("") @limiter.limit("10/minute") def revokeAllTrustedDevices( request: Request, userId: str = Query(..., description="User ID whose trusted devices to revoke"), currentUser: User = Depends(getCurrentUser), ) -> Dict[str, Any]: """Revoke ALL trusted devices for a user (force MFA on next login).""" _requireAdmin(currentUser) rootInterface = getRootInterface() from modules.auth.trustedDeviceService import revokeTrustedDevices count = revokeTrustedDevices(userId, rootInterface.db) logger.info("Admin %s revoked all trusted devices for userId=%s (%d)", currentUser.username, userId, count) return {"revoked": count, "userId": userId}