249 lines
No EOL
8.7 KiB
Python
249 lines
No EOL
8.7 KiB
Python
from fastapi import APIRouter, HTTPException, Depends, Body, Path
|
|
from typing import List, Dict, Any, Optional
|
|
from fastapi import status
|
|
from datetime import datetime
|
|
from dataclasses import dataclass
|
|
|
|
# Import auth module
|
|
from modules.auth import getCurrentActiveUser, getUserContext
|
|
|
|
# Import interfaces
|
|
from modules.gatewayInterface import getGatewayInterface
|
|
from modules.gatewayModel import User
|
|
|
|
# Determine all attributes of the model
|
|
def getModelAttributes(modelClass):
|
|
return [attr for attr in dir(modelClass)
|
|
if not callable(getattr(modelClass, attr))
|
|
and not attr.startswith('_')
|
|
and attr not in ('metadata', 'query', 'query_class', 'label', 'field_labels')]
|
|
|
|
# Model attributes for User
|
|
userAttributes = getModelAttributes(User)
|
|
|
|
@dataclass
|
|
class AppContext:
|
|
"""Context object for all required connections and user information"""
|
|
mandateId: int
|
|
userId: int
|
|
interfaceData: Any # Gateway Interface
|
|
|
|
async def getContext(currentUser: Dict[str, Any]) -> AppContext:
|
|
"""Creates a central context object with all required connections"""
|
|
mandateId, userId = await getUserContext(currentUser)
|
|
interfaceData = getGatewayInterface(mandateId, userId)
|
|
|
|
return AppContext(
|
|
mandateId=mandateId,
|
|
userId=userId,
|
|
interfaceData=interfaceData
|
|
)
|
|
|
|
# Create router for user endpoints
|
|
router = APIRouter(
|
|
prefix="/api/users",
|
|
tags=["Users"],
|
|
responses={404: {"description": "Not found"}}
|
|
)
|
|
|
|
@router.get("", response_model=List[Dict[str, Any]])
|
|
async def getUsers(currentUser: Dict[str, Any] = Depends(getCurrentActiveUser)):
|
|
"""Get all available users (only for Admin/SysAdmin users)"""
|
|
context = await getContext(currentUser)
|
|
|
|
# Permission check
|
|
if currentUser.get("privilege") not in ["admin", "sysadmin"]:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="No permission to access the user list"
|
|
)
|
|
|
|
# Admin sees only users of own mandate, SysAdmin sees all
|
|
if currentUser.get("privilege") == "admin":
|
|
return context.interfaceData.getUsersByMandate(context.mandateId)
|
|
else: # sysadmin
|
|
return context.interfaceData.getAllUsers()
|
|
|
|
@router.post("/register", response_model=Dict[str, Any])
|
|
async def registerUser(userData: dict = Body(...)):
|
|
"""Register a new user"""
|
|
# Don't use user context for registration
|
|
gateway = getGatewayInterface()
|
|
|
|
if "username" not in userData or "password" not in userData:
|
|
raise HTTPException(status_code=400, detail="Username and password required")
|
|
|
|
try:
|
|
# Prepare mandate data
|
|
mandateData = {
|
|
"name": f"Mandate of {userData['username']}",
|
|
"language": userData.get("language", "de")
|
|
}
|
|
|
|
newMandate = gateway.createMandate(**mandateData)
|
|
|
|
# Filter user attributes from the request
|
|
userCreateData = {}
|
|
for attr in userAttributes:
|
|
if attr in userData and attr != "id": # ID is auto-assigned
|
|
userCreateData[attr] = userData[attr]
|
|
|
|
# Required fields
|
|
userCreateData["username"] = userData["username"]
|
|
userCreateData["password"] = userData["password"]
|
|
userCreateData["mandateId"] = newMandate["id"]
|
|
|
|
# Default values for optional fields
|
|
userCreateData.setdefault("disabled", False)
|
|
userCreateData.setdefault("privilege", "user")
|
|
userCreateData.setdefault("language", "de")
|
|
|
|
newUser = gateway.createUser(**userCreateData)
|
|
return newUser
|
|
except ValueError as e:
|
|
raise HTTPException(status_code=400, detail=str(e))
|
|
|
|
@router.get("/{userId}", response_model=Dict[str, Any])
|
|
async def getUser(
|
|
userId: int,
|
|
currentUser: Dict[str, Any] = Depends(getCurrentActiveUser)
|
|
):
|
|
"""Get a specific user"""
|
|
context = await getContext(currentUser)
|
|
|
|
# Initialize gateway interface with user context
|
|
userToGet = context.interfaceData.getUser(userId)
|
|
if not userToGet:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"User with ID {userId} not found"
|
|
)
|
|
|
|
# Permission check
|
|
# User can only view themselves, Admin only users of their own mandate, SysAdmin all
|
|
if userId == context.userId:
|
|
# User can view themselves
|
|
pass
|
|
elif currentUser.get("privilege") == "admin" and userToGet.get("mandateId") == context.mandateId:
|
|
# Admin can view users of their own mandate
|
|
pass
|
|
elif currentUser.get("privilege") == "sysadmin":
|
|
# SysAdmin can view all users
|
|
pass
|
|
else:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="No permission to view this user"
|
|
)
|
|
|
|
return userToGet
|
|
|
|
@router.put("/{userId}", response_model=Dict[str, Any])
|
|
async def updateUser(
|
|
userId: int = Path(..., description="ID of the user to update"),
|
|
userData: Dict[str, Any] = Body(..., description="Updated user data"),
|
|
currentUser: Dict[str, Any] = Depends(getCurrentActiveUser)
|
|
):
|
|
"""Update an existing user"""
|
|
context = await getContext(currentUser)
|
|
|
|
# User exists?
|
|
userToUpdate = context.interfaceData.getUser(userId)
|
|
if not userToUpdate:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"User with ID {userId} not found"
|
|
)
|
|
|
|
# Permission check
|
|
isSelfUpdate = userId == context.userId
|
|
isAdmin = currentUser.get("privilege") == "admin"
|
|
isSysadmin = currentUser.get("privilege") == "sysadmin"
|
|
sameMandate = userToUpdate.get("mandateId") == context.mandateId
|
|
|
|
# Filter allowed fields based on permission level
|
|
allowedFields = {"username", "email", "fullName", "language"}
|
|
sensitiveFields = {"mandateId", "disabled", "privilege"}
|
|
|
|
# Check if sensitive fields should be changed
|
|
sensitiveUpdate = any(field in userData for field in sensitiveFields)
|
|
|
|
if isSelfUpdate and sensitiveUpdate:
|
|
# Normal users cannot change their sensitive data
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="No permission to change sensitive user data"
|
|
)
|
|
elif isAdmin and sensitiveUpdate and not sameMandate:
|
|
# Admins can only change sensitive data for users of their own mandate
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="No permission to change sensitive data for users of other mandates"
|
|
)
|
|
elif not (isSelfUpdate or (isAdmin and sameMandate) or isSysadmin):
|
|
# No permission for other cases
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="No permission to update this user"
|
|
)
|
|
|
|
# Dynamically filter attributes from the request
|
|
updateData = {}
|
|
for attr in userAttributes:
|
|
if attr in userData and attr != "id": # ID cannot be changed
|
|
updateData[attr] = userData[attr]
|
|
|
|
# Remove disallowed fields for normal users
|
|
if not (isAdmin or isSysadmin):
|
|
updateData = {k: v for k, v in updateData.items() if k in allowedFields}
|
|
|
|
# Update user data
|
|
updatedUser = context.interfaceData.updateUser(userId, updateData)
|
|
return updatedUser
|
|
|
|
@router.delete("/{userId}", status_code=status.HTTP_204_NO_CONTENT)
|
|
async def deleteUser(
|
|
userId: int = Path(..., description="ID of the user to delete"),
|
|
currentUser: Dict[str, Any] = Depends(getCurrentActiveUser)
|
|
):
|
|
"""Delete a user"""
|
|
context = await getContext(currentUser)
|
|
|
|
# User exists?
|
|
userToDelete = context.interfaceData.getUser(userId)
|
|
if not userToDelete:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"User with ID {userId} not found"
|
|
)
|
|
|
|
# Permission check
|
|
isSelfDelete = userId == context.userId
|
|
isAdmin = currentUser.get("privilege") == "admin"
|
|
isSysadmin = currentUser.get("privilege") == "sysadmin"
|
|
sameMandate = userToDelete.get("mandateId") == context.mandateId
|
|
|
|
if isSelfDelete:
|
|
# User can delete themselves
|
|
pass
|
|
elif isAdmin and sameMandate:
|
|
# Admin can delete users of their own mandate
|
|
pass
|
|
elif isSysadmin:
|
|
# SysAdmin can delete all users
|
|
pass
|
|
else:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="No permission to delete this user"
|
|
)
|
|
|
|
# Delete user and all referenced objects
|
|
success = context.interfaceData.deleteUser(userId)
|
|
if not success:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Error deleting user with ID {userId}"
|
|
)
|
|
|
|
return None |