from fastapi import APIRouter, HTTPException, Depends, Body, Path from typing import List, Dict, Any, Optional from fastapi import status from datetime import datetime from dataclasses import dataclass # Import auth module from modules.auth import getCurrentActiveUser, getUserContext # Import interfaces from modules.gatewayInterface import getGatewayInterface from modules.gatewayModel import User # Determine all attributes of the model def getModelAttributes(modelClass): return [attr for attr in dir(modelClass) if not callable(getattr(modelClass, attr)) and not attr.startswith('_') and attr not in ('metadata', 'query', 'query_class', 'label', 'field_labels')] # Model attributes for User userAttributes = getModelAttributes(User) @dataclass class AppContext: """Context object for all required connections and user information""" mandateId: int userId: int interfaceData: Any # Gateway Interface async def getContext(currentUser: Dict[str, Any]) -> AppContext: """Creates a central context object with all required connections""" mandateId, userId = await getUserContext(currentUser) interfaceData = getGatewayInterface(mandateId, userId) return AppContext( mandateId=mandateId, userId=userId, interfaceData=interfaceData ) # Create router for user endpoints router = APIRouter( prefix="/api/users", tags=["Users"], responses={404: {"description": "Not found"}} ) @router.get("", response_model=List[Dict[str, Any]]) async def getUsers(currentUser: Dict[str, Any] = Depends(getCurrentActiveUser)): """Get all available users (only for Admin/SysAdmin users)""" context = await getContext(currentUser) # Permission check if currentUser.get("privilege") not in ["admin", "sysadmin"]: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="No permission to access the user list" ) # Admin sees only users of own mandate, SysAdmin sees all if currentUser.get("privilege") == "admin": return context.interfaceData.getUsersByMandate(context.mandateId) else: # sysadmin return context.interfaceData.getAllUsers() @router.post("/register", response_model=Dict[str, Any]) async def registerUser(userData: dict = Body(...)): """Register a new user""" # Don't use user context for registration gateway = getGatewayInterface() if "username" not in userData or "password" not in userData: raise HTTPException(status_code=400, detail="Username and password required") try: # Prepare mandate data mandateData = { "name": f"Mandate of {userData['username']}", "language": userData.get("language", "de") } newMandate = gateway.createMandate(**mandateData) # Filter user attributes from the request userCreateData = {} for attr in userAttributes: if attr in userData and attr != "id": # ID is auto-assigned userCreateData[attr] = userData[attr] # Required fields userCreateData["username"] = userData["username"] userCreateData["password"] = userData["password"] userCreateData["mandateId"] = newMandate["id"] # Default values for optional fields userCreateData.setdefault("disabled", False) userCreateData.setdefault("privilege", "user") userCreateData.setdefault("language", "de") newUser = gateway.createUser(**userCreateData) return newUser except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) @router.get("/{userId}", response_model=Dict[str, Any]) async def getUser( userId: int, currentUser: Dict[str, Any] = Depends(getCurrentActiveUser) ): """Get a specific user""" context = await getContext(currentUser) # Initialize gateway interface with user context userToGet = context.interfaceData.getUser(userId) if not userToGet: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"User with ID {userId} not found" ) # Permission check # User can only view themselves, Admin only users of their own mandate, SysAdmin all if userId == context.userId: # User can view themselves pass elif currentUser.get("privilege") == "admin" and userToGet.get("mandateId") == context.mandateId: # Admin can view users of their own mandate pass elif currentUser.get("privilege") == "sysadmin": # SysAdmin can view all users pass else: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="No permission to view this user" ) return userToGet @router.put("/{userId}", response_model=Dict[str, Any]) async def updateUser( userId: int = Path(..., description="ID of the user to update"), userData: Dict[str, Any] = Body(..., description="Updated user data"), currentUser: Dict[str, Any] = Depends(getCurrentActiveUser) ): """Update an existing user""" context = await getContext(currentUser) # User exists? userToUpdate = context.interfaceData.getUser(userId) if not userToUpdate: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"User with ID {userId} not found" ) # Permission check isSelfUpdate = userId == context.userId isAdmin = currentUser.get("privilege") == "admin" isSysadmin = currentUser.get("privilege") == "sysadmin" sameMandate = userToUpdate.get("mandateId") == context.mandateId # Filter allowed fields based on permission level allowedFields = {"username", "email", "fullName", "language"} sensitiveFields = {"mandateId", "disabled", "privilege"} # Check if sensitive fields should be changed sensitiveUpdate = any(field in userData for field in sensitiveFields) if isSelfUpdate and sensitiveUpdate: # Normal users cannot change their sensitive data raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="No permission to change sensitive user data" ) elif isAdmin and sensitiveUpdate and not sameMandate: # Admins can only change sensitive data for users of their own mandate raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="No permission to change sensitive data for users of other mandates" ) elif not (isSelfUpdate or (isAdmin and sameMandate) or isSysadmin): # No permission for other cases raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="No permission to update this user" ) # Dynamically filter attributes from the request updateData = {} for attr in userAttributes: if attr in userData and attr != "id": # ID cannot be changed updateData[attr] = userData[attr] # Remove disallowed fields for normal users if not (isAdmin or isSysadmin): updateData = {k: v for k, v in updateData.items() if k in allowedFields} # Update user data updatedUser = context.interfaceData.updateUser(userId, updateData) return updatedUser @router.delete("/{userId}", status_code=status.HTTP_204_NO_CONTENT) async def deleteUser( userId: int = Path(..., description="ID of the user to delete"), currentUser: Dict[str, Any] = Depends(getCurrentActiveUser) ): """Delete a user""" context = await getContext(currentUser) # User exists? userToDelete = context.interfaceData.getUser(userId) if not userToDelete: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"User with ID {userId} not found" ) # Permission check isSelfDelete = userId == context.userId isAdmin = currentUser.get("privilege") == "admin" isSysadmin = currentUser.get("privilege") == "sysadmin" sameMandate = userToDelete.get("mandateId") == context.mandateId if isSelfDelete: # User can delete themselves pass elif isAdmin and sameMandate: # Admin can delete users of their own mandate pass elif isSysadmin: # SysAdmin can delete all users pass else: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="No permission to delete this user" ) # Delete user and all referenced objects success = context.interfaceData.deleteUser(userId) if not success: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error deleting user with ID {userId}" ) return None