gateway/modules/routes/routeAdminRbacRoles.py
2025-12-15 21:55:26 +01:00

718 lines
21 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Admin RBAC Roles Management routes.
Provides endpoints for managing roles and role assignments to users.
"""
from fastapi import APIRouter, HTTPException, Depends, Query, Body, Path, Request
from typing import List, Dict, Any, Optional
import logging
from modules.auth import getCurrentUser, limiter
from modules.datamodels.datamodelUam import User, UserInDB
from modules.datamodels.datamodelRbac import Role
from modules.interfaces.interfaceDbAppObjects import getInterface
# Configure logger
logger = logging.getLogger(__name__)
router = APIRouter(
prefix="/api/admin/rbac/roles",
tags=["Admin RBAC Roles"],
responses={404: {"description": "Not found"}}
)
def _ensureAdminAccess(currentUser: User) -> None:
"""Ensure current user has admin access to RBAC roles management."""
interface = getInterface(currentUser)
# Check if user has admin or sysadmin role
roleLabels = currentUser.roleLabels or []
if "sysadmin" not in roleLabels and "admin" not in roleLabels:
raise HTTPException(
status_code=403,
detail="Admin or sysadmin role required to manage RBAC roles"
)
# Additional RBAC check: verify user has permission to update UserInDB
# This is already covered by admin/sysadmin role check above, but we can add explicit RBAC check if needed
# For now, admin/sysadmin role check is sufficient
@router.get("/", response_model=List[Dict[str, Any]])
@limiter.limit("60/minute")
async def listRoles(
request: Request,
currentUser: User = Depends(getCurrentUser)
) -> List[Dict[str, Any]]:
"""
Get list of all available roles with metadata.
Returns:
- List of role dictionaries with role label, description, and user count
"""
try:
_ensureAdminAccess(currentUser)
interface = getInterface(currentUser)
# Get all roles from database
dbRoles = interface.getAllRoles()
# Get all users to count role assignments
allUsers = interface.getUsers()
# Count users per role
roleCounts: Dict[str, int] = {}
for user in allUsers:
for roleLabel in (user.roleLabels or []):
roleCounts[roleLabel] = roleCounts.get(roleLabel, 0) + 1
# Convert Role objects to dictionaries and add user counts
result = []
for role in dbRoles:
result.append({
"id": role.id,
"roleLabel": role.roleLabel,
"description": role.description,
"userCount": roleCounts.get(role.roleLabel, 0),
"isSystemRole": role.isSystemRole
})
# Add any roles found in user assignments that don't exist in database
dbRoleLabels = {role.roleLabel for role in dbRoles}
for roleLabel, count in roleCounts.items():
if roleLabel not in dbRoleLabels:
result.append({
"id": None,
"roleLabel": roleLabel,
"description": {"en": f"Custom role: {roleLabel}", "fr": f"Rôle personnalisé : {roleLabel}"},
"userCount": count,
"isSystemRole": False
})
return result
except HTTPException:
raise
except Exception as e:
logger.error(f"Error listing roles: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Failed to list roles: {str(e)}"
)
@router.get("/options", response_model=List[Dict[str, Any]])
@limiter.limit("60/minute")
async def getRoleOptions(
request: Request,
currentUser: User = Depends(getCurrentUser)
) -> List[Dict[str, Any]]:
"""
Get role options for select dropdowns.
Returns roles in format suitable for frontend select components.
Returns:
- List of role option dictionaries with value and label
"""
try:
_ensureAdminAccess(currentUser)
interface = getInterface(currentUser)
# Get all roles from database
dbRoles = interface.getAllRoles()
# Convert to options format
options = []
for role in dbRoles:
# Use English description as label, fallback to roleLabel
label = role.description.get("en", role.roleLabel) if isinstance(role.description, dict) else role.roleLabel
options.append({
"value": role.roleLabel,
"label": label
})
return options
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting role options: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Failed to get role options: {str(e)}"
)
@router.post("/", response_model=Dict[str, Any])
@limiter.limit("30/minute")
async def createRole(
request: Request,
role: Role = Body(...),
currentUser: User = Depends(getCurrentUser)
) -> Dict[str, Any]:
"""
Create a new role.
Request Body:
- role: Role object to create
Returns:
- Created role dictionary
"""
try:
_ensureAdminAccess(currentUser)
interface = getInterface(currentUser)
createdRole = interface.createRole(role)
return {
"id": createdRole.id,
"roleLabel": createdRole.roleLabel,
"description": createdRole.description,
"isSystemRole": createdRole.isSystemRole
}
except HTTPException:
raise
except ValueError as e:
raise HTTPException(
status_code=400,
detail=str(e)
)
except Exception as e:
logger.error(f"Error creating role: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Failed to create role: {str(e)}"
)
@router.get("/{roleId}", response_model=Dict[str, Any])
@limiter.limit("60/minute")
async def getRole(
request: Request,
roleId: str = Path(..., description="Role ID"),
currentUser: User = Depends(getCurrentUser)
) -> Dict[str, Any]:
"""
Get a role by ID.
Path Parameters:
- roleId: Role ID
Returns:
- Role dictionary
"""
try:
_ensureAdminAccess(currentUser)
interface = getInterface(currentUser)
role = interface.getRole(roleId)
if not role:
raise HTTPException(
status_code=404,
detail=f"Role {roleId} not found"
)
return {
"id": role.id,
"roleLabel": role.roleLabel,
"description": role.description,
"isSystemRole": role.isSystemRole
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting role: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Failed to get role: {str(e)}"
)
@router.put("/{roleId}", response_model=Dict[str, Any])
@limiter.limit("30/minute")
async def updateRole(
request: Request,
roleId: str = Path(..., description="Role ID"),
role: Role = Body(...),
currentUser: User = Depends(getCurrentUser)
) -> Dict[str, Any]:
"""
Update an existing role.
Path Parameters:
- roleId: Role ID
Request Body:
- role: Updated Role object
Returns:
- Updated role dictionary
"""
try:
_ensureAdminAccess(currentUser)
interface = getInterface(currentUser)
updatedRole = interface.updateRole(roleId, role)
return {
"id": updatedRole.id,
"roleLabel": updatedRole.roleLabel,
"description": updatedRole.description,
"isSystemRole": updatedRole.isSystemRole
}
except HTTPException:
raise
except ValueError as e:
raise HTTPException(
status_code=400,
detail=str(e)
)
except Exception as e:
logger.error(f"Error updating role: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Failed to update role: {str(e)}"
)
@router.delete("/{roleId}", response_model=Dict[str, str])
@limiter.limit("30/minute")
async def deleteRole(
request: Request,
roleId: str = Path(..., description="Role ID"),
currentUser: User = Depends(getCurrentUser)
) -> Dict[str, str]:
"""
Delete a role.
Path Parameters:
- roleId: Role ID
Returns:
- Success message
"""
try:
_ensureAdminAccess(currentUser)
interface = getInterface(currentUser)
success = interface.deleteRole(roleId)
if not success:
raise HTTPException(
status_code=404,
detail=f"Role {roleId} not found"
)
return {"message": f"Role {roleId} deleted successfully"}
except HTTPException:
raise
except ValueError as e:
raise HTTPException(
status_code=400,
detail=str(e)
)
except Exception as e:
logger.error(f"Error deleting role: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Failed to delete role: {str(e)}"
)
@router.get("/users", response_model=List[Dict[str, Any]])
@limiter.limit("60/minute")
async def listUsersWithRoles(
request: Request,
roleLabel: Optional[str] = Query(None, description="Filter by role label"),
mandateId: Optional[str] = Query(None, description="Filter by mandate ID"),
currentUser: User = Depends(getCurrentUser)
) -> List[Dict[str, Any]]:
"""
Get list of users with their role assignments.
Query Parameters:
- roleLabel: Optional filter by role label
- mandateId: Optional filter by mandate ID
Returns:
- List of user dictionaries with role assignments
"""
try:
_ensureAdminAccess(currentUser)
interface = getInterface(currentUser)
# Get users based on filters
if mandateId:
# Filter by mandate (if user has permission)
users = interface.getUsers()
users = [u for u in users if u.mandateId == mandateId]
else:
users = interface.getUsers()
# Filter by role if specified
if roleLabel:
users = [u for u in users if roleLabel in (u.roleLabels or [])]
# Format response
result = []
for user in users:
result.append({
"id": user.id,
"username": user.username,
"email": user.email,
"fullName": user.fullName,
"mandateId": user.mandateId,
"enabled": user.enabled,
"roleLabels": user.roleLabels or [],
"roleCount": len(user.roleLabels or [])
})
return result
except HTTPException:
raise
except Exception as e:
logger.error(f"Error listing users with roles: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Failed to list users with roles: {str(e)}"
)
@router.get("/users/{userId}", response_model=Dict[str, Any])
@limiter.limit("60/minute")
async def getUserRoles(
request: Request,
userId: str = Path(..., description="User ID"),
currentUser: User = Depends(getCurrentUser)
) -> Dict[str, Any]:
"""
Get role assignments for a specific user.
Path Parameters:
- userId: User ID
Returns:
- User dictionary with role assignments
"""
try:
_ensureAdminAccess(currentUser)
interface = getInterface(currentUser)
# Get user
user = interface.getUser(userId)
if not user:
raise HTTPException(
status_code=404,
detail=f"User {userId} not found"
)
return {
"id": user.id,
"username": user.username,
"email": user.email,
"fullName": user.fullName,
"mandateId": user.mandateId,
"enabled": user.enabled,
"roleLabels": user.roleLabels or [],
"roleCount": len(user.roleLabels or [])
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting user roles: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Failed to get user roles: {str(e)}"
)
@router.put("/users/{userId}/roles", response_model=Dict[str, Any])
@limiter.limit("30/minute")
async def updateUserRoles(
request: Request,
userId: str = Path(..., description="User ID"),
roleLabels: List[str] = Body(..., description="List of role labels to assign"),
currentUser: User = Depends(getCurrentUser)
) -> Dict[str, Any]:
"""
Update role assignments for a specific user.
Path Parameters:
- userId: User ID
Request Body:
- roleLabels: List of role labels to assign (e.g., ["admin", "user"])
Returns:
- Updated user dictionary with role assignments
"""
try:
_ensureAdminAccess(currentUser)
interface = getInterface(currentUser)
# Get user
user = interface.getUser(userId)
if not user:
raise HTTPException(
status_code=404,
detail=f"User {userId} not found"
)
# Validate role labels (basic validation - check against standard roles)
standardRoles = ["sysadmin", "admin", "user", "viewer"]
for roleLabel in roleLabels:
if roleLabel not in standardRoles:
logger.warning(f"Non-standard role label assigned: {roleLabel}")
# Update user roles
userData = {
"roleLabels": roleLabels
}
updatedUser = interface.updateUser(userId, userData)
logger.info(f"Updated roles for user {userId}: {roleLabels}")
return {
"id": updatedUser.id,
"username": updatedUser.username,
"email": updatedUser.email,
"fullName": updatedUser.fullName,
"mandateId": updatedUser.mandateId,
"enabled": updatedUser.enabled,
"roleLabels": updatedUser.roleLabels or [],
"roleCount": len(updatedUser.roleLabels or [])
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error updating user roles: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Failed to update user roles: {str(e)}"
)
@router.post("/users/{userId}/roles/{roleLabel}", response_model=Dict[str, Any])
@limiter.limit("30/minute")
async def addUserRole(
request: Request,
userId: str = Path(..., description="User ID"),
roleLabel: str = Path(..., description="Role label to add"),
currentUser: User = Depends(getCurrentUser)
) -> Dict[str, Any]:
"""
Add a role to a user (if not already assigned).
Path Parameters:
- userId: User ID
- roleLabel: Role label to add
Returns:
- Updated user dictionary with role assignments
"""
try:
_ensureAdminAccess(currentUser)
interface = getInterface(currentUser)
# Get user
user = interface.getUser(userId)
if not user:
raise HTTPException(
status_code=404,
detail=f"User {userId} not found"
)
# Get current roles
currentRoles = list(user.roleLabels or [])
# Add role if not already present
if roleLabel not in currentRoles:
currentRoles.append(roleLabel)
# Update user roles
userData = {
"roleLabels": currentRoles
}
updatedUser = interface.updateUser(userId, userData)
logger.info(f"Added role {roleLabel} to user {userId}")
else:
updatedUser = user
return {
"id": updatedUser.id,
"username": updatedUser.username,
"email": updatedUser.email,
"fullName": updatedUser.fullName,
"mandateId": updatedUser.mandateId,
"enabled": updatedUser.enabled,
"roleLabels": updatedUser.roleLabels or [],
"roleCount": len(updatedUser.roleLabels or [])
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error adding role to user: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Failed to add role to user: {str(e)}"
)
@router.delete("/users/{userId}/roles/{roleLabel}", response_model=Dict[str, Any])
@limiter.limit("30/minute")
async def removeUserRole(
request: Request,
userId: str = Path(..., description="User ID"),
roleLabel: str = Path(..., description="Role label to remove"),
currentUser: User = Depends(getCurrentUser)
) -> Dict[str, Any]:
"""
Remove a role from a user.
Path Parameters:
- userId: User ID
- roleLabel: Role label to remove
Returns:
- Updated user dictionary with role assignments
"""
try:
_ensureAdminAccess(currentUser)
interface = getInterface(currentUser)
# Get user
user = interface.getUser(userId)
if not user:
raise HTTPException(
status_code=404,
detail=f"User {userId} not found"
)
# Get current roles
currentRoles = list(user.roleLabels or [])
# Remove role if present
if roleLabel in currentRoles:
currentRoles.remove(roleLabel)
# Ensure user has at least one role (default to "user")
if not currentRoles:
currentRoles = ["user"]
logger.warning(f"User {userId} had all roles removed, defaulting to 'user' role")
# Update user roles
userData = {
"roleLabels": currentRoles
}
updatedUser = interface.updateUser(userId, userData)
logger.info(f"Removed role {roleLabel} from user {userId}")
else:
updatedUser = user
return {
"id": updatedUser.id,
"username": updatedUser.username,
"email": updatedUser.email,
"fullName": updatedUser.fullName,
"mandateId": updatedUser.mandateId,
"enabled": updatedUser.enabled,
"roleLabels": updatedUser.roleLabels or [],
"roleCount": len(updatedUser.roleLabels or [])
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error removing role from user: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Failed to remove role from user: {str(e)}"
)
@router.get("/roles/{roleLabel}/users", response_model=List[Dict[str, Any]])
@limiter.limit("60/minute")
async def getUsersWithRole(
request: Request,
roleLabel: str = Path(..., description="Role label"),
mandateId: Optional[str] = Query(None, description="Filter by mandate ID"),
currentUser: User = Depends(getCurrentUser)
) -> List[Dict[str, Any]]:
"""
Get all users with a specific role.
Path Parameters:
- roleLabel: Role label
Query Parameters:
- mandateId: Optional filter by mandate ID
Returns:
- List of users with the specified role
"""
try:
_ensureAdminAccess(currentUser)
interface = getInterface(currentUser)
# Get all users
users = interface.getUsers()
# Filter by role
users = [u for u in users if roleLabel in (u.roleLabels or [])]
# Filter by mandate if specified
if mandateId:
users = [u for u in users if u.mandateId == mandateId]
# Format response
result = []
for user in users:
result.append({
"id": user.id,
"username": user.username,
"email": user.email,
"fullName": user.fullName,
"mandateId": user.mandateId,
"enabled": user.enabled,
"roleLabels": user.roleLabels or [],
"roleCount": len(user.roleLabels or [])
})
return result
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting users with role: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Failed to get users with role: {str(e)}"
)