platform-core/modules/routes/routeAdminSessions.py

203 lines
6.4 KiB
Python

# Copyright (c) 2026 PowerOn AG
# All rights reserved.
"""
Admin endpoints for session and trusted device management.
Allows mandate-admins and platform-admins to view and revoke active sessions
and trusted devices for users under their jurisdiction.
"""
from fastapi import APIRouter, HTTPException, status, Depends, Request, Query
from typing import Dict, Any, List
import logging
from modules.auth import limiter, getCurrentUser
from modules.datamodels.datamodelUam import User
from modules.datamodels.datamodelSecurity import Token, TokenPurpose, TokenStatus, TrustedDevice
from modules.interfaces.interfaceDbApp import getRootInterface
from modules.shared.timeUtils import getUtcTimestamp
from modules.shared.i18nRegistry import apiRouteContext
routeApiMsg = apiRouteContext("routeAdminSessions")
logger = logging.getLogger(__name__)
router = APIRouter(
prefix="/api/admin/sessions",
tags=["Admin Sessions"],
responses={404: {"description": "Not found"}},
)
def _requireAdmin(currentUser: User) -> None:
"""Ensure the caller is a platform admin or sysAdmin."""
if not (getattr(currentUser, "isPlatformAdmin", False) or getattr(currentUser, "isSysAdmin", False)):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=routeApiMsg("Only platform admins can manage sessions"),
)
@router.get("")
@limiter.limit("30/minute")
def listSessions(
request: Request,
userId: str = Query(..., description="User ID whose sessions to list"),
currentUser: User = Depends(getCurrentUser),
) -> List[Dict[str, Any]]:
"""List active auth sessions for a user."""
_requireAdmin(currentUser)
rootInterface = getRootInterface()
tokens = rootInterface.db.getRecordset(
Token,
recordFilter={
"userId": userId,
"tokenPurpose": TokenPurpose.AUTH_SESSION.value,
"status": TokenStatus.ACTIVE.value,
},
)
now = getUtcTimestamp()
result = []
for t in tokens:
expiresAt = t.get("expiresAt", 0)
if expiresAt < now:
continue
result.append({
"sessionId": t.get("sessionId"),
"tokenId": t.get("id"),
"authority": t.get("authority"),
"createdAt": t.get("sysCreatedAt"),
"expiresAt": expiresAt,
})
return result
@router.delete("/{sessionId}")
@limiter.limit("30/minute")
def revokeSession(
request: Request,
sessionId: str,
currentUser: User = Depends(getCurrentUser),
) -> Dict[str, Any]:
"""Revoke a single session by sessionId (sets status=REVOKED, not delete)."""
_requireAdmin(currentUser)
rootInterface = getRootInterface()
adminId = str(currentUser.id)
tokens = rootInterface.db.getRecordset(
Token,
recordFilter={
"sessionId": sessionId,
"tokenPurpose": TokenPurpose.AUTH_SESSION.value,
"status": TokenStatus.ACTIVE.value,
},
)
count = 0
for t in tokens:
rootInterface.revokeTokenById(t["id"], revokedBy=adminId, reason="admin session revoke")
count += 1
if count == 0:
raise HTTPException(status_code=404, detail=routeApiMsg("Session not found"))
logger.info("Admin %s revoked session %s (%d token(s))", currentUser.username, sessionId, count)
return {"revoked": count, "sessionId": sessionId}
@router.delete("")
@limiter.limit("10/minute")
def revokeAllSessions(
request: Request,
userId: str = Query(..., description="User ID whose sessions to revoke"),
currentUser: User = Depends(getCurrentUser),
) -> Dict[str, Any]:
"""Revoke ALL active sessions for a user (force logout everywhere)."""
_requireAdmin(currentUser)
rootInterface = getRootInterface()
adminId = str(currentUser.id)
count = rootInterface.revokeTokensByUser(
userId, revokedBy=adminId, reason="admin revoke all sessions",
)
logger.info("Admin %s revoked all sessions for userId=%s (%d token(s))", currentUser.username, userId, count)
return {"revoked": count, "userId": userId}
# --- Trusted Devices ---
trustedDeviceRouter = APIRouter(
prefix="/api/admin/trusted-devices",
tags=["Admin Sessions"],
responses={404: {"description": "Not found"}},
)
@trustedDeviceRouter.get("")
@limiter.limit("30/minute")
def listTrustedDevices(
request: Request,
userId: str = Query(..., description="User ID whose trusted devices to list"),
currentUser: User = Depends(getCurrentUser),
) -> List[Dict[str, Any]]:
"""List trusted devices for a user."""
_requireAdmin(currentUser)
rootInterface = getRootInterface()
devices = rootInterface.db.getRecordset(
TrustedDevice, recordFilter={"userId": userId}
)
now = getUtcTimestamp()
result = []
for d in devices:
result.append({
"id": d.get("id", ""),
"trustedUntil": d.get("trustedUntil"),
"isExpired": d.get("trustedUntil", 0) < now,
"userAgent": d.get("userAgent"),
"ipAddress": d.get("ipAddress"),
"createdAt": d.get("createdAt"),
})
return result
@trustedDeviceRouter.delete("/{deviceId}")
@limiter.limit("30/minute")
def revokeTrustedDevice(
request: Request,
deviceId: str,
currentUser: User = Depends(getCurrentUser),
) -> Dict[str, Any]:
"""Revoke a single trusted device by ID."""
_requireAdmin(currentUser)
rootInterface = getRootInterface()
existing = rootInterface.db.getRecord(TrustedDevice, deviceId)
if not existing:
raise HTTPException(status_code=404, detail=routeApiMsg("Trusted device not found"))
rootInterface.db.recordDelete(TrustedDevice, deviceId)
logger.info("Admin %s revoked trusted device %s", currentUser.username, deviceId)
return {"revoked": 1, "deviceId": deviceId}
@trustedDeviceRouter.delete("")
@limiter.limit("10/minute")
def revokeAllTrustedDevices(
request: Request,
userId: str = Query(..., description="User ID whose trusted devices to revoke"),
currentUser: User = Depends(getCurrentUser),
) -> Dict[str, Any]:
"""Revoke ALL trusted devices for a user (force MFA on next login)."""
_requireAdmin(currentUser)
rootInterface = getRootInterface()
from modules.auth.trustedDeviceService import revokeTrustedDevices
count = revokeTrustedDevices(userId, rootInterface.db)
logger.info("Admin %s revoked all trusted devices for userId=%s (%d)", currentUser.username, userId, count)
return {"revoked": count, "userId": userId}