gateway/routes/routeUsers.py
2025-05-16 16:17:28 +02:00

409 lines
No EOL
16 KiB
Python

from fastapi import APIRouter, HTTPException, Depends, Body, Path
from typing import List, Dict, Any, Optional
from fastapi import status
from datetime import datetime
from dataclasses import dataclass
# Import auth module
from modules.auth import getCurrentActiveUser, getUserContext
# Import interfaces
from modules.gatewayInterface import getGatewayInterface
from modules.gatewayModel import User
# Determine all attributes of the model
def getModelAttributes(modelClass):
return [attr for attr in dir(modelClass)
if not callable(getattr(modelClass, attr))
and not attr.startswith('_')
and attr not in ('metadata', 'query', 'query_class', 'label', 'field_labels')]
# Model attributes for User
userAttributes = getModelAttributes(User)
@dataclass
class AppContext:
"""Context object for all required connections and user information"""
mandateId: int
userId: int
interfaceData: Any # Gateway Interface
async def getContext(currentUser: Dict[str, Any]) -> AppContext:
"""Creates a central context object with all required connections"""
mandateId, userId = await getUserContext(currentUser)
interfaceData = getGatewayInterface(mandateId, userId)
return AppContext(
mandateId=mandateId,
userId=userId,
interfaceData=interfaceData
)
# Create router for user endpoints
router = APIRouter(
prefix="/api/users",
tags=["Users"],
responses={404: {"description": "Not found"}}
)
@router.get("", response_model=List[Dict[str, Any]])
async def getUsers(currentUser: Dict[str, Any] = Depends(getCurrentActiveUser)):
"""Get all available users (only for Admin/SysAdmin users)"""
context = await getContext(currentUser)
# Permission check
if currentUser.get("privilege") not in ["admin", "sysadmin"]:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="No permission to access the user list"
)
# Admin sees only users of own mandate, SysAdmin sees all
if currentUser.get("privilege") == "admin":
return context.interfaceData.getUsersByMandate(context.mandateId)
else: # sysadmin
return context.interfaceData.getAllUsers()
@router.post("/register", response_model=Dict[str, Any])
async def registerUser(userData: dict = Body(...)):
"""Register a new user"""
# Add debug logging to see what's coming in
import logging
logger = logging.getLogger(__name__)
logger.info(f"Registration request data: {userData}")
# Get the initial IDs for mandate and admin user
adminGateway = getGatewayInterface()
# Get ID of the root mandate - we'll use this for new users
rootMandateId = adminGateway.getInitialId("mandates")
adminUserId = adminGateway.getInitialId("users")
logger.info(f"Root mandate ID: {rootMandateId}, Admin user ID: {adminUserId}")
if not rootMandateId or not adminUserId:
logger.error("System initialization error: Missing root mandate or admin user")
raise HTTPException(
status_code=500,
detail="System is not properly initialized with root mandate and admin user"
)
# Use a gateway with admin context for user creation
gateway = getGatewayInterface(rootMandateId, adminUserId)
if "username" not in userData or "password" not in userData:
logger.error("Missing required fields in registration data")
raise HTTPException(status_code=400, detail="Username and password required")
try:
# Create user data - explicitly set fields
userCreateData = {
"username": userData["username"],
"password": userData["password"],
"mandateId": rootMandateId, # Use the Root mandate
"disabled": False,
"privilege": "user",
"language": userData.get("language", "de")
}
# Explicitly add optional fields - only if they exist and are not empty
if "email" in userData and userData["email"]:
userCreateData["email"] = userData["email"]
if "fullName" in userData and userData["fullName"]:
userCreateData["fullName"] = userData["fullName"]
logger.info(f"Attempting to create user with data: {userCreateData}")
# First check if user already exists
existingUser = gateway.getUserByUsername(userData["username"])
if existingUser:
logger.error(f"User {userData['username']} already exists")
raise HTTPException(
status_code=400,
detail=f"User {userData['username']} already exists"
)
# Create the user
newUser = gateway.createUser(**userCreateData)
logger.info(f"User created successfully: {newUser}")
# Wait a short moment to ensure database consistency
import time
time.sleep(0.5)
# Verify that the password was properly stored
createdUser = gateway.getUserByUsername(userData["username"])
logger.info(f"Retrieved created user: {createdUser}")
if not createdUser:
logger.error("User creation verification failed: User not found after creation")
raise HTTPException(
status_code=500,
detail="Failed to verify user creation. Please try again."
)
if "hashedPassword" not in createdUser:
logger.error("User creation verification failed: Password not stored")
# If password wasn't stored, delete the user and raise an error
if createdUser:
logger.info(f"Attempting to delete user {createdUser['id']} due to missing password")
try:
gateway.deleteUser(createdUser["id"])
logger.info(f"Successfully deleted user {createdUser['id']} after password storage failure")
except Exception as deleteError:
logger.error(f"Failed to delete user after password storage failure: {str(deleteError)}")
raise HTTPException(
status_code=500,
detail="Failed to store password securely. Please try again."
)
# Final verification - try to authenticate the user
try:
authResult = gateway.authenticateUser(userData["username"], userData["password"])
if not authResult:
logger.error("Final verification failed: Could not authenticate newly created user")
# Delete the user if authentication fails
if createdUser:
try:
gateway.deleteUser(createdUser["id"])
logger.info(f"Successfully deleted user {createdUser['id']} after authentication failure")
except Exception as deleteError:
logger.error(f"Failed to delete user after authentication failure: {str(deleteError)}")
raise HTTPException(
status_code=500,
detail="Failed to verify user authentication. Please try again."
)
except Exception as authError:
logger.error(f"Authentication verification failed: {str(authError)}")
# Delete the user if authentication fails
if createdUser:
try:
gateway.deleteUser(createdUser["id"])
logger.info(f"Successfully deleted user {createdUser['id']} after authentication error")
except Exception as deleteError:
logger.error(f"Failed to delete user after authentication error: {str(deleteError)}")
raise HTTPException(
status_code=500,
detail="Failed to verify user authentication. Please try again."
)
logger.info("User registration completed successfully")
return newUser
except ValueError as e:
logger.error(f"ValueError in registration: {str(e)}")
raise HTTPException(status_code=400, detail=str(e))
except PermissionError as e:
logger.error(f"PermissionError in registration: {str(e)}")
raise HTTPException(status_code=403, detail=str(e))
except Exception as e:
import traceback
logger.error(f"Unexpected error in registration: {str(e)}")
logger.error("Full traceback:")
logger.error(traceback.format_exc())
logger.error(f"Error type: {type(e).__name__}")
logger.error(f"Error args: {e.args}")
raise HTTPException(status_code=500, detail=f"Registration failed: {str(e)}")
@router.post("/register-with-msal", response_model=Dict[str, Any])
async def registerUserWithMsal(userData: dict = Body(...)):
"""Register a new user using Microsoft authentication"""
# Add debug logging
import logging
logger = logging.getLogger(__name__)
logger.info(f"MSAL Registration request data: {userData}")
# Get the initial IDs for mandate and admin user
adminGateway = getGatewayInterface()
# Get ID of the root mandate - we'll use this for new users
rootMandateId = adminGateway.getInitialId("mandates")
adminUserId = adminGateway.getInitialId("users")
if not rootMandateId or not adminUserId:
raise HTTPException(
status_code=500,
detail="System is not properly initialized with root mandate and admin user"
)
# Use a gateway with admin context for user creation
gateway = getGatewayInterface(rootMandateId, adminUserId)
if "username" not in userData:
raise HTTPException(status_code=400, detail="Username required")
try:
# Create user data with a random password since it won't be used
import secrets
random_password = secrets.token_urlsafe(32)
# Create user with required fields
newUser = gateway.createUser(
username=userData["username"],
password=random_password, # Random password since MSAL auth will be used
email=userData.get("email"),
fullName=userData.get("fullName"),
language=userData.get("language", "de"),
mandateId=rootMandateId,
disabled=False,
privilege="user"
)
return newUser
except ValueError as e:
logger.error(f"ValueError in MSAL registration: {str(e)}")
raise HTTPException(status_code=400, detail=str(e))
except PermissionError as e:
logger.error(f"PermissionError in MSAL registration: {str(e)}")
raise HTTPException(status_code=403, detail=str(e))
except Exception as e:
import traceback
logger.error(f"Unexpected error in MSAL registration: {str(e)}")
logger.error(traceback.format_exc())
raise HTTPException(status_code=500, detail=f"MSAL Registration failed: {str(e)}")
@router.get("/{userId}", response_model=Dict[str, Any])
async def getUser(
userId: int,
currentUser: Dict[str, Any] = Depends(getCurrentActiveUser)
):
"""Get a specific user"""
context = await getContext(currentUser)
# Initialize gateway interface with user context
userToGet = context.interfaceData.getUser(userId)
if not userToGet:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"User with ID {userId} not found"
)
# Permission check
# User can only view themselves, Admin only users of their own mandate, SysAdmin all
if userId == context.userId:
# User can view themselves
pass
elif currentUser.get("privilege") == "admin" and userToGet.get("mandateId") == context.mandateId:
# Admin can view users of their own mandate
pass
elif currentUser.get("privilege") == "sysadmin":
# SysAdmin can view all users
pass
else:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="No permission to view this user"
)
return userToGet
@router.put("/{userId}", response_model=Dict[str, Any])
async def updateUser(
userId: int = Path(..., description="ID of the user to update"),
userData: Dict[str, Any] = Body(..., description="Updated user data"),
currentUser: Dict[str, Any] = Depends(getCurrentActiveUser)
):
"""Update an existing user"""
context = await getContext(currentUser)
# User exists?
userToUpdate = context.interfaceData.getUser(userId)
if not userToUpdate:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"User with ID {userId} not found"
)
# Permission check
isSelfUpdate = userId == context.userId
isAdmin = currentUser.get("privilege") == "admin"
isSysadmin = currentUser.get("privilege") == "sysadmin"
sameMandate = userToUpdate.get("mandateId") == context.mandateId
# Filter allowed fields based on permission level
allowedFields = {"username", "email", "fullName", "language"}
sensitiveFields = {"mandateId", "disabled", "privilege"}
# Check if sensitive fields should be changed
sensitiveUpdate = any(field in userData for field in sensitiveFields)
if isSelfUpdate and sensitiveUpdate:
# Normal users cannot change their sensitive data
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="No permission to change sensitive user data"
)
elif isAdmin and sensitiveUpdate and not sameMandate:
# Admins can only change sensitive data for users of their own mandate
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="No permission to change sensitive data for users of other mandates"
)
elif not (isSelfUpdate or (isAdmin and sameMandate) or isSysadmin):
# No permission for other cases
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="No permission to update this user"
)
# Dynamically filter attributes from the request
updateData = {}
for attr in userAttributes:
if attr in userData and attr != "id": # ID cannot be changed
updateData[attr] = userData[attr]
# Remove disallowed fields for normal users
if not (isAdmin or isSysadmin):
updateData = {k: v for k, v in updateData.items() if k in allowedFields}
# Update user data
updatedUser = context.interfaceData.updateUser(userId, updateData)
return updatedUser
@router.delete("/{userId}", status_code=status.HTTP_204_NO_CONTENT)
async def deleteUser(
userId: int = Path(..., description="ID of the user to delete"),
currentUser: Dict[str, Any] = Depends(getCurrentActiveUser)
):
"""Delete a user"""
context = await getContext(currentUser)
# User exists?
userToDelete = context.interfaceData.getUser(userId)
if not userToDelete:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"User with ID {userId} not found"
)
# Permission check
isSelfDelete = userId == context.userId
isAdmin = currentUser.get("privilege") == "admin"
isSysadmin = currentUser.get("privilege") == "sysadmin"
sameMandate = userToDelete.get("mandateId") == context.mandateId
if isSelfDelete:
# User can delete themselves
pass
elif isAdmin and sameMandate:
# Admin can delete users of their own mandate
pass
elif isSysadmin:
# SysAdmin can delete all users
pass
else:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="No permission to delete this user"
)
# Delete user and all referenced objects
success = context.interfaceData.deleteUser(userId)
if not success:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error deleting user with ID {userId}"
)
return None