platform-core/modules/auth/trustedDeviceService.py
ValueOn AG 29de7e9915
All checks were successful
Deploy Plattform-Core (Int) / test (push) Successful in 58s
Deploy Plattform-Core (Int) / deploy (push) Successful in 8s
fixes bad improrts by composer
2026-06-11 23:28:49 +02:00

219 lines
6.9 KiB
Python

# Copyright (c) 2026 PowerOn AG
# All rights reserved.
"""
Trusted Device Service.
After successful MFA verification a device can be marked as trusted for a
configurable duration (default 60 days). On subsequent logins from the same
device the MFA step is skipped.
Cookie: ``mfa_trusted`` (httpOnly, Secure, SameSite policy from jwtService).
DB: ``TrustedDevice`` table in poweron_app.
Regulatory basis:
- NIST SP 800-63B Section 5.2.8: Verifier MAY re-authenticate only after a
configurable period when a device is bound to the subscriber.
- Microsoft, Google, AWS implement identical patterns.
"""
import logging
import secrets
from typing import Optional
from fastapi import Request, Response
from modules.shared.configuration import APP_CONFIG
from modules.shared.timeUtils import getUtcNow, getUtcTimestamp
from modules.datamodels.datamodelSecurity import TrustedDevice, Token, TokenPurpose
logger = logging.getLogger(__name__)
_COOKIE_NAME = "mfa_trusted"
_DEFAULT_TRUST_DAYS = 60
_TOKEN_BYTES = 32
def _getTrustDurationDays() -> int:
raw = (APP_CONFIG.get("MFA_TRUST_DURATION_DAYS") or "").strip()
if raw.isdigit() and int(raw) > 0:
return int(raw)
return _DEFAULT_TRUST_DAYS
def createTrustedDevice(userId: str, request: Request, response: Response, db) -> str:
"""Create a TrustedDevice entry and set the cookie on the response.
Returns the device token (cookie value).
"""
from modules.auth.jwtService import _cookiePolicy
trustDays = _getTrustDurationDays()
deviceToken = secrets.token_urlsafe(_TOKEN_BYTES)
now = getUtcTimestamp()
trustedUntil = now + (trustDays * 86400)
device = TrustedDevice(
id=deviceToken,
userId=userId,
trustedUntil=trustedUntil,
userAgent=(request.headers.get("user-agent") or "")[:512],
ipAddress=_getClientIp(request),
createdAt=now,
)
try:
db.recordCreate(TrustedDevice, device.model_dump())
except Exception as e:
logger.error(f"Failed to persist TrustedDevice for userId={userId}: {e}")
return ""
useSecure, samesite, _ = _cookiePolicy()
response.set_cookie(
key=_COOKIE_NAME,
value=deviceToken,
httponly=True,
secure=useSecure,
samesite=samesite,
path="/",
max_age=trustDays * 86400,
)
logger.info(f"Trusted device created for userId={userId}, valid {trustDays}d")
return deviceToken
def isTrustedDevice(request: Request, userId: str, db) -> bool:
"""Check if the current request comes from a trusted device for the given user."""
deviceToken = request.cookies.get(_COOKIE_NAME)
if not deviceToken:
return False
try:
records = db.getRecordset(
TrustedDevice,
recordFilter={"id": deviceToken, "userId": userId},
)
if not records:
return False
device = records[0]
trustedUntil = device.get("trustedUntil", 0)
if isinstance(trustedUntil, (int, float)) and trustedUntil > getUtcTimestamp():
return True
return False
except Exception as e:
logger.warning(f"Error checking trusted device for userId={userId}: {e}")
return False
def revokeTrustedDevices(userId: str, db) -> int:
"""Revoke all trusted devices for a user. Returns count of deleted entries."""
try:
records = db.getRecordset(TrustedDevice, recordFilter={"userId": userId})
count = 0
for rec in records:
db.recordDelete(TrustedDevice, rec["id"])
count += 1
if count:
logger.info(f"Revoked {count} trusted device(s) for userId={userId}")
return count
except Exception as e:
logger.error(f"Failed to revoke trusted devices for userId={userId}: {e}")
return 0
def clearTrustedDeviceCookie(response: Response) -> None:
"""Clear the mfa_trusted cookie."""
from modules.auth.jwtService import _cookiePolicy
useSecure, samesite, samesiteHeader = _cookiePolicy()
secure_flag = "; Secure" if useSecure else ""
response.headers.append(
"Set-Cookie",
f"{_COOKIE_NAME}=deleted; Path=/; Max-Age=0; Expires=Thu, 01 Jan 1970 00:00:00 GMT; HttpOnly{secure_flag}; SameSite={samesiteHeader}"
)
response.delete_cookie(
key=_COOKIE_NAME,
path="/",
secure=useSecure,
httponly=True,
samesite=samesite,
)
def cleanupExpiredDevices(db) -> int:
"""Remove TrustedDevice entries past their trustedUntil. Returns deleted count."""
try:
records = db.getRecordset(TrustedDevice, recordFilter={})
now = getUtcTimestamp()
count = 0
for rec in records:
if rec.get("trustedUntil", 0) < now:
db.recordDelete(TrustedDevice, rec["id"])
count += 1
if count:
logger.info(f"Cleaned up {count} expired trusted device(s)")
return count
except Exception as e:
logger.error(f"Error cleaning up expired trusted devices: {e}")
return 0
def _getClientIp(request: Request) -> Optional[str]:
"""Extract client IP from request (respects X-Forwarded-For)."""
forwarded = request.headers.get("x-forwarded-for")
if forwarded:
return forwarded.split(",")[0].strip()
if request.client:
return request.client.host
return None
# --- Scheduler Integration ---
async def _runTokenAndDeviceCleanup() -> None:
"""Scheduled task: remove expired tokens and trusted devices."""
try:
from modules.connectors.connectorDbPostgre import ConnectorPostgre
db = ConnectorPostgre("poweron_app")
now = getUtcTimestamp()
# Expired auth-session tokens
tokens = db.getRecordset(
Token,
recordFilter={"tokenPurpose": TokenPurpose.AUTH_SESSION.value},
)
expiredCount = 0
for t in tokens:
if t.get("expiresAt", 0) < now:
db.recordDelete(Token, t["id"])
expiredCount += 1
# Expired trusted devices
deviceCount = cleanupExpiredDevices(db)
if expiredCount or deviceCount:
logger.info(
f"Token cleanup: {expiredCount} expired token(s), "
f"{deviceCount} expired trusted device(s) removed"
)
except Exception as e:
logger.error(f"Token/device cleanup failed: {e}")
def registerTokenCleanupScheduler() -> None:
"""Register daily token cleanup job. Call during app startup."""
try:
from modules.shared.eventManagement import eventManager
eventManager.registerCron(
jobId="token_device_cleanup",
func=_runTokenAndDeviceCleanup,
cronKwargs={"hour": "4", "minute": "0"},
)
logger.info("Token/device cleanup scheduler registered (daily 04:00)")
except Exception as e:
logger.warning(f"Failed to register token cleanup scheduler: {e}")