gateway/routes/users.py

265 lines
No EOL
9.4 KiB
Python

from fastapi import APIRouter, HTTPException, Depends, Body, Path
from typing import List, Dict, Any, Optional
from fastapi import status
from datetime import datetime
from dataclasses import dataclass
# Import auth module
from modules.auth import get_current_active_user, get_user_context
# Import interfaces
from modules.gateway_interface import get_gateway_interface
from modules.gateway_model import User
# Determine all attributes of the model (except internal/special attributes)
def get_model_attributes(model_class):
return [attr for attr in dir(model_class)
if not callable(getattr(model_class, attr))
and not attr.startswith('_')
and attr != 'metadata'
and attr != 'query'
and attr != 'query_class'
and attr != 'label'
and attr != 'field_labels']
# Model attributes for User
user_attributes = get_model_attributes(User)
@dataclass
class AppContext:
"""Context object for all required connections and user information"""
mandate_id: int
user_id: int
interface_data: Any # Gateway Interface
async def get_context(current_user: Dict[str, Any]) -> AppContext:
"""
Creates a central context object with all required interfaces
Args:
current_user: Current user from authentication
Returns:
AppContext object with all required connections
"""
mandate_id, user_id = await get_user_context(current_user)
interface_data = get_gateway_interface(mandate_id, user_id)
return AppContext(
mandate_id=mandate_id,
user_id=user_id,
interface_data=interface_data
)
# Create router for user endpoints
router = APIRouter(
prefix="/api/users",
tags=["Users"],
responses={404: {"description": "Not found"}}
)
@router.get("", response_model=List[Dict[str, Any]])
async def get_users(current_user: Dict[str, Any] = Depends(get_current_active_user)):
"""Get all available users (only for Admin/SysAdmin users)"""
context = await get_context(current_user)
# Permission check
if current_user.get("privilege") not in ["admin", "sysadmin"]:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="No permission to access the user list"
)
# Admin sees only users of own mandate, SysAdmin sees all
if current_user.get("privilege") == "admin":
return context.interface_data.get_users_by_mandate(context.mandate_id)
else: # sysadmin
return context.interface_data.get_all_users()
@router.post("/register", response_model=Dict[str, Any])
async def register_user(user_data: dict = Body(...)):
"""Register a new user"""
# Don't use user context for registration
gateway = get_gateway_interface()
if "username" not in user_data or "password" not in user_data:
raise HTTPException(status_code=400, detail="Username and password required")
try:
# Dynamically filter attributes
mandate_data = {
"name": f"Mandate of {user_data['username']}",
"language": user_data.get("language", "de")
}
new_mandate = gateway.create_mandate(**mandate_data)
# Filter user attributes from the request
user_create_data = {}
for attr in user_attributes:
if attr in user_data and attr not in ["id"]: # ID is auto-assigned
user_create_data[attr] = user_data[attr]
# Required fields
user_create_data["username"] = user_data["username"]
user_create_data["password"] = user_data["password"]
user_create_data["mandate_id"] = new_mandate["id"]
# Default values for optional fields
if "disabled" not in user_create_data:
user_create_data["disabled"] = False
if "privilege" not in user_create_data:
user_create_data["privilege"] = "user"
if "language" not in user_create_data:
user_create_data["language"] = "de"
new_user = gateway.create_user(**user_create_data)
return new_user
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@router.get("/{user_id}", response_model=Dict[str, Any])
async def get_user(
user_id: int,
current_user: Dict[str, Any] = Depends(get_current_active_user)
):
"""Get a specific user"""
context = await get_context(current_user)
# Initialize gateway interface with user context
user_to_get = context.interface_data.get_user(user_id)
if not user_to_get:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"User with ID {user_id} not found"
)
# Permission check
# User can only view themselves, Admin only users of their own mandate, SysAdmin all
if user_id == context.user_id:
# User can view themselves
pass
elif current_user.get("privilege") == "admin" and user_to_get.get("mandate_id") == context.mandate_id:
# Admin can view users of their own mandate
pass
elif current_user.get("privilege") == "sysadmin":
# SysAdmin can view all users
pass
else:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="No permission to view this user"
)
return user_to_get
@router.put("/{user_id}", response_model=Dict[str, Any])
async def update_user(
user_id: int = Path(..., description="ID of the user to update"),
user_data: Dict[str, Any] = Body(..., description="Updated user data"),
current_user: Dict[str, Any] = Depends(get_current_active_user)
):
"""Update an existing user"""
context = await get_context(current_user)
# User exists?
user_to_update = context.interface_data.get_user(user_id)
if not user_to_update:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"User with ID {user_id} not found"
)
# Permission check
is_self_update = user_id == context.user_id
is_admin = current_user.get("privilege") == "admin"
is_sysadmin = current_user.get("privilege") == "sysadmin"
same_mandate = user_to_update.get("mandate_id") == context.mandate_id
# Filter allowed fields based on permission level
allowed_fields = {"username", "email", "full_name", "language"}
sensitive_fields = {"mandate_id", "disabled", "privilege"}
# Check if sensitive fields should be changed
sensitive_update = any(field in user_data for field in sensitive_fields)
if is_self_update and sensitive_update:
# Normal users cannot change their sensitive data
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="No permission to change sensitive user data"
)
elif is_admin and sensitive_update and not same_mandate:
# Admins can only change sensitive data for users of their own mandate
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="No permission to change sensitive data for users of other mandates"
)
elif not (is_self_update or (is_admin and same_mandate) or is_sysadmin):
# No permission for other cases
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="No permission to update this user"
)
# Dynamically filter attributes from the request
update_data = {}
for attr in user_attributes:
if attr in user_data and attr not in ["id"]: # ID cannot be changed
update_data[attr] = user_data[attr]
# Remove disallowed fields for normal users
if not (is_admin or is_sysadmin):
update_data = {k: v for k, v in update_data.items() if k in allowed_fields}
# Update user data
updated_user = context.interface_data.update_user(user_id, update_data)
return updated_user
@router.delete("/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_user(
user_id: int = Path(..., description="ID of the user to delete"),
current_user: Dict[str, Any] = Depends(get_current_active_user)
):
"""Delete a user"""
context = await get_context(current_user)
# User exists?
user_to_delete = context.interface_data.get_user(user_id)
if not user_to_delete:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"User with ID {user_id} not found"
)
# Permission check
is_self_delete = user_id == context.user_id
is_admin = current_user.get("privilege") == "admin"
is_sysadmin = current_user.get("privilege") == "sysadmin"
same_mandate = user_to_delete.get("mandate_id") == context.mandate_id
if is_self_delete:
# User can delete themselves
pass
elif is_admin and same_mandate:
# Admin can delete users of their own mandate
pass
elif is_sysadmin:
# SysAdmin can delete all users
pass
else:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="No permission to delete this user"
)
# Delete user and all referenced objects
success = context.interface_data.delete_user(user_id)
if not success:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error deleting user with ID {user_id}"
)
# Return no content on successful deletion
return None