Merge pull request 'security and mfa' (#6) from int into main
Reviewed-on: #6
This commit is contained in:
commit
2510493891
16 changed files with 809 additions and 10 deletions
3
app.py
3
app.py
|
|
@ -659,6 +659,9 @@ app.include_router(tableViewsRouter)
|
|||
from modules.routes.routeSecurityLocal import router as localRouter
|
||||
app.include_router(localRouter)
|
||||
|
||||
from modules.routes.routeMfa import router as mfaRouter
|
||||
app.include_router(mfaRouter)
|
||||
|
||||
from modules.routes.routeSecurityMsft import router as msftRouter
|
||||
app.include_router(msftRouter)
|
||||
|
||||
|
|
|
|||
|
|
@ -18,6 +18,9 @@ DB_PORT=5432
|
|||
APP_JWT_KEY_SECRET = DEV_ENC:Z0FBQUFBQm8xSUpERjlrSktmZHVuQnJ1VVJDdndLaUcxZGJsT2ZlUFRlcFdOZ001RnlzM2FhLWhRV2tjWWFhaWQwQ3hkcUFvbThMcndxSjFpYTdfRV9OZGhTcksxbXFTZWg5MDZvOHpCVXBHcDJYaHlJM0tyNWRZckZsVHpQcmxTZHJoZUs1M3lfU2ljRnJaTmNSQ0w0X085OXI0QW80M2xfQnJqZmZ6VEh3TUltX0xzeE42SGtZPQ==
|
||||
APP_TOKEN_EXPIRY=300
|
||||
|
||||
# MFA Configuration
|
||||
MFA_REQUIRE_ADMINS = False
|
||||
|
||||
# CORS Configuration
|
||||
APP_ALLOWED_ORIGINS=http://localhost:8080,http://localhost:5176,https://nyla.poweron.swiss,https://nyla-int.poweron.swiss,https://nyla.poweron-center.net,https://nyla-int.poweron-center.net
|
||||
|
||||
|
|
|
|||
|
|
@ -20,6 +20,9 @@ DB_PORT=5432
|
|||
APP_JWT_KEY_SECRET = INT_ENC:Z0FBQUFBQm8xSVRjNUctb2RwU25iR3ZnanBOdHZhWUtIajZ1RnZzTEp4aDR0MktWRjNoeVBrY1Npd1R0VE9YVHp3M2w1cXRzbUxNaU82QUJvaDNFeVQyN05KblRWblBvbWtoT0VXbkNBbDQ5OHhwSUFnaDZGRG10Vmgtdm1YUkRsYUhFMzRVZURmSFlDTFIzVWg4MXNueDZyMGc5aVpFdWRxY3dkTExGM093ZTVUZVl5LUhGWnlRPQ==
|
||||
APP_TOKEN_EXPIRY=300
|
||||
|
||||
# MFA Configuration
|
||||
MFA_REQUIRE_ADMINS = True
|
||||
|
||||
# CORS Configuration
|
||||
APP_ALLOWED_ORIGINS=http://localhost:8080,http://localhost:5176,https://porta.poweron.swiss,https://porta-int.poweron.swiss,https://nyla.poweron.swiss,https://nyla-int.poweron.swiss,https://nyla.poweron-center.net,https://nyla-int.poweron-center.net
|
||||
|
||||
|
|
|
|||
|
|
@ -18,6 +18,9 @@ DB_PORT=5432
|
|||
APP_JWT_KEY_SECRET = PROD_ENC:Z0FBQUFBQnBDM1Z3elhfV0Rnd2pQRjlMdkVwX1FnSmRhSzNZUlV5SVpaWXBNX1hpa2xPZGdMSWpnN2ZINHQxeGZnNHJweU5pZjlyYlY5Qm9zOUZEbl9wUEgtZHZXd1NhR19JSG9kbFU4MnFGQnllbFhRQVphRGQyNHlFVWR5VHQyUUpqN0stUmRuY2QyTi1oalczRHpLTEJqWURjZWs4YjZvT2U5YnFqcXEwdEpxV05fX05QMmtrPQ==
|
||||
APP_TOKEN_EXPIRY=300
|
||||
|
||||
# MFA Configuration
|
||||
MFA_REQUIRE_ADMINS = True
|
||||
|
||||
# CORS Configuration
|
||||
APP_ALLOWED_ORIGINS=http://localhost:8080,http://localhost:5176,https://porta.poweron.swiss,https://porta-int.poweron.swiss,https://nyla.poweron.swiss,https://nyla-int.poweron.swiss,https://nyla.poweron-center.net,https://nyla-int.poweron-center.net
|
||||
|
||||
|
|
|
|||
132
modules/auth/mfaService.py
Normal file
132
modules/auth/mfaService.py
Normal file
|
|
@ -0,0 +1,132 @@
|
|||
# Copyright (c) 2025 Patrick Motsch
|
||||
# All rights reserved.
|
||||
"""
|
||||
MFA (Multi-Factor Authentication) Service.
|
||||
|
||||
TOTP-based MFA using pyotp. Secrets are encrypted at rest via
|
||||
encryptValue/decryptValue from the configuration module.
|
||||
|
||||
MFA obligation is resolved by three OR-linked rules:
|
||||
1. Any mandate the user belongs to has ``mfaRequired=True``.
|
||||
2. User is sysAdmin OR platformAdmin AND config key ``MFA_REQUIRE_ADMINS``
|
||||
is truthy.
|
||||
3. User has opted in (``mfaEnabled=True`` without any mandate/admin rule).
|
||||
"""
|
||||
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
import pyotp
|
||||
|
||||
from modules.shared.configuration import APP_CONFIG, encryptValue, decryptValue
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_MFA_DIGITS = 6
|
||||
_MFA_INTERVAL = 30
|
||||
_MFA_VALID_WINDOW = 1
|
||||
|
||||
|
||||
def _getMfaIssuer() -> str:
|
||||
"""Build the TOTP issuer name, e.g. 'PowerOn' or 'PowerOn (Dev)'."""
|
||||
envType = (APP_CONFIG.get("APP_ENV_TYPE") or "").strip().lower()
|
||||
if envType in ("prod", ""):
|
||||
return "PowerOn"
|
||||
return f"PowerOn ({envType.upper()})"
|
||||
|
||||
|
||||
def _generateSecret() -> str:
|
||||
"""Generate a fresh base32-encoded TOTP secret."""
|
||||
return pyotp.random_base32()
|
||||
|
||||
|
||||
def _encryptSecret(plainSecret: str, userId: str = "system") -> str:
|
||||
return encryptValue(plainSecret, userId=userId, keyName="mfa_secret")
|
||||
|
||||
|
||||
def _decryptSecret(encryptedSecret: str, userId: str = "system") -> str:
|
||||
return decryptValue(encryptedSecret, userId=userId, keyName="mfa_secret")
|
||||
|
||||
|
||||
def _buildTotp(plainSecret: str) -> pyotp.TOTP:
|
||||
return pyotp.TOTP(plainSecret, digits=_MFA_DIGITS, interval=_MFA_INTERVAL)
|
||||
|
||||
|
||||
def generateSetup(userId: str, username: str) -> dict:
|
||||
"""Start MFA enrolment: return secret + provisioning URI (for QR code).
|
||||
|
||||
Returns dict with keys ``secret`` (encrypted for DB storage) and
|
||||
``provisioningUri`` (otpauth:// URI the frontend renders as QR).
|
||||
The plaintext secret is NOT returned -- the URI already contains it.
|
||||
"""
|
||||
plain = _generateSecret()
|
||||
encrypted = _encryptSecret(plain, userId=userId)
|
||||
totp = _buildTotp(plain)
|
||||
uri = totp.provisioning_uri(name=username, issuer_name=_getMfaIssuer())
|
||||
return {
|
||||
"encryptedSecret": encrypted,
|
||||
"provisioningUri": uri,
|
||||
}
|
||||
|
||||
|
||||
def confirmSetup(encryptedSecret: str, code: str, userId: str = "system") -> bool:
|
||||
"""Verify a TOTP code against an encrypted secret (enrolment confirmation)."""
|
||||
try:
|
||||
plain = _decryptSecret(encryptedSecret, userId=userId)
|
||||
totp = _buildTotp(plain)
|
||||
return totp.verify(code, valid_window=_MFA_VALID_WINDOW)
|
||||
except Exception:
|
||||
logger.exception("MFA confirmSetup failed for userId=%s", userId)
|
||||
return False
|
||||
|
||||
|
||||
def verifyCode(encryptedSecret: str, code: str, userId: str = "system") -> bool:
|
||||
"""Verify a TOTP code during login."""
|
||||
try:
|
||||
plain = _decryptSecret(encryptedSecret, userId=userId)
|
||||
totp = _buildTotp(plain)
|
||||
return totp.verify(code, valid_window=_MFA_VALID_WINDOW)
|
||||
except Exception:
|
||||
logger.exception("MFA verifyCode failed for userId=%s", userId)
|
||||
return False
|
||||
|
||||
|
||||
def _isMfaRequireAdminsEnabled() -> bool:
|
||||
"""Read ``MFA_REQUIRE_ADMINS`` from config / env."""
|
||||
raw = (APP_CONFIG.get("MFA_REQUIRE_ADMINS") or "").strip().lower()
|
||||
return raw in ("1", "true", "yes")
|
||||
|
||||
|
||||
def isMfaRequired(user, userMandates=None, mandates=None) -> bool:
|
||||
"""Resolve whether MFA is mandatory for *user*.
|
||||
|
||||
Rules (OR):
|
||||
1. At least one of the user's mandates has ``mfaRequired=True``.
|
||||
2. User is sysAdmin or platformAdmin AND ``MFA_REQUIRE_ADMINS`` config
|
||||
key is truthy.
|
||||
3. User already opted in (``mfaEnabled=True``).
|
||||
|
||||
Parameters
|
||||
----------
|
||||
user : User | UserInDB
|
||||
The user object.
|
||||
userMandates : list | None
|
||||
List of UserMandate records for the user (each has ``mandateId``).
|
||||
mandates : list | None
|
||||
List of Mandate objects the user has access to. If provided directly
|
||||
this avoids a second lookup.
|
||||
"""
|
||||
if getattr(user, "mfaEnabled", False):
|
||||
return True
|
||||
|
||||
isSys = getattr(user, "isSysAdmin", False)
|
||||
isPlat = getattr(user, "isPlatformAdmin", False)
|
||||
if (isSys or isPlat) and _isMfaRequireAdminsEnabled():
|
||||
return True
|
||||
|
||||
if mandates:
|
||||
for m in mandates:
|
||||
if getattr(m, "mfaRequired", False):
|
||||
return True
|
||||
|
||||
return False
|
||||
|
|
@ -197,6 +197,26 @@ class Mandate(PowerOnModel):
|
|||
# `customer.email`, `customer.tax_id_data` mappen kann
|
||||
# (Stripe verlangt die Adresse strukturiert, nicht als Freitext).
|
||||
# ``order`` 200-209 gruppiert die Felder visuell am Ende des Formulars.
|
||||
mfaRequired: bool = Field(
|
||||
default=False,
|
||||
description="When true, all users with access to this mandate must have MFA enabled.",
|
||||
json_schema_extra={
|
||||
"frontend_type": "checkbox",
|
||||
"frontend_readonly": False,
|
||||
"frontend_required": False,
|
||||
"label": "MFA-Pflicht",
|
||||
"frontend_format_labels": ["Ja", "-", "Nein"],
|
||||
"order": 190,
|
||||
},
|
||||
)
|
||||
|
||||
@field_validator("mfaRequired", mode="before")
|
||||
@classmethod
|
||||
def _coerceMfaRequired(cls, v):
|
||||
if v is None:
|
||||
return False
|
||||
return v
|
||||
|
||||
invoiceCompanyName: Optional[str] = Field(
|
||||
default=None,
|
||||
description="Firmenname / Empfaenger der Rechnung (falls abweichend vom Voller Name).",
|
||||
|
|
@ -623,6 +643,25 @@ class User(PowerOnModel):
|
|||
return v
|
||||
|
||||
|
||||
mfaEnabled: bool = Field(
|
||||
default=False,
|
||||
description="Whether the user has completed MFA setup and has TOTP active.",
|
||||
json_schema_extra={
|
||||
"frontend_type": "checkbox",
|
||||
"frontend_readonly": True,
|
||||
"frontend_required": False,
|
||||
"label": "MFA aktiv",
|
||||
"frontend_format_labels": ["Ja", "-", "Nein"],
|
||||
},
|
||||
)
|
||||
|
||||
@field_validator("mfaEnabled", mode="before")
|
||||
@classmethod
|
||||
def _coerceMfaEnabled(cls, v):
|
||||
if v is None:
|
||||
return False
|
||||
return v
|
||||
|
||||
authenticationAuthority: AuthAuthority = Field(
|
||||
default=AuthAuthority.LOCAL,
|
||||
description="Primary authentication authority",
|
||||
|
|
@ -655,6 +694,11 @@ class UserInDB(User):
|
|||
description="Hash of the user password",
|
||||
json_schema_extra={"label": "Passwort-Hash"},
|
||||
)
|
||||
mfaSecret: Optional[str] = Field(
|
||||
None,
|
||||
description="Encrypted TOTP secret for MFA. Stored via encryptValue/decryptValue.",
|
||||
json_schema_extra={"label": "MFA-Secret", "frontend_visible": False},
|
||||
)
|
||||
resetToken: Optional[str] = Field(
|
||||
None,
|
||||
description="Password reset token (UUID)",
|
||||
|
|
|
|||
|
|
@ -813,11 +813,13 @@ class AppObjects:
|
|||
updateDict.pop(field, None)
|
||||
|
||||
# Update user data using model
|
||||
updatedData = user.model_dump()
|
||||
# Use UserInDB so that DB-only fields (mfaSecret, hashedPassword, etc.)
|
||||
# are preserved through the round-trip instead of being silently dropped.
|
||||
existingRecord = self._getUserForAuthentication(user.username)
|
||||
updatedData = dict(existingRecord) if existingRecord else user.model_dump()
|
||||
updatedData.update(updateDict)
|
||||
# Ensure ID matches userId parameter
|
||||
updatedData["id"] = userId
|
||||
updatedUser = User(**updatedData)
|
||||
updatedUser = UserInDB(**updatedData)
|
||||
|
||||
# Update user record
|
||||
self.db.recordModify(UserInDB, userId, updatedUser)
|
||||
|
|
|
|||
278
modules/routes/routeMfa.py
Normal file
278
modules/routes/routeMfa.py
Normal file
|
|
@ -0,0 +1,278 @@
|
|||
# Copyright (c) 2025 Patrick Motsch
|
||||
# All rights reserved.
|
||||
"""
|
||||
Routes for TOTP-based Multi-Factor Authentication.
|
||||
|
||||
Endpoints:
|
||||
GET /api/mfa/status - MFA status for the current user
|
||||
POST /api/mfa/setup - Start MFA enrolment (returns provisioningUri)
|
||||
POST /api/mfa/confirm - Confirm enrolment with first TOTP code
|
||||
POST /api/mfa/verify - Verify TOTP during login (uses mfa_pending temp token)
|
||||
POST /api/mfa/disable - Disable MFA for current user
|
||||
"""
|
||||
|
||||
from datetime import timedelta
|
||||
from typing import Dict, Any
|
||||
|
||||
from fastapi import APIRouter, HTTPException, status, Depends, Request, Response, Body
|
||||
from jose import jwt, JWTError
|
||||
import logging
|
||||
import uuid
|
||||
|
||||
from modules.auth import getCurrentUser, limiter, SECRET_KEY, ALGORITHM
|
||||
from modules.auth import (
|
||||
createAccessToken,
|
||||
createRefreshToken,
|
||||
setAccessTokenCookie,
|
||||
setRefreshTokenCookie,
|
||||
)
|
||||
from modules.auth.mfaService import (
|
||||
generateSetup,
|
||||
confirmSetup,
|
||||
verifyCode,
|
||||
isMfaRequired,
|
||||
)
|
||||
from modules.interfaces.interfaceDbApp import getRootInterface
|
||||
from modules.datamodels.datamodelUam import User, UserInDB, AuthAuthority, Mandate
|
||||
from modules.datamodels.datamodelSecurity import Token, TokenPurpose
|
||||
from modules.shared.configuration import APP_CONFIG
|
||||
from modules.shared.i18nRegistry import apiRouteContext
|
||||
from datetime import datetime
|
||||
|
||||
routeApiMsg = apiRouteContext("routeMfa")
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
router = APIRouter(prefix="/api/mfa", tags=["MFA"])
|
||||
|
||||
_MFA_PENDING_EXPIRE_MINUTES = 5
|
||||
|
||||
|
||||
def _createMfaPendingToken(userId: str, username: str, authority: str, sessionId: str) -> str:
|
||||
"""Short-lived JWT that authorises only the /mfa/verify endpoint."""
|
||||
payload = {
|
||||
"sub": username,
|
||||
"userId": userId,
|
||||
"authenticationAuthority": authority,
|
||||
"sid": sessionId,
|
||||
"type": "mfa_pending",
|
||||
"jti": str(uuid.uuid4()),
|
||||
}
|
||||
token, _ = createAccessToken(payload, expiresDelta=timedelta(minutes=_MFA_PENDING_EXPIRE_MINUTES))
|
||||
return token
|
||||
|
||||
|
||||
def _decodeMfaPendingToken(token: str) -> dict:
|
||||
"""Decode and validate an mfa_pending token. Raises HTTPException on failure."""
|
||||
try:
|
||||
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
|
||||
except JWTError:
|
||||
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid or expired MFA token")
|
||||
if payload.get("type") != "mfa_pending":
|
||||
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token type")
|
||||
return payload
|
||||
|
||||
|
||||
@router.get("/status")
|
||||
def mfaStatus(currentUser: User = Depends(getCurrentUser)) -> Dict[str, Any]:
|
||||
rootInterface = getRootInterface()
|
||||
userMandates = rootInterface.getUserMandates(str(currentUser.id))
|
||||
mandateIds = [um.mandateId for um in (userMandates or []) if getattr(um, "enabled", True)]
|
||||
mandates = []
|
||||
for mid in mandateIds:
|
||||
try:
|
||||
recs = rootInterface.db.getRecordset(Mandate, recordFilter={"id": mid})
|
||||
if recs:
|
||||
mandates.append(Mandate.model_validate(dict(recs[0])))
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
required = isMfaRequired(currentUser, userMandates=userMandates, mandates=mandates)
|
||||
return {
|
||||
"mfaEnabled": getattr(currentUser, "mfaEnabled", False),
|
||||
"mfaRequired": required,
|
||||
}
|
||||
|
||||
|
||||
@router.post("/setup")
|
||||
def mfaSetup(currentUser: User = Depends(getCurrentUser)) -> Dict[str, Any]:
|
||||
"""Start MFA enrolment. Returns ``provisioningUri`` (otpauth:// for QR)."""
|
||||
if getattr(currentUser, "mfaEnabled", False):
|
||||
raise HTTPException(status_code=400, detail="MFA is already enabled")
|
||||
|
||||
result = generateSetup(userId=str(currentUser.id), username=currentUser.username)
|
||||
|
||||
rootInterface = getRootInterface()
|
||||
rootInterface.updateUser(str(currentUser.id), {"mfaSecret": result["encryptedSecret"]})
|
||||
|
||||
return {"provisioningUri": result["provisioningUri"]}
|
||||
|
||||
|
||||
@router.post("/confirm")
|
||||
@limiter.limit("10/minute")
|
||||
def mfaConfirm(
|
||||
request: Request,
|
||||
code: str = Body(...),
|
||||
token: str = Body(None),
|
||||
) -> Dict[str, Any]:
|
||||
"""Confirm enrolment by verifying the first TOTP code.
|
||||
|
||||
Accepts either a valid session (cookie/header JWT) OR an ``mfa_pending``
|
||||
token so that the forced-setup flow during login works without a session.
|
||||
"""
|
||||
rootInterface = getRootInterface()
|
||||
|
||||
if token:
|
||||
payload = _decodeMfaPendingToken(token)
|
||||
username = payload["sub"]
|
||||
userId = payload["userId"]
|
||||
else:
|
||||
raise HTTPException(status_code=401, detail="MFA token required")
|
||||
|
||||
userRecord = rootInterface._getUserForAuthentication(username)
|
||||
if not userRecord:
|
||||
raise HTTPException(status_code=404, detail="User not found")
|
||||
|
||||
encryptedSecret = userRecord.get("mfaSecret")
|
||||
if not encryptedSecret:
|
||||
logger.warning("MFA confirm: no mfaSecret on user %s", username)
|
||||
raise HTTPException(status_code=400, detail="MFA setup not started")
|
||||
|
||||
logger.info("MFA confirm: verifying code for user %s (secret present=%s, code=%s)", username, bool(encryptedSecret), code)
|
||||
if not confirmSetup(encryptedSecret, code, userId=userId):
|
||||
logger.warning("MFA confirm: TOTP code rejected for user %s", username)
|
||||
raise HTTPException(status_code=400, detail="Invalid TOTP code")
|
||||
|
||||
rootInterface.updateUser(userId, {"mfaEnabled": True})
|
||||
logger.info("MFA confirmed for user %s", username)
|
||||
|
||||
return {"mfaEnabled": True}
|
||||
|
||||
|
||||
@router.post("/confirm-authenticated")
|
||||
def mfaConfirmAuthenticated(
|
||||
request: Request,
|
||||
code: str = Body(..., embed=True),
|
||||
currentUser: User = Depends(getCurrentUser),
|
||||
) -> Dict[str, Any]:
|
||||
"""Confirm MFA enrolment for an already authenticated user (Settings page)."""
|
||||
rootInterface = getRootInterface()
|
||||
userRecord = rootInterface._getUserForAuthentication(currentUser.username)
|
||||
if not userRecord:
|
||||
raise HTTPException(status_code=404, detail="User not found")
|
||||
|
||||
encryptedSecret = userRecord.get("mfaSecret")
|
||||
if not encryptedSecret:
|
||||
raise HTTPException(status_code=400, detail="MFA setup not started")
|
||||
|
||||
if not confirmSetup(encryptedSecret, code, userId=str(currentUser.id)):
|
||||
raise HTTPException(status_code=400, detail="Invalid TOTP code")
|
||||
|
||||
rootInterface.updateUser(str(currentUser.id), {"mfaEnabled": True})
|
||||
logger.info("MFA confirmed for user %s", currentUser.username)
|
||||
|
||||
return {"mfaEnabled": True}
|
||||
|
||||
|
||||
@router.post("/verify")
|
||||
@limiter.limit("10/minute")
|
||||
def mfaVerify(
|
||||
request: Request,
|
||||
response: Response,
|
||||
token: str = Body(...),
|
||||
code: str = Body(...),
|
||||
) -> Dict[str, Any]:
|
||||
"""Verify TOTP code during login. Accepts the ``mfa_pending`` temp token
|
||||
and the 6-digit code. On success, issues real session JWTs (cookies)."""
|
||||
payload = _decodeMfaPendingToken(token)
|
||||
userId = payload["userId"]
|
||||
username = payload["sub"]
|
||||
authority = payload.get("authenticationAuthority", AuthAuthority.LOCAL)
|
||||
sessionId = payload.get("sid", str(uuid.uuid4()))
|
||||
|
||||
rootInterface = getRootInterface()
|
||||
userRecord = rootInterface._getUserForAuthentication(username)
|
||||
if not userRecord:
|
||||
raise HTTPException(status_code=401, detail="User not found")
|
||||
|
||||
encryptedSecret = userRecord.get("mfaSecret")
|
||||
if not encryptedSecret:
|
||||
raise HTTPException(status_code=400, detail="MFA not configured for this user")
|
||||
|
||||
if not verifyCode(encryptedSecret, code, userId=userId):
|
||||
raise HTTPException(status_code=401, detail="Invalid MFA code")
|
||||
|
||||
tokenData = {
|
||||
"sub": username,
|
||||
"userId": userId,
|
||||
"authenticationAuthority": authority,
|
||||
"sid": sessionId,
|
||||
}
|
||||
accessToken, accessExpires = createAccessToken(tokenData)
|
||||
setAccessTokenCookie(response, accessToken)
|
||||
|
||||
refreshToken, _ = createRefreshToken(tokenData)
|
||||
setRefreshTokenCookie(response, refreshToken)
|
||||
|
||||
jti = jwt.decode(accessToken, SECRET_KEY, algorithms=[ALGORITHM]).get("jti")
|
||||
|
||||
from modules.interfaces.interfaceDbApp import getInterface
|
||||
user = User.model_validate(userRecord)
|
||||
userInterface = getInterface(user)
|
||||
dbToken = Token(
|
||||
id=jti,
|
||||
userId=userId,
|
||||
authority=authority,
|
||||
tokenPurpose=TokenPurpose.AUTH_SESSION,
|
||||
tokenAccess=accessToken,
|
||||
tokenType="bearer",
|
||||
expiresAt=accessExpires.timestamp(),
|
||||
sessionId=sessionId,
|
||||
)
|
||||
userInterface.saveAccessToken(dbToken)
|
||||
|
||||
logger.info("MFA verify successful for user %s", username)
|
||||
|
||||
try:
|
||||
from modules.shared.auditLogger import audit_logger
|
||||
audit_logger.logUserAccess(
|
||||
userId=userId,
|
||||
mandateId="system",
|
||||
action="mfa_verify_success",
|
||||
successInfo="totp_verified",
|
||||
ipAddress=request.client.host if request.client else None,
|
||||
userAgent=request.headers.get("user-agent"),
|
||||
success=True,
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return {
|
||||
"type": "local_auth_success",
|
||||
"message": "MFA verification successful",
|
||||
"authenticationAuthority": authority if isinstance(authority, str) else authority.value if hasattr(authority, "value") else str(authority),
|
||||
"expires_at": accessExpires.isoformat(),
|
||||
}
|
||||
|
||||
|
||||
@router.post("/disable")
|
||||
def mfaDisable(
|
||||
currentUser: User = Depends(getCurrentUser),
|
||||
code: str = Body(..., embed=True),
|
||||
) -> Dict[str, Any]:
|
||||
"""Disable MFA for the current user. Requires a valid TOTP code as confirmation."""
|
||||
rootInterface = getRootInterface()
|
||||
userRecord = rootInterface._getUserForAuthentication(currentUser.username)
|
||||
if not userRecord:
|
||||
raise HTTPException(status_code=404, detail="User not found")
|
||||
|
||||
encryptedSecret = userRecord.get("mfaSecret")
|
||||
if not encryptedSecret or not getattr(currentUser, "mfaEnabled", False):
|
||||
raise HTTPException(status_code=400, detail="MFA is not enabled")
|
||||
|
||||
if not verifyCode(encryptedSecret, code, userId=str(currentUser.id)):
|
||||
raise HTTPException(status_code=400, detail="Invalid TOTP code")
|
||||
|
||||
rootInterface.updateUser(str(currentUser.id), {"mfaEnabled": False, "mfaSecret": None})
|
||||
logger.info("MFA disabled for user %s", currentUser.username)
|
||||
|
||||
return {"mfaEnabled": False}
|
||||
|
|
@ -19,7 +19,7 @@ from jose import JWTError
|
|||
|
||||
from modules.shared.configuration import APP_CONFIG
|
||||
from modules.interfaces.interfaceDbApp import getInterface, getRootInterface
|
||||
from modules.datamodels.datamodelUam import AuthAuthority, User, ConnectionStatus, UserConnection
|
||||
from modules.datamodels.datamodelUam import AuthAuthority, User, Mandate, ConnectionStatus, UserConnection
|
||||
from modules.datamodels.datamodelSecurity import Token, TokenPurpose
|
||||
from modules.auth import getCurrentUser, limiter, SECRET_KEY, ALGORITHM
|
||||
from modules.auth.oauthConnectTicket import resolve_connect_context
|
||||
|
|
@ -219,6 +219,65 @@ async def auth_login_callback(
|
|||
)
|
||||
isNewUser = True
|
||||
|
||||
# --- MFA gate --------------------------------------------------------
|
||||
from modules.auth.mfaService import isMfaRequired as _isMfaRequired
|
||||
from modules.routes.routeMfa import _createMfaPendingToken
|
||||
|
||||
userRecord = rootInterface._getUserForAuthentication(user.username)
|
||||
userMandates = rootInterface.getUserMandates(str(user.id))
|
||||
_mandateIds = [um.mandateId for um in (userMandates or []) if getattr(um, "enabled", True)]
|
||||
_mandateObjs = []
|
||||
for _mid in _mandateIds:
|
||||
try:
|
||||
_recs = rootInterface.db.getRecordset(Mandate, recordFilter={"id": _mid})
|
||||
if _recs:
|
||||
_mandateObjs.append(Mandate.model_validate(dict(_recs[0])))
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
mfaRequired = _isMfaRequired(user, userMandates=userMandates, mandates=_mandateObjs)
|
||||
hasMfaSetup = bool(userRecord and userRecord.get("mfaSecret") and getattr(user, "mfaEnabled", False))
|
||||
|
||||
if mfaRequired or hasMfaSetup:
|
||||
import uuid as _uuid
|
||||
_sid = str(_uuid.uuid4())
|
||||
pendingToken = _createMfaPendingToken(
|
||||
userId=str(user.id),
|
||||
username=user.username,
|
||||
authority=AuthAuthority.GOOGLE.value,
|
||||
sessionId=_sid,
|
||||
)
|
||||
if hasMfaSetup:
|
||||
mfaType = "mfa_required"
|
||||
extraFields = ""
|
||||
else:
|
||||
mfaType = "mfa_setup_required"
|
||||
from modules.auth.mfaService import generateSetup as _generateSetup
|
||||
existingSecret = userRecord.get("mfaSecret") if userRecord else None
|
||||
if existingSecret:
|
||||
from modules.auth.mfaService import _decryptSecret, _buildTotp, _getMfaIssuer
|
||||
_plain = _decryptSecret(existingSecret, userId=str(user.id))
|
||||
_uri = _buildTotp(_plain).provisioning_uri(name=user.username, issuer_name=_getMfaIssuer())
|
||||
setupResult = {"provisioningUri": _uri}
|
||||
else:
|
||||
setupResult = _generateSetup(userId=str(user.id), username=user.username)
|
||||
rootInterface.updateUser(str(user.id), {"mfaSecret": setupResult["encryptedSecret"]})
|
||||
extraFields = f", provisioningUri: {json.dumps(setupResult['provisioningUri'])}"
|
||||
return HTMLResponse(
|
||||
content=f"""
|
||||
<html><head><title>MFA Required</title></head><body><script>
|
||||
if (window.opener) {{
|
||||
window.opener.postMessage({{
|
||||
type: '{mfaType}',
|
||||
mfaToken: {json.dumps(pendingToken)}{extraFields}
|
||||
}}, '*');
|
||||
}}
|
||||
setTimeout(() => window.close(), 1000);
|
||||
</script></body></html>
|
||||
"""
|
||||
)
|
||||
# --- end MFA gate -----------------------------------------------------
|
||||
|
||||
jwt_token_data = {
|
||||
"sub": user.username,
|
||||
"userId": str(user.id),
|
||||
|
|
|
|||
|
|
@ -253,7 +253,56 @@ def login(
|
|||
detail=routeApiMsg("Invalid username or password"),
|
||||
headers={"WWW-Authenticate": "Bearer"},
|
||||
)
|
||||
|
||||
|
||||
# --- MFA gate --------------------------------------------------------
|
||||
from modules.auth.mfaService import isMfaRequired as _isMfaRequired
|
||||
from modules.routes.routeMfa import _createMfaPendingToken
|
||||
|
||||
userRecord = rootInterface._getUserForAuthentication(user.username)
|
||||
userMandates = rootInterface.getUserMandates(str(user.id))
|
||||
mandateIds = [um.mandateId for um in (userMandates or []) if getattr(um, "enabled", True)]
|
||||
mandateObjs = []
|
||||
for _mid in mandateIds:
|
||||
try:
|
||||
recs = rootInterface.db.getRecordset(Mandate, recordFilter={"id": _mid})
|
||||
if recs:
|
||||
mandateObjs.append(Mandate.model_validate(dict(recs[0])))
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
mfaRequired = _isMfaRequired(user, userMandates=userMandates, mandates=mandateObjs)
|
||||
hasMfaSetup = bool(userRecord and userRecord.get("mfaSecret") and getattr(user, "mfaEnabled", False))
|
||||
|
||||
if mfaRequired or hasMfaSetup:
|
||||
_sid = str(uuid.uuid4())
|
||||
pendingToken = _createMfaPendingToken(
|
||||
userId=str(user.id),
|
||||
username=user.username,
|
||||
authority=AuthAuthority.LOCAL.value,
|
||||
sessionId=_sid,
|
||||
)
|
||||
if hasMfaSetup:
|
||||
return {"type": "mfa_required", "mfaToken": pendingToken}
|
||||
else:
|
||||
from modules.auth.mfaService import generateSetup as _generateSetup
|
||||
existingSecret = userRecord.get("mfaSecret") if userRecord else None
|
||||
if existingSecret:
|
||||
from modules.auth.mfaService import _decryptSecret, _buildTotp
|
||||
_plain = _decryptSecret(existingSecret, userId=str(user.id))
|
||||
_totp = _buildTotp(_plain)
|
||||
from modules.auth.mfaService import _getMfaIssuer
|
||||
_uri = _totp.provisioning_uri(name=user.username, issuer_name=_getMfaIssuer())
|
||||
setupResult = {"provisioningUri": _uri}
|
||||
else:
|
||||
setupResult = _generateSetup(userId=str(user.id), username=user.username)
|
||||
rootInterface.updateUser(str(user.id), {"mfaSecret": setupResult["encryptedSecret"]})
|
||||
return {
|
||||
"type": "mfa_setup_required",
|
||||
"mfaToken": pendingToken,
|
||||
"provisioningUri": setupResult["provisioningUri"],
|
||||
}
|
||||
# --- end MFA gate -----------------------------------------------------
|
||||
|
||||
# Create token data
|
||||
# MULTI-TENANT: Token does NOT contain mandateId anymore
|
||||
# Mandate context is determined per request via X-Mandate-Id header
|
||||
|
|
|
|||
|
|
@ -20,7 +20,7 @@ from jose import JWTError
|
|||
|
||||
from modules.shared.configuration import APP_CONFIG
|
||||
from modules.interfaces.interfaceDbApp import getInterface, getRootInterface
|
||||
from modules.datamodels.datamodelUam import AuthAuthority, User, ConnectionStatus, UserConnection
|
||||
from modules.datamodels.datamodelUam import AuthAuthority, User, Mandate, ConnectionStatus, UserConnection
|
||||
from modules.datamodels.datamodelSecurity import Token, TokenPurpose
|
||||
from modules.auth import getCurrentUser, limiter, SECRET_KEY, ALGORITHM
|
||||
from modules.auth.oauthConnectTicket import resolve_connect_context
|
||||
|
|
@ -192,6 +192,65 @@ async def auth_login_callback(
|
|||
addExternalIdentityConnection=False,
|
||||
)
|
||||
|
||||
# --- MFA gate --------------------------------------------------------
|
||||
from modules.auth.mfaService import isMfaRequired as _isMfaRequired
|
||||
from modules.routes.routeMfa import _createMfaPendingToken
|
||||
|
||||
userRecord = rootInterface._getUserForAuthentication(user.username)
|
||||
userMandates = rootInterface.getUserMandates(str(user.id))
|
||||
_mandateIds = [um.mandateId for um in (userMandates or []) if getattr(um, "enabled", True)]
|
||||
_mandateObjs = []
|
||||
for _mid in _mandateIds:
|
||||
try:
|
||||
_recs = rootInterface.db.getRecordset(Mandate, recordFilter={"id": _mid})
|
||||
if _recs:
|
||||
_mandateObjs.append(Mandate.model_validate(dict(_recs[0])))
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
mfaRequired = _isMfaRequired(user, userMandates=userMandates, mandates=_mandateObjs)
|
||||
hasMfaSetup = bool(userRecord and userRecord.get("mfaSecret") and getattr(user, "mfaEnabled", False))
|
||||
|
||||
if mfaRequired or hasMfaSetup:
|
||||
import uuid as _uuid
|
||||
_sid = str(_uuid.uuid4())
|
||||
pendingToken = _createMfaPendingToken(
|
||||
userId=str(user.id),
|
||||
username=user.username,
|
||||
authority=AuthAuthority.MSFT.value,
|
||||
sessionId=_sid,
|
||||
)
|
||||
if hasMfaSetup:
|
||||
mfaType = "mfa_required"
|
||||
extraFields = ""
|
||||
else:
|
||||
mfaType = "mfa_setup_required"
|
||||
from modules.auth.mfaService import generateSetup as _generateSetup
|
||||
existingSecret = userRecord.get("mfaSecret") if userRecord else None
|
||||
if existingSecret:
|
||||
from modules.auth.mfaService import _decryptSecret, _buildTotp, _getMfaIssuer
|
||||
_plain = _decryptSecret(existingSecret, userId=str(user.id))
|
||||
_uri = _buildTotp(_plain).provisioning_uri(name=user.username, issuer_name=_getMfaIssuer())
|
||||
setupResult = {"provisioningUri": _uri}
|
||||
else:
|
||||
setupResult = _generateSetup(userId=str(user.id), username=user.username)
|
||||
rootInterface.updateUser(str(user.id), {"mfaSecret": setupResult["encryptedSecret"]})
|
||||
extraFields = f", provisioningUri: {json.dumps(setupResult['provisioningUri'])}"
|
||||
return HTMLResponse(
|
||||
content=f"""
|
||||
<html><head><title>MFA Required</title></head><body><script>
|
||||
if (window.opener) {{
|
||||
window.opener.postMessage({{
|
||||
type: '{mfaType}',
|
||||
mfaToken: {json.dumps(pendingToken)}{extraFields}
|
||||
}}, '*');
|
||||
}}
|
||||
setTimeout(() => window.close(), 1000);
|
||||
</script></body></html>
|
||||
"""
|
||||
)
|
||||
# --- end MFA gate -----------------------------------------------------
|
||||
|
||||
jwt_token_data = {
|
||||
"sub": user.username,
|
||||
"userId": str(user.id),
|
||||
|
|
|
|||
|
|
@ -142,6 +142,15 @@ class BaseRenderer(ABC):
|
|||
"""
|
||||
pass
|
||||
|
||||
@staticmethod
|
||||
def _normalizeMargins(raw) -> Dict[str, int]:
|
||||
"""Accept marginsPt as dict or [top, right, bottom, left] list."""
|
||||
if isinstance(raw, dict):
|
||||
return raw
|
||||
if isinstance(raw, (list, tuple)) and len(raw) >= 4:
|
||||
return {"top": raw[0], "right": raw[1], "bottom": raw[2], "left": raw[3]}
|
||||
return {"top": 60, "right": 60, "bottom": 60, "left": 60}
|
||||
|
||||
def _convertUnifiedStyleToInternal(self, style: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""Convert the unified resolvedStyle dict (from styleDefaults) into
|
||||
the renderer-internal style-set format that all rendering methods already
|
||||
|
|
@ -263,7 +272,7 @@ class BaseRenderer(ABC):
|
|||
},
|
||||
"page": {
|
||||
"format": page.get("format", "A4"),
|
||||
"margins": page.get("marginsPt", {"top": 60, "bottom": 60, "left": 60, "right": 60}),
|
||||
"margins": self._normalizeMargins(page.get("marginsPt", {"top": 60, "bottom": 60, "left": 60, "right": 60})),
|
||||
"show_page_numbers": page.get("showPageNumbers", True),
|
||||
"header_height": page.get("headerHeight", 30),
|
||||
"footer_height": page.get("footerHeight", 30),
|
||||
|
|
|
|||
|
|
@ -306,7 +306,7 @@ class RendererHtml(BaseRenderer):
|
|||
css_parts.append(f" color: {paraColor};")
|
||||
css_parts.append(f" font-size: {paraSizePt}pt;")
|
||||
css_parts.append(f" line-height: {lineSpacing};")
|
||||
margins = page.get("marginsPt", {})
|
||||
margins = self._normalizeMargins(page.get("marginsPt", {}))
|
||||
if margins:
|
||||
css_parts.append(f" margin: {margins.get('top', 60)}pt {margins.get('right', 60)}pt {margins.get('bottom', 60)}pt {margins.get('left', 60)}pt;")
|
||||
else:
|
||||
|
|
|
|||
|
|
@ -259,7 +259,7 @@ class RendererPdf(BaseRenderer):
|
|||
# Create PDF document with unified page margins or defaults
|
||||
pageCfg = unifiedStyle["page"] if unifiedStyle else None
|
||||
if pageCfg:
|
||||
m = pageCfg["marginsPt"]
|
||||
m = self._normalizeMargins(pageCfg.get("marginsPt", {}))
|
||||
doc = SimpleDocTemplate(buffer, pagesize=A4, rightMargin=m["right"], leftMargin=m["left"], topMargin=m["top"], bottomMargin=m["bottom"])
|
||||
else:
|
||||
doc = SimpleDocTemplate(buffer, pagesize=A4, rightMargin=72, leftMargin=72, topMargin=72, bottomMargin=18)
|
||||
|
|
@ -1186,7 +1186,7 @@ class RendererPdf(BaseRenderer):
|
|||
# Use page dimensions minus margins with generous safety buffer
|
||||
# A4 = 595.27 x 841.89 pt; frame = page - margins - internal padding
|
||||
_us = getattr(self, '_unifiedStyle', None) or {}
|
||||
_pageMgn = (_us.get('page') or {}).get('marginsPt') or {}
|
||||
_pageMgn = self._normalizeMargins((_us.get('page') or {}).get('marginsPt', {}))
|
||||
marginTop = _pageMgn.get('top', 60)
|
||||
marginBottom = _pageMgn.get('bottom', 60)
|
||||
marginLeft = _pageMgn.get('left', 60)
|
||||
|
|
|
|||
|
|
@ -14,6 +14,7 @@ python-jose[cryptography]==3.3.0 # For JWT tokens
|
|||
cryptography>=41.0.0 # For encryption/decryption of configuration values
|
||||
passlib==1.7.4
|
||||
argon2-cffi>=21.3.0 # Für Passwort-Hashing in gateway_interface.py
|
||||
pyotp>=2.9.0 # TOTP-basierte MFA (Multi-Factor Authentication)
|
||||
google-auth-oauthlib==1.2.0 # Für Google OAuth
|
||||
google-auth==2.27.0 # Für Google Authentication
|
||||
google-api-python-client==2.170.0 # For Google API integration
|
||||
|
|
|
|||
154
tests/unit/auth/test_mfaService.py
Normal file
154
tests/unit/auth/test_mfaService.py
Normal file
|
|
@ -0,0 +1,154 @@
|
|||
# Copyright (c) 2025 Patrick Motsch
|
||||
# All rights reserved.
|
||||
"""
|
||||
Unit tests for modules.auth.mfaService.
|
||||
|
||||
Tests TOTP generation, verification, encryption round-trip, and the
|
||||
three-rule MFA obligation resolver.
|
||||
"""
|
||||
|
||||
import pytest
|
||||
from unittest.mock import patch, MagicMock
|
||||
import pyotp
|
||||
|
||||
from modules.auth.mfaService import (
|
||||
_generateSecret,
|
||||
_buildTotp,
|
||||
generateSetup,
|
||||
confirmSetup,
|
||||
verifyCode,
|
||||
isMfaRequired,
|
||||
_isMfaRequireAdminsEnabled,
|
||||
)
|
||||
|
||||
|
||||
class TestTotpBasics:
|
||||
def test_generateSecret_returns_base32(self):
|
||||
secret = _generateSecret()
|
||||
assert isinstance(secret, str)
|
||||
assert len(secret) >= 16
|
||||
|
||||
def test_buildTotp_generates_valid_code(self):
|
||||
secret = _generateSecret()
|
||||
totp = _buildTotp(secret)
|
||||
code = totp.now()
|
||||
assert len(code) == 6
|
||||
assert code.isdigit()
|
||||
|
||||
def test_verifyCode_accepts_current_code(self):
|
||||
secret = _generateSecret()
|
||||
totp = _buildTotp(secret)
|
||||
code = totp.now()
|
||||
encrypted = f"FAKE_ENC:{secret}"
|
||||
|
||||
with patch("modules.auth.mfaService._decryptSecret", return_value=secret):
|
||||
assert verifyCode(encrypted, code) is True
|
||||
|
||||
def test_verifyCode_rejects_wrong_code(self):
|
||||
secret = _generateSecret()
|
||||
encrypted = f"FAKE_ENC:{secret}"
|
||||
|
||||
with patch("modules.auth.mfaService._decryptSecret", return_value=secret):
|
||||
assert verifyCode(encrypted, "000000") is False
|
||||
|
||||
|
||||
class TestGenerateSetup:
|
||||
@patch("modules.auth.mfaService._encryptSecret", return_value="ENC_SECRET")
|
||||
def test_returns_uri_and_encrypted_secret(self, _mock_enc):
|
||||
result = generateSetup(userId="u1", username="testuser")
|
||||
assert "encryptedSecret" in result
|
||||
assert "provisioningUri" in result
|
||||
assert result["encryptedSecret"] == "ENC_SECRET"
|
||||
assert "otpauth://totp/" in result["provisioningUri"]
|
||||
assert "PowerOn" in result["provisioningUri"]
|
||||
|
||||
|
||||
class TestConfirmSetup:
|
||||
def test_confirmSetup_with_valid_code(self):
|
||||
secret = _generateSecret()
|
||||
totp = _buildTotp(secret)
|
||||
code = totp.now()
|
||||
|
||||
with patch("modules.auth.mfaService._decryptSecret", return_value=secret):
|
||||
assert confirmSetup("ENC", code) is True
|
||||
|
||||
def test_confirmSetup_with_invalid_code(self):
|
||||
secret = _generateSecret()
|
||||
with patch("modules.auth.mfaService._decryptSecret", return_value=secret):
|
||||
assert confirmSetup("ENC", "999999") is False
|
||||
|
||||
def test_confirmSetup_handles_decryption_error(self):
|
||||
with patch("modules.auth.mfaService._decryptSecret", side_effect=Exception("decrypt error")):
|
||||
assert confirmSetup("BAD_ENC", "123456") is False
|
||||
|
||||
|
||||
class TestIsMfaRequired:
|
||||
def _makeUser(self, mfaEnabled=False, isSysAdmin=False, isPlatformAdmin=False):
|
||||
u = MagicMock()
|
||||
u.mfaEnabled = mfaEnabled
|
||||
u.isSysAdmin = isSysAdmin
|
||||
u.isPlatformAdmin = isPlatformAdmin
|
||||
return u
|
||||
|
||||
def _makeMandate(self, mfaRequired=False):
|
||||
m = MagicMock()
|
||||
m.mfaRequired = mfaRequired
|
||||
return m
|
||||
|
||||
def test_mfaEnabled_user_always_required(self):
|
||||
user = self._makeUser(mfaEnabled=True)
|
||||
assert isMfaRequired(user) is True
|
||||
|
||||
@patch("modules.auth.mfaService._isMfaRequireAdminsEnabled", return_value=True)
|
||||
def test_sysadmin_with_config_key(self, _mock):
|
||||
user = self._makeUser(isSysAdmin=True)
|
||||
assert isMfaRequired(user) is True
|
||||
|
||||
@patch("modules.auth.mfaService._isMfaRequireAdminsEnabled", return_value=True)
|
||||
def test_platformadmin_with_config_key(self, _mock):
|
||||
user = self._makeUser(isPlatformAdmin=True)
|
||||
assert isMfaRequired(user) is True
|
||||
|
||||
@patch("modules.auth.mfaService._isMfaRequireAdminsEnabled", return_value=False)
|
||||
def test_admin_without_config_key_not_required(self, _mock):
|
||||
user = self._makeUser(isSysAdmin=True)
|
||||
assert isMfaRequired(user) is False
|
||||
|
||||
@patch("modules.auth.mfaService._isMfaRequireAdminsEnabled", return_value=False)
|
||||
def test_mandate_with_mfaRequired(self, _mock):
|
||||
user = self._makeUser()
|
||||
mandate = self._makeMandate(mfaRequired=True)
|
||||
assert isMfaRequired(user, mandates=[mandate]) is True
|
||||
|
||||
@patch("modules.auth.mfaService._isMfaRequireAdminsEnabled", return_value=False)
|
||||
def test_mandate_without_mfaRequired(self, _mock):
|
||||
user = self._makeUser()
|
||||
mandate = self._makeMandate(mfaRequired=False)
|
||||
assert isMfaRequired(user, mandates=[mandate]) is False
|
||||
|
||||
@patch("modules.auth.mfaService._isMfaRequireAdminsEnabled", return_value=False)
|
||||
def test_regular_user_no_mandate_not_required(self, _mock):
|
||||
user = self._makeUser()
|
||||
assert isMfaRequired(user) is False
|
||||
|
||||
|
||||
class TestConfigKey:
|
||||
@patch("modules.auth.mfaService.APP_CONFIG")
|
||||
def test_config_true(self, mock_cfg):
|
||||
mock_cfg.get.return_value = "true"
|
||||
assert _isMfaRequireAdminsEnabled() is True
|
||||
|
||||
@patch("modules.auth.mfaService.APP_CONFIG")
|
||||
def test_config_false(self, mock_cfg):
|
||||
mock_cfg.get.return_value = "false"
|
||||
assert _isMfaRequireAdminsEnabled() is False
|
||||
|
||||
@patch("modules.auth.mfaService.APP_CONFIG")
|
||||
def test_config_empty(self, mock_cfg):
|
||||
mock_cfg.get.return_value = ""
|
||||
assert _isMfaRequireAdminsEnabled() is False
|
||||
|
||||
@patch("modules.auth.mfaService.APP_CONFIG")
|
||||
def test_config_one(self, mock_cfg):
|
||||
mock_cfg.get.return_value = "1"
|
||||
assert _isMfaRequireAdminsEnabled() is True
|
||||
Loading…
Reference in a new issue