132 lines
4.2 KiB
Python
132 lines
4.2 KiB
Python
# Copyright (c) 2026 PowerOn AG
|
|
# All rights reserved.
|
|
"""
|
|
MFA (Multi-Factor Authentication) Service.
|
|
|
|
TOTP-based MFA using pyotp. Secrets are encrypted at rest via
|
|
encryptValue/decryptValue from the configuration module.
|
|
|
|
MFA obligation is resolved by three OR-linked rules:
|
|
1. Any mandate the user belongs to has ``mfaRequired=True``.
|
|
2. User is sysAdmin OR platformAdmin AND config key ``MFA_REQUIRE_ADMINS``
|
|
is truthy.
|
|
3. User has opted in (``mfaEnabled=True`` without any mandate/admin rule).
|
|
"""
|
|
|
|
import logging
|
|
from typing import Optional
|
|
|
|
import pyotp
|
|
|
|
from modules.shared.configuration import APP_CONFIG, encryptValue, decryptValue
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
_MFA_DIGITS = 6
|
|
_MFA_INTERVAL = 30
|
|
_MFA_VALID_WINDOW = 1
|
|
|
|
|
|
def getMfaIssuer() -> str:
|
|
"""Build the TOTP issuer name, e.g. 'PowerOn' or 'PowerOn (Dev)'."""
|
|
envType = (APP_CONFIG.get("APP_ENV_TYPE") or "").strip().lower()
|
|
if envType in ("prod", ""):
|
|
return "PowerOn"
|
|
return f"PowerOn ({envType.upper()})"
|
|
|
|
|
|
def _generateSecret() -> str:
|
|
"""Generate a fresh base32-encoded TOTP secret."""
|
|
return pyotp.random_base32()
|
|
|
|
|
|
def _encryptSecret(plainSecret: str, userId: str = "system") -> str:
|
|
return encryptValue(plainSecret, userId=userId, keyName="mfa_secret")
|
|
|
|
|
|
def decryptSecret(encryptedSecret: str, userId: str = "system") -> str:
|
|
return decryptValue(encryptedSecret, userId=userId, keyName="mfa_secret")
|
|
|
|
|
|
def buildTotp(plainSecret: str) -> pyotp.TOTP:
|
|
return pyotp.TOTP(plainSecret, digits=_MFA_DIGITS, interval=_MFA_INTERVAL)
|
|
|
|
|
|
def generateSetup(userId: str, username: str) -> dict:
|
|
"""Start MFA enrolment: return secret + provisioning URI (for QR code).
|
|
|
|
Returns dict with keys ``secret`` (encrypted for DB storage) and
|
|
``provisioningUri`` (otpauth:// URI the frontend renders as QR).
|
|
The plaintext secret is NOT returned -- the URI already contains it.
|
|
"""
|
|
plain = _generateSecret()
|
|
encrypted = _encryptSecret(plain, userId=userId)
|
|
totp = buildTotp(plain)
|
|
uri = totp.provisioning_uri(name=username, issuer_name=getMfaIssuer())
|
|
return {
|
|
"encryptedSecret": encrypted,
|
|
"provisioningUri": uri,
|
|
}
|
|
|
|
|
|
def confirmSetup(encryptedSecret: str, code: str, userId: str = "system") -> bool:
|
|
"""Verify a TOTP code against an encrypted secret (enrolment confirmation)."""
|
|
try:
|
|
plain = decryptSecret(encryptedSecret, userId=userId)
|
|
totp = buildTotp(plain)
|
|
return totp.verify(code, valid_window=_MFA_VALID_WINDOW)
|
|
except Exception:
|
|
logger.exception("MFA confirmSetup failed for userId=%s", userId)
|
|
return False
|
|
|
|
|
|
def verifyCode(encryptedSecret: str, code: str, userId: str = "system") -> bool:
|
|
"""Verify a TOTP code during login."""
|
|
try:
|
|
plain = decryptSecret(encryptedSecret, userId=userId)
|
|
totp = buildTotp(plain)
|
|
return totp.verify(code, valid_window=_MFA_VALID_WINDOW)
|
|
except Exception:
|
|
logger.exception("MFA verifyCode failed for userId=%s", userId)
|
|
return False
|
|
|
|
|
|
def _isMfaRequireAdminsEnabled() -> bool:
|
|
"""Read ``MFA_REQUIRE_ADMINS`` from config / env."""
|
|
raw = (APP_CONFIG.get("MFA_REQUIRE_ADMINS") or "").strip().lower()
|
|
return raw in ("1", "true", "yes")
|
|
|
|
|
|
def isMfaRequired(user, userMandates=None, mandates=None) -> bool:
|
|
"""Resolve whether MFA is mandatory for *user*.
|
|
|
|
Rules (OR):
|
|
1. At least one of the user's mandates has ``mfaRequired=True``.
|
|
2. User is sysAdmin or platformAdmin AND ``MFA_REQUIRE_ADMINS`` config
|
|
key is truthy.
|
|
3. User already opted in (``mfaEnabled=True``).
|
|
|
|
Parameters
|
|
----------
|
|
user : User | UserInDB
|
|
The user object.
|
|
userMandates : list | None
|
|
List of UserMandate records for the user (each has ``mandateId``).
|
|
mandates : list | None
|
|
List of Mandate objects the user has access to. If provided directly
|
|
this avoids a second lookup.
|
|
"""
|
|
if getattr(user, "mfaEnabled", False):
|
|
return True
|
|
|
|
isSys = getattr(user, "isSysAdmin", False)
|
|
isPlat = getattr(user, "isPlatformAdmin", False)
|
|
if (isSys or isPlat) and _isMfaRequireAdminsEnabled():
|
|
return True
|
|
|
|
if mandates:
|
|
for m in mandates:
|
|
if getattr(m, "mfaRequired", False):
|
|
return True
|
|
|
|
return False
|