# Copyright (c) 2025 Patrick Motsch # All rights reserved. """ Routes for local security and authentication. """ from fastapi import APIRouter, HTTPException, status, Depends, Request, Response, Body, Path from fastapi.security import OAuth2PasswordRequestForm import logging from typing import Dict, Any from datetime import datetime from fastapi.responses import JSONResponse, HTMLResponse, RedirectResponse import uuid from jose import jwt # Import auth modules from modules.auth import getCurrentUser, limiter, SECRET_KEY, ALGORITHM, getRequestContext, RequestContext from modules.auth import createAccessToken, createRefreshToken, setAccessTokenCookie, setRefreshTokenCookie, clearAccessTokenCookie, clearRefreshTokenCookie from modules.interfaces.interfaceDbApp import getInterface, getRootInterface from modules.datamodels.datamodelUam import User, UserInDB, AuthAuthority, Mandate from modules.datamodels.datamodelSecurity import Token, TokenPurpose from modules.shared.configuration import APP_CONFIG from modules.shared.timeUtils import getUtcTimestamp from modules.shared.i18nRegistry import apiRouteContext routeApiMsg = apiRouteContext("routeSecurityLocal") # Configure logger logger = logging.getLogger(__name__) def _buildAuthEmailHtml( greeting: str, bodyLines: list, buttonText: str = None, buttonUrl: str = None, footerText: str = None, ) -> str: """Build a branded HTML email for authentication flows. Uses the same visual design as notifyMandateAdmins._renderHtmlEmail (dark header, clean body, operator footer). """ import html as _html paragraphsHtml = "" for line in bodyLines: if line == "": paragraphsHtml += '

 

\n' else: escaped = _html.escape(str(line)) paragraphsHtml += f'

{escaped}

\n' buttonBlock = "" if buttonText and buttonUrl: buttonBlock = f'''
{_html.escape(buttonText)}

{_html.escape(buttonUrl)}

''' footerNote = "" if footerText: footerNote = f'

{_html.escape(footerText)}

\n' operatorLine = "" try: from modules.shared.configuration import APP_CONFIG parts = [p for p in [ APP_CONFIG.get("Operator_CompanyName", ""), APP_CONFIG.get("Operator_Address", ""), APP_CONFIG.get("Operator_VatNumber", ""), ] if p] if parts: operatorLine = ( f'

' f'{_html.escape(" | ".join(parts))}

\n' ) except Exception: pass return f'''

PowerOn

{_html.escape(greeting)}

{paragraphsHtml} {buttonBlock}
{footerNote}

Diese E-Mail wurde automatisch von PowerOn versendet.

{operatorLine}
''' def _sendAuthEmail(recipient: str, subject: str, message: str, userId: str = None, htmlOverride: str = None) -> bool: """ Send authentication-related email directly without requiring full Services initialization. Used for registration, password reset, and other auth flows. Args: recipient: Email address subject: Email subject message: Plain text fallback (ignored when htmlOverride is given) userId: Optional user ID for logging htmlOverride: Pre-built branded HTML (from _buildAuthEmailHtml) Returns: bool: True if email was sent successfully """ try: from modules.interfaces.interfaceMessaging import getInterface as getMessagingInterface from modules.datamodels.datamodelMessaging import MessagingChannel htmlMessage = htmlOverride if not htmlMessage: import html escaped = html.escape(message) escaped = escaped.replace('\n', '
\n') htmlMessage = f'{escaped}' messagingInterface = getMessagingInterface() success = messagingInterface.send( channel=MessagingChannel.EMAIL, recipient=recipient, subject=subject, message=htmlMessage ) if success: logger.info(f"Auth email sent successfully to {recipient} (userId: {userId})") else: logger.warning(f"Failed to send auth email to {recipient} (userId: {userId})") return success except Exception as e: logger.error(f"Error sending auth email to {recipient}: {str(e)}", exc_info=True) return False # Create router for Local Security endpoints router = APIRouter( prefix="/api/local", tags=["Security Local"], responses={ 404: {"description": "Not found"}, 400: {"description": "Bad request"}, 401: {"description": "Unauthorized"}, 403: {"description": "Forbidden"}, 500: {"description": "Internal server error"} } ) def _ensureHomeMandate(rootInterface, user) -> None: """Ensure user has a Home mandate, but only if they have no mandate memberships AND no pending invitations. Invited users should NOT get a Home mandate — they join existing mandates via invitation acceptance and can create their own later via onboarding. """ userId = str(user.id) userMandates = rootInterface.getUserMandates(userId) if userMandates: for um in userMandates: mandate = rootInterface.getMandate(um.mandateId) if mandate and (mandate.name or "").startswith("Home ") and not mandate.isSystem: return logger.debug(f"User {user.username} has {len(userMandates)} mandate(s) but no Home — skipping auto-creation") return try: from modules.interfaces.interfaceDbApp import getRootInterface as _getRootIf appIf = _getRootIf() normalizedEmail = (user.email or "").strip().lower() if user.email else None pendingByUsername = appIf.getInvitationsByTargetUsername(user.username) pendingByEmail = appIf.getInvitationsByEmail(normalizedEmail) if normalizedEmail else [] seenIds = set() for inv in pendingByUsername + pendingByEmail: if inv.id in seenIds: continue seenIds.add(inv.id) if not inv.revokedAt and (inv.currentUses or 0) < (inv.maxUses or 1): logger.info(f"User {user.username} has pending invitation(s) — skipping Home mandate creation") return except Exception as e: logger.warning(f"Could not check pending invitations for {user.username}: {e}") homeMandateLabel = f"Home {user.username}" rootInterface._provisionMandateForUser( userId=userId, mandateLabel=homeMandateLabel, planKey="TRIAL_14D", ) logger.info(f"Created Home mandate '{homeMandateLabel}' for user {user.username}") @router.post("/login") @limiter.limit("30/minute") def login( request: Request, response: Response, formData: OAuth2PasswordRequestForm = Depends(), ) -> Dict[str, Any]: """Get access token for local user authentication""" try: # Validate CSRF token csrf_token = request.headers.get("X-CSRF-Token") if not csrf_token: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=routeApiMsg("CSRF token missing") ) # Get gateway interface with root privileges for authentication rootInterface = getRootInterface() # Authenticate user # Note: authenticateLocalUser uses _getUserForAuthentication which bypasses RBAC # This is correct because users are mandate-independent (Multi-Tenant Design) user = rootInterface.authenticateLocalUser( username=formData.username, password=formData.password ) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail=routeApiMsg("Invalid username or password"), headers={"WWW-Authenticate": "Bearer"}, ) # Create token data # MULTI-TENANT: Token does NOT contain mandateId anymore # Mandate context is determined per request via X-Mandate-Id header token_data = { "sub": user.username, "userId": str(user.id), "authenticationAuthority": AuthAuthority.LOCAL # NO mandateId in token - stateless multi-tenant design } # Create session id and include in token claims for session-scoped logout session_id = str(uuid.uuid4()) token_data["sid"] = session_id # Create access token + set cookie access_token, _access_expires = createAccessToken(token_data) setAccessTokenCookie(response, access_token) # Create refresh token + set cookie refresh_token, _refresh_expires = createRefreshToken(token_data) setRefreshTokenCookie(response, refresh_token) # Get expiration time for response try: payload = jwt.decode(access_token, SECRET_KEY, algorithms=[ALGORITHM]) expires_at = datetime.fromtimestamp(payload.get("exp")) except Exception as e: logger.error(f"Failed to decode access token: {str(e)}") raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=routeApiMsg("Failed to finalize token")) # Get user-specific interface for token operations userInterface = getInterface(user) # Get jti from already decoded payload jti = payload.get("jti") # Create token record in database # MULTI-TENANT: Token model no longer has mandateId field token = Token( id=jti, userId=user.id, authority=AuthAuthority.LOCAL, tokenPurpose=TokenPurpose.AUTH_SESSION, tokenAccess=access_token, tokenType="bearer", expiresAt=expires_at.timestamp(), sessionId=session_id # NO mandateId - Token is not mandate-bound ) # Save access token userInterface.saveAccessToken(token) # Ensure user has a Home mandate (created on first login if missing) try: _ensureHomeMandate(rootInterface, user) except Exception as homeErr: logger.error(f"Error ensuring Home mandate for user {user.username}: {homeErr}") # Activate PENDING subscriptions on first login (runs AFTER _ensureHomeMandate # so that a freshly provisioned Home mandate subscription is also activated) try: activatedCount = rootInterface._activatePendingSubscriptions(str(user.id)) if activatedCount > 0: logger.info(f"Activated {activatedCount} pending subscription(s) for user {user.username}") except Exception as subErr: logger.error(f"Error activating subscriptions on login: {subErr}") # Log successful login (app log file + audit DB for traceability) logger.info("Login successful for username=%s (userId=%s)", formData.username, str(user.id)) try: from modules.shared.auditLogger import audit_logger audit_logger.logUserAccess( userId=str(user.id), mandateId="system", action="login", successInfo="local_auth_success", ipAddress=request.client.host if request.client else None, userAgent=request.headers.get("user-agent"), success=True ) except Exception: # Don't fail if audit logging fails pass # Create response data (tokens are now in httpOnly cookies) response_data = { "type": "local_auth_success", "message": "Login successful - tokens set in httpOnly cookies", "authenticationAuthority": "local", "expires_at": expires_at.isoformat() } return response_data except ValueError as e: # Handle authentication errors error_msg = str(e) logger.warning(f"Authentication failed for user {formData.username}: {error_msg}") # Check if user is disabled and provide specific message if error_msg == "User is disabled": error_msg = "Your account is disabled. Please send an email to p.motsch@valueon.ch to get access to the PowerOn center." # Log failed login attempt try: from modules.shared.auditLogger import audit_logger audit_logger.logUserAccess( userId=formData.username or "unknown", mandateId="system", action="login_failed", successInfo=f"failed: {error_msg}", ipAddress=request.client.host if request.client else None, userAgent=request.headers.get("user-agent"), success=False ) except Exception: # Don't fail if audit logging fails pass raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail=error_msg, headers={"WWW-Authenticate": "Bearer"}, ) except Exception as e: # Handle other errors error_msg = f"Login failed: {str(e)}" logger.error(f"Unexpected error during login for user {formData.username}: {error_msg}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=error_msg ) @router.post("/register") @limiter.limit("10/minute") def register_user( request: Request, userData: User = Body(...), frontendUrl: str = Body(..., embed=True), registrationType: str = Body("personal", embed=True), companyName: str = Body(None, embed=True), ) -> Dict[str, Any]: """Register a new local user (magic link based - no password required). Unified registration path: invited users skip Home mandate provisioning (they join the inviting mandate instead). Non-invited users get a Home mandate with TRIAL_14D. Company mandate creation is deferred to onboarding. Args: userData: User data (username, email, fullName, language) frontendUrl: The frontend URL to use in magic link (REQUIRED - provided by frontend) registrationType: Kept for backward compat but ignored (company mandates via onboarding) companyName: Kept for backward compat but ignored """ try: appInterface = getRootInterface() baseUrl = frontendUrl.rstrip("/") normalizedEmail = userData.email.lower().strip() if userData.email else None user = appInterface.createUser( username=userData.username, password=None, email=normalizedEmail, fullName=userData.fullName, language=userData.language, enabled=True, authenticationAuthority=AuthAuthority.LOCAL ) if not user: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=routeApiMsg("Failed to register user") ) # Check for pending invitations BEFORE provisioning. # Search by both username AND email (email-only invitations have targetUsername=None). hasPendingInvitations = False validInvitations = [] try: from modules.datamodels.datamodelInvitation import Invitation currentTime = getUtcTimestamp() pendingByUsername = appInterface.getInvitationsByTargetUsername(userData.username) pendingByEmail = appInterface.getInvitationsByEmail(normalizedEmail) if normalizedEmail else [] seenIds = set() allPending = pendingByUsername + pendingByEmail for invitation in allPending: if invitation.id in seenIds: continue seenIds.add(invitation.id) if (invitation.expiresAt or 0) < currentTime: continue if invitation.revokedAt: continue if (invitation.currentUses or 0) >= (invitation.maxUses or 1): continue validInvitations.append(invitation) hasPendingInvitations = len(validInvitations) > 0 except Exception as invErr: logger.warning(f"Failed to check pending invitations: {invErr}") # Only provision Home mandate if user has NO pending invitations. # Invited users join existing mandates; they can create their own later via onboarding. provisionResult = None if not hasPendingInvitations: try: homeMandateLabel = f"Home {user.username}" provisionResult = appInterface._provisionMandateForUser( userId=str(user.id), mandateLabel=homeMandateLabel, planKey="TRIAL_14D", ) logger.info(f"Provisioned Home mandate for user {user.id}: {provisionResult}") except Exception as provErr: logger.error(f"Error provisioning Home mandate for user {user.id}: {provErr}") else: logger.info(f"Skipping Home mandate for user {user.id} — has {len(validInvitations)} pending invitation(s)") # Generate reset token for password setup token, expires = appInterface.generateResetTokenAndExpiry() appInterface.setResetToken(user.id, token, expires, clearPassword=False) # Send registration email with magic link try: magicLink = f"{baseUrl}/reset?token={token}" expiryHours = int(APP_CONFIG.get("Auth_RESET_TOKEN_EXPIRY_HOURS", "24")) emailSubject = "PowerOn Registrierung - Passwort setzen" emailHtml = _buildAuthEmailHtml( greeting=f"Hallo {user.fullName or user.username}", bodyLines=[ "Vielen Dank für Ihre Registrierung bei PowerOn.", "", f"Ihr Benutzername: {user.username}", "", "Klicken Sie auf die Schaltfläche, um Ihr Passwort zu setzen:", ], buttonText="Passwort setzen", buttonUrl=magicLink, footerText=f"Dieser Link ist {expiryHours} Stunden gültig. Falls Sie sich nicht registriert haben, können Sie diese E-Mail ignorieren.", ) emailSent = _sendAuthEmail( recipient=user.email, subject=emailSubject, message="", userId=str(user.id), htmlOverride=emailHtml, ) if not emailSent: logger.warning(f"Failed to send registration email to {user.email}") except Exception as emailErr: logger.error(f"Error sending registration email: {str(emailErr)}") # Create notifications for pending invitations for invitation in validInvitations: try: from modules.routes.routeNotifications import createInvitationNotification mandateId = invitation.mandateId mandate = appInterface.getMandate(mandateId) mandateName = (mandate.label or mandate.name) if mandate else "PowerOn" inviterId = invitation.sysCreatedBy inviter = appInterface.getUser(inviterId) if inviterId else None inviterName = (inviter.fullName or inviter.username) if inviter else "PowerOn" createInvitationNotification( userId=str(user.id), invitationId=str(invitation.id), mandateName=mandateName, inviterName=inviterName ) logger.info(f"Created notification for new user {userData.username} for invitation {invitation.id}") except Exception as notifErr: logger.warning(f"Failed to create notification for invitation {invitation.id}: {notifErr}") responseData = { "message": "Registrierung erfolgreich! Bitte prüfen Sie Ihre E-Mail für den Link zum Setzen Ihres Passworts." } if provisionResult: responseData["mandateId"] = provisionResult.get("mandateId") responseData["hasInvitations"] = hasPendingInvitations return responseData except ValueError as e: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=str(e) ) except Exception as e: logger.error(f"Error registering user: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to register user: {str(e)}" ) @router.get("/me", response_model=User) @limiter.limit("30/minute") def read_user_me( request: Request, currentUser: User = Depends(getCurrentUser) ) -> User: """Get current user info""" try: return currentUser except Exception as e: logger.error(f"Error getting user me: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to get current user: {str(e)}" ) @router.post("/refresh") @limiter.limit("60/minute") def refresh_token( request: Request, response: Response ) -> Dict[str, Any]: """Refresh access token using refresh token from cookie""" try: # Get refresh token from cookie refresh_token = request.cookies.get('refresh_token') if not refresh_token: raise HTTPException(status_code=401, detail=routeApiMsg("No refresh token found")) # Validate refresh token try: payload = jwt.decode(refresh_token, SECRET_KEY, algorithms=[ALGORITHM]) if payload.get("type") != "refresh": raise HTTPException(status_code=401, detail=routeApiMsg("Invalid refresh token type")) except jwt.ExpiredSignatureError: raise HTTPException(status_code=401, detail=routeApiMsg("Refresh token expired")) except jwt.JWTError: raise HTTPException(status_code=401, detail=routeApiMsg("Invalid refresh token")) # Get user information from refresh token payload user_id = payload.get("userId") if not user_id: raise HTTPException(status_code=401, detail=routeApiMsg("Invalid refresh token - missing user ID")) # Get user from database using the user ID from refresh token try: app_interface = getRootInterface() current_user = app_interface.getUser(user_id) if not current_user: raise HTTPException(status_code=401, detail=routeApiMsg("User not found")) except Exception as e: logger.error(f"Failed to get user from database: {str(e)}") raise HTTPException(status_code=500, detail=routeApiMsg("Failed to validate user")) # Create new token data # MULTI-TENANT: Token does NOT contain mandateId anymore token_data = { "sub": current_user.username, "userId": str(current_user.id), "authenticationAuthority": current_user.authenticationAuthority # NO mandateId in token } # Create new access token + set cookie access_token, _expires = createAccessToken(token_data) setAccessTokenCookie(response, access_token) # Get expiration time try: payload = jwt.decode(access_token, SECRET_KEY, algorithms=[ALGORITHM]) expires_at = datetime.fromtimestamp(payload.get("exp")) except Exception as e: logger.error(f"Failed to decode new access token: {str(e)}") raise HTTPException(status_code=500, detail=routeApiMsg("Failed to create new token")) return { "type": "token_refresh_success", "message": "Token refreshed successfully", "expires_at": expires_at.isoformat() } except HTTPException as e: # If it's a 503 error (service unavailable due to missing token table), return it as-is if e.status_code == 503: raise # For other HTTP exceptions, re-raise them raise except Exception as e: logger.error(f"Token refresh error: {str(e)}") raise HTTPException(status_code=500, detail=routeApiMsg("Token refresh failed")) @router.post("/logout") @limiter.limit("30/minute") def logout(request: Request, response: Response, currentUser: User = Depends(getCurrentUser)) -> JSONResponse: """Logout from local authentication""" try: # Get user interface with current user context appInterface = getInterface(currentUser) # Get token from cookie or Authorization header token = request.cookies.get('auth_token') if not token: auth_header = request.headers.get("Authorization") if auth_header and auth_header.lower().startswith("bearer "): token = auth_header.split(" ", 1)[1].strip() if not token: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=routeApiMsg("No token found")) try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) session_id = payload.get("sid") or payload.get("sessionId") jti = payload.get("jti") except Exception as e: logger.error(f"Failed to decode JWT on logout: {str(e)}") raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=routeApiMsg("Invalid token")) revoked = 0 if session_id: revoked = appInterface.revokeTokensBySessionId(session_id, currentUser.id, AuthAuthority.LOCAL, revokedBy=currentUser.id, reason="logout") elif jti: appInterface.revokeTokenById(jti, revokedBy=currentUser.id, reason="logout") revoked = 1 # Log successful logout # MULTI-TENANT: Logout is a system-level function, no mandate context try: from modules.shared.auditLogger import audit_logger audit_logger.logUserAccess( userId=str(currentUser.id), mandateId="system", action="logout", successInfo=f"revoked_tokens: {revoked}", ipAddress=request.client.host if request.client else None, userAgent=request.headers.get("user-agent"), success=True ) except Exception: # Don't fail if audit logging fails pass # Create the JSON response first json_response = JSONResponse({ "message": "Successfully logged out - cookies cleared", "revokedTokens": revoked }) # Clear httpOnly cookies on the response we're actually returning clearAccessTokenCookie(json_response) clearRefreshTokenCookie(json_response) return json_response except Exception as e: logger.error(f"Error during logout: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Logout failed: {str(e)}" ) @router.get("/available") @limiter.limit("10/minute") def check_username_availability( request: Request, username: str, authenticationAuthority: str = "local" ) -> Dict[str, Any]: """Check if a username is available for registration.""" try: # Get root interface appInterface = getRootInterface() # Use the interface's method to check availability result = appInterface.checkUsernameAvailability({ "username": username, "authenticationAuthority": authenticationAuthority }) return { "username": username, "authenticationAuthority": authenticationAuthority, "available": result["available"], "message": result["message"] } except Exception as e: logger.error(f"Error checking username availability: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to check username availability: {str(e)}" ) @router.post("/password-reset-request") @limiter.limit("5/minute") def password_reset_request( request: Request, username: str = Body(..., embed=True), frontendUrl: str = Body(..., embed=True) ) -> Dict[str, Any]: """Request password reset email. Finds user by username (globally unique) and sends reset email to their email address. Args: username: User's username (globally unique identifier) frontendUrl: The frontend URL to use in magic link (REQUIRED - provided by frontend) """ try: rootInterface = getRootInterface() # Frontend URL is required - no fallback baseUrl = frontendUrl.rstrip("/") # Find user by username (username is globally unique) user = rootInterface.findUserByUsernameLocalAuth(username) if user and user.email: expiryHours = int(APP_CONFIG.get("Auth_RESET_TOKEN_EXPIRY_HOURS", "24")) try: # Generate reset token token, expires = rootInterface.generateResetTokenAndExpiry() # Set reset token but keep existing password valid until new one is set rootInterface.setResetToken(user.id, token, expires, clearPassword=False) # Generate magic link using provided frontend URL magicLink = f"{baseUrl}/reset?token={token}" # Send email using dedicated auth email function emailSubject = "PowerOn - Passwort zurücksetzen" emailHtml = _buildAuthEmailHtml( greeting=f"Hallo {user.fullName or user.username}", bodyLines=[ "Sie haben eine Passwort-Zurücksetzung für Ihren PowerOn Account angefordert.", "", f"Benutzername: {user.username}", "", "Klicken Sie auf die Schaltfläche, um Ihr Passwort zurückzusetzen:", ], buttonText="Passwort zurücksetzen", buttonUrl=magicLink, footerText=f"Dieser Link ist {expiryHours} Stunden gültig. Falls Sie diese Anforderung nicht gestellt haben, können Sie diese E-Mail ignorieren.", ) emailSent = _sendAuthEmail( recipient=user.email, subject=emailSubject, message="", userId=str(user.id), htmlOverride=emailHtml, ) if emailSent: logger.info(f"Password reset email sent to {user.email} for user {user.username}") else: logger.warning(f"Failed to send password reset email to {user.email}") except Exception as userErr: logger.error(f"Failed to send reset email for user {username}: {str(userErr)}") else: logger.info(f"Password reset requested for unknown username: {username}") # Always return same message (security - don't reveal if user exists) return { "message": "Falls ein Konto mit diesem Benutzernamen existiert, wurde ein Reset-Link an die hinterlegte E-Mail-Adresse gesendet." } except Exception as e: logger.error(f"Error in password reset request: {str(e)}") # Still return success for security return { "message": "Falls ein Konto mit diesem Benutzernamen existiert, wurde ein Reset-Link an die hinterlegte E-Mail-Adresse gesendet." } @router.post("/onboarding") @limiter.limit("5/minute") def onboarding_provision( request: Request, currentUser: User = Depends(getCurrentUser), companyName: str = Body(None, embed=True), planKey: str = Body("TRIAL_14D", embed=True), ) -> Dict[str, Any]: """Post-login onboarding: create a mandate for the user. Guard: user can only create a mandate if they are NOT already admin in any non-system mandate. This prevents duplicate provisioning. """ try: from modules.datamodels.datamodelMembership import UserMandate, UserMandateRole from modules.datamodels.datamodelRbac import Role appInterface = getRootInterface() db = appInterface.db userId = str(currentUser.id) # Check if user already has admin role in a non-system mandate userMandates = db.getRecordset(UserMandate, recordFilter={"userId": userId, "enabled": True}) hasAdminMandate = False for um in userMandates: mandateId = um.get("mandateId") mandate = db.getRecordset(Mandate, recordFilter={"id": mandateId}) if mandate and mandate[0].get("isSystem"): continue umId = um.get("id") umRoles = db.getRecordset(UserMandateRole, recordFilter={"userMandateId": umId}) for umRole in umRoles: roleId = umRole.get("roleId") roles = db.getRecordset(Role, recordFilter={"id": roleId}) for role in roles: if "admin" in (role.get("roleLabel") or "").lower(): hasAdminMandate = True break if hasAdminMandate: break if hasAdminMandate: break if hasAdminMandate: logger.info(f"Onboarding: user {currentUser.username} already has admin mandate — skipping provisioning") return { "message": "User already has an admin mandate", "mandateId": None, "alreadyProvisioned": True, } mandateLabel = (companyName.strip() if companyName and companyName.strip() else f"Home {currentUser.username}") if planKey not in ("TRIAL_14D", "STARTER_MONTHLY", "STARTER_YEARLY", "PROFESSIONAL_MONTHLY", "PROFESSIONAL_YEARLY", "MAX_MONTHLY", "MAX_YEARLY"): planKey = "TRIAL_14D" result = appInterface._provisionMandateForUser( userId=userId, mandateLabel=mandateLabel, planKey=planKey, ) try: activatedCount = appInterface._activatePendingSubscriptions(userId) if activatedCount > 0: logger.info(f"Activated {activatedCount} pending subscription(s) for user {currentUser.username} during onboarding") except Exception as subErr: logger.error(f"Error activating subscriptions during onboarding: {subErr}") logger.info(f"Onboarding provision for {currentUser.username}: {result}") return { "message": "Mandate provisioned successfully", "mandateId": result.get("mandateId") if result else None, "alreadyProvisioned": False, } except Exception as e: logger.error(f"Onboarding provision failed: {e}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e) ) @router.post("/password-reset") @limiter.limit("10/minute") def password_reset( request: Request, token: str = Body(..., embed=True), password: str = Body(..., embed=True) ) -> Dict[str, Any]: """Reset password using token from magic link.""" try: # Validate token format (UUID) try: uuid.UUID(token) except ValueError: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=routeApiMsg("Ungültiger oder abgelaufener Reset-Link") ) # Validate password strength if len(password) < 8: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=routeApiMsg("Passwort muss mindestens 8 Zeichen lang sein") ) rootInterface = getRootInterface() # Verify and reset success = rootInterface.resetPasswordWithToken(token, password) if not success: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=routeApiMsg("Ungültiger oder abgelaufener Reset-Link") ) # Log success try: from modules.shared.auditLogger import audit_logger audit_logger.logSecurityEvent( userId="unknown", mandateId="unknown", action="password_reset_via_token", details="Password reset completed via magic link" ) except Exception: pass return {"message": "Passwort erfolgreich gesetzt"} except HTTPException: raise except Exception as e: logger.error(f"Error in password reset: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=routeApiMsg("Passwort-Zurücksetzung fehlgeschlagen") ) # ============================================================ # Neutralization Mappings (user-level, view/delete) # ============================================================ @router.get("/neutralization-mappings") @limiter.limit("60/minute") def _getNeutralizationMappings( request: Request, context: RequestContext = Depends(getRequestContext), ): """List the current user's neutralization placeholder mappings.""" userId = str(context.user.id) from modules.interfaces.interfaceDbApp import getRootInterface from modules.features.neutralization.datamodelFeatureNeutralizer import DataNeutralizerAttributes rootIf = getRootInterface() records = rootIf.db.getRecordset(DataNeutralizerAttributes, recordFilter={"userId": userId}) return {"mappings": records} @router.delete("/neutralization-mappings/{mappingId}") @limiter.limit("30/minute") def _deleteNeutralizationMapping( request: Request, mappingId: str = Path(..., description="ID of the mapping to delete"), context: RequestContext = Depends(getRequestContext), ): """Delete a specific neutralization mapping owned by the current user.""" userId = str(context.user.id) from modules.interfaces.interfaceDbApp import getRootInterface from modules.features.neutralization.datamodelFeatureNeutralizer import DataNeutralizerAttributes rootIf = getRootInterface() records = rootIf.db.getRecordset(DataNeutralizerAttributes, recordFilter={"id": mappingId}) if not records: raise HTTPException(status_code=404, detail=routeApiMsg("Mapping not found")) rec = records[0] recUserId = rec.get("userId") if isinstance(rec, dict) else getattr(rec, "userId", None) if recUserId != userId: raise HTTPException(status_code=403, detail=routeApiMsg("Not your mapping")) rootIf.db.recordDelete(DataNeutralizerAttributes, mappingId) return {"deleted": True, "id": mappingId}