# Copyright (c) 2025 Patrick Motsch # All rights reserved. """ Routes for TOTP-based Multi-Factor Authentication. Endpoints: GET /api/mfa/status - MFA status for the current user POST /api/mfa/setup - Start MFA enrolment (returns provisioningUri) POST /api/mfa/confirm - Confirm enrolment with first TOTP code POST /api/mfa/verify - Verify TOTP during login (uses mfa_pending temp token) POST /api/mfa/disable - Disable MFA for current user """ from datetime import timedelta from typing import Dict, Any from fastapi import APIRouter, HTTPException, status, Depends, Request, Response, Body from jose import jwt, JWTError import logging import uuid from modules.auth import getCurrentUser, limiter, SECRET_KEY, ALGORITHM from modules.auth import ( createAccessToken, createRefreshToken, setAccessTokenCookie, setRefreshTokenCookie, ) from modules.auth.mfaService import ( generateSetup, confirmSetup, verifyCode, isMfaRequired, ) from modules.interfaces.interfaceDbApp import getRootInterface from modules.datamodels.datamodelUam import User, UserInDB, AuthAuthority, Mandate from modules.datamodels.datamodelSecurity import Token, TokenPurpose from modules.shared.configuration import APP_CONFIG from modules.shared.i18nRegistry import apiRouteContext from datetime import datetime routeApiMsg = apiRouteContext("routeMfa") logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/mfa", tags=["MFA"]) _MFA_PENDING_EXPIRE_MINUTES = 5 def _createMfaPendingToken(userId: str, username: str, authority: str, sessionId: str) -> str: """Short-lived JWT that authorises only the /mfa/verify endpoint.""" payload = { "sub": username, "userId": userId, "authenticationAuthority": authority, "sid": sessionId, "type": "mfa_pending", "jti": str(uuid.uuid4()), } token, _ = createAccessToken(payload, expiresDelta=timedelta(minutes=_MFA_PENDING_EXPIRE_MINUTES)) return token def _decodeMfaPendingToken(token: str) -> dict: """Decode and validate an mfa_pending token. Raises HTTPException on failure.""" try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) except JWTError: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid or expired MFA token") if payload.get("type") != "mfa_pending": raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token type") return payload @router.get("/status") def mfaStatus(currentUser: User = Depends(getCurrentUser)) -> Dict[str, Any]: rootInterface = getRootInterface() userMandates = rootInterface.getUserMandates(str(currentUser.id)) mandateIds = [um.mandateId for um in (userMandates or []) if getattr(um, "enabled", True)] mandates = [] for mid in mandateIds: try: recs = rootInterface.db.getRecordset(Mandate, recordFilter={"id": mid}) if recs: mandates.append(Mandate.model_validate(dict(recs[0]))) except Exception: pass required = isMfaRequired(currentUser, userMandates=userMandates, mandates=mandates) return { "mfaEnabled": getattr(currentUser, "mfaEnabled", False), "mfaRequired": required, } @router.post("/setup") def mfaSetup(currentUser: User = Depends(getCurrentUser)) -> Dict[str, Any]: """Start MFA enrolment. Returns ``provisioningUri`` (otpauth:// for QR).""" if getattr(currentUser, "mfaEnabled", False): raise HTTPException(status_code=400, detail="MFA is already enabled") result = generateSetup(userId=str(currentUser.id), username=currentUser.username) rootInterface = getRootInterface() rootInterface.updateUser(str(currentUser.id), {"mfaSecret": result["encryptedSecret"]}) return {"provisioningUri": result["provisioningUri"]} @router.post("/confirm") @limiter.limit("10/minute") def mfaConfirm( request: Request, code: str = Body(...), token: str = Body(None), ) -> Dict[str, Any]: """Confirm enrolment by verifying the first TOTP code. Accepts either a valid session (cookie/header JWT) OR an ``mfa_pending`` token so that the forced-setup flow during login works without a session. """ rootInterface = getRootInterface() if token: payload = _decodeMfaPendingToken(token) username = payload["sub"] userId = payload["userId"] else: raise HTTPException(status_code=401, detail="MFA token required") userRecord = rootInterface._getUserForAuthentication(username) if not userRecord: raise HTTPException(status_code=404, detail="User not found") encryptedSecret = userRecord.get("mfaSecret") if not encryptedSecret: logger.warning("MFA confirm: no mfaSecret on user %s", username) raise HTTPException(status_code=400, detail="MFA setup not started") logger.info("MFA confirm: verifying code for user %s (secret present=%s, code=%s)", username, bool(encryptedSecret), code) if not confirmSetup(encryptedSecret, code, userId=userId): logger.warning("MFA confirm: TOTP code rejected for user %s", username) raise HTTPException(status_code=400, detail="Invalid TOTP code") rootInterface.updateUser(userId, {"mfaEnabled": True}) logger.info("MFA confirmed for user %s", username) return {"mfaEnabled": True} @router.post("/confirm-authenticated") def mfaConfirmAuthenticated( request: Request, code: str = Body(..., embed=True), currentUser: User = Depends(getCurrentUser), ) -> Dict[str, Any]: """Confirm MFA enrolment for an already authenticated user (Settings page).""" rootInterface = getRootInterface() userRecord = rootInterface._getUserForAuthentication(currentUser.username) if not userRecord: raise HTTPException(status_code=404, detail="User not found") encryptedSecret = userRecord.get("mfaSecret") if not encryptedSecret: raise HTTPException(status_code=400, detail="MFA setup not started") if not confirmSetup(encryptedSecret, code, userId=str(currentUser.id)): raise HTTPException(status_code=400, detail="Invalid TOTP code") rootInterface.updateUser(str(currentUser.id), {"mfaEnabled": True}) logger.info("MFA confirmed for user %s", currentUser.username) return {"mfaEnabled": True} @router.post("/verify") @limiter.limit("10/minute") def mfaVerify( request: Request, response: Response, token: str = Body(...), code: str = Body(...), ) -> Dict[str, Any]: """Verify TOTP code during login. Accepts the ``mfa_pending`` temp token and the 6-digit code. On success, issues real session JWTs (cookies).""" payload = _decodeMfaPendingToken(token) userId = payload["userId"] username = payload["sub"] authority = payload.get("authenticationAuthority", AuthAuthority.LOCAL) sessionId = payload.get("sid", str(uuid.uuid4())) rootInterface = getRootInterface() userRecord = rootInterface._getUserForAuthentication(username) if not userRecord: raise HTTPException(status_code=401, detail="User not found") encryptedSecret = userRecord.get("mfaSecret") if not encryptedSecret: raise HTTPException(status_code=400, detail="MFA not configured for this user") if not verifyCode(encryptedSecret, code, userId=userId): raise HTTPException(status_code=401, detail="Invalid MFA code") tokenData = { "sub": username, "userId": userId, "authenticationAuthority": authority, "sid": sessionId, } accessToken, accessExpires = createAccessToken(tokenData) setAccessTokenCookie(response, accessToken) refreshToken, _ = createRefreshToken(tokenData) setRefreshTokenCookie(response, refreshToken) jti = jwt.decode(accessToken, SECRET_KEY, algorithms=[ALGORITHM]).get("jti") from modules.interfaces.interfaceDbApp import getInterface user = User.model_validate(userRecord) userInterface = getInterface(user) dbToken = Token( id=jti, userId=userId, authority=authority, tokenPurpose=TokenPurpose.AUTH_SESSION, tokenAccess=accessToken, tokenType="bearer", expiresAt=accessExpires.timestamp(), sessionId=sessionId, ) userInterface.saveAccessToken(dbToken) logger.info("MFA verify successful for user %s", username) try: from modules.shared.auditLogger import audit_logger audit_logger.logUserAccess( userId=userId, mandateId="system", action="mfa_verify_success", successInfo="totp_verified", ipAddress=request.client.host if request.client else None, userAgent=request.headers.get("user-agent"), success=True, ) except Exception: pass return { "type": "local_auth_success", "message": "MFA verification successful", "authenticationAuthority": authority if isinstance(authority, str) else authority.value if hasattr(authority, "value") else str(authority), "expires_at": accessExpires.isoformat(), } @router.post("/disable") def mfaDisable( currentUser: User = Depends(getCurrentUser), code: str = Body(..., embed=True), ) -> Dict[str, Any]: """Disable MFA for the current user. Requires a valid TOTP code as confirmation.""" rootInterface = getRootInterface() userRecord = rootInterface._getUserForAuthentication(currentUser.username) if not userRecord: raise HTTPException(status_code=404, detail="User not found") encryptedSecret = userRecord.get("mfaSecret") if not encryptedSecret or not getattr(currentUser, "mfaEnabled", False): raise HTTPException(status_code=400, detail="MFA is not enabled") if not verifyCode(encryptedSecret, code, userId=str(currentUser.id)): raise HTTPException(status_code=400, detail="Invalid TOTP code") rootInterface.updateUser(str(currentUser.id), {"mfaEnabled": False, "mfaSecret": None}) logger.info("MFA disabled for user %s", currentUser.username) return {"mfaEnabled": False}