security and mfa #6

Merged
p.motsch merged 1 commit from int into main 2026-06-03 21:25:26 +00:00
16 changed files with 809 additions and 10 deletions

3
app.py
View file

@ -659,6 +659,9 @@ app.include_router(tableViewsRouter)
from modules.routes.routeSecurityLocal import router as localRouter
app.include_router(localRouter)
from modules.routes.routeMfa import router as mfaRouter
app.include_router(mfaRouter)
from modules.routes.routeSecurityMsft import router as msftRouter
app.include_router(msftRouter)

View file

@ -18,6 +18,9 @@ DB_PORT=5432
APP_JWT_KEY_SECRET = DEV_ENC:Z0FBQUFBQm8xSUpERjlrSktmZHVuQnJ1VVJDdndLaUcxZGJsT2ZlUFRlcFdOZ001RnlzM2FhLWhRV2tjWWFhaWQwQ3hkcUFvbThMcndxSjFpYTdfRV9OZGhTcksxbXFTZWg5MDZvOHpCVXBHcDJYaHlJM0tyNWRZckZsVHpQcmxTZHJoZUs1M3lfU2ljRnJaTmNSQ0w0X085OXI0QW80M2xfQnJqZmZ6VEh3TUltX0xzeE42SGtZPQ==
APP_TOKEN_EXPIRY=300
# MFA Configuration
MFA_REQUIRE_ADMINS = False
# CORS Configuration
APP_ALLOWED_ORIGINS=http://localhost:8080,http://localhost:5176,https://nyla.poweron.swiss,https://nyla-int.poweron.swiss,https://nyla.poweron-center.net,https://nyla-int.poweron-center.net

View file

@ -20,6 +20,9 @@ DB_PORT=5432
APP_JWT_KEY_SECRET = INT_ENC:Z0FBQUFBQm8xSVRjNUctb2RwU25iR3ZnanBOdHZhWUtIajZ1RnZzTEp4aDR0MktWRjNoeVBrY1Npd1R0VE9YVHp3M2w1cXRzbUxNaU82QUJvaDNFeVQyN05KblRWblBvbWtoT0VXbkNBbDQ5OHhwSUFnaDZGRG10Vmgtdm1YUkRsYUhFMzRVZURmSFlDTFIzVWg4MXNueDZyMGc5aVpFdWRxY3dkTExGM093ZTVUZVl5LUhGWnlRPQ==
APP_TOKEN_EXPIRY=300
# MFA Configuration
MFA_REQUIRE_ADMINS = True
# CORS Configuration
APP_ALLOWED_ORIGINS=http://localhost:8080,http://localhost:5176,https://porta.poweron.swiss,https://porta-int.poweron.swiss,https://nyla.poweron.swiss,https://nyla-int.poweron.swiss,https://nyla.poweron-center.net,https://nyla-int.poweron-center.net

View file

@ -18,6 +18,9 @@ DB_PORT=5432
APP_JWT_KEY_SECRET = PROD_ENC:Z0FBQUFBQnBDM1Z3elhfV0Rnd2pQRjlMdkVwX1FnSmRhSzNZUlV5SVpaWXBNX1hpa2xPZGdMSWpnN2ZINHQxeGZnNHJweU5pZjlyYlY5Qm9zOUZEbl9wUEgtZHZXd1NhR19JSG9kbFU4MnFGQnllbFhRQVphRGQyNHlFVWR5VHQyUUpqN0stUmRuY2QyTi1oalczRHpLTEJqWURjZWs4YjZvT2U5YnFqcXEwdEpxV05fX05QMmtrPQ==
APP_TOKEN_EXPIRY=300
# MFA Configuration
MFA_REQUIRE_ADMINS = True
# CORS Configuration
APP_ALLOWED_ORIGINS=http://localhost:8080,http://localhost:5176,https://porta.poweron.swiss,https://porta-int.poweron.swiss,https://nyla.poweron.swiss,https://nyla-int.poweron.swiss,https://nyla.poweron-center.net,https://nyla-int.poweron-center.net

132
modules/auth/mfaService.py Normal file
View file

@ -0,0 +1,132 @@
# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
MFA (Multi-Factor Authentication) Service.
TOTP-based MFA using pyotp. Secrets are encrypted at rest via
encryptValue/decryptValue from the configuration module.
MFA obligation is resolved by three OR-linked rules:
1. Any mandate the user belongs to has ``mfaRequired=True``.
2. User is sysAdmin OR platformAdmin AND config key ``MFA_REQUIRE_ADMINS``
is truthy.
3. User has opted in (``mfaEnabled=True`` without any mandate/admin rule).
"""
import logging
from typing import Optional
import pyotp
from modules.shared.configuration import APP_CONFIG, encryptValue, decryptValue
logger = logging.getLogger(__name__)
_MFA_DIGITS = 6
_MFA_INTERVAL = 30
_MFA_VALID_WINDOW = 1
def _getMfaIssuer() -> str:
"""Build the TOTP issuer name, e.g. 'PowerOn' or 'PowerOn (Dev)'."""
envType = (APP_CONFIG.get("APP_ENV_TYPE") or "").strip().lower()
if envType in ("prod", ""):
return "PowerOn"
return f"PowerOn ({envType.upper()})"
def _generateSecret() -> str:
"""Generate a fresh base32-encoded TOTP secret."""
return pyotp.random_base32()
def _encryptSecret(plainSecret: str, userId: str = "system") -> str:
return encryptValue(plainSecret, userId=userId, keyName="mfa_secret")
def _decryptSecret(encryptedSecret: str, userId: str = "system") -> str:
return decryptValue(encryptedSecret, userId=userId, keyName="mfa_secret")
def _buildTotp(plainSecret: str) -> pyotp.TOTP:
return pyotp.TOTP(plainSecret, digits=_MFA_DIGITS, interval=_MFA_INTERVAL)
def generateSetup(userId: str, username: str) -> dict:
"""Start MFA enrolment: return secret + provisioning URI (for QR code).
Returns dict with keys ``secret`` (encrypted for DB storage) and
``provisioningUri`` (otpauth:// URI the frontend renders as QR).
The plaintext secret is NOT returned -- the URI already contains it.
"""
plain = _generateSecret()
encrypted = _encryptSecret(plain, userId=userId)
totp = _buildTotp(plain)
uri = totp.provisioning_uri(name=username, issuer_name=_getMfaIssuer())
return {
"encryptedSecret": encrypted,
"provisioningUri": uri,
}
def confirmSetup(encryptedSecret: str, code: str, userId: str = "system") -> bool:
"""Verify a TOTP code against an encrypted secret (enrolment confirmation)."""
try:
plain = _decryptSecret(encryptedSecret, userId=userId)
totp = _buildTotp(plain)
return totp.verify(code, valid_window=_MFA_VALID_WINDOW)
except Exception:
logger.exception("MFA confirmSetup failed for userId=%s", userId)
return False
def verifyCode(encryptedSecret: str, code: str, userId: str = "system") -> bool:
"""Verify a TOTP code during login."""
try:
plain = _decryptSecret(encryptedSecret, userId=userId)
totp = _buildTotp(plain)
return totp.verify(code, valid_window=_MFA_VALID_WINDOW)
except Exception:
logger.exception("MFA verifyCode failed for userId=%s", userId)
return False
def _isMfaRequireAdminsEnabled() -> bool:
"""Read ``MFA_REQUIRE_ADMINS`` from config / env."""
raw = (APP_CONFIG.get("MFA_REQUIRE_ADMINS") or "").strip().lower()
return raw in ("1", "true", "yes")
def isMfaRequired(user, userMandates=None, mandates=None) -> bool:
"""Resolve whether MFA is mandatory for *user*.
Rules (OR):
1. At least one of the user's mandates has ``mfaRequired=True``.
2. User is sysAdmin or platformAdmin AND ``MFA_REQUIRE_ADMINS`` config
key is truthy.
3. User already opted in (``mfaEnabled=True``).
Parameters
----------
user : User | UserInDB
The user object.
userMandates : list | None
List of UserMandate records for the user (each has ``mandateId``).
mandates : list | None
List of Mandate objects the user has access to. If provided directly
this avoids a second lookup.
"""
if getattr(user, "mfaEnabled", False):
return True
isSys = getattr(user, "isSysAdmin", False)
isPlat = getattr(user, "isPlatformAdmin", False)
if (isSys or isPlat) and _isMfaRequireAdminsEnabled():
return True
if mandates:
for m in mandates:
if getattr(m, "mfaRequired", False):
return True
return False

View file

@ -197,6 +197,26 @@ class Mandate(PowerOnModel):
# `customer.email`, `customer.tax_id_data` mappen kann
# (Stripe verlangt die Adresse strukturiert, nicht als Freitext).
# ``order`` 200-209 gruppiert die Felder visuell am Ende des Formulars.
mfaRequired: bool = Field(
default=False,
description="When true, all users with access to this mandate must have MFA enabled.",
json_schema_extra={
"frontend_type": "checkbox",
"frontend_readonly": False,
"frontend_required": False,
"label": "MFA-Pflicht",
"frontend_format_labels": ["Ja", "-", "Nein"],
"order": 190,
},
)
@field_validator("mfaRequired", mode="before")
@classmethod
def _coerceMfaRequired(cls, v):
if v is None:
return False
return v
invoiceCompanyName: Optional[str] = Field(
default=None,
description="Firmenname / Empfaenger der Rechnung (falls abweichend vom Voller Name).",
@ -623,6 +643,25 @@ class User(PowerOnModel):
return v
mfaEnabled: bool = Field(
default=False,
description="Whether the user has completed MFA setup and has TOTP active.",
json_schema_extra={
"frontend_type": "checkbox",
"frontend_readonly": True,
"frontend_required": False,
"label": "MFA aktiv",
"frontend_format_labels": ["Ja", "-", "Nein"],
},
)
@field_validator("mfaEnabled", mode="before")
@classmethod
def _coerceMfaEnabled(cls, v):
if v is None:
return False
return v
authenticationAuthority: AuthAuthority = Field(
default=AuthAuthority.LOCAL,
description="Primary authentication authority",
@ -655,6 +694,11 @@ class UserInDB(User):
description="Hash of the user password",
json_schema_extra={"label": "Passwort-Hash"},
)
mfaSecret: Optional[str] = Field(
None,
description="Encrypted TOTP secret for MFA. Stored via encryptValue/decryptValue.",
json_schema_extra={"label": "MFA-Secret", "frontend_visible": False},
)
resetToken: Optional[str] = Field(
None,
description="Password reset token (UUID)",

View file

@ -813,11 +813,13 @@ class AppObjects:
updateDict.pop(field, None)
# Update user data using model
updatedData = user.model_dump()
# Use UserInDB so that DB-only fields (mfaSecret, hashedPassword, etc.)
# are preserved through the round-trip instead of being silently dropped.
existingRecord = self._getUserForAuthentication(user.username)
updatedData = dict(existingRecord) if existingRecord else user.model_dump()
updatedData.update(updateDict)
# Ensure ID matches userId parameter
updatedData["id"] = userId
updatedUser = User(**updatedData)
updatedUser = UserInDB(**updatedData)
# Update user record
self.db.recordModify(UserInDB, userId, updatedUser)

278
modules/routes/routeMfa.py Normal file
View file

@ -0,0 +1,278 @@
# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Routes for TOTP-based Multi-Factor Authentication.
Endpoints:
GET /api/mfa/status - MFA status for the current user
POST /api/mfa/setup - Start MFA enrolment (returns provisioningUri)
POST /api/mfa/confirm - Confirm enrolment with first TOTP code
POST /api/mfa/verify - Verify TOTP during login (uses mfa_pending temp token)
POST /api/mfa/disable - Disable MFA for current user
"""
from datetime import timedelta
from typing import Dict, Any
from fastapi import APIRouter, HTTPException, status, Depends, Request, Response, Body
from jose import jwt, JWTError
import logging
import uuid
from modules.auth import getCurrentUser, limiter, SECRET_KEY, ALGORITHM
from modules.auth import (
createAccessToken,
createRefreshToken,
setAccessTokenCookie,
setRefreshTokenCookie,
)
from modules.auth.mfaService import (
generateSetup,
confirmSetup,
verifyCode,
isMfaRequired,
)
from modules.interfaces.interfaceDbApp import getRootInterface
from modules.datamodels.datamodelUam import User, UserInDB, AuthAuthority, Mandate
from modules.datamodels.datamodelSecurity import Token, TokenPurpose
from modules.shared.configuration import APP_CONFIG
from modules.shared.i18nRegistry import apiRouteContext
from datetime import datetime
routeApiMsg = apiRouteContext("routeMfa")
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/mfa", tags=["MFA"])
_MFA_PENDING_EXPIRE_MINUTES = 5
def _createMfaPendingToken(userId: str, username: str, authority: str, sessionId: str) -> str:
"""Short-lived JWT that authorises only the /mfa/verify endpoint."""
payload = {
"sub": username,
"userId": userId,
"authenticationAuthority": authority,
"sid": sessionId,
"type": "mfa_pending",
"jti": str(uuid.uuid4()),
}
token, _ = createAccessToken(payload, expiresDelta=timedelta(minutes=_MFA_PENDING_EXPIRE_MINUTES))
return token
def _decodeMfaPendingToken(token: str) -> dict:
"""Decode and validate an mfa_pending token. Raises HTTPException on failure."""
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
except JWTError:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid or expired MFA token")
if payload.get("type") != "mfa_pending":
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token type")
return payload
@router.get("/status")
def mfaStatus(currentUser: User = Depends(getCurrentUser)) -> Dict[str, Any]:
rootInterface = getRootInterface()
userMandates = rootInterface.getUserMandates(str(currentUser.id))
mandateIds = [um.mandateId for um in (userMandates or []) if getattr(um, "enabled", True)]
mandates = []
for mid in mandateIds:
try:
recs = rootInterface.db.getRecordset(Mandate, recordFilter={"id": mid})
if recs:
mandates.append(Mandate.model_validate(dict(recs[0])))
except Exception:
pass
required = isMfaRequired(currentUser, userMandates=userMandates, mandates=mandates)
return {
"mfaEnabled": getattr(currentUser, "mfaEnabled", False),
"mfaRequired": required,
}
@router.post("/setup")
def mfaSetup(currentUser: User = Depends(getCurrentUser)) -> Dict[str, Any]:
"""Start MFA enrolment. Returns ``provisioningUri`` (otpauth:// for QR)."""
if getattr(currentUser, "mfaEnabled", False):
raise HTTPException(status_code=400, detail="MFA is already enabled")
result = generateSetup(userId=str(currentUser.id), username=currentUser.username)
rootInterface = getRootInterface()
rootInterface.updateUser(str(currentUser.id), {"mfaSecret": result["encryptedSecret"]})
return {"provisioningUri": result["provisioningUri"]}
@router.post("/confirm")
@limiter.limit("10/minute")
def mfaConfirm(
request: Request,
code: str = Body(...),
token: str = Body(None),
) -> Dict[str, Any]:
"""Confirm enrolment by verifying the first TOTP code.
Accepts either a valid session (cookie/header JWT) OR an ``mfa_pending``
token so that the forced-setup flow during login works without a session.
"""
rootInterface = getRootInterface()
if token:
payload = _decodeMfaPendingToken(token)
username = payload["sub"]
userId = payload["userId"]
else:
raise HTTPException(status_code=401, detail="MFA token required")
userRecord = rootInterface._getUserForAuthentication(username)
if not userRecord:
raise HTTPException(status_code=404, detail="User not found")
encryptedSecret = userRecord.get("mfaSecret")
if not encryptedSecret:
logger.warning("MFA confirm: no mfaSecret on user %s", username)
raise HTTPException(status_code=400, detail="MFA setup not started")
logger.info("MFA confirm: verifying code for user %s (secret present=%s, code=%s)", username, bool(encryptedSecret), code)
if not confirmSetup(encryptedSecret, code, userId=userId):
logger.warning("MFA confirm: TOTP code rejected for user %s", username)
raise HTTPException(status_code=400, detail="Invalid TOTP code")
rootInterface.updateUser(userId, {"mfaEnabled": True})
logger.info("MFA confirmed for user %s", username)
return {"mfaEnabled": True}
@router.post("/confirm-authenticated")
def mfaConfirmAuthenticated(
request: Request,
code: str = Body(..., embed=True),
currentUser: User = Depends(getCurrentUser),
) -> Dict[str, Any]:
"""Confirm MFA enrolment for an already authenticated user (Settings page)."""
rootInterface = getRootInterface()
userRecord = rootInterface._getUserForAuthentication(currentUser.username)
if not userRecord:
raise HTTPException(status_code=404, detail="User not found")
encryptedSecret = userRecord.get("mfaSecret")
if not encryptedSecret:
raise HTTPException(status_code=400, detail="MFA setup not started")
if not confirmSetup(encryptedSecret, code, userId=str(currentUser.id)):
raise HTTPException(status_code=400, detail="Invalid TOTP code")
rootInterface.updateUser(str(currentUser.id), {"mfaEnabled": True})
logger.info("MFA confirmed for user %s", currentUser.username)
return {"mfaEnabled": True}
@router.post("/verify")
@limiter.limit("10/minute")
def mfaVerify(
request: Request,
response: Response,
token: str = Body(...),
code: str = Body(...),
) -> Dict[str, Any]:
"""Verify TOTP code during login. Accepts the ``mfa_pending`` temp token
and the 6-digit code. On success, issues real session JWTs (cookies)."""
payload = _decodeMfaPendingToken(token)
userId = payload["userId"]
username = payload["sub"]
authority = payload.get("authenticationAuthority", AuthAuthority.LOCAL)
sessionId = payload.get("sid", str(uuid.uuid4()))
rootInterface = getRootInterface()
userRecord = rootInterface._getUserForAuthentication(username)
if not userRecord:
raise HTTPException(status_code=401, detail="User not found")
encryptedSecret = userRecord.get("mfaSecret")
if not encryptedSecret:
raise HTTPException(status_code=400, detail="MFA not configured for this user")
if not verifyCode(encryptedSecret, code, userId=userId):
raise HTTPException(status_code=401, detail="Invalid MFA code")
tokenData = {
"sub": username,
"userId": userId,
"authenticationAuthority": authority,
"sid": sessionId,
}
accessToken, accessExpires = createAccessToken(tokenData)
setAccessTokenCookie(response, accessToken)
refreshToken, _ = createRefreshToken(tokenData)
setRefreshTokenCookie(response, refreshToken)
jti = jwt.decode(accessToken, SECRET_KEY, algorithms=[ALGORITHM]).get("jti")
from modules.interfaces.interfaceDbApp import getInterface
user = User.model_validate(userRecord)
userInterface = getInterface(user)
dbToken = Token(
id=jti,
userId=userId,
authority=authority,
tokenPurpose=TokenPurpose.AUTH_SESSION,
tokenAccess=accessToken,
tokenType="bearer",
expiresAt=accessExpires.timestamp(),
sessionId=sessionId,
)
userInterface.saveAccessToken(dbToken)
logger.info("MFA verify successful for user %s", username)
try:
from modules.shared.auditLogger import audit_logger
audit_logger.logUserAccess(
userId=userId,
mandateId="system",
action="mfa_verify_success",
successInfo="totp_verified",
ipAddress=request.client.host if request.client else None,
userAgent=request.headers.get("user-agent"),
success=True,
)
except Exception:
pass
return {
"type": "local_auth_success",
"message": "MFA verification successful",
"authenticationAuthority": authority if isinstance(authority, str) else authority.value if hasattr(authority, "value") else str(authority),
"expires_at": accessExpires.isoformat(),
}
@router.post("/disable")
def mfaDisable(
currentUser: User = Depends(getCurrentUser),
code: str = Body(..., embed=True),
) -> Dict[str, Any]:
"""Disable MFA for the current user. Requires a valid TOTP code as confirmation."""
rootInterface = getRootInterface()
userRecord = rootInterface._getUserForAuthentication(currentUser.username)
if not userRecord:
raise HTTPException(status_code=404, detail="User not found")
encryptedSecret = userRecord.get("mfaSecret")
if not encryptedSecret or not getattr(currentUser, "mfaEnabled", False):
raise HTTPException(status_code=400, detail="MFA is not enabled")
if not verifyCode(encryptedSecret, code, userId=str(currentUser.id)):
raise HTTPException(status_code=400, detail="Invalid TOTP code")
rootInterface.updateUser(str(currentUser.id), {"mfaEnabled": False, "mfaSecret": None})
logger.info("MFA disabled for user %s", currentUser.username)
return {"mfaEnabled": False}

View file

@ -19,7 +19,7 @@ from jose import JWTError
from modules.shared.configuration import APP_CONFIG
from modules.interfaces.interfaceDbApp import getInterface, getRootInterface
from modules.datamodels.datamodelUam import AuthAuthority, User, ConnectionStatus, UserConnection
from modules.datamodels.datamodelUam import AuthAuthority, User, Mandate, ConnectionStatus, UserConnection
from modules.datamodels.datamodelSecurity import Token, TokenPurpose
from modules.auth import getCurrentUser, limiter, SECRET_KEY, ALGORITHM
from modules.auth.oauthConnectTicket import resolve_connect_context
@ -219,6 +219,65 @@ async def auth_login_callback(
)
isNewUser = True
# --- MFA gate --------------------------------------------------------
from modules.auth.mfaService import isMfaRequired as _isMfaRequired
from modules.routes.routeMfa import _createMfaPendingToken
userRecord = rootInterface._getUserForAuthentication(user.username)
userMandates = rootInterface.getUserMandates(str(user.id))
_mandateIds = [um.mandateId for um in (userMandates or []) if getattr(um, "enabled", True)]
_mandateObjs = []
for _mid in _mandateIds:
try:
_recs = rootInterface.db.getRecordset(Mandate, recordFilter={"id": _mid})
if _recs:
_mandateObjs.append(Mandate.model_validate(dict(_recs[0])))
except Exception:
pass
mfaRequired = _isMfaRequired(user, userMandates=userMandates, mandates=_mandateObjs)
hasMfaSetup = bool(userRecord and userRecord.get("mfaSecret") and getattr(user, "mfaEnabled", False))
if mfaRequired or hasMfaSetup:
import uuid as _uuid
_sid = str(_uuid.uuid4())
pendingToken = _createMfaPendingToken(
userId=str(user.id),
username=user.username,
authority=AuthAuthority.GOOGLE.value,
sessionId=_sid,
)
if hasMfaSetup:
mfaType = "mfa_required"
extraFields = ""
else:
mfaType = "mfa_setup_required"
from modules.auth.mfaService import generateSetup as _generateSetup
existingSecret = userRecord.get("mfaSecret") if userRecord else None
if existingSecret:
from modules.auth.mfaService import _decryptSecret, _buildTotp, _getMfaIssuer
_plain = _decryptSecret(existingSecret, userId=str(user.id))
_uri = _buildTotp(_plain).provisioning_uri(name=user.username, issuer_name=_getMfaIssuer())
setupResult = {"provisioningUri": _uri}
else:
setupResult = _generateSetup(userId=str(user.id), username=user.username)
rootInterface.updateUser(str(user.id), {"mfaSecret": setupResult["encryptedSecret"]})
extraFields = f", provisioningUri: {json.dumps(setupResult['provisioningUri'])}"
return HTMLResponse(
content=f"""
<html><head><title>MFA Required</title></head><body><script>
if (window.opener) {{
window.opener.postMessage({{
type: '{mfaType}',
mfaToken: {json.dumps(pendingToken)}{extraFields}
}}, '*');
}}
setTimeout(() => window.close(), 1000);
</script></body></html>
"""
)
# --- end MFA gate -----------------------------------------------------
jwt_token_data = {
"sub": user.username,
"userId": str(user.id),

View file

@ -253,7 +253,56 @@ def login(
detail=routeApiMsg("Invalid username or password"),
headers={"WWW-Authenticate": "Bearer"},
)
# --- MFA gate --------------------------------------------------------
from modules.auth.mfaService import isMfaRequired as _isMfaRequired
from modules.routes.routeMfa import _createMfaPendingToken
userRecord = rootInterface._getUserForAuthentication(user.username)
userMandates = rootInterface.getUserMandates(str(user.id))
mandateIds = [um.mandateId for um in (userMandates or []) if getattr(um, "enabled", True)]
mandateObjs = []
for _mid in mandateIds:
try:
recs = rootInterface.db.getRecordset(Mandate, recordFilter={"id": _mid})
if recs:
mandateObjs.append(Mandate.model_validate(dict(recs[0])))
except Exception:
pass
mfaRequired = _isMfaRequired(user, userMandates=userMandates, mandates=mandateObjs)
hasMfaSetup = bool(userRecord and userRecord.get("mfaSecret") and getattr(user, "mfaEnabled", False))
if mfaRequired or hasMfaSetup:
_sid = str(uuid.uuid4())
pendingToken = _createMfaPendingToken(
userId=str(user.id),
username=user.username,
authority=AuthAuthority.LOCAL.value,
sessionId=_sid,
)
if hasMfaSetup:
return {"type": "mfa_required", "mfaToken": pendingToken}
else:
from modules.auth.mfaService import generateSetup as _generateSetup
existingSecret = userRecord.get("mfaSecret") if userRecord else None
if existingSecret:
from modules.auth.mfaService import _decryptSecret, _buildTotp
_plain = _decryptSecret(existingSecret, userId=str(user.id))
_totp = _buildTotp(_plain)
from modules.auth.mfaService import _getMfaIssuer
_uri = _totp.provisioning_uri(name=user.username, issuer_name=_getMfaIssuer())
setupResult = {"provisioningUri": _uri}
else:
setupResult = _generateSetup(userId=str(user.id), username=user.username)
rootInterface.updateUser(str(user.id), {"mfaSecret": setupResult["encryptedSecret"]})
return {
"type": "mfa_setup_required",
"mfaToken": pendingToken,
"provisioningUri": setupResult["provisioningUri"],
}
# --- end MFA gate -----------------------------------------------------
# Create token data
# MULTI-TENANT: Token does NOT contain mandateId anymore
# Mandate context is determined per request via X-Mandate-Id header

View file

@ -20,7 +20,7 @@ from jose import JWTError
from modules.shared.configuration import APP_CONFIG
from modules.interfaces.interfaceDbApp import getInterface, getRootInterface
from modules.datamodels.datamodelUam import AuthAuthority, User, ConnectionStatus, UserConnection
from modules.datamodels.datamodelUam import AuthAuthority, User, Mandate, ConnectionStatus, UserConnection
from modules.datamodels.datamodelSecurity import Token, TokenPurpose
from modules.auth import getCurrentUser, limiter, SECRET_KEY, ALGORITHM
from modules.auth.oauthConnectTicket import resolve_connect_context
@ -192,6 +192,65 @@ async def auth_login_callback(
addExternalIdentityConnection=False,
)
# --- MFA gate --------------------------------------------------------
from modules.auth.mfaService import isMfaRequired as _isMfaRequired
from modules.routes.routeMfa import _createMfaPendingToken
userRecord = rootInterface._getUserForAuthentication(user.username)
userMandates = rootInterface.getUserMandates(str(user.id))
_mandateIds = [um.mandateId for um in (userMandates or []) if getattr(um, "enabled", True)]
_mandateObjs = []
for _mid in _mandateIds:
try:
_recs = rootInterface.db.getRecordset(Mandate, recordFilter={"id": _mid})
if _recs:
_mandateObjs.append(Mandate.model_validate(dict(_recs[0])))
except Exception:
pass
mfaRequired = _isMfaRequired(user, userMandates=userMandates, mandates=_mandateObjs)
hasMfaSetup = bool(userRecord and userRecord.get("mfaSecret") and getattr(user, "mfaEnabled", False))
if mfaRequired or hasMfaSetup:
import uuid as _uuid
_sid = str(_uuid.uuid4())
pendingToken = _createMfaPendingToken(
userId=str(user.id),
username=user.username,
authority=AuthAuthority.MSFT.value,
sessionId=_sid,
)
if hasMfaSetup:
mfaType = "mfa_required"
extraFields = ""
else:
mfaType = "mfa_setup_required"
from modules.auth.mfaService import generateSetup as _generateSetup
existingSecret = userRecord.get("mfaSecret") if userRecord else None
if existingSecret:
from modules.auth.mfaService import _decryptSecret, _buildTotp, _getMfaIssuer
_plain = _decryptSecret(existingSecret, userId=str(user.id))
_uri = _buildTotp(_plain).provisioning_uri(name=user.username, issuer_name=_getMfaIssuer())
setupResult = {"provisioningUri": _uri}
else:
setupResult = _generateSetup(userId=str(user.id), username=user.username)
rootInterface.updateUser(str(user.id), {"mfaSecret": setupResult["encryptedSecret"]})
extraFields = f", provisioningUri: {json.dumps(setupResult['provisioningUri'])}"
return HTMLResponse(
content=f"""
<html><head><title>MFA Required</title></head><body><script>
if (window.opener) {{
window.opener.postMessage({{
type: '{mfaType}',
mfaToken: {json.dumps(pendingToken)}{extraFields}
}}, '*');
}}
setTimeout(() => window.close(), 1000);
</script></body></html>
"""
)
# --- end MFA gate -----------------------------------------------------
jwt_token_data = {
"sub": user.username,
"userId": str(user.id),

View file

@ -142,6 +142,15 @@ class BaseRenderer(ABC):
"""
pass
@staticmethod
def _normalizeMargins(raw) -> Dict[str, int]:
"""Accept marginsPt as dict or [top, right, bottom, left] list."""
if isinstance(raw, dict):
return raw
if isinstance(raw, (list, tuple)) and len(raw) >= 4:
return {"top": raw[0], "right": raw[1], "bottom": raw[2], "left": raw[3]}
return {"top": 60, "right": 60, "bottom": 60, "left": 60}
def _convertUnifiedStyleToInternal(self, style: Dict[str, Any]) -> Dict[str, Any]:
"""Convert the unified resolvedStyle dict (from styleDefaults) into
the renderer-internal style-set format that all rendering methods already
@ -263,7 +272,7 @@ class BaseRenderer(ABC):
},
"page": {
"format": page.get("format", "A4"),
"margins": page.get("marginsPt", {"top": 60, "bottom": 60, "left": 60, "right": 60}),
"margins": self._normalizeMargins(page.get("marginsPt", {"top": 60, "bottom": 60, "left": 60, "right": 60})),
"show_page_numbers": page.get("showPageNumbers", True),
"header_height": page.get("headerHeight", 30),
"footer_height": page.get("footerHeight", 30),

View file

@ -306,7 +306,7 @@ class RendererHtml(BaseRenderer):
css_parts.append(f" color: {paraColor};")
css_parts.append(f" font-size: {paraSizePt}pt;")
css_parts.append(f" line-height: {lineSpacing};")
margins = page.get("marginsPt", {})
margins = self._normalizeMargins(page.get("marginsPt", {}))
if margins:
css_parts.append(f" margin: {margins.get('top', 60)}pt {margins.get('right', 60)}pt {margins.get('bottom', 60)}pt {margins.get('left', 60)}pt;")
else:

View file

@ -259,7 +259,7 @@ class RendererPdf(BaseRenderer):
# Create PDF document with unified page margins or defaults
pageCfg = unifiedStyle["page"] if unifiedStyle else None
if pageCfg:
m = pageCfg["marginsPt"]
m = self._normalizeMargins(pageCfg.get("marginsPt", {}))
doc = SimpleDocTemplate(buffer, pagesize=A4, rightMargin=m["right"], leftMargin=m["left"], topMargin=m["top"], bottomMargin=m["bottom"])
else:
doc = SimpleDocTemplate(buffer, pagesize=A4, rightMargin=72, leftMargin=72, topMargin=72, bottomMargin=18)
@ -1186,7 +1186,7 @@ class RendererPdf(BaseRenderer):
# Use page dimensions minus margins with generous safety buffer
# A4 = 595.27 x 841.89 pt; frame = page - margins - internal padding
_us = getattr(self, '_unifiedStyle', None) or {}
_pageMgn = (_us.get('page') or {}).get('marginsPt') or {}
_pageMgn = self._normalizeMargins((_us.get('page') or {}).get('marginsPt', {}))
marginTop = _pageMgn.get('top', 60)
marginBottom = _pageMgn.get('bottom', 60)
marginLeft = _pageMgn.get('left', 60)

View file

@ -14,6 +14,7 @@ python-jose[cryptography]==3.3.0 # For JWT tokens
cryptography>=41.0.0 # For encryption/decryption of configuration values
passlib==1.7.4
argon2-cffi>=21.3.0 # Für Passwort-Hashing in gateway_interface.py
pyotp>=2.9.0 # TOTP-basierte MFA (Multi-Factor Authentication)
google-auth-oauthlib==1.2.0 # Für Google OAuth
google-auth==2.27.0 # Für Google Authentication
google-api-python-client==2.170.0 # For Google API integration

View file

@ -0,0 +1,154 @@
# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Unit tests for modules.auth.mfaService.
Tests TOTP generation, verification, encryption round-trip, and the
three-rule MFA obligation resolver.
"""
import pytest
from unittest.mock import patch, MagicMock
import pyotp
from modules.auth.mfaService import (
_generateSecret,
_buildTotp,
generateSetup,
confirmSetup,
verifyCode,
isMfaRequired,
_isMfaRequireAdminsEnabled,
)
class TestTotpBasics:
def test_generateSecret_returns_base32(self):
secret = _generateSecret()
assert isinstance(secret, str)
assert len(secret) >= 16
def test_buildTotp_generates_valid_code(self):
secret = _generateSecret()
totp = _buildTotp(secret)
code = totp.now()
assert len(code) == 6
assert code.isdigit()
def test_verifyCode_accepts_current_code(self):
secret = _generateSecret()
totp = _buildTotp(secret)
code = totp.now()
encrypted = f"FAKE_ENC:{secret}"
with patch("modules.auth.mfaService._decryptSecret", return_value=secret):
assert verifyCode(encrypted, code) is True
def test_verifyCode_rejects_wrong_code(self):
secret = _generateSecret()
encrypted = f"FAKE_ENC:{secret}"
with patch("modules.auth.mfaService._decryptSecret", return_value=secret):
assert verifyCode(encrypted, "000000") is False
class TestGenerateSetup:
@patch("modules.auth.mfaService._encryptSecret", return_value="ENC_SECRET")
def test_returns_uri_and_encrypted_secret(self, _mock_enc):
result = generateSetup(userId="u1", username="testuser")
assert "encryptedSecret" in result
assert "provisioningUri" in result
assert result["encryptedSecret"] == "ENC_SECRET"
assert "otpauth://totp/" in result["provisioningUri"]
assert "PowerOn" in result["provisioningUri"]
class TestConfirmSetup:
def test_confirmSetup_with_valid_code(self):
secret = _generateSecret()
totp = _buildTotp(secret)
code = totp.now()
with patch("modules.auth.mfaService._decryptSecret", return_value=secret):
assert confirmSetup("ENC", code) is True
def test_confirmSetup_with_invalid_code(self):
secret = _generateSecret()
with patch("modules.auth.mfaService._decryptSecret", return_value=secret):
assert confirmSetup("ENC", "999999") is False
def test_confirmSetup_handles_decryption_error(self):
with patch("modules.auth.mfaService._decryptSecret", side_effect=Exception("decrypt error")):
assert confirmSetup("BAD_ENC", "123456") is False
class TestIsMfaRequired:
def _makeUser(self, mfaEnabled=False, isSysAdmin=False, isPlatformAdmin=False):
u = MagicMock()
u.mfaEnabled = mfaEnabled
u.isSysAdmin = isSysAdmin
u.isPlatformAdmin = isPlatformAdmin
return u
def _makeMandate(self, mfaRequired=False):
m = MagicMock()
m.mfaRequired = mfaRequired
return m
def test_mfaEnabled_user_always_required(self):
user = self._makeUser(mfaEnabled=True)
assert isMfaRequired(user) is True
@patch("modules.auth.mfaService._isMfaRequireAdminsEnabled", return_value=True)
def test_sysadmin_with_config_key(self, _mock):
user = self._makeUser(isSysAdmin=True)
assert isMfaRequired(user) is True
@patch("modules.auth.mfaService._isMfaRequireAdminsEnabled", return_value=True)
def test_platformadmin_with_config_key(self, _mock):
user = self._makeUser(isPlatformAdmin=True)
assert isMfaRequired(user) is True
@patch("modules.auth.mfaService._isMfaRequireAdminsEnabled", return_value=False)
def test_admin_without_config_key_not_required(self, _mock):
user = self._makeUser(isSysAdmin=True)
assert isMfaRequired(user) is False
@patch("modules.auth.mfaService._isMfaRequireAdminsEnabled", return_value=False)
def test_mandate_with_mfaRequired(self, _mock):
user = self._makeUser()
mandate = self._makeMandate(mfaRequired=True)
assert isMfaRequired(user, mandates=[mandate]) is True
@patch("modules.auth.mfaService._isMfaRequireAdminsEnabled", return_value=False)
def test_mandate_without_mfaRequired(self, _mock):
user = self._makeUser()
mandate = self._makeMandate(mfaRequired=False)
assert isMfaRequired(user, mandates=[mandate]) is False
@patch("modules.auth.mfaService._isMfaRequireAdminsEnabled", return_value=False)
def test_regular_user_no_mandate_not_required(self, _mock):
user = self._makeUser()
assert isMfaRequired(user) is False
class TestConfigKey:
@patch("modules.auth.mfaService.APP_CONFIG")
def test_config_true(self, mock_cfg):
mock_cfg.get.return_value = "true"
assert _isMfaRequireAdminsEnabled() is True
@patch("modules.auth.mfaService.APP_CONFIG")
def test_config_false(self, mock_cfg):
mock_cfg.get.return_value = "false"
assert _isMfaRequireAdminsEnabled() is False
@patch("modules.auth.mfaService.APP_CONFIG")
def test_config_empty(self, mock_cfg):
mock_cfg.get.return_value = ""
assert _isMfaRequireAdminsEnabled() is False
@patch("modules.auth.mfaService.APP_CONFIG")
def test_config_one(self, mock_cfg):
mock_cfg.get.return_value = "1"
assert _isMfaRequireAdminsEnabled() is True