# Copyright (c) 2025 Patrick Motsch # All rights reserved. """ User routes for the backend API. Implements the endpoints for user management. MULTI-TENANT: User management requires RequestContext. - mandateId from X-Mandate-Id header determines which users are visible - SysAdmin can see all users across mandates """ from fastapi import APIRouter, HTTPException, Depends, Body, Path, Request, Response, Query from typing import List, Dict, Any, Optional from fastapi import status from pydantic import BaseModel import logging import json # Import interfaces and models import modules.interfaces.interfaceDbApp as interfaceDbApp from modules.auth import limiter, getRequestContext, RequestContext # Import the attribute definition and helper functions from modules.datamodels.datamodelUam import User, UserInDB, AuthAuthority from modules.interfaces.interfaceDbApp import getRootInterface from modules.datamodels.datamodelPagination import PaginationParams, PaginatedResponse, PaginationMetadata, normalize_pagination_dict # Configure logger logger = logging.getLogger(__name__) def _isAdminForUser(context: RequestContext, targetUserId: str) -> bool: """ Check if the current user has admin rights for the target user. SysAdmin can manage all users. MandateAdmin can manage users in their mandates. Works without X-Mandate-Id header (admin pages don't send it). """ if context.hasSysAdminRole: return True # Find mandates where current user is admin rootInterface = getRootInterface() userId = str(context.user.id) userMandates = rootInterface.getUserMandates(userId) adminMandateIds = [] for um in userMandates: if not getattr(um, 'enabled', True): continue umId = getattr(um, 'id', None) mandateId = getattr(um, 'mandateId', None) if not umId or not mandateId: continue roleIds = rootInterface.getRoleIdsForUserMandate(str(umId)) for roleId in roleIds: role = rootInterface.getRole(roleId) if role and role.roleLabel == "admin" and not role.featureInstanceId: adminMandateIds.append(str(mandateId)) break if not adminMandateIds: return False # Check if target user is in any of the admin's mandates targetMandates = rootInterface.getUserMandates(targetUserId) for tm in targetMandates: if str(getattr(tm, 'mandateId', '')) in adminMandateIds: return True return False def _extractDistinctValues(items: List[Dict[str, Any]], columnKey: str) -> List[str]: """Extract sorted distinct display values for a column from enriched items.""" values = set() for item in items: val = item.get(columnKey) if val is None or val == "": continue if isinstance(val, bool): values.add("true" if val else "false") elif isinstance(val, (int, float)): values.add(str(val)) elif isinstance(val, dict): text = val.get("en") or next((v for v in val.values() if isinstance(v, str) and v), None) if text: values.add(str(text)) else: values.add(str(val)) return sorted(values, key=lambda v: v.lower()) def _handleFilterValuesRequest( items: List[Dict[str, Any]], column: str, paginationJson: Optional[str] = None, ) -> List[str]: """ Generic handler for /filter-values endpoints. Applies all active filters EXCEPT the one for the requested column (cross-filtering), then extracts distinct values for that column. """ crossFilterParams: Optional[PaginationParams] = None if paginationJson: try: import json paginationDict = json.loads(paginationJson) if paginationDict: paginationDict = normalize_pagination_dict(paginationDict) filters = paginationDict.get("filters", {}) filters.pop(column, None) paginationDict["filters"] = filters paginationDict.pop("sort", None) crossFilterParams = PaginationParams(**paginationDict) except (json.JSONDecodeError, ValueError): pass crossFiltered = _applyFiltersAndSort(items, crossFilterParams) return _extractDistinctValues(crossFiltered, column) def _applyFiltersAndSort(items: List[Dict[str, Any]], paginationParams: Optional[PaginationParams]) -> List[Dict[str, Any]]: """ Apply filters and sorting to a list of items. This is used when we can't do server-side filtering in the database (e.g., SysAdmin view). Args: items: List of dictionaries to filter/sort paginationParams: Pagination parameters with filters and sort Returns: Filtered and sorted list """ if not paginationParams: return items result = items.copy() # Apply filters if paginationParams.filters: filters = paginationParams.filters # Handle general search searchTerm = filters.get('search', '').lower() if filters.get('search') else None if searchTerm: def matchesSearch(item: Dict[str, Any]) -> bool: for value in item.values(): if value is not None and searchTerm in str(value).lower(): return True return False result = [item for item in result if matchesSearch(item)] # Handle field-specific filters for field, filterValue in filters.items(): if field == 'search': continue # Already handled if isinstance(filterValue, dict) and 'operator' in filterValue: operator = filterValue.get('operator', 'equals') value = filterValue.get('value') else: operator = 'equals' value = filterValue if value is None or value == '': continue def matchesFilter(item: Dict[str, Any], f: str, op: str, v: Any) -> bool: itemValue = item.get(f) if itemValue is None: return False itemStr = str(itemValue).lower() valueStr = str(v).lower() if op in ('equals', 'eq'): return itemStr == valueStr elif op == 'contains': return valueStr in itemStr elif op == 'startsWith': return itemStr.startswith(valueStr) elif op == 'endsWith': return itemStr.endswith(valueStr) elif op in ('gt', 'gte', 'lt', 'lte'): try: itemNum = float(itemValue) valueNum = float(v) if op == 'gt': return itemNum > valueNum elif op == 'gte': return itemNum >= valueNum elif op == 'lt': return itemNum < valueNum elif op == 'lte': return itemNum <= valueNum except (ValueError, TypeError): return False elif op == 'between': if isinstance(v, dict): fromVal = v.get('from', '') toVal = v.get('to', '') if not fromVal and not toVal: return True # Date range: from/to are YYYY-MM-DD strings, itemValue may be Unix timestamp try: from datetime import datetime, timezone fromTs = None toTs = None if fromVal: fromTs = datetime.strptime(str(fromVal), '%Y-%m-%d').replace(tzinfo=timezone.utc).timestamp() if toVal: toTs = datetime.strptime(str(toVal), '%Y-%m-%d').replace(hour=23, minute=59, second=59, tzinfo=timezone.utc).timestamp() itemNum = float(itemValue) if not isinstance(itemValue, (int, float)) else itemValue # Normalize: if item looks like a millisecond timestamp, convert to seconds if itemNum > 10000000000: itemNum = itemNum / 1000 if fromTs is not None and toTs is not None: return fromTs <= itemNum <= toTs elif fromTs is not None: return itemNum >= fromTs elif toTs is not None: return itemNum <= toTs except (ValueError, TypeError): # Fallback: string comparison (for non-numeric date fields) fromStr = str(fromVal).lower() if fromVal else '' toStr = str(toVal).lower() if toVal else '' if fromStr and toStr: return fromStr <= itemStr <= toStr elif fromStr: return itemStr >= fromStr elif toStr: return itemStr <= toStr return True elif op == 'in': if isinstance(v, list): return itemStr in [str(x).lower() for x in v] return False elif op == 'notIn': if isinstance(v, list): return itemStr not in [str(x).lower() for x in v] return True return True result = [item for item in result if matchesFilter(item, field, operator, value)] # Apply sorting — None values always last if paginationParams.sort: for sortField in reversed(paginationParams.sort): fieldName = sortField.field ascending = sortField.direction == 'asc' noneItems = [item for item in result if item.get(fieldName) is None] nonNoneItems = [item for item in result if item.get(fieldName) is not None] def getSortKey(item: Dict[str, Any], _fn=fieldName): value = item.get(_fn) if isinstance(value, bool): return (0, int(value), '') if isinstance(value, (int, float)): return (0, value, '') return (1, 0, str(value).lower()) nonNoneItems = sorted(nonNoneItems, key=getSortKey, reverse=not ascending) result = nonNoneItems + noneItems return result router = APIRouter( prefix="/api/users", tags=["Manage Users"], responses={404: {"description": "Not found"}} ) # ============================================================================ # OPTIONS ENDPOINTS (for dropdowns) # ============================================================================ @router.get("/options", response_model=List[Dict[str, Any]]) @limiter.limit("60/minute") def get_user_options( request: Request, context: RequestContext = Depends(getRequestContext) ) -> List[Dict[str, Any]]: """ Get user options for select dropdowns. MULTI-TENANT: mandateId from X-Mandate-Id header determines scope. Returns standardized format: [{ value, label }] """ try: appInterface = interfaceDbApp.getInterface(context.user) if context.mandateId: result = appInterface.getUsersByMandate(str(context.mandateId), None) users = result.items if hasattr(result, 'items') else result elif context.hasSysAdminRole: users = appInterface.getAllUsers() else: raise HTTPException(status_code=403, detail="Access denied") return [ {"value": user.id, "label": user.fullName or user.username or user.email or user.id} for user in users ] except HTTPException: raise except Exception as e: logger.error(f"Error getting user options: {str(e)}") raise HTTPException(status_code=500, detail=f"Failed to get user options: {str(e)}") # ============================================================================ # CRUD ENDPOINTS # ============================================================================ @router.get("/", response_model=PaginatedResponse[User]) @limiter.limit("30/minute") def get_users( request: Request, pagination: Optional[str] = Query(None, description="JSON-encoded PaginationParams object"), context: RequestContext = Depends(getRequestContext) ) -> PaginatedResponse[User]: """ Get users with optional pagination, sorting, and filtering. MULTI-TENANT: mandateId from X-Mandate-Id header determines scope. SysAdmin without mandateId sees all users. Query Parameters: - pagination: JSON-encoded PaginationParams object, or None for no pagination Examples: - GET /api/users/ (no pagination - returns all users in mandate) - GET /api/users/?pagination={"page":1,"pageSize":10,"sort":[]} """ try: # Parse pagination parameter paginationParams = None if pagination: try: paginationDict = json.loads(pagination) if paginationDict: paginationDict = normalize_pagination_dict(paginationDict) paginationParams = PaginationParams(**paginationDict) except (json.JSONDecodeError, ValueError) as e: raise HTTPException( status_code=400, detail=f"Invalid pagination parameter: {str(e)}" ) appInterface = interfaceDbApp.getInterface(context.user, mandateId=context.mandateId) # MULTI-TENANT: Use mandateId from context (header) # SysAdmin without mandateId can see all users if context.mandateId: # Get users for specific mandate using getUsersByMandate result = appInterface.getUsersByMandate(str(context.mandateId), paginationParams) # getUsersByMandate returns PaginatedResult if pagination was provided if paginationParams and hasattr(result, 'items'): return PaginatedResponse( items=result.items, pagination=PaginationMetadata( currentPage=result.currentPage, pageSize=result.pageSize, totalItems=result.totalItems, totalPages=result.totalPages, sort=paginationParams.sort, filters=paginationParams.filters ) ) else: # No pagination - result is a list users = result if isinstance(result, list) else result.items if hasattr(result, 'items') else [] return PaginatedResponse( items=users, pagination=None ) elif context.hasSysAdminRole: # SysAdmin without mandateId — DB-level pagination via interface result = appInterface.getAllUsers(paginationParams) if paginationParams and hasattr(result, 'items'): return PaginatedResponse( items=result.items, pagination=PaginationMetadata( currentPage=paginationParams.page, pageSize=paginationParams.pageSize, totalItems=result.totalItems, totalPages=result.totalPages, sort=paginationParams.sort, filters=paginationParams.filters ) ) else: users = result if isinstance(result, list) else (result.items if hasattr(result, 'items') else []) return PaginatedResponse( items=users, pagination=None ) else: # Non-SysAdmin without mandateId: aggregate users across all admin mandates rootInterface = getRootInterface() userMandates = rootInterface.getUserMandates(str(context.user.id)) # Find mandates where user has admin role adminMandateIds = [] for um in userMandates: umId = getattr(um, 'id', None) mandateId = getattr(um, 'mandateId', None) if not umId or not mandateId: continue roleIds = rootInterface.getRoleIdsForUserMandate(str(umId)) for roleId in roleIds: role = rootInterface.getRole(roleId) if role and role.roleLabel == "admin" and not role.featureInstanceId: adminMandateIds.append(str(mandateId)) break if not adminMandateIds: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="No admin access to any mandate" ) # Aggregate users across all admin mandates (deduplicate by user ID) seenUserIds = set() allUsers = [] for mid in adminMandateIds: mandateUsers = rootInterface.getUsersByMandate(mid) if isinstance(mandateUsers, list): users = mandateUsers elif hasattr(mandateUsers, 'items'): users = mandateUsers.items else: users = [] for u in users: uid = u.get("id") if isinstance(u, dict) else getattr(u, "id", None) if uid and uid not in seenUserIds: seenUserIds.add(uid) userData = u if isinstance(u, dict) else u.model_dump() if hasattr(u, 'model_dump') else vars(u) allUsers.append(userData) # Apply server-side filtering and sorting filteredUsers = _applyFiltersAndSort(allUsers, paginationParams) users = [User(**u) for u in filteredUsers] if paginationParams: import math totalItems = len(users) totalPages = math.ceil(totalItems / paginationParams.pageSize) if totalItems > 0 else 0 startIdx = (paginationParams.page - 1) * paginationParams.pageSize endIdx = startIdx + paginationParams.pageSize paginatedUsers = users[startIdx:endIdx] return PaginatedResponse( items=paginatedUsers, pagination=PaginationMetadata( currentPage=paginationParams.page, pageSize=paginationParams.pageSize, totalItems=totalItems, totalPages=totalPages, sort=paginationParams.sort, filters=paginationParams.filters ) ) else: return PaginatedResponse( items=users, pagination=None ) except HTTPException: raise except Exception as e: logger.error(f"Error getting users: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to get users: {str(e)}" ) @router.get("/filter-values") @limiter.limit("60/minute") def get_user_filter_values( request: Request, column: str = Query(..., description="Column key"), pagination: Optional[str] = Query(None, description="JSON-encoded current filters"), context: RequestContext = Depends(getRequestContext) ) -> list: """Return distinct filter values for a column in users.""" try: appInterface = interfaceDbApp.getInterface(context.user, mandateId=context.mandateId) # Build cross-filter pagination (all filters except the requested column) crossFilterPagination = None if pagination: try: paginationDict = json.loads(pagination) if paginationDict: paginationDict = normalize_pagination_dict(paginationDict) filters = paginationDict.get("filters", {}) filters.pop(column, None) paginationDict["filters"] = filters paginationDict.pop("sort", None) crossFilterPagination = PaginationParams(**paginationDict) except (json.JSONDecodeError, ValueError): pass if context.mandateId: # Mandate-scoped: in-memory (users require UserMandate join) result = appInterface.getUsersByMandate(str(context.mandateId), None) users = result if isinstance(result, list) else (result.items if hasattr(result, 'items') else []) items = [u.model_dump() if hasattr(u, 'model_dump') else u for u in users] return _handleFilterValuesRequest(items, column, pagination) elif context.hasSysAdminRole: # SysAdmin: use SQL DISTINCT for DB columns try: rootInterface = getRootInterface() values = rootInterface.db.getDistinctColumnValues( UserInDB, column, crossFilterPagination ) return sorted(values, key=lambda v: v.lower()) except Exception: users = appInterface.getAllUsers() items = [u.model_dump() if hasattr(u, 'model_dump') else u for u in users] return _handleFilterValuesRequest(items, column, pagination) else: # Non-admin multi-mandate: aggregate across admin mandates (in-memory) rootInterface = getRootInterface() userMandates = rootInterface.getUserMandates(str(context.user.id)) adminMandateIds = [] for um in userMandates: umId = getattr(um, 'id', None) mandateId = getattr(um, 'mandateId', None) if not umId or not mandateId: continue roleIds = rootInterface.getRoleIdsForUserMandate(str(umId)) for roleId in roleIds: role = rootInterface.getRole(roleId) if role and role.roleLabel == "admin" and not role.featureInstanceId: adminMandateIds.append(str(mandateId)) break if not adminMandateIds: return [] seenUserIds = set() users = [] for mid in adminMandateIds: mandateUsers = rootInterface.getUsersByMandate(mid) uList = mandateUsers if isinstance(mandateUsers, list) else (mandateUsers.items if hasattr(mandateUsers, 'items') else []) for u in uList: uid = u.get("id") if isinstance(u, dict) else getattr(u, "id", None) if uid and uid not in seenUserIds: seenUserIds.add(uid) users.append(u) items = [u.model_dump() if hasattr(u, 'model_dump') else u for u in users] return _handleFilterValuesRequest(items, column, pagination) except HTTPException: raise except Exception as e: logger.error(f"Error getting filter values for users: {str(e)}") raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) @router.get("/{userId}", response_model=User) @limiter.limit("30/minute") def get_user( request: Request, userId: str = Path(..., description="ID of the user"), context: RequestContext = Depends(getRequestContext) ) -> User: """ Get a specific user by ID. MULTI-TENANT: User must be in the same mandate (via UserMandate) or caller is SysAdmin. """ try: appInterface = interfaceDbApp.getInterface(context.user) # Get user without filtering by enabled status user = appInterface.getUser(userId) if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"User with ID {userId} not found" ) # MULTI-TENANT: Verify user is in the same mandate (unless SysAdmin) if context.mandateId and not context.hasSysAdminRole: userMandate = appInterface.getUserMandate(userId, str(context.mandateId)) if not userMandate: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="User not in your mandate" ) return user except HTTPException: raise except Exception as e: logger.error(f"Error getting user {userId}: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to get user: {str(e)}" ) class CreateUserRequest(BaseModel): """Request body for creating a new user""" username: str email: Optional[str] = None fullName: Optional[str] = None language: str = "en" enabled: bool = True isSysAdmin: bool = False password: Optional[str] = None @router.post("", response_model=User) @limiter.limit("10/minute") def create_user( request: Request, userData: CreateUserRequest = Body(...), context: RequestContext = Depends(getRequestContext) ) -> User: """ Create a new user. MULTI-TENANT: User is created and automatically added to the current mandate. """ appInterface = interfaceDbApp.getInterface(context.user) # Extract fields from request model and call createUser with individual parameters newUser = appInterface.createUser( username=userData.username, password=userData.password, email=userData.email, fullName=userData.fullName, language=userData.language, enabled=userData.enabled, authenticationAuthority=AuthAuthority.LOCAL, isSysAdmin=userData.isSysAdmin ) # MULTI-TENANT: Add user to current mandate via UserMandate with default "user" role if context.mandateId: userRole = appInterface.getRoleByLabel("user") if not userRole: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="No 'user' role found in system — cannot assign user to mandate" ) appInterface.createUserMandate( userId=str(newUser.id), mandateId=str(context.mandateId), roleIds=[str(userRole.id)] ) logger.info(f"Created UserMandate for user {newUser.id} in mandate {context.mandateId}") return newUser @router.put("/{userId}", response_model=User) @limiter.limit("10/minute") def update_user( request: Request, userId: str = Path(..., description="ID of the user to update"), userData: User = Body(...), context: RequestContext = Depends(getRequestContext) ) -> User: """ Update an existing user. Self-service: Users can update their own profile (language, fullName, etc.). Admin: MandateAdmin can update users in their mandates. SysAdmin for all. """ isSelfUpdate = str(context.user.id) == str(userId) # Non-self updates require admin permission if not isSelfUpdate and not _isAdminForUser(context, userId): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Admin role required to update other users" ) # Use rootInterface for user lookup/update (avoids RBAC filtering on User table) rootInterface = getRootInterface() # Check if the user exists existingUser = rootInterface.getUser(userId) if not existingUser: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"User with ID {userId} not found" ) # Update user updatedUser = rootInterface.updateUser(userId, userData) if not updatedUser: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Error updating the user" ) return updatedUser @router.post("/{userId}/reset-password") @limiter.limit("5/minute") def reset_user_password( request: Request, userId: str = Path(..., description="ID of the user to reset password for"), newPassword: str = Body(..., embed=True), context: RequestContext = Depends(getRequestContext) ) -> Dict[str, Any]: """ Reset user password (Admin only). MULTI-TENANT: MandateAdmin can reset passwords for users in their mandates. SysAdmin for all. """ try: # Check admin permission (SysAdmin or MandateAdmin for this user) if not _isAdminForUser(context, userId): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Admin role required to reset passwords" ) # Get user interface appInterface = interfaceDbApp.getInterface(context.user) # Validate password strength if len(newPassword) < 8: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Password must be at least 8 characters long" ) # Reset password success = appInterface.resetUserPassword(userId, newPassword) if not success: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to reset password" ) # SECURITY: Automatically revoke all tokens for the user after password reset try: revoked_count = appInterface.revokeTokensByUser( userId=userId, authority=None, # Revoke all authorities mandateId=None, # Revoke across all mandates revokedBy=context.user.id, reason="password_reset" ) logger.info(f"Revoked {revoked_count} tokens for user {userId} after password reset") except Exception as e: logger.error(f"Failed to revoke tokens after password reset for user {userId}: {str(e)}") # Don't fail the password reset if token revocation fails # Log password reset try: from modules.shared.auditLogger import audit_logger audit_logger.logSecurityEvent( userId=str(context.user.id), mandateId=str(context.mandateId) if context.mandateId else "system", action="password_reset", details=f"Reset password for user {userId}", ipAddress=request.client.host if request.client else None, success=True ) except Exception: pass return { "message": "Password reset successfully", "user_id": userId } except HTTPException: raise except Exception as e: logger.error(f"Error resetting password: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Password reset failed: {str(e)}" ) @router.post("/change-password") @limiter.limit("5/minute") def change_password( request: Request, currentPassword: str = Body(..., embed=True), newPassword: str = Body(..., embed=True), context: RequestContext = Depends(getRequestContext) ) -> Dict[str, Any]: """ Change current user's password. MULTI-TENANT: User changes their own password (no mandate restriction). """ try: # Get user interface appInterface = interfaceDbApp.getInterface(context.user) # Verify current password if not appInterface.verifyPassword(currentPassword, context.user.passwordHash): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Current password is incorrect" ) # Validate new password strength if len(newPassword) < 8: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="New password must be at least 8 characters long" ) # Change password success = appInterface.resetUserPassword(str(context.user.id), newPassword) if not success: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to change password" ) # SECURITY: Automatically revoke all tokens for the user after password change try: revoked_count = appInterface.revokeTokensByUser( userId=str(context.user.id), authority=None, # Revoke all authorities mandateId=None, # Revoke across all mandates revokedBy=context.user.id, reason="password_change" ) logger.info(f"Revoked {revoked_count} tokens for user {context.user.id} after password change") except Exception as e: logger.error(f"Failed to revoke tokens after password change for user {context.user.id}: {str(e)}") # Don't fail the password change if token revocation fails # Log password change try: from modules.shared.auditLogger import audit_logger audit_logger.logSecurityEvent( userId=str(context.user.id), mandateId=str(context.mandateId) if context.mandateId else "system", action="password_change", details="User changed their own password", ipAddress=request.client.host if request.client else None, success=True ) except Exception: pass return { "message": "Password changed successfully. Please log in again with your new password." } except HTTPException: raise except Exception as e: logger.error(f"Error changing password: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Password change failed: {str(e)}" ) @router.post("/{userId}/send-password-link") @limiter.limit("10/minute") def send_password_link( request: Request, userId: str = Path(..., description="ID of the user to send password setup link"), frontendUrl: str = Body(..., embed=True), context: RequestContext = Depends(getRequestContext) ) -> Dict[str, Any]: """ Send password setup/reset link to a user (admin function). MULTI-TENANT: MandateAdmin can send to users in their mandates. SysAdmin for all. This allows admins to send a magic link to users to set or reset their password. Used when creating users without password or to help users who forgot their password. Args: userId: ID of the user to send the link to frontendUrl: The frontend URL to use in the magic link """ try: from modules.shared.configuration import APP_CONFIG # Check admin permission (SysAdmin or MandateAdmin for this user) if not _isAdminForUser(context, userId): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Admin role required to send password links" ) # Get user interface appInterface = interfaceDbApp.getInterface(context.user) # Get target user targetUser = appInterface.getUser(userId) if not targetUser: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) # Check if user has an email if not targetUser.email: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="User has no email address configured" ) # Use root interface for token operations rootInterface = getRootInterface() # Generate reset token token, expires = rootInterface.generateResetTokenAndExpiry() # Set reset token (don't clear password - user might have one already) rootInterface.setResetToken(userId, token, expires, clearPassword=False) # Send email with magic link baseUrl = frontendUrl.rstrip("/") magicLink = f"{baseUrl}/reset?token={token}" expiryHours = int(APP_CONFIG.get("Auth_RESET_TOKEN_EXPIRY_HOURS", "24")) try: from modules.routes.routeSecurityLocal import _buildAuthEmailHtml, _sendAuthEmail emailSubject = "PowerOn - Passwort setzen" emailHtml = _buildAuthEmailHtml( greeting=f"Hallo {targetUser.fullName or targetUser.username}", bodyLines=[ "Ein Administrator hat einen Link zum Setzen Ihres Passworts angefordert.", "", f"Ihr Benutzername: {targetUser.username}", "", "Klicken Sie auf die Schaltfläche, um Ihr Passwort zu setzen:", ], buttonText="Passwort setzen", buttonUrl=magicLink, footerText=f"Dieser Link ist {expiryHours} Stunden gültig. Falls Sie diese Anforderung nicht erwartet haben, kontaktieren Sie bitte Ihren Administrator.", ) emailSent = _sendAuthEmail( recipient=targetUser.email, subject=emailSubject, message="", userId=str(targetUser.id), htmlOverride=emailHtml, ) if not emailSent: logger.warning(f"Failed to send password setup email to {targetUser.email}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to send email" ) except HTTPException: raise except Exception as emailErr: logger.error(f"Error sending password setup email: {str(emailErr)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to send email: {str(emailErr)}" ) # Log the action try: from modules.shared.auditLogger import audit_logger audit_logger.logSecurityEvent( userId=str(context.user.id), mandateId=str(context.mandateId) if context.mandateId else "system", action="send_password_link", details=f"Sent password setup link to user {userId} ({targetUser.email})" ) except Exception: pass logger.info(f"Password setup link sent to {targetUser.email} for user {targetUser.username} by admin {context.user.username}") return { "message": f"Password setup link sent to {targetUser.email}", "userId": userId, "email": targetUser.email } except HTTPException: raise except Exception as e: logger.error(f"Error sending password link: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to send password link: {str(e)}" ) @router.delete("/{userId}", response_model=Dict[str, Any]) @limiter.limit("10/minute") def delete_user( request: Request, userId: str = Path(..., description="ID of the user to delete"), context: RequestContext = Depends(getRequestContext) ) -> Dict[str, Any]: """ Delete a user. MULTI-TENANT: Can only delete users in the same mandate (unless SysAdmin). """ appInterface = interfaceDbApp.getInterface(context.user) # Check if the user exists existingUser = appInterface.getUser(userId) if not existingUser: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"User with ID {userId} not found" ) # MULTI-TENANT: Verify user is in the same mandate (unless SysAdmin) if context.mandateId and not context.hasSysAdminRole: userMandate = appInterface.getUserMandate(userId, str(context.mandateId)) if not userMandate: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Cannot delete user outside your mandate" ) # Delete UserMandate entries for this user first userMandates = appInterface.getUserMandates(userId) for um in userMandates: appInterface.deleteUserMandate(userId, str(um.mandateId)) success = appInterface.deleteUser(userId) if not success: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Error deleting the user" ) return {"message": f"User with ID {userId} successfully deleted"}