185 lines
5.7 KiB
Python
185 lines
5.7 KiB
Python
# Copyright (c) 2026 PowerOn AG
|
|
# All rights reserved.
|
|
"""
|
|
Admin endpoints for session and trusted device management.
|
|
|
|
Allows mandate-admins and platform-admins to view and revoke active sessions
|
|
and trusted devices for users under their jurisdiction.
|
|
"""
|
|
|
|
from fastapi import APIRouter, HTTPException, status, Depends, Request, Query
|
|
from typing import Dict, Any, List
|
|
import logging
|
|
|
|
from modules.auth import limiter, getCurrentUser
|
|
from modules.datamodels.datamodelUam import User
|
|
from modules.datamodels.datamodelSecurity import Token, TokenPurpose, TokenStatus, TrustedDevice
|
|
from modules.interfaces.interfaceDbApp import getRootInterface
|
|
from modules.shared.timeUtils import getUtcTimestamp
|
|
from modules.shared.i18nRegistry import apiRouteContext
|
|
|
|
routeApiMsg = apiRouteContext("routeAdminSessions")
|
|
logger = logging.getLogger(__name__)
|
|
|
|
router = APIRouter(
|
|
prefix="/api/admin/sessions",
|
|
tags=["Admin Sessions"],
|
|
responses={404: {"description": "Not found"}},
|
|
)
|
|
|
|
|
|
def _requireAdmin(currentUser: User) -> None:
|
|
"""Ensure the caller is a platform admin or sysAdmin."""
|
|
if not (getattr(currentUser, "isPlatformAdmin", False) or getattr(currentUser, "isSysAdmin", False)):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail=routeApiMsg("Only platform admins can manage sessions"),
|
|
)
|
|
|
|
|
|
@router.get("")
|
|
@limiter.limit("30/minute")
|
|
def listSessions(
|
|
request: Request,
|
|
userId: str = Query(..., description="User ID whose sessions to list"),
|
|
currentUser: User = Depends(getCurrentUser),
|
|
) -> List[Dict[str, Any]]:
|
|
"""List active auth sessions for a user."""
|
|
_requireAdmin(currentUser)
|
|
rootInterface = getRootInterface()
|
|
|
|
tokens = rootInterface.db.getRecordset(
|
|
Token,
|
|
recordFilter={
|
|
"userId": userId,
|
|
"tokenPurpose": TokenPurpose.AUTH_SESSION.value,
|
|
"status": TokenStatus.ACTIVE.value,
|
|
},
|
|
)
|
|
|
|
now = getUtcTimestamp()
|
|
result = []
|
|
for t in tokens:
|
|
expiresAt = t.get("expiresAt", 0)
|
|
if expiresAt < now:
|
|
continue
|
|
result.append({
|
|
"sessionId": t.get("sessionId"),
|
|
"tokenId": t.get("id"),
|
|
"authority": t.get("authority"),
|
|
"createdAt": t.get("sysCreatedAt"),
|
|
"expiresAt": expiresAt,
|
|
})
|
|
|
|
return result
|
|
|
|
|
|
@router.delete("/{sessionId}")
|
|
@limiter.limit("30/minute")
|
|
def revokeSession(
|
|
request: Request,
|
|
sessionId: str,
|
|
currentUser: User = Depends(getCurrentUser),
|
|
) -> Dict[str, Any]:
|
|
"""Revoke a single session by sessionId."""
|
|
_requireAdmin(currentUser)
|
|
rootInterface = getRootInterface()
|
|
|
|
tokens = rootInterface.db.getRecordset(
|
|
Token,
|
|
recordFilter={"sessionId": sessionId, "tokenPurpose": TokenPurpose.AUTH_SESSION.value},
|
|
)
|
|
count = 0
|
|
for t in tokens:
|
|
rootInterface.db.recordDelete(Token, t["id"])
|
|
count += 1
|
|
|
|
if count == 0:
|
|
raise HTTPException(status_code=404, detail=routeApiMsg("Session not found"))
|
|
|
|
logger.info(f"Admin {currentUser.username} revoked session {sessionId} ({count} token(s))")
|
|
return {"revoked": count, "sessionId": sessionId}
|
|
|
|
|
|
@router.delete("")
|
|
@limiter.limit("10/minute")
|
|
def revokeAllSessions(
|
|
request: Request,
|
|
userId: str = Query(..., description="User ID whose sessions to revoke"),
|
|
currentUser: User = Depends(getCurrentUser),
|
|
) -> Dict[str, Any]:
|
|
"""Revoke ALL active sessions for a user (force logout everywhere)."""
|
|
_requireAdmin(currentUser)
|
|
rootInterface = getRootInterface()
|
|
|
|
tokens = rootInterface.db.getRecordset(
|
|
Token,
|
|
recordFilter={
|
|
"userId": userId,
|
|
"tokenPurpose": TokenPurpose.AUTH_SESSION.value,
|
|
},
|
|
)
|
|
count = 0
|
|
for t in tokens:
|
|
rootInterface.db.recordDelete(Token, t["id"])
|
|
count += 1
|
|
|
|
logger.info(f"Admin {currentUser.username} revoked all sessions for userId={userId} ({count} token(s))")
|
|
return {"revoked": count, "userId": userId}
|
|
|
|
|
|
# --- Trusted Devices ---
|
|
|
|
trustedDeviceRouter = APIRouter(
|
|
prefix="/api/admin/trusted-devices",
|
|
tags=["Admin Sessions"],
|
|
responses={404: {"description": "Not found"}},
|
|
)
|
|
|
|
|
|
@trustedDeviceRouter.get("")
|
|
@limiter.limit("30/minute")
|
|
def listTrustedDevices(
|
|
request: Request,
|
|
userId: str = Query(..., description="User ID whose trusted devices to list"),
|
|
currentUser: User = Depends(getCurrentUser),
|
|
) -> List[Dict[str, Any]]:
|
|
"""List trusted devices for a user."""
|
|
_requireAdmin(currentUser)
|
|
rootInterface = getRootInterface()
|
|
|
|
devices = rootInterface.db.getRecordset(
|
|
TrustedDevice, recordFilter={"userId": userId}
|
|
)
|
|
|
|
now = getUtcTimestamp()
|
|
result = []
|
|
for d in devices:
|
|
result.append({
|
|
"id": d.get("id", "")[:8] + "...",
|
|
"trustedUntil": d.get("trustedUntil"),
|
|
"isExpired": d.get("trustedUntil", 0) < now,
|
|
"userAgent": d.get("userAgent"),
|
|
"ipAddress": d.get("ipAddress"),
|
|
"createdAt": d.get("createdAt"),
|
|
})
|
|
|
|
return result
|
|
|
|
|
|
@trustedDeviceRouter.delete("")
|
|
@limiter.limit("10/minute")
|
|
def revokeAllTrustedDevices(
|
|
request: Request,
|
|
userId: str = Query(..., description="User ID whose trusted devices to revoke"),
|
|
currentUser: User = Depends(getCurrentUser),
|
|
) -> Dict[str, Any]:
|
|
"""Revoke ALL trusted devices for a user (force MFA on next login)."""
|
|
_requireAdmin(currentUser)
|
|
rootInterface = getRootInterface()
|
|
|
|
from modules.auth.trustedDeviceService import revokeTrustedDevices
|
|
count = revokeTrustedDevices(userId, rootInterface.db)
|
|
|
|
logger.info(f"Admin {currentUser.username} revoked all trusted devices for userId={userId} ({count})")
|
|
return {"revoked": count, "userId": userId}
|