From 08fa70e4e06c1c04d96bf343c63463bcebbcb80c Mon Sep 17 00:00:00 2001
From: ValueOn AG
Date: Wed, 3 Jun 2026 23:21:29 +0200
Subject: [PATCH] security and mfa
---
app.py | 3 +
env-dev.env | 3 +
env-int.env | 3 +
env-prod.env | 3 +
modules/auth/mfaService.py | 132 +++++++++
modules/datamodels/datamodelUam.py | 44 +++
modules/interfaces/interfaceDbApp.py | 8 +-
modules/routes/routeMfa.py | 278 ++++++++++++++++++
modules/routes/routeSecurityGoogle.py | 61 +++-
modules/routes/routeSecurityLocal.py | 51 +++-
modules/routes/routeSecurityMsft.py | 61 +++-
.../renderers/documentRendererBaseTemplate.py | 11 +-
.../renderers/rendererHtml.py | 2 +-
.../renderers/rendererPdf.py | 4 +-
requirements.txt | 1 +
tests/unit/auth/test_mfaService.py | 154 ++++++++++
16 files changed, 809 insertions(+), 10 deletions(-)
create mode 100644 modules/auth/mfaService.py
create mode 100644 modules/routes/routeMfa.py
create mode 100644 tests/unit/auth/test_mfaService.py
diff --git a/app.py b/app.py
index ec35a2cf..07bdf98f 100644
--- a/app.py
+++ b/app.py
@@ -659,6 +659,9 @@ app.include_router(tableViewsRouter)
from modules.routes.routeSecurityLocal import router as localRouter
app.include_router(localRouter)
+from modules.routes.routeMfa import router as mfaRouter
+app.include_router(mfaRouter)
+
from modules.routes.routeSecurityMsft import router as msftRouter
app.include_router(msftRouter)
diff --git a/env-dev.env b/env-dev.env
index 748d8c8d..dff07ad7 100644
--- a/env-dev.env
+++ b/env-dev.env
@@ -18,6 +18,9 @@ DB_PORT=5432
APP_JWT_KEY_SECRET = DEV_ENC:Z0FBQUFBQm8xSUpERjlrSktmZHVuQnJ1VVJDdndLaUcxZGJsT2ZlUFRlcFdOZ001RnlzM2FhLWhRV2tjWWFhaWQwQ3hkcUFvbThMcndxSjFpYTdfRV9OZGhTcksxbXFTZWg5MDZvOHpCVXBHcDJYaHlJM0tyNWRZckZsVHpQcmxTZHJoZUs1M3lfU2ljRnJaTmNSQ0w0X085OXI0QW80M2xfQnJqZmZ6VEh3TUltX0xzeE42SGtZPQ==
APP_TOKEN_EXPIRY=300
+# MFA Configuration
+MFA_REQUIRE_ADMINS = False
+
# CORS Configuration
APP_ALLOWED_ORIGINS=http://localhost:8080,http://localhost:5176,https://nyla.poweron.swiss,https://nyla-int.poweron.swiss,https://nyla.poweron-center.net,https://nyla-int.poweron-center.net
diff --git a/env-int.env b/env-int.env
index 5e2a4e77..d7b62f79 100644
--- a/env-int.env
+++ b/env-int.env
@@ -20,6 +20,9 @@ DB_PORT=5432
APP_JWT_KEY_SECRET = INT_ENC:Z0FBQUFBQm8xSVRjNUctb2RwU25iR3ZnanBOdHZhWUtIajZ1RnZzTEp4aDR0MktWRjNoeVBrY1Npd1R0VE9YVHp3M2w1cXRzbUxNaU82QUJvaDNFeVQyN05KblRWblBvbWtoT0VXbkNBbDQ5OHhwSUFnaDZGRG10Vmgtdm1YUkRsYUhFMzRVZURmSFlDTFIzVWg4MXNueDZyMGc5aVpFdWRxY3dkTExGM093ZTVUZVl5LUhGWnlRPQ==
APP_TOKEN_EXPIRY=300
+# MFA Configuration
+MFA_REQUIRE_ADMINS = True
+
# CORS Configuration
APP_ALLOWED_ORIGINS=http://localhost:8080,http://localhost:5176,https://porta.poweron.swiss,https://porta-int.poweron.swiss,https://nyla.poweron.swiss,https://nyla-int.poweron.swiss,https://nyla.poweron-center.net,https://nyla-int.poweron-center.net
diff --git a/env-prod.env b/env-prod.env
index 4395fd5d..7401129d 100644
--- a/env-prod.env
+++ b/env-prod.env
@@ -18,6 +18,9 @@ DB_PORT=5432
APP_JWT_KEY_SECRET = PROD_ENC:Z0FBQUFBQnBDM1Z3elhfV0Rnd2pQRjlMdkVwX1FnSmRhSzNZUlV5SVpaWXBNX1hpa2xPZGdMSWpnN2ZINHQxeGZnNHJweU5pZjlyYlY5Qm9zOUZEbl9wUEgtZHZXd1NhR19JSG9kbFU4MnFGQnllbFhRQVphRGQyNHlFVWR5VHQyUUpqN0stUmRuY2QyTi1oalczRHpLTEJqWURjZWs4YjZvT2U5YnFqcXEwdEpxV05fX05QMmtrPQ==
APP_TOKEN_EXPIRY=300
+# MFA Configuration
+MFA_REQUIRE_ADMINS = True
+
# CORS Configuration
APP_ALLOWED_ORIGINS=http://localhost:8080,http://localhost:5176,https://porta.poweron.swiss,https://porta-int.poweron.swiss,https://nyla.poweron.swiss,https://nyla-int.poweron.swiss,https://nyla.poweron-center.net,https://nyla-int.poweron-center.net
diff --git a/modules/auth/mfaService.py b/modules/auth/mfaService.py
new file mode 100644
index 00000000..7ecd7889
--- /dev/null
+++ b/modules/auth/mfaService.py
@@ -0,0 +1,132 @@
+# Copyright (c) 2025 Patrick Motsch
+# All rights reserved.
+"""
+MFA (Multi-Factor Authentication) Service.
+
+TOTP-based MFA using pyotp. Secrets are encrypted at rest via
+encryptValue/decryptValue from the configuration module.
+
+MFA obligation is resolved by three OR-linked rules:
+ 1. Any mandate the user belongs to has ``mfaRequired=True``.
+ 2. User is sysAdmin OR platformAdmin AND config key ``MFA_REQUIRE_ADMINS``
+ is truthy.
+ 3. User has opted in (``mfaEnabled=True`` without any mandate/admin rule).
+"""
+
+import logging
+from typing import Optional
+
+import pyotp
+
+from modules.shared.configuration import APP_CONFIG, encryptValue, decryptValue
+
+logger = logging.getLogger(__name__)
+
+_MFA_DIGITS = 6
+_MFA_INTERVAL = 30
+_MFA_VALID_WINDOW = 1
+
+
+def _getMfaIssuer() -> str:
+ """Build the TOTP issuer name, e.g. 'PowerOn' or 'PowerOn (Dev)'."""
+ envType = (APP_CONFIG.get("APP_ENV_TYPE") or "").strip().lower()
+ if envType in ("prod", ""):
+ return "PowerOn"
+ return f"PowerOn ({envType.upper()})"
+
+
+def _generateSecret() -> str:
+ """Generate a fresh base32-encoded TOTP secret."""
+ return pyotp.random_base32()
+
+
+def _encryptSecret(plainSecret: str, userId: str = "system") -> str:
+ return encryptValue(plainSecret, userId=userId, keyName="mfa_secret")
+
+
+def _decryptSecret(encryptedSecret: str, userId: str = "system") -> str:
+ return decryptValue(encryptedSecret, userId=userId, keyName="mfa_secret")
+
+
+def _buildTotp(plainSecret: str) -> pyotp.TOTP:
+ return pyotp.TOTP(plainSecret, digits=_MFA_DIGITS, interval=_MFA_INTERVAL)
+
+
+def generateSetup(userId: str, username: str) -> dict:
+ """Start MFA enrolment: return secret + provisioning URI (for QR code).
+
+ Returns dict with keys ``secret`` (encrypted for DB storage) and
+ ``provisioningUri`` (otpauth:// URI the frontend renders as QR).
+ The plaintext secret is NOT returned -- the URI already contains it.
+ """
+ plain = _generateSecret()
+ encrypted = _encryptSecret(plain, userId=userId)
+ totp = _buildTotp(plain)
+ uri = totp.provisioning_uri(name=username, issuer_name=_getMfaIssuer())
+ return {
+ "encryptedSecret": encrypted,
+ "provisioningUri": uri,
+ }
+
+
+def confirmSetup(encryptedSecret: str, code: str, userId: str = "system") -> bool:
+ """Verify a TOTP code against an encrypted secret (enrolment confirmation)."""
+ try:
+ plain = _decryptSecret(encryptedSecret, userId=userId)
+ totp = _buildTotp(plain)
+ return totp.verify(code, valid_window=_MFA_VALID_WINDOW)
+ except Exception:
+ logger.exception("MFA confirmSetup failed for userId=%s", userId)
+ return False
+
+
+def verifyCode(encryptedSecret: str, code: str, userId: str = "system") -> bool:
+ """Verify a TOTP code during login."""
+ try:
+ plain = _decryptSecret(encryptedSecret, userId=userId)
+ totp = _buildTotp(plain)
+ return totp.verify(code, valid_window=_MFA_VALID_WINDOW)
+ except Exception:
+ logger.exception("MFA verifyCode failed for userId=%s", userId)
+ return False
+
+
+def _isMfaRequireAdminsEnabled() -> bool:
+ """Read ``MFA_REQUIRE_ADMINS`` from config / env."""
+ raw = (APP_CONFIG.get("MFA_REQUIRE_ADMINS") or "").strip().lower()
+ return raw in ("1", "true", "yes")
+
+
+def isMfaRequired(user, userMandates=None, mandates=None) -> bool:
+ """Resolve whether MFA is mandatory for *user*.
+
+ Rules (OR):
+ 1. At least one of the user's mandates has ``mfaRequired=True``.
+ 2. User is sysAdmin or platformAdmin AND ``MFA_REQUIRE_ADMINS`` config
+ key is truthy.
+ 3. User already opted in (``mfaEnabled=True``).
+
+ Parameters
+ ----------
+ user : User | UserInDB
+ The user object.
+ userMandates : list | None
+ List of UserMandate records for the user (each has ``mandateId``).
+ mandates : list | None
+ List of Mandate objects the user has access to. If provided directly
+ this avoids a second lookup.
+ """
+ if getattr(user, "mfaEnabled", False):
+ return True
+
+ isSys = getattr(user, "isSysAdmin", False)
+ isPlat = getattr(user, "isPlatformAdmin", False)
+ if (isSys or isPlat) and _isMfaRequireAdminsEnabled():
+ return True
+
+ if mandates:
+ for m in mandates:
+ if getattr(m, "mfaRequired", False):
+ return True
+
+ return False
diff --git a/modules/datamodels/datamodelUam.py b/modules/datamodels/datamodelUam.py
index da074f57..1d95598c 100644
--- a/modules/datamodels/datamodelUam.py
+++ b/modules/datamodels/datamodelUam.py
@@ -197,6 +197,26 @@ class Mandate(PowerOnModel):
# `customer.email`, `customer.tax_id_data` mappen kann
# (Stripe verlangt die Adresse strukturiert, nicht als Freitext).
# ``order`` 200-209 gruppiert die Felder visuell am Ende des Formulars.
+ mfaRequired: bool = Field(
+ default=False,
+ description="When true, all users with access to this mandate must have MFA enabled.",
+ json_schema_extra={
+ "frontend_type": "checkbox",
+ "frontend_readonly": False,
+ "frontend_required": False,
+ "label": "MFA-Pflicht",
+ "frontend_format_labels": ["Ja", "-", "Nein"],
+ "order": 190,
+ },
+ )
+
+ @field_validator("mfaRequired", mode="before")
+ @classmethod
+ def _coerceMfaRequired(cls, v):
+ if v is None:
+ return False
+ return v
+
invoiceCompanyName: Optional[str] = Field(
default=None,
description="Firmenname / Empfaenger der Rechnung (falls abweichend vom Voller Name).",
@@ -623,6 +643,25 @@ class User(PowerOnModel):
return v
+ mfaEnabled: bool = Field(
+ default=False,
+ description="Whether the user has completed MFA setup and has TOTP active.",
+ json_schema_extra={
+ "frontend_type": "checkbox",
+ "frontend_readonly": True,
+ "frontend_required": False,
+ "label": "MFA aktiv",
+ "frontend_format_labels": ["Ja", "-", "Nein"],
+ },
+ )
+
+ @field_validator("mfaEnabled", mode="before")
+ @classmethod
+ def _coerceMfaEnabled(cls, v):
+ if v is None:
+ return False
+ return v
+
authenticationAuthority: AuthAuthority = Field(
default=AuthAuthority.LOCAL,
description="Primary authentication authority",
@@ -655,6 +694,11 @@ class UserInDB(User):
description="Hash of the user password",
json_schema_extra={"label": "Passwort-Hash"},
)
+ mfaSecret: Optional[str] = Field(
+ None,
+ description="Encrypted TOTP secret for MFA. Stored via encryptValue/decryptValue.",
+ json_schema_extra={"label": "MFA-Secret", "frontend_visible": False},
+ )
resetToken: Optional[str] = Field(
None,
description="Password reset token (UUID)",
diff --git a/modules/interfaces/interfaceDbApp.py b/modules/interfaces/interfaceDbApp.py
index ad445e7f..76bdf14d 100644
--- a/modules/interfaces/interfaceDbApp.py
+++ b/modules/interfaces/interfaceDbApp.py
@@ -813,11 +813,13 @@ class AppObjects:
updateDict.pop(field, None)
# Update user data using model
- updatedData = user.model_dump()
+ # Use UserInDB so that DB-only fields (mfaSecret, hashedPassword, etc.)
+ # are preserved through the round-trip instead of being silently dropped.
+ existingRecord = self._getUserForAuthentication(user.username)
+ updatedData = dict(existingRecord) if existingRecord else user.model_dump()
updatedData.update(updateDict)
- # Ensure ID matches userId parameter
updatedData["id"] = userId
- updatedUser = User(**updatedData)
+ updatedUser = UserInDB(**updatedData)
# Update user record
self.db.recordModify(UserInDB, userId, updatedUser)
diff --git a/modules/routes/routeMfa.py b/modules/routes/routeMfa.py
new file mode 100644
index 00000000..551faa37
--- /dev/null
+++ b/modules/routes/routeMfa.py
@@ -0,0 +1,278 @@
+# Copyright (c) 2025 Patrick Motsch
+# All rights reserved.
+"""
+Routes for TOTP-based Multi-Factor Authentication.
+
+Endpoints:
+ GET /api/mfa/status - MFA status for the current user
+ POST /api/mfa/setup - Start MFA enrolment (returns provisioningUri)
+ POST /api/mfa/confirm - Confirm enrolment with first TOTP code
+ POST /api/mfa/verify - Verify TOTP during login (uses mfa_pending temp token)
+ POST /api/mfa/disable - Disable MFA for current user
+"""
+
+from datetime import timedelta
+from typing import Dict, Any
+
+from fastapi import APIRouter, HTTPException, status, Depends, Request, Response, Body
+from jose import jwt, JWTError
+import logging
+import uuid
+
+from modules.auth import getCurrentUser, limiter, SECRET_KEY, ALGORITHM
+from modules.auth import (
+ createAccessToken,
+ createRefreshToken,
+ setAccessTokenCookie,
+ setRefreshTokenCookie,
+)
+from modules.auth.mfaService import (
+ generateSetup,
+ confirmSetup,
+ verifyCode,
+ isMfaRequired,
+)
+from modules.interfaces.interfaceDbApp import getRootInterface
+from modules.datamodels.datamodelUam import User, UserInDB, AuthAuthority, Mandate
+from modules.datamodels.datamodelSecurity import Token, TokenPurpose
+from modules.shared.configuration import APP_CONFIG
+from modules.shared.i18nRegistry import apiRouteContext
+from datetime import datetime
+
+routeApiMsg = apiRouteContext("routeMfa")
+logger = logging.getLogger(__name__)
+
+router = APIRouter(prefix="/api/mfa", tags=["MFA"])
+
+_MFA_PENDING_EXPIRE_MINUTES = 5
+
+
+def _createMfaPendingToken(userId: str, username: str, authority: str, sessionId: str) -> str:
+ """Short-lived JWT that authorises only the /mfa/verify endpoint."""
+ payload = {
+ "sub": username,
+ "userId": userId,
+ "authenticationAuthority": authority,
+ "sid": sessionId,
+ "type": "mfa_pending",
+ "jti": str(uuid.uuid4()),
+ }
+ token, _ = createAccessToken(payload, expiresDelta=timedelta(minutes=_MFA_PENDING_EXPIRE_MINUTES))
+ return token
+
+
+def _decodeMfaPendingToken(token: str) -> dict:
+ """Decode and validate an mfa_pending token. Raises HTTPException on failure."""
+ try:
+ payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
+ except JWTError:
+ raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid or expired MFA token")
+ if payload.get("type") != "mfa_pending":
+ raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token type")
+ return payload
+
+
+@router.get("/status")
+def mfaStatus(currentUser: User = Depends(getCurrentUser)) -> Dict[str, Any]:
+ rootInterface = getRootInterface()
+ userMandates = rootInterface.getUserMandates(str(currentUser.id))
+ mandateIds = [um.mandateId for um in (userMandates or []) if getattr(um, "enabled", True)]
+ mandates = []
+ for mid in mandateIds:
+ try:
+ recs = rootInterface.db.getRecordset(Mandate, recordFilter={"id": mid})
+ if recs:
+ mandates.append(Mandate.model_validate(dict(recs[0])))
+ except Exception:
+ pass
+
+ required = isMfaRequired(currentUser, userMandates=userMandates, mandates=mandates)
+ return {
+ "mfaEnabled": getattr(currentUser, "mfaEnabled", False),
+ "mfaRequired": required,
+ }
+
+
+@router.post("/setup")
+def mfaSetup(currentUser: User = Depends(getCurrentUser)) -> Dict[str, Any]:
+ """Start MFA enrolment. Returns ``provisioningUri`` (otpauth:// for QR)."""
+ if getattr(currentUser, "mfaEnabled", False):
+ raise HTTPException(status_code=400, detail="MFA is already enabled")
+
+ result = generateSetup(userId=str(currentUser.id), username=currentUser.username)
+
+ rootInterface = getRootInterface()
+ rootInterface.updateUser(str(currentUser.id), {"mfaSecret": result["encryptedSecret"]})
+
+ return {"provisioningUri": result["provisioningUri"]}
+
+
+@router.post("/confirm")
+@limiter.limit("10/minute")
+def mfaConfirm(
+ request: Request,
+ code: str = Body(...),
+ token: str = Body(None),
+) -> Dict[str, Any]:
+ """Confirm enrolment by verifying the first TOTP code.
+
+ Accepts either a valid session (cookie/header JWT) OR an ``mfa_pending``
+ token so that the forced-setup flow during login works without a session.
+ """
+ rootInterface = getRootInterface()
+
+ if token:
+ payload = _decodeMfaPendingToken(token)
+ username = payload["sub"]
+ userId = payload["userId"]
+ else:
+ raise HTTPException(status_code=401, detail="MFA token required")
+
+ userRecord = rootInterface._getUserForAuthentication(username)
+ if not userRecord:
+ raise HTTPException(status_code=404, detail="User not found")
+
+ encryptedSecret = userRecord.get("mfaSecret")
+ if not encryptedSecret:
+ logger.warning("MFA confirm: no mfaSecret on user %s", username)
+ raise HTTPException(status_code=400, detail="MFA setup not started")
+
+ logger.info("MFA confirm: verifying code for user %s (secret present=%s, code=%s)", username, bool(encryptedSecret), code)
+ if not confirmSetup(encryptedSecret, code, userId=userId):
+ logger.warning("MFA confirm: TOTP code rejected for user %s", username)
+ raise HTTPException(status_code=400, detail="Invalid TOTP code")
+
+ rootInterface.updateUser(userId, {"mfaEnabled": True})
+ logger.info("MFA confirmed for user %s", username)
+
+ return {"mfaEnabled": True}
+
+
+@router.post("/confirm-authenticated")
+def mfaConfirmAuthenticated(
+ request: Request,
+ code: str = Body(..., embed=True),
+ currentUser: User = Depends(getCurrentUser),
+) -> Dict[str, Any]:
+ """Confirm MFA enrolment for an already authenticated user (Settings page)."""
+ rootInterface = getRootInterface()
+ userRecord = rootInterface._getUserForAuthentication(currentUser.username)
+ if not userRecord:
+ raise HTTPException(status_code=404, detail="User not found")
+
+ encryptedSecret = userRecord.get("mfaSecret")
+ if not encryptedSecret:
+ raise HTTPException(status_code=400, detail="MFA setup not started")
+
+ if not confirmSetup(encryptedSecret, code, userId=str(currentUser.id)):
+ raise HTTPException(status_code=400, detail="Invalid TOTP code")
+
+ rootInterface.updateUser(str(currentUser.id), {"mfaEnabled": True})
+ logger.info("MFA confirmed for user %s", currentUser.username)
+
+ return {"mfaEnabled": True}
+
+
+@router.post("/verify")
+@limiter.limit("10/minute")
+def mfaVerify(
+ request: Request,
+ response: Response,
+ token: str = Body(...),
+ code: str = Body(...),
+) -> Dict[str, Any]:
+ """Verify TOTP code during login. Accepts the ``mfa_pending`` temp token
+ and the 6-digit code. On success, issues real session JWTs (cookies)."""
+ payload = _decodeMfaPendingToken(token)
+ userId = payload["userId"]
+ username = payload["sub"]
+ authority = payload.get("authenticationAuthority", AuthAuthority.LOCAL)
+ sessionId = payload.get("sid", str(uuid.uuid4()))
+
+ rootInterface = getRootInterface()
+ userRecord = rootInterface._getUserForAuthentication(username)
+ if not userRecord:
+ raise HTTPException(status_code=401, detail="User not found")
+
+ encryptedSecret = userRecord.get("mfaSecret")
+ if not encryptedSecret:
+ raise HTTPException(status_code=400, detail="MFA not configured for this user")
+
+ if not verifyCode(encryptedSecret, code, userId=userId):
+ raise HTTPException(status_code=401, detail="Invalid MFA code")
+
+ tokenData = {
+ "sub": username,
+ "userId": userId,
+ "authenticationAuthority": authority,
+ "sid": sessionId,
+ }
+ accessToken, accessExpires = createAccessToken(tokenData)
+ setAccessTokenCookie(response, accessToken)
+
+ refreshToken, _ = createRefreshToken(tokenData)
+ setRefreshTokenCookie(response, refreshToken)
+
+ jti = jwt.decode(accessToken, SECRET_KEY, algorithms=[ALGORITHM]).get("jti")
+
+ from modules.interfaces.interfaceDbApp import getInterface
+ user = User.model_validate(userRecord)
+ userInterface = getInterface(user)
+ dbToken = Token(
+ id=jti,
+ userId=userId,
+ authority=authority,
+ tokenPurpose=TokenPurpose.AUTH_SESSION,
+ tokenAccess=accessToken,
+ tokenType="bearer",
+ expiresAt=accessExpires.timestamp(),
+ sessionId=sessionId,
+ )
+ userInterface.saveAccessToken(dbToken)
+
+ logger.info("MFA verify successful for user %s", username)
+
+ try:
+ from modules.shared.auditLogger import audit_logger
+ audit_logger.logUserAccess(
+ userId=userId,
+ mandateId="system",
+ action="mfa_verify_success",
+ successInfo="totp_verified",
+ ipAddress=request.client.host if request.client else None,
+ userAgent=request.headers.get("user-agent"),
+ success=True,
+ )
+ except Exception:
+ pass
+
+ return {
+ "type": "local_auth_success",
+ "message": "MFA verification successful",
+ "authenticationAuthority": authority if isinstance(authority, str) else authority.value if hasattr(authority, "value") else str(authority),
+ "expires_at": accessExpires.isoformat(),
+ }
+
+
+@router.post("/disable")
+def mfaDisable(
+ currentUser: User = Depends(getCurrentUser),
+ code: str = Body(..., embed=True),
+) -> Dict[str, Any]:
+ """Disable MFA for the current user. Requires a valid TOTP code as confirmation."""
+ rootInterface = getRootInterface()
+ userRecord = rootInterface._getUserForAuthentication(currentUser.username)
+ if not userRecord:
+ raise HTTPException(status_code=404, detail="User not found")
+
+ encryptedSecret = userRecord.get("mfaSecret")
+ if not encryptedSecret or not getattr(currentUser, "mfaEnabled", False):
+ raise HTTPException(status_code=400, detail="MFA is not enabled")
+
+ if not verifyCode(encryptedSecret, code, userId=str(currentUser.id)):
+ raise HTTPException(status_code=400, detail="Invalid TOTP code")
+
+ rootInterface.updateUser(str(currentUser.id), {"mfaEnabled": False, "mfaSecret": None})
+ logger.info("MFA disabled for user %s", currentUser.username)
+
+ return {"mfaEnabled": False}
diff --git a/modules/routes/routeSecurityGoogle.py b/modules/routes/routeSecurityGoogle.py
index 87df681a..853d4067 100644
--- a/modules/routes/routeSecurityGoogle.py
+++ b/modules/routes/routeSecurityGoogle.py
@@ -19,7 +19,7 @@ from jose import JWTError
from modules.shared.configuration import APP_CONFIG
from modules.interfaces.interfaceDbApp import getInterface, getRootInterface
-from modules.datamodels.datamodelUam import AuthAuthority, User, ConnectionStatus, UserConnection
+from modules.datamodels.datamodelUam import AuthAuthority, User, Mandate, ConnectionStatus, UserConnection
from modules.datamodels.datamodelSecurity import Token, TokenPurpose
from modules.auth import getCurrentUser, limiter, SECRET_KEY, ALGORITHM
from modules.auth.oauthConnectTicket import resolve_connect_context
@@ -219,6 +219,65 @@ async def auth_login_callback(
)
isNewUser = True
+ # --- MFA gate --------------------------------------------------------
+ from modules.auth.mfaService import isMfaRequired as _isMfaRequired
+ from modules.routes.routeMfa import _createMfaPendingToken
+
+ userRecord = rootInterface._getUserForAuthentication(user.username)
+ userMandates = rootInterface.getUserMandates(str(user.id))
+ _mandateIds = [um.mandateId for um in (userMandates or []) if getattr(um, "enabled", True)]
+ _mandateObjs = []
+ for _mid in _mandateIds:
+ try:
+ _recs = rootInterface.db.getRecordset(Mandate, recordFilter={"id": _mid})
+ if _recs:
+ _mandateObjs.append(Mandate.model_validate(dict(_recs[0])))
+ except Exception:
+ pass
+
+ mfaRequired = _isMfaRequired(user, userMandates=userMandates, mandates=_mandateObjs)
+ hasMfaSetup = bool(userRecord and userRecord.get("mfaSecret") and getattr(user, "mfaEnabled", False))
+
+ if mfaRequired or hasMfaSetup:
+ import uuid as _uuid
+ _sid = str(_uuid.uuid4())
+ pendingToken = _createMfaPendingToken(
+ userId=str(user.id),
+ username=user.username,
+ authority=AuthAuthority.GOOGLE.value,
+ sessionId=_sid,
+ )
+ if hasMfaSetup:
+ mfaType = "mfa_required"
+ extraFields = ""
+ else:
+ mfaType = "mfa_setup_required"
+ from modules.auth.mfaService import generateSetup as _generateSetup
+ existingSecret = userRecord.get("mfaSecret") if userRecord else None
+ if existingSecret:
+ from modules.auth.mfaService import _decryptSecret, _buildTotp, _getMfaIssuer
+ _plain = _decryptSecret(existingSecret, userId=str(user.id))
+ _uri = _buildTotp(_plain).provisioning_uri(name=user.username, issuer_name=_getMfaIssuer())
+ setupResult = {"provisioningUri": _uri}
+ else:
+ setupResult = _generateSetup(userId=str(user.id), username=user.username)
+ rootInterface.updateUser(str(user.id), {"mfaSecret": setupResult["encryptedSecret"]})
+ extraFields = f", provisioningUri: {json.dumps(setupResult['provisioningUri'])}"
+ return HTMLResponse(
+ content=f"""
+ MFA Required
+ """
+ )
+ # --- end MFA gate -----------------------------------------------------
+
jwt_token_data = {
"sub": user.username,
"userId": str(user.id),
diff --git a/modules/routes/routeSecurityLocal.py b/modules/routes/routeSecurityLocal.py
index 807d5192..c8efa0a8 100644
--- a/modules/routes/routeSecurityLocal.py
+++ b/modules/routes/routeSecurityLocal.py
@@ -253,7 +253,56 @@ def login(
detail=routeApiMsg("Invalid username or password"),
headers={"WWW-Authenticate": "Bearer"},
)
-
+
+ # --- MFA gate --------------------------------------------------------
+ from modules.auth.mfaService import isMfaRequired as _isMfaRequired
+ from modules.routes.routeMfa import _createMfaPendingToken
+
+ userRecord = rootInterface._getUserForAuthentication(user.username)
+ userMandates = rootInterface.getUserMandates(str(user.id))
+ mandateIds = [um.mandateId for um in (userMandates or []) if getattr(um, "enabled", True)]
+ mandateObjs = []
+ for _mid in mandateIds:
+ try:
+ recs = rootInterface.db.getRecordset(Mandate, recordFilter={"id": _mid})
+ if recs:
+ mandateObjs.append(Mandate.model_validate(dict(recs[0])))
+ except Exception:
+ pass
+
+ mfaRequired = _isMfaRequired(user, userMandates=userMandates, mandates=mandateObjs)
+ hasMfaSetup = bool(userRecord and userRecord.get("mfaSecret") and getattr(user, "mfaEnabled", False))
+
+ if mfaRequired or hasMfaSetup:
+ _sid = str(uuid.uuid4())
+ pendingToken = _createMfaPendingToken(
+ userId=str(user.id),
+ username=user.username,
+ authority=AuthAuthority.LOCAL.value,
+ sessionId=_sid,
+ )
+ if hasMfaSetup:
+ return {"type": "mfa_required", "mfaToken": pendingToken}
+ else:
+ from modules.auth.mfaService import generateSetup as _generateSetup
+ existingSecret = userRecord.get("mfaSecret") if userRecord else None
+ if existingSecret:
+ from modules.auth.mfaService import _decryptSecret, _buildTotp
+ _plain = _decryptSecret(existingSecret, userId=str(user.id))
+ _totp = _buildTotp(_plain)
+ from modules.auth.mfaService import _getMfaIssuer
+ _uri = _totp.provisioning_uri(name=user.username, issuer_name=_getMfaIssuer())
+ setupResult = {"provisioningUri": _uri}
+ else:
+ setupResult = _generateSetup(userId=str(user.id), username=user.username)
+ rootInterface.updateUser(str(user.id), {"mfaSecret": setupResult["encryptedSecret"]})
+ return {
+ "type": "mfa_setup_required",
+ "mfaToken": pendingToken,
+ "provisioningUri": setupResult["provisioningUri"],
+ }
+ # --- end MFA gate -----------------------------------------------------
+
# Create token data
# MULTI-TENANT: Token does NOT contain mandateId anymore
# Mandate context is determined per request via X-Mandate-Id header
diff --git a/modules/routes/routeSecurityMsft.py b/modules/routes/routeSecurityMsft.py
index 67a598dd..80e0d19a 100644
--- a/modules/routes/routeSecurityMsft.py
+++ b/modules/routes/routeSecurityMsft.py
@@ -20,7 +20,7 @@ from jose import JWTError
from modules.shared.configuration import APP_CONFIG
from modules.interfaces.interfaceDbApp import getInterface, getRootInterface
-from modules.datamodels.datamodelUam import AuthAuthority, User, ConnectionStatus, UserConnection
+from modules.datamodels.datamodelUam import AuthAuthority, User, Mandate, ConnectionStatus, UserConnection
from modules.datamodels.datamodelSecurity import Token, TokenPurpose
from modules.auth import getCurrentUser, limiter, SECRET_KEY, ALGORITHM
from modules.auth.oauthConnectTicket import resolve_connect_context
@@ -192,6 +192,65 @@ async def auth_login_callback(
addExternalIdentityConnection=False,
)
+ # --- MFA gate --------------------------------------------------------
+ from modules.auth.mfaService import isMfaRequired as _isMfaRequired
+ from modules.routes.routeMfa import _createMfaPendingToken
+
+ userRecord = rootInterface._getUserForAuthentication(user.username)
+ userMandates = rootInterface.getUserMandates(str(user.id))
+ _mandateIds = [um.mandateId for um in (userMandates or []) if getattr(um, "enabled", True)]
+ _mandateObjs = []
+ for _mid in _mandateIds:
+ try:
+ _recs = rootInterface.db.getRecordset(Mandate, recordFilter={"id": _mid})
+ if _recs:
+ _mandateObjs.append(Mandate.model_validate(dict(_recs[0])))
+ except Exception:
+ pass
+
+ mfaRequired = _isMfaRequired(user, userMandates=userMandates, mandates=_mandateObjs)
+ hasMfaSetup = bool(userRecord and userRecord.get("mfaSecret") and getattr(user, "mfaEnabled", False))
+
+ if mfaRequired or hasMfaSetup:
+ import uuid as _uuid
+ _sid = str(_uuid.uuid4())
+ pendingToken = _createMfaPendingToken(
+ userId=str(user.id),
+ username=user.username,
+ authority=AuthAuthority.MSFT.value,
+ sessionId=_sid,
+ )
+ if hasMfaSetup:
+ mfaType = "mfa_required"
+ extraFields = ""
+ else:
+ mfaType = "mfa_setup_required"
+ from modules.auth.mfaService import generateSetup as _generateSetup
+ existingSecret = userRecord.get("mfaSecret") if userRecord else None
+ if existingSecret:
+ from modules.auth.mfaService import _decryptSecret, _buildTotp, _getMfaIssuer
+ _plain = _decryptSecret(existingSecret, userId=str(user.id))
+ _uri = _buildTotp(_plain).provisioning_uri(name=user.username, issuer_name=_getMfaIssuer())
+ setupResult = {"provisioningUri": _uri}
+ else:
+ setupResult = _generateSetup(userId=str(user.id), username=user.username)
+ rootInterface.updateUser(str(user.id), {"mfaSecret": setupResult["encryptedSecret"]})
+ extraFields = f", provisioningUri: {json.dumps(setupResult['provisioningUri'])}"
+ return HTMLResponse(
+ content=f"""
+ MFA Required
+ """
+ )
+ # --- end MFA gate -----------------------------------------------------
+
jwt_token_data = {
"sub": user.username,
"userId": str(user.id),
diff --git a/modules/serviceCenter/services/serviceGeneration/renderers/documentRendererBaseTemplate.py b/modules/serviceCenter/services/serviceGeneration/renderers/documentRendererBaseTemplate.py
index 61eadee7..519a2697 100644
--- a/modules/serviceCenter/services/serviceGeneration/renderers/documentRendererBaseTemplate.py
+++ b/modules/serviceCenter/services/serviceGeneration/renderers/documentRendererBaseTemplate.py
@@ -142,6 +142,15 @@ class BaseRenderer(ABC):
"""
pass
+ @staticmethod
+ def _normalizeMargins(raw) -> Dict[str, int]:
+ """Accept marginsPt as dict or [top, right, bottom, left] list."""
+ if isinstance(raw, dict):
+ return raw
+ if isinstance(raw, (list, tuple)) and len(raw) >= 4:
+ return {"top": raw[0], "right": raw[1], "bottom": raw[2], "left": raw[3]}
+ return {"top": 60, "right": 60, "bottom": 60, "left": 60}
+
def _convertUnifiedStyleToInternal(self, style: Dict[str, Any]) -> Dict[str, Any]:
"""Convert the unified resolvedStyle dict (from styleDefaults) into
the renderer-internal style-set format that all rendering methods already
@@ -263,7 +272,7 @@ class BaseRenderer(ABC):
},
"page": {
"format": page.get("format", "A4"),
- "margins": page.get("marginsPt", {"top": 60, "bottom": 60, "left": 60, "right": 60}),
+ "margins": self._normalizeMargins(page.get("marginsPt", {"top": 60, "bottom": 60, "left": 60, "right": 60})),
"show_page_numbers": page.get("showPageNumbers", True),
"header_height": page.get("headerHeight", 30),
"footer_height": page.get("footerHeight", 30),
diff --git a/modules/serviceCenter/services/serviceGeneration/renderers/rendererHtml.py b/modules/serviceCenter/services/serviceGeneration/renderers/rendererHtml.py
index 16c1cdfd..1b3fc952 100644
--- a/modules/serviceCenter/services/serviceGeneration/renderers/rendererHtml.py
+++ b/modules/serviceCenter/services/serviceGeneration/renderers/rendererHtml.py
@@ -306,7 +306,7 @@ class RendererHtml(BaseRenderer):
css_parts.append(f" color: {paraColor};")
css_parts.append(f" font-size: {paraSizePt}pt;")
css_parts.append(f" line-height: {lineSpacing};")
- margins = page.get("marginsPt", {})
+ margins = self._normalizeMargins(page.get("marginsPt", {}))
if margins:
css_parts.append(f" margin: {margins.get('top', 60)}pt {margins.get('right', 60)}pt {margins.get('bottom', 60)}pt {margins.get('left', 60)}pt;")
else:
diff --git a/modules/serviceCenter/services/serviceGeneration/renderers/rendererPdf.py b/modules/serviceCenter/services/serviceGeneration/renderers/rendererPdf.py
index 425da644..a82a4866 100644
--- a/modules/serviceCenter/services/serviceGeneration/renderers/rendererPdf.py
+++ b/modules/serviceCenter/services/serviceGeneration/renderers/rendererPdf.py
@@ -259,7 +259,7 @@ class RendererPdf(BaseRenderer):
# Create PDF document with unified page margins or defaults
pageCfg = unifiedStyle["page"] if unifiedStyle else None
if pageCfg:
- m = pageCfg["marginsPt"]
+ m = self._normalizeMargins(pageCfg.get("marginsPt", {}))
doc = SimpleDocTemplate(buffer, pagesize=A4, rightMargin=m["right"], leftMargin=m["left"], topMargin=m["top"], bottomMargin=m["bottom"])
else:
doc = SimpleDocTemplate(buffer, pagesize=A4, rightMargin=72, leftMargin=72, topMargin=72, bottomMargin=18)
@@ -1186,7 +1186,7 @@ class RendererPdf(BaseRenderer):
# Use page dimensions minus margins with generous safety buffer
# A4 = 595.27 x 841.89 pt; frame = page - margins - internal padding
_us = getattr(self, '_unifiedStyle', None) or {}
- _pageMgn = (_us.get('page') or {}).get('marginsPt') or {}
+ _pageMgn = self._normalizeMargins((_us.get('page') or {}).get('marginsPt', {}))
marginTop = _pageMgn.get('top', 60)
marginBottom = _pageMgn.get('bottom', 60)
marginLeft = _pageMgn.get('left', 60)
diff --git a/requirements.txt b/requirements.txt
index 70effc91..ffbed180 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -14,6 +14,7 @@ python-jose[cryptography]==3.3.0 # For JWT tokens
cryptography>=41.0.0 # For encryption/decryption of configuration values
passlib==1.7.4
argon2-cffi>=21.3.0 # Für Passwort-Hashing in gateway_interface.py
+pyotp>=2.9.0 # TOTP-basierte MFA (Multi-Factor Authentication)
google-auth-oauthlib==1.2.0 # Für Google OAuth
google-auth==2.27.0 # Für Google Authentication
google-api-python-client==2.170.0 # For Google API integration
diff --git a/tests/unit/auth/test_mfaService.py b/tests/unit/auth/test_mfaService.py
new file mode 100644
index 00000000..b6f3a1e3
--- /dev/null
+++ b/tests/unit/auth/test_mfaService.py
@@ -0,0 +1,154 @@
+# Copyright (c) 2025 Patrick Motsch
+# All rights reserved.
+"""
+Unit tests for modules.auth.mfaService.
+
+Tests TOTP generation, verification, encryption round-trip, and the
+three-rule MFA obligation resolver.
+"""
+
+import pytest
+from unittest.mock import patch, MagicMock
+import pyotp
+
+from modules.auth.mfaService import (
+ _generateSecret,
+ _buildTotp,
+ generateSetup,
+ confirmSetup,
+ verifyCode,
+ isMfaRequired,
+ _isMfaRequireAdminsEnabled,
+)
+
+
+class TestTotpBasics:
+ def test_generateSecret_returns_base32(self):
+ secret = _generateSecret()
+ assert isinstance(secret, str)
+ assert len(secret) >= 16
+
+ def test_buildTotp_generates_valid_code(self):
+ secret = _generateSecret()
+ totp = _buildTotp(secret)
+ code = totp.now()
+ assert len(code) == 6
+ assert code.isdigit()
+
+ def test_verifyCode_accepts_current_code(self):
+ secret = _generateSecret()
+ totp = _buildTotp(secret)
+ code = totp.now()
+ encrypted = f"FAKE_ENC:{secret}"
+
+ with patch("modules.auth.mfaService._decryptSecret", return_value=secret):
+ assert verifyCode(encrypted, code) is True
+
+ def test_verifyCode_rejects_wrong_code(self):
+ secret = _generateSecret()
+ encrypted = f"FAKE_ENC:{secret}"
+
+ with patch("modules.auth.mfaService._decryptSecret", return_value=secret):
+ assert verifyCode(encrypted, "000000") is False
+
+
+class TestGenerateSetup:
+ @patch("modules.auth.mfaService._encryptSecret", return_value="ENC_SECRET")
+ def test_returns_uri_and_encrypted_secret(self, _mock_enc):
+ result = generateSetup(userId="u1", username="testuser")
+ assert "encryptedSecret" in result
+ assert "provisioningUri" in result
+ assert result["encryptedSecret"] == "ENC_SECRET"
+ assert "otpauth://totp/" in result["provisioningUri"]
+ assert "PowerOn" in result["provisioningUri"]
+
+
+class TestConfirmSetup:
+ def test_confirmSetup_with_valid_code(self):
+ secret = _generateSecret()
+ totp = _buildTotp(secret)
+ code = totp.now()
+
+ with patch("modules.auth.mfaService._decryptSecret", return_value=secret):
+ assert confirmSetup("ENC", code) is True
+
+ def test_confirmSetup_with_invalid_code(self):
+ secret = _generateSecret()
+ with patch("modules.auth.mfaService._decryptSecret", return_value=secret):
+ assert confirmSetup("ENC", "999999") is False
+
+ def test_confirmSetup_handles_decryption_error(self):
+ with patch("modules.auth.mfaService._decryptSecret", side_effect=Exception("decrypt error")):
+ assert confirmSetup("BAD_ENC", "123456") is False
+
+
+class TestIsMfaRequired:
+ def _makeUser(self, mfaEnabled=False, isSysAdmin=False, isPlatformAdmin=False):
+ u = MagicMock()
+ u.mfaEnabled = mfaEnabled
+ u.isSysAdmin = isSysAdmin
+ u.isPlatformAdmin = isPlatformAdmin
+ return u
+
+ def _makeMandate(self, mfaRequired=False):
+ m = MagicMock()
+ m.mfaRequired = mfaRequired
+ return m
+
+ def test_mfaEnabled_user_always_required(self):
+ user = self._makeUser(mfaEnabled=True)
+ assert isMfaRequired(user) is True
+
+ @patch("modules.auth.mfaService._isMfaRequireAdminsEnabled", return_value=True)
+ def test_sysadmin_with_config_key(self, _mock):
+ user = self._makeUser(isSysAdmin=True)
+ assert isMfaRequired(user) is True
+
+ @patch("modules.auth.mfaService._isMfaRequireAdminsEnabled", return_value=True)
+ def test_platformadmin_with_config_key(self, _mock):
+ user = self._makeUser(isPlatformAdmin=True)
+ assert isMfaRequired(user) is True
+
+ @patch("modules.auth.mfaService._isMfaRequireAdminsEnabled", return_value=False)
+ def test_admin_without_config_key_not_required(self, _mock):
+ user = self._makeUser(isSysAdmin=True)
+ assert isMfaRequired(user) is False
+
+ @patch("modules.auth.mfaService._isMfaRequireAdminsEnabled", return_value=False)
+ def test_mandate_with_mfaRequired(self, _mock):
+ user = self._makeUser()
+ mandate = self._makeMandate(mfaRequired=True)
+ assert isMfaRequired(user, mandates=[mandate]) is True
+
+ @patch("modules.auth.mfaService._isMfaRequireAdminsEnabled", return_value=False)
+ def test_mandate_without_mfaRequired(self, _mock):
+ user = self._makeUser()
+ mandate = self._makeMandate(mfaRequired=False)
+ assert isMfaRequired(user, mandates=[mandate]) is False
+
+ @patch("modules.auth.mfaService._isMfaRequireAdminsEnabled", return_value=False)
+ def test_regular_user_no_mandate_not_required(self, _mock):
+ user = self._makeUser()
+ assert isMfaRequired(user) is False
+
+
+class TestConfigKey:
+ @patch("modules.auth.mfaService.APP_CONFIG")
+ def test_config_true(self, mock_cfg):
+ mock_cfg.get.return_value = "true"
+ assert _isMfaRequireAdminsEnabled() is True
+
+ @patch("modules.auth.mfaService.APP_CONFIG")
+ def test_config_false(self, mock_cfg):
+ mock_cfg.get.return_value = "false"
+ assert _isMfaRequireAdminsEnabled() is False
+
+ @patch("modules.auth.mfaService.APP_CONFIG")
+ def test_config_empty(self, mock_cfg):
+ mock_cfg.get.return_value = ""
+ assert _isMfaRequireAdminsEnabled() is False
+
+ @patch("modules.auth.mfaService.APP_CONFIG")
+ def test_config_one(self, mock_cfg):
+ mock_cfg.get.return_value = "1"
+ assert _isMfaRequireAdminsEnabled() is True
--
2.45.2