# Copyright (c) 2026 PowerOn AG # All rights reserved. """ Trusted Device Service. After successful MFA verification a device can be marked as trusted for a configurable duration (default 60 days). On subsequent logins from the same device the MFA step is skipped. Cookie: ``mfa_trusted`` (httpOnly, Secure, SameSite policy from jwtService). DB: ``TrustedDevice`` table in poweron_app. Regulatory basis: - NIST SP 800-63B Section 5.2.8: Verifier MAY re-authenticate only after a configurable period when a device is bound to the subscriber. - Microsoft, Google, AWS implement identical patterns. """ import logging import secrets from typing import Optional from fastapi import Request, Response from modules.shared.configuration import APP_CONFIG from modules.shared.timeUtils import getUtcNow, getUtcTimestamp from modules.datamodels.datamodelSecurity import TrustedDevice logger = logging.getLogger(__name__) _COOKIE_NAME = "mfa_trusted" _DEFAULT_TRUST_DAYS = 60 _TOKEN_BYTES = 32 def _getTrustDurationDays() -> int: raw = (APP_CONFIG.get("MFA_TRUST_DURATION_DAYS") or "").strip() if raw.isdigit() and int(raw) > 0: return int(raw) return _DEFAULT_TRUST_DAYS def createTrustedDevice(userId: str, request: Request, response: Response, db) -> str: """Create a TrustedDevice entry and set the cookie on the response. Returns the device token (cookie value). """ from modules.auth.jwtService import _cookiePolicy trustDays = _getTrustDurationDays() deviceToken = secrets.token_urlsafe(_TOKEN_BYTES) now = getUtcTimestamp() trustedUntil = now + (trustDays * 86400) device = TrustedDevice( id=deviceToken, userId=userId, trustedUntil=trustedUntil, userAgent=(request.headers.get("user-agent") or "")[:512], ipAddress=_getClientIp(request), createdAt=now, ) try: db.recordCreate(TrustedDevice, device.model_dump()) except Exception as e: logger.error(f"Failed to persist TrustedDevice for userId={userId}: {e}") return "" useSecure, samesite, _ = _cookiePolicy() response.set_cookie( key=_COOKIE_NAME, value=deviceToken, httponly=True, secure=useSecure, samesite=samesite, path="/", max_age=trustDays * 86400, ) logger.info(f"Trusted device created for userId={userId}, valid {trustDays}d") return deviceToken def isTrustedDevice(request: Request, userId: str, db) -> bool: """Check if the current request comes from a trusted device for the given user.""" deviceToken = request.cookies.get(_COOKIE_NAME) if not deviceToken: return False try: records = db.getRecordset( TrustedDevice, recordFilter={"id": deviceToken, "userId": userId}, ) if not records: return False device = records[0] trustedUntil = device.get("trustedUntil", 0) if isinstance(trustedUntil, (int, float)) and trustedUntil > getUtcTimestamp(): return True return False except Exception as e: logger.warning(f"Error checking trusted device for userId={userId}: {e}") return False def revokeTrustedDevices(userId: str, db) -> int: """Revoke all trusted devices for a user. Returns count of deleted entries.""" try: records = db.getRecordset(TrustedDevice, recordFilter={"userId": userId}) count = 0 for rec in records: db.recordDelete(TrustedDevice, rec["id"]) count += 1 if count: logger.info(f"Revoked {count} trusted device(s) for userId={userId}") return count except Exception as e: logger.error(f"Failed to revoke trusted devices for userId={userId}: {e}") return 0 def clearTrustedDeviceCookie(response: Response) -> None: """Clear the mfa_trusted cookie.""" from modules.auth.jwtService import _cookiePolicy useSecure, samesite, samesiteHeader = _cookiePolicy() secure_flag = "; Secure" if useSecure else "" response.headers.append( "Set-Cookie", f"{_COOKIE_NAME}=deleted; Path=/; Max-Age=0; Expires=Thu, 01 Jan 1970 00:00:00 GMT; HttpOnly{secure_flag}; SameSite={samesiteHeader}" ) response.delete_cookie( key=_COOKIE_NAME, path="/", secure=useSecure, httponly=True, samesite=samesite, ) def cleanupExpiredDevices(db) -> int: """Remove TrustedDevice entries past their trustedUntil. Returns deleted count.""" try: records = db.getRecordset(TrustedDevice, recordFilter={}) now = getUtcTimestamp() count = 0 for rec in records: if rec.get("trustedUntil", 0) < now: db.recordDelete(TrustedDevice, rec["id"]) count += 1 if count: logger.info(f"Cleaned up {count} expired trusted device(s)") return count except Exception as e: logger.error(f"Error cleaning up expired trusted devices: {e}") return 0 def _getClientIp(request: Request) -> Optional[str]: """Extract client IP from request (respects X-Forwarded-For).""" forwarded = request.headers.get("x-forwarded-for") if forwarded: return forwarded.split(",")[0].strip() if request.client: return request.client.host return None # --- Scheduler Integration --- async def _runTokenAndDeviceCleanup() -> None: """Scheduled task: remove expired tokens and trusted devices.""" try: from modules.connectors.connectorDbPostgre import ConnectorPostgre db = ConnectorPostgre("poweron_app") now = getUtcTimestamp() # Expired auth-session tokens tokens = db.getRecordset( Token, recordFilter={"tokenPurpose": TokenPurpose.AUTH_SESSION.value}, ) expiredCount = 0 for t in tokens: if t.get("expiresAt", 0) < now: db.recordDelete(Token, t["id"]) expiredCount += 1 # Expired trusted devices deviceCount = cleanupExpiredDevices(db) if expiredCount or deviceCount: logger.info( f"Token cleanup: {expiredCount} expired token(s), " f"{deviceCount} expired trusted device(s) removed" ) except Exception as e: logger.error(f"Token/device cleanup failed: {e}") def registerTokenCleanupScheduler() -> None: """Register daily token cleanup job. Call during app startup.""" try: from modules.shared.eventManagement import eventManager eventManager.registerCron( jobId="token_device_cleanup", func=_runTokenAndDeviceCleanup, cronKwargs={"hour": "4", "minute": "0"}, ) logger.info("Token/device cleanup scheduler registered (daily 04:00)") except Exception as e: logger.warning(f"Failed to register token cleanup scheduler: {e}")