platform-core/modules/routes/routeMfa.py
ValueOn AG 4a60086c80
Some checks failed
Deploy Plattform-Core (Int) / test (push) Failing after 15s
Deploy Plattform-Core (Int) / deploy (push) Has been skipped
cp adapted to 2026 poweron
2026-06-09 09:53:31 +02:00

278 lines
10 KiB
Python

# Copyright (c) 2026 PowerOn AG
# All rights reserved.
"""
Routes for TOTP-based Multi-Factor Authentication.
Endpoints:
GET /api/mfa/status - MFA status for the current user
POST /api/mfa/setup - Start MFA enrolment (returns provisioningUri)
POST /api/mfa/confirm - Confirm enrolment with first TOTP code
POST /api/mfa/verify - Verify TOTP during login (uses mfa_pending temp token)
POST /api/mfa/disable - Disable MFA for current user
"""
from datetime import timedelta
from typing import Dict, Any
from fastapi import APIRouter, HTTPException, status, Depends, Request, Response, Body
from jose import jwt, JWTError
import logging
import uuid
from modules.auth import getCurrentUser, limiter, SECRET_KEY, ALGORITHM
from modules.auth import (
createAccessToken,
createRefreshToken,
setAccessTokenCookie,
setRefreshTokenCookie,
)
from modules.auth.mfaService import (
generateSetup,
confirmSetup,
verifyCode,
isMfaRequired,
)
from modules.interfaces.interfaceDbApp import getRootInterface
from modules.datamodels.datamodelUam import User, UserInDB, AuthAuthority, Mandate
from modules.datamodels.datamodelSecurity import Token, TokenPurpose
from modules.shared.configuration import APP_CONFIG
from modules.shared.i18nRegistry import apiRouteContext
from datetime import datetime
routeApiMsg = apiRouteContext("routeMfa")
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/mfa", tags=["MFA"])
_MFA_PENDING_EXPIRE_MINUTES = 5
def createMfaPendingToken(userId: str, username: str, authority: str, sessionId: str) -> str:
"""Short-lived JWT that authorises only the /mfa/verify endpoint."""
payload = {
"sub": username,
"userId": userId,
"authenticationAuthority": authority,
"sid": sessionId,
"type": "mfa_pending",
"jti": str(uuid.uuid4()),
}
token, _ = createAccessToken(payload, expiresDelta=timedelta(minutes=_MFA_PENDING_EXPIRE_MINUTES))
return token
def _decodeMfaPendingToken(token: str) -> dict:
"""Decode and validate an mfa_pending token. Raises HTTPException on failure."""
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
except JWTError:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid or expired MFA token")
if payload.get("type") != "mfa_pending":
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token type")
return payload
@router.get("/status")
def mfaStatus(currentUser: User = Depends(getCurrentUser)) -> Dict[str, Any]:
rootInterface = getRootInterface()
userMandates = rootInterface.getUserMandates(str(currentUser.id))
mandateIds = [um.mandateId for um in (userMandates or []) if getattr(um, "enabled", True)]
mandates = []
for mid in mandateIds:
try:
recs = rootInterface.db.getRecordset(Mandate, recordFilter={"id": mid})
if recs:
mandates.append(Mandate.model_validate(dict(recs[0])))
except Exception:
pass
required = isMfaRequired(currentUser, userMandates=userMandates, mandates=mandates)
return {
"mfaEnabled": getattr(currentUser, "mfaEnabled", False),
"mfaRequired": required,
}
@router.post("/setup")
def mfaSetup(currentUser: User = Depends(getCurrentUser)) -> Dict[str, Any]:
"""Start MFA enrolment. Returns ``provisioningUri`` (otpauth:// for QR)."""
if getattr(currentUser, "mfaEnabled", False):
raise HTTPException(status_code=400, detail="MFA is already enabled")
result = generateSetup(userId=str(currentUser.id), username=currentUser.username)
rootInterface = getRootInterface()
rootInterface.updateUser(str(currentUser.id), {"mfaSecret": result["encryptedSecret"]})
return {"provisioningUri": result["provisioningUri"]}
@router.post("/confirm")
@limiter.limit("10/minute")
def mfaConfirm(
request: Request,
code: str = Body(...),
token: str = Body(None),
) -> Dict[str, Any]:
"""Confirm enrolment by verifying the first TOTP code.
Accepts either a valid session (cookie/header JWT) OR an ``mfa_pending``
token so that the forced-setup flow during login works without a session.
"""
rootInterface = getRootInterface()
if token:
payload = _decodeMfaPendingToken(token)
username = payload["sub"]
userId = payload["userId"]
else:
raise HTTPException(status_code=401, detail="MFA token required")
userRecord = rootInterface._getUserForAuthentication(username)
if not userRecord:
raise HTTPException(status_code=404, detail="User not found")
encryptedSecret = userRecord.get("mfaSecret")
if not encryptedSecret:
logger.warning("MFA confirm: no mfaSecret on user %s", username)
raise HTTPException(status_code=400, detail="MFA setup not started")
logger.info("MFA confirm: verifying code for user %s (secret present=%s, code=%s)", username, bool(encryptedSecret), code)
if not confirmSetup(encryptedSecret, code, userId=userId):
logger.warning("MFA confirm: TOTP code rejected for user %s", username)
raise HTTPException(status_code=400, detail="Invalid TOTP code")
rootInterface.updateUser(userId, {"mfaEnabled": True})
logger.info("MFA confirmed for user %s", username)
return {"mfaEnabled": True}
@router.post("/confirm-authenticated")
def mfaConfirmAuthenticated(
request: Request,
code: str = Body(..., embed=True),
currentUser: User = Depends(getCurrentUser),
) -> Dict[str, Any]:
"""Confirm MFA enrolment for an already authenticated user (Settings page)."""
rootInterface = getRootInterface()
userRecord = rootInterface._getUserForAuthentication(currentUser.username)
if not userRecord:
raise HTTPException(status_code=404, detail="User not found")
encryptedSecret = userRecord.get("mfaSecret")
if not encryptedSecret:
raise HTTPException(status_code=400, detail="MFA setup not started")
if not confirmSetup(encryptedSecret, code, userId=str(currentUser.id)):
raise HTTPException(status_code=400, detail="Invalid TOTP code")
rootInterface.updateUser(str(currentUser.id), {"mfaEnabled": True})
logger.info("MFA confirmed for user %s", currentUser.username)
return {"mfaEnabled": True}
@router.post("/verify")
@limiter.limit("10/minute")
def mfaVerify(
request: Request,
response: Response,
token: str = Body(...),
code: str = Body(...),
) -> Dict[str, Any]:
"""Verify TOTP code during login. Accepts the ``mfa_pending`` temp token
and the 6-digit code. On success, issues real session JWTs (cookies)."""
payload = _decodeMfaPendingToken(token)
userId = payload["userId"]
username = payload["sub"]
authority = payload.get("authenticationAuthority", AuthAuthority.LOCAL)
sessionId = payload.get("sid", str(uuid.uuid4()))
rootInterface = getRootInterface()
userRecord = rootInterface._getUserForAuthentication(username)
if not userRecord:
raise HTTPException(status_code=401, detail="User not found")
encryptedSecret = userRecord.get("mfaSecret")
if not encryptedSecret:
raise HTTPException(status_code=400, detail="MFA not configured for this user")
if not verifyCode(encryptedSecret, code, userId=userId):
raise HTTPException(status_code=401, detail="Invalid MFA code")
tokenData = {
"sub": username,
"userId": userId,
"authenticationAuthority": authority,
"sid": sessionId,
}
accessToken, accessExpires = createAccessToken(tokenData)
setAccessTokenCookie(response, accessToken)
refreshToken, _ = createRefreshToken(tokenData)
setRefreshTokenCookie(response, refreshToken)
jti = jwt.decode(accessToken, SECRET_KEY, algorithms=[ALGORITHM]).get("jti")
from modules.interfaces.interfaceDbApp import getInterface
user = User.model_validate(userRecord)
userInterface = getInterface(user)
dbToken = Token(
id=jti,
userId=userId,
authority=authority,
tokenPurpose=TokenPurpose.AUTH_SESSION,
tokenAccess=accessToken,
tokenType="bearer",
expiresAt=accessExpires.timestamp(),
sessionId=sessionId,
)
userInterface.saveAccessToken(dbToken)
logger.info("MFA verify successful for user %s", username)
try:
from modules.dbHelpers.auditLogger import audit_logger
audit_logger.logUserAccess(
userId=userId,
mandateId="system",
action="mfa_verify_success",
successInfo="totp_verified",
ipAddress=request.client.host if request.client else None,
userAgent=request.headers.get("user-agent"),
success=True,
)
except Exception:
pass
return {
"type": "local_auth_success",
"message": "MFA verification successful",
"authenticationAuthority": authority if isinstance(authority, str) else authority.value if hasattr(authority, "value") else str(authority),
"expires_at": accessExpires.isoformat(),
}
@router.post("/disable")
def mfaDisable(
currentUser: User = Depends(getCurrentUser),
code: str = Body(..., embed=True),
) -> Dict[str, Any]:
"""Disable MFA for the current user. Requires a valid TOTP code as confirmation."""
rootInterface = getRootInterface()
userRecord = rootInterface._getUserForAuthentication(currentUser.username)
if not userRecord:
raise HTTPException(status_code=404, detail="User not found")
encryptedSecret = userRecord.get("mfaSecret")
if not encryptedSecret or not getattr(currentUser, "mfaEnabled", False):
raise HTTPException(status_code=400, detail="MFA is not enabled")
if not verifyCode(encryptedSecret, code, userId=str(currentUser.id)):
raise HTTPException(status_code=400, detail="Invalid TOTP code")
rootInterface.updateUser(str(currentUser.id), {"mfaEnabled": False, "mfaSecret": None})
logger.info("MFA disabled for user %s", currentUser.username)
return {"mfaEnabled": False}