From 08fa70e4e06c1c04d96bf343c63463bcebbcb80c Mon Sep 17 00:00:00 2001 From: ValueOn AG Date: Wed, 3 Jun 2026 23:21:29 +0200 Subject: [PATCH] security and mfa --- app.py | 3 + env-dev.env | 3 + env-int.env | 3 + env-prod.env | 3 + modules/auth/mfaService.py | 132 +++++++++ modules/datamodels/datamodelUam.py | 44 +++ modules/interfaces/interfaceDbApp.py | 8 +- modules/routes/routeMfa.py | 278 ++++++++++++++++++ modules/routes/routeSecurityGoogle.py | 61 +++- modules/routes/routeSecurityLocal.py | 51 +++- modules/routes/routeSecurityMsft.py | 61 +++- .../renderers/documentRendererBaseTemplate.py | 11 +- .../renderers/rendererHtml.py | 2 +- .../renderers/rendererPdf.py | 4 +- requirements.txt | 1 + tests/unit/auth/test_mfaService.py | 154 ++++++++++ 16 files changed, 809 insertions(+), 10 deletions(-) create mode 100644 modules/auth/mfaService.py create mode 100644 modules/routes/routeMfa.py create mode 100644 tests/unit/auth/test_mfaService.py diff --git a/app.py b/app.py index ec35a2cf..07bdf98f 100644 --- a/app.py +++ b/app.py @@ -659,6 +659,9 @@ app.include_router(tableViewsRouter) from modules.routes.routeSecurityLocal import router as localRouter app.include_router(localRouter) +from modules.routes.routeMfa import router as mfaRouter +app.include_router(mfaRouter) + from modules.routes.routeSecurityMsft import router as msftRouter app.include_router(msftRouter) diff --git a/env-dev.env b/env-dev.env index 748d8c8d..dff07ad7 100644 --- a/env-dev.env +++ b/env-dev.env @@ -18,6 +18,9 @@ DB_PORT=5432 APP_JWT_KEY_SECRET = DEV_ENC:Z0FBQUFBQm8xSUpERjlrSktmZHVuQnJ1VVJDdndLaUcxZGJsT2ZlUFRlcFdOZ001RnlzM2FhLWhRV2tjWWFhaWQwQ3hkcUFvbThMcndxSjFpYTdfRV9OZGhTcksxbXFTZWg5MDZvOHpCVXBHcDJYaHlJM0tyNWRZckZsVHpQcmxTZHJoZUs1M3lfU2ljRnJaTmNSQ0w0X085OXI0QW80M2xfQnJqZmZ6VEh3TUltX0xzeE42SGtZPQ== APP_TOKEN_EXPIRY=300 +# MFA Configuration +MFA_REQUIRE_ADMINS = False + # CORS Configuration APP_ALLOWED_ORIGINS=http://localhost:8080,http://localhost:5176,https://nyla.poweron.swiss,https://nyla-int.poweron.swiss,https://nyla.poweron-center.net,https://nyla-int.poweron-center.net diff --git a/env-int.env b/env-int.env index 5e2a4e77..d7b62f79 100644 --- a/env-int.env +++ b/env-int.env @@ -20,6 +20,9 @@ DB_PORT=5432 APP_JWT_KEY_SECRET = INT_ENC:Z0FBQUFBQm8xSVRjNUctb2RwU25iR3ZnanBOdHZhWUtIajZ1RnZzTEp4aDR0MktWRjNoeVBrY1Npd1R0VE9YVHp3M2w1cXRzbUxNaU82QUJvaDNFeVQyN05KblRWblBvbWtoT0VXbkNBbDQ5OHhwSUFnaDZGRG10Vmgtdm1YUkRsYUhFMzRVZURmSFlDTFIzVWg4MXNueDZyMGc5aVpFdWRxY3dkTExGM093ZTVUZVl5LUhGWnlRPQ== APP_TOKEN_EXPIRY=300 +# MFA Configuration +MFA_REQUIRE_ADMINS = True + # CORS Configuration APP_ALLOWED_ORIGINS=http://localhost:8080,http://localhost:5176,https://porta.poweron.swiss,https://porta-int.poweron.swiss,https://nyla.poweron.swiss,https://nyla-int.poweron.swiss,https://nyla.poweron-center.net,https://nyla-int.poweron-center.net diff --git a/env-prod.env b/env-prod.env index 4395fd5d..7401129d 100644 --- a/env-prod.env +++ b/env-prod.env @@ -18,6 +18,9 @@ DB_PORT=5432 APP_JWT_KEY_SECRET = PROD_ENC:Z0FBQUFBQnBDM1Z3elhfV0Rnd2pQRjlMdkVwX1FnSmRhSzNZUlV5SVpaWXBNX1hpa2xPZGdMSWpnN2ZINHQxeGZnNHJweU5pZjlyYlY5Qm9zOUZEbl9wUEgtZHZXd1NhR19JSG9kbFU4MnFGQnllbFhRQVphRGQyNHlFVWR5VHQyUUpqN0stUmRuY2QyTi1oalczRHpLTEJqWURjZWs4YjZvT2U5YnFqcXEwdEpxV05fX05QMmtrPQ== APP_TOKEN_EXPIRY=300 +# MFA Configuration +MFA_REQUIRE_ADMINS = True + # CORS Configuration APP_ALLOWED_ORIGINS=http://localhost:8080,http://localhost:5176,https://porta.poweron.swiss,https://porta-int.poweron.swiss,https://nyla.poweron.swiss,https://nyla-int.poweron.swiss,https://nyla.poweron-center.net,https://nyla-int.poweron-center.net diff --git a/modules/auth/mfaService.py b/modules/auth/mfaService.py new file mode 100644 index 00000000..7ecd7889 --- /dev/null +++ b/modules/auth/mfaService.py @@ -0,0 +1,132 @@ +# Copyright (c) 2025 Patrick Motsch +# All rights reserved. +""" +MFA (Multi-Factor Authentication) Service. + +TOTP-based MFA using pyotp. Secrets are encrypted at rest via +encryptValue/decryptValue from the configuration module. + +MFA obligation is resolved by three OR-linked rules: + 1. Any mandate the user belongs to has ``mfaRequired=True``. + 2. User is sysAdmin OR platformAdmin AND config key ``MFA_REQUIRE_ADMINS`` + is truthy. + 3. User has opted in (``mfaEnabled=True`` without any mandate/admin rule). +""" + +import logging +from typing import Optional + +import pyotp + +from modules.shared.configuration import APP_CONFIG, encryptValue, decryptValue + +logger = logging.getLogger(__name__) + +_MFA_DIGITS = 6 +_MFA_INTERVAL = 30 +_MFA_VALID_WINDOW = 1 + + +def _getMfaIssuer() -> str: + """Build the TOTP issuer name, e.g. 'PowerOn' or 'PowerOn (Dev)'.""" + envType = (APP_CONFIG.get("APP_ENV_TYPE") or "").strip().lower() + if envType in ("prod", ""): + return "PowerOn" + return f"PowerOn ({envType.upper()})" + + +def _generateSecret() -> str: + """Generate a fresh base32-encoded TOTP secret.""" + return pyotp.random_base32() + + +def _encryptSecret(plainSecret: str, userId: str = "system") -> str: + return encryptValue(plainSecret, userId=userId, keyName="mfa_secret") + + +def _decryptSecret(encryptedSecret: str, userId: str = "system") -> str: + return decryptValue(encryptedSecret, userId=userId, keyName="mfa_secret") + + +def _buildTotp(plainSecret: str) -> pyotp.TOTP: + return pyotp.TOTP(plainSecret, digits=_MFA_DIGITS, interval=_MFA_INTERVAL) + + +def generateSetup(userId: str, username: str) -> dict: + """Start MFA enrolment: return secret + provisioning URI (for QR code). + + Returns dict with keys ``secret`` (encrypted for DB storage) and + ``provisioningUri`` (otpauth:// URI the frontend renders as QR). + The plaintext secret is NOT returned -- the URI already contains it. + """ + plain = _generateSecret() + encrypted = _encryptSecret(plain, userId=userId) + totp = _buildTotp(plain) + uri = totp.provisioning_uri(name=username, issuer_name=_getMfaIssuer()) + return { + "encryptedSecret": encrypted, + "provisioningUri": uri, + } + + +def confirmSetup(encryptedSecret: str, code: str, userId: str = "system") -> bool: + """Verify a TOTP code against an encrypted secret (enrolment confirmation).""" + try: + plain = _decryptSecret(encryptedSecret, userId=userId) + totp = _buildTotp(plain) + return totp.verify(code, valid_window=_MFA_VALID_WINDOW) + except Exception: + logger.exception("MFA confirmSetup failed for userId=%s", userId) + return False + + +def verifyCode(encryptedSecret: str, code: str, userId: str = "system") -> bool: + """Verify a TOTP code during login.""" + try: + plain = _decryptSecret(encryptedSecret, userId=userId) + totp = _buildTotp(plain) + return totp.verify(code, valid_window=_MFA_VALID_WINDOW) + except Exception: + logger.exception("MFA verifyCode failed for userId=%s", userId) + return False + + +def _isMfaRequireAdminsEnabled() -> bool: + """Read ``MFA_REQUIRE_ADMINS`` from config / env.""" + raw = (APP_CONFIG.get("MFA_REQUIRE_ADMINS") or "").strip().lower() + return raw in ("1", "true", "yes") + + +def isMfaRequired(user, userMandates=None, mandates=None) -> bool: + """Resolve whether MFA is mandatory for *user*. + + Rules (OR): + 1. At least one of the user's mandates has ``mfaRequired=True``. + 2. User is sysAdmin or platformAdmin AND ``MFA_REQUIRE_ADMINS`` config + key is truthy. + 3. User already opted in (``mfaEnabled=True``). + + Parameters + ---------- + user : User | UserInDB + The user object. + userMandates : list | None + List of UserMandate records for the user (each has ``mandateId``). + mandates : list | None + List of Mandate objects the user has access to. If provided directly + this avoids a second lookup. + """ + if getattr(user, "mfaEnabled", False): + return True + + isSys = getattr(user, "isSysAdmin", False) + isPlat = getattr(user, "isPlatformAdmin", False) + if (isSys or isPlat) and _isMfaRequireAdminsEnabled(): + return True + + if mandates: + for m in mandates: + if getattr(m, "mfaRequired", False): + return True + + return False diff --git a/modules/datamodels/datamodelUam.py b/modules/datamodels/datamodelUam.py index da074f57..1d95598c 100644 --- a/modules/datamodels/datamodelUam.py +++ b/modules/datamodels/datamodelUam.py @@ -197,6 +197,26 @@ class Mandate(PowerOnModel): # `customer.email`, `customer.tax_id_data` mappen kann # (Stripe verlangt die Adresse strukturiert, nicht als Freitext). # ``order`` 200-209 gruppiert die Felder visuell am Ende des Formulars. + mfaRequired: bool = Field( + default=False, + description="When true, all users with access to this mandate must have MFA enabled.", + json_schema_extra={ + "frontend_type": "checkbox", + "frontend_readonly": False, + "frontend_required": False, + "label": "MFA-Pflicht", + "frontend_format_labels": ["Ja", "-", "Nein"], + "order": 190, + }, + ) + + @field_validator("mfaRequired", mode="before") + @classmethod + def _coerceMfaRequired(cls, v): + if v is None: + return False + return v + invoiceCompanyName: Optional[str] = Field( default=None, description="Firmenname / Empfaenger der Rechnung (falls abweichend vom Voller Name).", @@ -623,6 +643,25 @@ class User(PowerOnModel): return v + mfaEnabled: bool = Field( + default=False, + description="Whether the user has completed MFA setup and has TOTP active.", + json_schema_extra={ + "frontend_type": "checkbox", + "frontend_readonly": True, + "frontend_required": False, + "label": "MFA aktiv", + "frontend_format_labels": ["Ja", "-", "Nein"], + }, + ) + + @field_validator("mfaEnabled", mode="before") + @classmethod + def _coerceMfaEnabled(cls, v): + if v is None: + return False + return v + authenticationAuthority: AuthAuthority = Field( default=AuthAuthority.LOCAL, description="Primary authentication authority", @@ -655,6 +694,11 @@ class UserInDB(User): description="Hash of the user password", json_schema_extra={"label": "Passwort-Hash"}, ) + mfaSecret: Optional[str] = Field( + None, + description="Encrypted TOTP secret for MFA. Stored via encryptValue/decryptValue.", + json_schema_extra={"label": "MFA-Secret", "frontend_visible": False}, + ) resetToken: Optional[str] = Field( None, description="Password reset token (UUID)", diff --git a/modules/interfaces/interfaceDbApp.py b/modules/interfaces/interfaceDbApp.py index ad445e7f..76bdf14d 100644 --- a/modules/interfaces/interfaceDbApp.py +++ b/modules/interfaces/interfaceDbApp.py @@ -813,11 +813,13 @@ class AppObjects: updateDict.pop(field, None) # Update user data using model - updatedData = user.model_dump() + # Use UserInDB so that DB-only fields (mfaSecret, hashedPassword, etc.) + # are preserved through the round-trip instead of being silently dropped. + existingRecord = self._getUserForAuthentication(user.username) + updatedData = dict(existingRecord) if existingRecord else user.model_dump() updatedData.update(updateDict) - # Ensure ID matches userId parameter updatedData["id"] = userId - updatedUser = User(**updatedData) + updatedUser = UserInDB(**updatedData) # Update user record self.db.recordModify(UserInDB, userId, updatedUser) diff --git a/modules/routes/routeMfa.py b/modules/routes/routeMfa.py new file mode 100644 index 00000000..551faa37 --- /dev/null +++ b/modules/routes/routeMfa.py @@ -0,0 +1,278 @@ +# Copyright (c) 2025 Patrick Motsch +# All rights reserved. +""" +Routes for TOTP-based Multi-Factor Authentication. + +Endpoints: + GET /api/mfa/status - MFA status for the current user + POST /api/mfa/setup - Start MFA enrolment (returns provisioningUri) + POST /api/mfa/confirm - Confirm enrolment with first TOTP code + POST /api/mfa/verify - Verify TOTP during login (uses mfa_pending temp token) + POST /api/mfa/disable - Disable MFA for current user +""" + +from datetime import timedelta +from typing import Dict, Any + +from fastapi import APIRouter, HTTPException, status, Depends, Request, Response, Body +from jose import jwt, JWTError +import logging +import uuid + +from modules.auth import getCurrentUser, limiter, SECRET_KEY, ALGORITHM +from modules.auth import ( + createAccessToken, + createRefreshToken, + setAccessTokenCookie, + setRefreshTokenCookie, +) +from modules.auth.mfaService import ( + generateSetup, + confirmSetup, + verifyCode, + isMfaRequired, +) +from modules.interfaces.interfaceDbApp import getRootInterface +from modules.datamodels.datamodelUam import User, UserInDB, AuthAuthority, Mandate +from modules.datamodels.datamodelSecurity import Token, TokenPurpose +from modules.shared.configuration import APP_CONFIG +from modules.shared.i18nRegistry import apiRouteContext +from datetime import datetime + +routeApiMsg = apiRouteContext("routeMfa") +logger = logging.getLogger(__name__) + +router = APIRouter(prefix="/api/mfa", tags=["MFA"]) + +_MFA_PENDING_EXPIRE_MINUTES = 5 + + +def _createMfaPendingToken(userId: str, username: str, authority: str, sessionId: str) -> str: + """Short-lived JWT that authorises only the /mfa/verify endpoint.""" + payload = { + "sub": username, + "userId": userId, + "authenticationAuthority": authority, + "sid": sessionId, + "type": "mfa_pending", + "jti": str(uuid.uuid4()), + } + token, _ = createAccessToken(payload, expiresDelta=timedelta(minutes=_MFA_PENDING_EXPIRE_MINUTES)) + return token + + +def _decodeMfaPendingToken(token: str) -> dict: + """Decode and validate an mfa_pending token. Raises HTTPException on failure.""" + try: + payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) + except JWTError: + raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid or expired MFA token") + if payload.get("type") != "mfa_pending": + raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token type") + return payload + + +@router.get("/status") +def mfaStatus(currentUser: User = Depends(getCurrentUser)) -> Dict[str, Any]: + rootInterface = getRootInterface() + userMandates = rootInterface.getUserMandates(str(currentUser.id)) + mandateIds = [um.mandateId for um in (userMandates or []) if getattr(um, "enabled", True)] + mandates = [] + for mid in mandateIds: + try: + recs = rootInterface.db.getRecordset(Mandate, recordFilter={"id": mid}) + if recs: + mandates.append(Mandate.model_validate(dict(recs[0]))) + except Exception: + pass + + required = isMfaRequired(currentUser, userMandates=userMandates, mandates=mandates) + return { + "mfaEnabled": getattr(currentUser, "mfaEnabled", False), + "mfaRequired": required, + } + + +@router.post("/setup") +def mfaSetup(currentUser: User = Depends(getCurrentUser)) -> Dict[str, Any]: + """Start MFA enrolment. Returns ``provisioningUri`` (otpauth:// for QR).""" + if getattr(currentUser, "mfaEnabled", False): + raise HTTPException(status_code=400, detail="MFA is already enabled") + + result = generateSetup(userId=str(currentUser.id), username=currentUser.username) + + rootInterface = getRootInterface() + rootInterface.updateUser(str(currentUser.id), {"mfaSecret": result["encryptedSecret"]}) + + return {"provisioningUri": result["provisioningUri"]} + + +@router.post("/confirm") +@limiter.limit("10/minute") +def mfaConfirm( + request: Request, + code: str = Body(...), + token: str = Body(None), +) -> Dict[str, Any]: + """Confirm enrolment by verifying the first TOTP code. + + Accepts either a valid session (cookie/header JWT) OR an ``mfa_pending`` + token so that the forced-setup flow during login works without a session. + """ + rootInterface = getRootInterface() + + if token: + payload = _decodeMfaPendingToken(token) + username = payload["sub"] + userId = payload["userId"] + else: + raise HTTPException(status_code=401, detail="MFA token required") + + userRecord = rootInterface._getUserForAuthentication(username) + if not userRecord: + raise HTTPException(status_code=404, detail="User not found") + + encryptedSecret = userRecord.get("mfaSecret") + if not encryptedSecret: + logger.warning("MFA confirm: no mfaSecret on user %s", username) + raise HTTPException(status_code=400, detail="MFA setup not started") + + logger.info("MFA confirm: verifying code for user %s (secret present=%s, code=%s)", username, bool(encryptedSecret), code) + if not confirmSetup(encryptedSecret, code, userId=userId): + logger.warning("MFA confirm: TOTP code rejected for user %s", username) + raise HTTPException(status_code=400, detail="Invalid TOTP code") + + rootInterface.updateUser(userId, {"mfaEnabled": True}) + logger.info("MFA confirmed for user %s", username) + + return {"mfaEnabled": True} + + +@router.post("/confirm-authenticated") +def mfaConfirmAuthenticated( + request: Request, + code: str = Body(..., embed=True), + currentUser: User = Depends(getCurrentUser), +) -> Dict[str, Any]: + """Confirm MFA enrolment for an already authenticated user (Settings page).""" + rootInterface = getRootInterface() + userRecord = rootInterface._getUserForAuthentication(currentUser.username) + if not userRecord: + raise HTTPException(status_code=404, detail="User not found") + + encryptedSecret = userRecord.get("mfaSecret") + if not encryptedSecret: + raise HTTPException(status_code=400, detail="MFA setup not started") + + if not confirmSetup(encryptedSecret, code, userId=str(currentUser.id)): + raise HTTPException(status_code=400, detail="Invalid TOTP code") + + rootInterface.updateUser(str(currentUser.id), {"mfaEnabled": True}) + logger.info("MFA confirmed for user %s", currentUser.username) + + return {"mfaEnabled": True} + + +@router.post("/verify") +@limiter.limit("10/minute") +def mfaVerify( + request: Request, + response: Response, + token: str = Body(...), + code: str = Body(...), +) -> Dict[str, Any]: + """Verify TOTP code during login. Accepts the ``mfa_pending`` temp token + and the 6-digit code. On success, issues real session JWTs (cookies).""" + payload = _decodeMfaPendingToken(token) + userId = payload["userId"] + username = payload["sub"] + authority = payload.get("authenticationAuthority", AuthAuthority.LOCAL) + sessionId = payload.get("sid", str(uuid.uuid4())) + + rootInterface = getRootInterface() + userRecord = rootInterface._getUserForAuthentication(username) + if not userRecord: + raise HTTPException(status_code=401, detail="User not found") + + encryptedSecret = userRecord.get("mfaSecret") + if not encryptedSecret: + raise HTTPException(status_code=400, detail="MFA not configured for this user") + + if not verifyCode(encryptedSecret, code, userId=userId): + raise HTTPException(status_code=401, detail="Invalid MFA code") + + tokenData = { + "sub": username, + "userId": userId, + "authenticationAuthority": authority, + "sid": sessionId, + } + accessToken, accessExpires = createAccessToken(tokenData) + setAccessTokenCookie(response, accessToken) + + refreshToken, _ = createRefreshToken(tokenData) + setRefreshTokenCookie(response, refreshToken) + + jti = jwt.decode(accessToken, SECRET_KEY, algorithms=[ALGORITHM]).get("jti") + + from modules.interfaces.interfaceDbApp import getInterface + user = User.model_validate(userRecord) + userInterface = getInterface(user) + dbToken = Token( + id=jti, + userId=userId, + authority=authority, + tokenPurpose=TokenPurpose.AUTH_SESSION, + tokenAccess=accessToken, + tokenType="bearer", + expiresAt=accessExpires.timestamp(), + sessionId=sessionId, + ) + userInterface.saveAccessToken(dbToken) + + logger.info("MFA verify successful for user %s", username) + + try: + from modules.shared.auditLogger import audit_logger + audit_logger.logUserAccess( + userId=userId, + mandateId="system", + action="mfa_verify_success", + successInfo="totp_verified", + ipAddress=request.client.host if request.client else None, + userAgent=request.headers.get("user-agent"), + success=True, + ) + except Exception: + pass + + return { + "type": "local_auth_success", + "message": "MFA verification successful", + "authenticationAuthority": authority if isinstance(authority, str) else authority.value if hasattr(authority, "value") else str(authority), + "expires_at": accessExpires.isoformat(), + } + + +@router.post("/disable") +def mfaDisable( + currentUser: User = Depends(getCurrentUser), + code: str = Body(..., embed=True), +) -> Dict[str, Any]: + """Disable MFA for the current user. Requires a valid TOTP code as confirmation.""" + rootInterface = getRootInterface() + userRecord = rootInterface._getUserForAuthentication(currentUser.username) + if not userRecord: + raise HTTPException(status_code=404, detail="User not found") + + encryptedSecret = userRecord.get("mfaSecret") + if not encryptedSecret or not getattr(currentUser, "mfaEnabled", False): + raise HTTPException(status_code=400, detail="MFA is not enabled") + + if not verifyCode(encryptedSecret, code, userId=str(currentUser.id)): + raise HTTPException(status_code=400, detail="Invalid TOTP code") + + rootInterface.updateUser(str(currentUser.id), {"mfaEnabled": False, "mfaSecret": None}) + logger.info("MFA disabled for user %s", currentUser.username) + + return {"mfaEnabled": False} diff --git a/modules/routes/routeSecurityGoogle.py b/modules/routes/routeSecurityGoogle.py index 87df681a..853d4067 100644 --- a/modules/routes/routeSecurityGoogle.py +++ b/modules/routes/routeSecurityGoogle.py @@ -19,7 +19,7 @@ from jose import JWTError from modules.shared.configuration import APP_CONFIG from modules.interfaces.interfaceDbApp import getInterface, getRootInterface -from modules.datamodels.datamodelUam import AuthAuthority, User, ConnectionStatus, UserConnection +from modules.datamodels.datamodelUam import AuthAuthority, User, Mandate, ConnectionStatus, UserConnection from modules.datamodels.datamodelSecurity import Token, TokenPurpose from modules.auth import getCurrentUser, limiter, SECRET_KEY, ALGORITHM from modules.auth.oauthConnectTicket import resolve_connect_context @@ -219,6 +219,65 @@ async def auth_login_callback( ) isNewUser = True + # --- MFA gate -------------------------------------------------------- + from modules.auth.mfaService import isMfaRequired as _isMfaRequired + from modules.routes.routeMfa import _createMfaPendingToken + + userRecord = rootInterface._getUserForAuthentication(user.username) + userMandates = rootInterface.getUserMandates(str(user.id)) + _mandateIds = [um.mandateId for um in (userMandates or []) if getattr(um, "enabled", True)] + _mandateObjs = [] + for _mid in _mandateIds: + try: + _recs = rootInterface.db.getRecordset(Mandate, recordFilter={"id": _mid}) + if _recs: + _mandateObjs.append(Mandate.model_validate(dict(_recs[0]))) + except Exception: + pass + + mfaRequired = _isMfaRequired(user, userMandates=userMandates, mandates=_mandateObjs) + hasMfaSetup = bool(userRecord and userRecord.get("mfaSecret") and getattr(user, "mfaEnabled", False)) + + if mfaRequired or hasMfaSetup: + import uuid as _uuid + _sid = str(_uuid.uuid4()) + pendingToken = _createMfaPendingToken( + userId=str(user.id), + username=user.username, + authority=AuthAuthority.GOOGLE.value, + sessionId=_sid, + ) + if hasMfaSetup: + mfaType = "mfa_required" + extraFields = "" + else: + mfaType = "mfa_setup_required" + from modules.auth.mfaService import generateSetup as _generateSetup + existingSecret = userRecord.get("mfaSecret") if userRecord else None + if existingSecret: + from modules.auth.mfaService import _decryptSecret, _buildTotp, _getMfaIssuer + _plain = _decryptSecret(existingSecret, userId=str(user.id)) + _uri = _buildTotp(_plain).provisioning_uri(name=user.username, issuer_name=_getMfaIssuer()) + setupResult = {"provisioningUri": _uri} + else: + setupResult = _generateSetup(userId=str(user.id), username=user.username) + rootInterface.updateUser(str(user.id), {"mfaSecret": setupResult["encryptedSecret"]}) + extraFields = f", provisioningUri: {json.dumps(setupResult['provisioningUri'])}" + return HTMLResponse( + content=f""" + MFA Required + """ + ) + # --- end MFA gate ----------------------------------------------------- + jwt_token_data = { "sub": user.username, "userId": str(user.id), diff --git a/modules/routes/routeSecurityLocal.py b/modules/routes/routeSecurityLocal.py index 807d5192..c8efa0a8 100644 --- a/modules/routes/routeSecurityLocal.py +++ b/modules/routes/routeSecurityLocal.py @@ -253,7 +253,56 @@ def login( detail=routeApiMsg("Invalid username or password"), headers={"WWW-Authenticate": "Bearer"}, ) - + + # --- MFA gate -------------------------------------------------------- + from modules.auth.mfaService import isMfaRequired as _isMfaRequired + from modules.routes.routeMfa import _createMfaPendingToken + + userRecord = rootInterface._getUserForAuthentication(user.username) + userMandates = rootInterface.getUserMandates(str(user.id)) + mandateIds = [um.mandateId for um in (userMandates or []) if getattr(um, "enabled", True)] + mandateObjs = [] + for _mid in mandateIds: + try: + recs = rootInterface.db.getRecordset(Mandate, recordFilter={"id": _mid}) + if recs: + mandateObjs.append(Mandate.model_validate(dict(recs[0]))) + except Exception: + pass + + mfaRequired = _isMfaRequired(user, userMandates=userMandates, mandates=mandateObjs) + hasMfaSetup = bool(userRecord and userRecord.get("mfaSecret") and getattr(user, "mfaEnabled", False)) + + if mfaRequired or hasMfaSetup: + _sid = str(uuid.uuid4()) + pendingToken = _createMfaPendingToken( + userId=str(user.id), + username=user.username, + authority=AuthAuthority.LOCAL.value, + sessionId=_sid, + ) + if hasMfaSetup: + return {"type": "mfa_required", "mfaToken": pendingToken} + else: + from modules.auth.mfaService import generateSetup as _generateSetup + existingSecret = userRecord.get("mfaSecret") if userRecord else None + if existingSecret: + from modules.auth.mfaService import _decryptSecret, _buildTotp + _plain = _decryptSecret(existingSecret, userId=str(user.id)) + _totp = _buildTotp(_plain) + from modules.auth.mfaService import _getMfaIssuer + _uri = _totp.provisioning_uri(name=user.username, issuer_name=_getMfaIssuer()) + setupResult = {"provisioningUri": _uri} + else: + setupResult = _generateSetup(userId=str(user.id), username=user.username) + rootInterface.updateUser(str(user.id), {"mfaSecret": setupResult["encryptedSecret"]}) + return { + "type": "mfa_setup_required", + "mfaToken": pendingToken, + "provisioningUri": setupResult["provisioningUri"], + } + # --- end MFA gate ----------------------------------------------------- + # Create token data # MULTI-TENANT: Token does NOT contain mandateId anymore # Mandate context is determined per request via X-Mandate-Id header diff --git a/modules/routes/routeSecurityMsft.py b/modules/routes/routeSecurityMsft.py index 67a598dd..80e0d19a 100644 --- a/modules/routes/routeSecurityMsft.py +++ b/modules/routes/routeSecurityMsft.py @@ -20,7 +20,7 @@ from jose import JWTError from modules.shared.configuration import APP_CONFIG from modules.interfaces.interfaceDbApp import getInterface, getRootInterface -from modules.datamodels.datamodelUam import AuthAuthority, User, ConnectionStatus, UserConnection +from modules.datamodels.datamodelUam import AuthAuthority, User, Mandate, ConnectionStatus, UserConnection from modules.datamodels.datamodelSecurity import Token, TokenPurpose from modules.auth import getCurrentUser, limiter, SECRET_KEY, ALGORITHM from modules.auth.oauthConnectTicket import resolve_connect_context @@ -192,6 +192,65 @@ async def auth_login_callback( addExternalIdentityConnection=False, ) + # --- MFA gate -------------------------------------------------------- + from modules.auth.mfaService import isMfaRequired as _isMfaRequired + from modules.routes.routeMfa import _createMfaPendingToken + + userRecord = rootInterface._getUserForAuthentication(user.username) + userMandates = rootInterface.getUserMandates(str(user.id)) + _mandateIds = [um.mandateId for um in (userMandates or []) if getattr(um, "enabled", True)] + _mandateObjs = [] + for _mid in _mandateIds: + try: + _recs = rootInterface.db.getRecordset(Mandate, recordFilter={"id": _mid}) + if _recs: + _mandateObjs.append(Mandate.model_validate(dict(_recs[0]))) + except Exception: + pass + + mfaRequired = _isMfaRequired(user, userMandates=userMandates, mandates=_mandateObjs) + hasMfaSetup = bool(userRecord and userRecord.get("mfaSecret") and getattr(user, "mfaEnabled", False)) + + if mfaRequired or hasMfaSetup: + import uuid as _uuid + _sid = str(_uuid.uuid4()) + pendingToken = _createMfaPendingToken( + userId=str(user.id), + username=user.username, + authority=AuthAuthority.MSFT.value, + sessionId=_sid, + ) + if hasMfaSetup: + mfaType = "mfa_required" + extraFields = "" + else: + mfaType = "mfa_setup_required" + from modules.auth.mfaService import generateSetup as _generateSetup + existingSecret = userRecord.get("mfaSecret") if userRecord else None + if existingSecret: + from modules.auth.mfaService import _decryptSecret, _buildTotp, _getMfaIssuer + _plain = _decryptSecret(existingSecret, userId=str(user.id)) + _uri = _buildTotp(_plain).provisioning_uri(name=user.username, issuer_name=_getMfaIssuer()) + setupResult = {"provisioningUri": _uri} + else: + setupResult = _generateSetup(userId=str(user.id), username=user.username) + rootInterface.updateUser(str(user.id), {"mfaSecret": setupResult["encryptedSecret"]}) + extraFields = f", provisioningUri: {json.dumps(setupResult['provisioningUri'])}" + return HTMLResponse( + content=f""" + MFA Required + """ + ) + # --- end MFA gate ----------------------------------------------------- + jwt_token_data = { "sub": user.username, "userId": str(user.id), diff --git a/modules/serviceCenter/services/serviceGeneration/renderers/documentRendererBaseTemplate.py b/modules/serviceCenter/services/serviceGeneration/renderers/documentRendererBaseTemplate.py index 61eadee7..519a2697 100644 --- a/modules/serviceCenter/services/serviceGeneration/renderers/documentRendererBaseTemplate.py +++ b/modules/serviceCenter/services/serviceGeneration/renderers/documentRendererBaseTemplate.py @@ -142,6 +142,15 @@ class BaseRenderer(ABC): """ pass + @staticmethod + def _normalizeMargins(raw) -> Dict[str, int]: + """Accept marginsPt as dict or [top, right, bottom, left] list.""" + if isinstance(raw, dict): + return raw + if isinstance(raw, (list, tuple)) and len(raw) >= 4: + return {"top": raw[0], "right": raw[1], "bottom": raw[2], "left": raw[3]} + return {"top": 60, "right": 60, "bottom": 60, "left": 60} + def _convertUnifiedStyleToInternal(self, style: Dict[str, Any]) -> Dict[str, Any]: """Convert the unified resolvedStyle dict (from styleDefaults) into the renderer-internal style-set format that all rendering methods already @@ -263,7 +272,7 @@ class BaseRenderer(ABC): }, "page": { "format": page.get("format", "A4"), - "margins": page.get("marginsPt", {"top": 60, "bottom": 60, "left": 60, "right": 60}), + "margins": self._normalizeMargins(page.get("marginsPt", {"top": 60, "bottom": 60, "left": 60, "right": 60})), "show_page_numbers": page.get("showPageNumbers", True), "header_height": page.get("headerHeight", 30), "footer_height": page.get("footerHeight", 30), diff --git a/modules/serviceCenter/services/serviceGeneration/renderers/rendererHtml.py b/modules/serviceCenter/services/serviceGeneration/renderers/rendererHtml.py index 16c1cdfd..1b3fc952 100644 --- a/modules/serviceCenter/services/serviceGeneration/renderers/rendererHtml.py +++ b/modules/serviceCenter/services/serviceGeneration/renderers/rendererHtml.py @@ -306,7 +306,7 @@ class RendererHtml(BaseRenderer): css_parts.append(f" color: {paraColor};") css_parts.append(f" font-size: {paraSizePt}pt;") css_parts.append(f" line-height: {lineSpacing};") - margins = page.get("marginsPt", {}) + margins = self._normalizeMargins(page.get("marginsPt", {})) if margins: css_parts.append(f" margin: {margins.get('top', 60)}pt {margins.get('right', 60)}pt {margins.get('bottom', 60)}pt {margins.get('left', 60)}pt;") else: diff --git a/modules/serviceCenter/services/serviceGeneration/renderers/rendererPdf.py b/modules/serviceCenter/services/serviceGeneration/renderers/rendererPdf.py index 425da644..a82a4866 100644 --- a/modules/serviceCenter/services/serviceGeneration/renderers/rendererPdf.py +++ b/modules/serviceCenter/services/serviceGeneration/renderers/rendererPdf.py @@ -259,7 +259,7 @@ class RendererPdf(BaseRenderer): # Create PDF document with unified page margins or defaults pageCfg = unifiedStyle["page"] if unifiedStyle else None if pageCfg: - m = pageCfg["marginsPt"] + m = self._normalizeMargins(pageCfg.get("marginsPt", {})) doc = SimpleDocTemplate(buffer, pagesize=A4, rightMargin=m["right"], leftMargin=m["left"], topMargin=m["top"], bottomMargin=m["bottom"]) else: doc = SimpleDocTemplate(buffer, pagesize=A4, rightMargin=72, leftMargin=72, topMargin=72, bottomMargin=18) @@ -1186,7 +1186,7 @@ class RendererPdf(BaseRenderer): # Use page dimensions minus margins with generous safety buffer # A4 = 595.27 x 841.89 pt; frame = page - margins - internal padding _us = getattr(self, '_unifiedStyle', None) or {} - _pageMgn = (_us.get('page') or {}).get('marginsPt') or {} + _pageMgn = self._normalizeMargins((_us.get('page') or {}).get('marginsPt', {})) marginTop = _pageMgn.get('top', 60) marginBottom = _pageMgn.get('bottom', 60) marginLeft = _pageMgn.get('left', 60) diff --git a/requirements.txt b/requirements.txt index 70effc91..ffbed180 100644 --- a/requirements.txt +++ b/requirements.txt @@ -14,6 +14,7 @@ python-jose[cryptography]==3.3.0 # For JWT tokens cryptography>=41.0.0 # For encryption/decryption of configuration values passlib==1.7.4 argon2-cffi>=21.3.0 # Für Passwort-Hashing in gateway_interface.py +pyotp>=2.9.0 # TOTP-basierte MFA (Multi-Factor Authentication) google-auth-oauthlib==1.2.0 # Für Google OAuth google-auth==2.27.0 # Für Google Authentication google-api-python-client==2.170.0 # For Google API integration diff --git a/tests/unit/auth/test_mfaService.py b/tests/unit/auth/test_mfaService.py new file mode 100644 index 00000000..b6f3a1e3 --- /dev/null +++ b/tests/unit/auth/test_mfaService.py @@ -0,0 +1,154 @@ +# Copyright (c) 2025 Patrick Motsch +# All rights reserved. +""" +Unit tests for modules.auth.mfaService. + +Tests TOTP generation, verification, encryption round-trip, and the +three-rule MFA obligation resolver. +""" + +import pytest +from unittest.mock import patch, MagicMock +import pyotp + +from modules.auth.mfaService import ( + _generateSecret, + _buildTotp, + generateSetup, + confirmSetup, + verifyCode, + isMfaRequired, + _isMfaRequireAdminsEnabled, +) + + +class TestTotpBasics: + def test_generateSecret_returns_base32(self): + secret = _generateSecret() + assert isinstance(secret, str) + assert len(secret) >= 16 + + def test_buildTotp_generates_valid_code(self): + secret = _generateSecret() + totp = _buildTotp(secret) + code = totp.now() + assert len(code) == 6 + assert code.isdigit() + + def test_verifyCode_accepts_current_code(self): + secret = _generateSecret() + totp = _buildTotp(secret) + code = totp.now() + encrypted = f"FAKE_ENC:{secret}" + + with patch("modules.auth.mfaService._decryptSecret", return_value=secret): + assert verifyCode(encrypted, code) is True + + def test_verifyCode_rejects_wrong_code(self): + secret = _generateSecret() + encrypted = f"FAKE_ENC:{secret}" + + with patch("modules.auth.mfaService._decryptSecret", return_value=secret): + assert verifyCode(encrypted, "000000") is False + + +class TestGenerateSetup: + @patch("modules.auth.mfaService._encryptSecret", return_value="ENC_SECRET") + def test_returns_uri_and_encrypted_secret(self, _mock_enc): + result = generateSetup(userId="u1", username="testuser") + assert "encryptedSecret" in result + assert "provisioningUri" in result + assert result["encryptedSecret"] == "ENC_SECRET" + assert "otpauth://totp/" in result["provisioningUri"] + assert "PowerOn" in result["provisioningUri"] + + +class TestConfirmSetup: + def test_confirmSetup_with_valid_code(self): + secret = _generateSecret() + totp = _buildTotp(secret) + code = totp.now() + + with patch("modules.auth.mfaService._decryptSecret", return_value=secret): + assert confirmSetup("ENC", code) is True + + def test_confirmSetup_with_invalid_code(self): + secret = _generateSecret() + with patch("modules.auth.mfaService._decryptSecret", return_value=secret): + assert confirmSetup("ENC", "999999") is False + + def test_confirmSetup_handles_decryption_error(self): + with patch("modules.auth.mfaService._decryptSecret", side_effect=Exception("decrypt error")): + assert confirmSetup("BAD_ENC", "123456") is False + + +class TestIsMfaRequired: + def _makeUser(self, mfaEnabled=False, isSysAdmin=False, isPlatformAdmin=False): + u = MagicMock() + u.mfaEnabled = mfaEnabled + u.isSysAdmin = isSysAdmin + u.isPlatformAdmin = isPlatformAdmin + return u + + def _makeMandate(self, mfaRequired=False): + m = MagicMock() + m.mfaRequired = mfaRequired + return m + + def test_mfaEnabled_user_always_required(self): + user = self._makeUser(mfaEnabled=True) + assert isMfaRequired(user) is True + + @patch("modules.auth.mfaService._isMfaRequireAdminsEnabled", return_value=True) + def test_sysadmin_with_config_key(self, _mock): + user = self._makeUser(isSysAdmin=True) + assert isMfaRequired(user) is True + + @patch("modules.auth.mfaService._isMfaRequireAdminsEnabled", return_value=True) + def test_platformadmin_with_config_key(self, _mock): + user = self._makeUser(isPlatformAdmin=True) + assert isMfaRequired(user) is True + + @patch("modules.auth.mfaService._isMfaRequireAdminsEnabled", return_value=False) + def test_admin_without_config_key_not_required(self, _mock): + user = self._makeUser(isSysAdmin=True) + assert isMfaRequired(user) is False + + @patch("modules.auth.mfaService._isMfaRequireAdminsEnabled", return_value=False) + def test_mandate_with_mfaRequired(self, _mock): + user = self._makeUser() + mandate = self._makeMandate(mfaRequired=True) + assert isMfaRequired(user, mandates=[mandate]) is True + + @patch("modules.auth.mfaService._isMfaRequireAdminsEnabled", return_value=False) + def test_mandate_without_mfaRequired(self, _mock): + user = self._makeUser() + mandate = self._makeMandate(mfaRequired=False) + assert isMfaRequired(user, mandates=[mandate]) is False + + @patch("modules.auth.mfaService._isMfaRequireAdminsEnabled", return_value=False) + def test_regular_user_no_mandate_not_required(self, _mock): + user = self._makeUser() + assert isMfaRequired(user) is False + + +class TestConfigKey: + @patch("modules.auth.mfaService.APP_CONFIG") + def test_config_true(self, mock_cfg): + mock_cfg.get.return_value = "true" + assert _isMfaRequireAdminsEnabled() is True + + @patch("modules.auth.mfaService.APP_CONFIG") + def test_config_false(self, mock_cfg): + mock_cfg.get.return_value = "false" + assert _isMfaRequireAdminsEnabled() is False + + @patch("modules.auth.mfaService.APP_CONFIG") + def test_config_empty(self, mock_cfg): + mock_cfg.get.return_value = "" + assert _isMfaRequireAdminsEnabled() is False + + @patch("modules.auth.mfaService.APP_CONFIG") + def test_config_one(self, mock_cfg): + mock_cfg.get.return_value = "1" + assert _isMfaRequireAdminsEnabled() is True -- 2.45.2