716 lines
21 KiB
Python
716 lines
21 KiB
Python
"""
|
|
Admin RBAC Roles Management routes.
|
|
Provides endpoints for managing roles and role assignments to users.
|
|
"""
|
|
|
|
from fastapi import APIRouter, HTTPException, Depends, Query, Body, Path, Request
|
|
from typing import List, Dict, Any, Optional
|
|
import logging
|
|
|
|
from modules.security.auth import getCurrentUser, limiter
|
|
from modules.datamodels.datamodelUam import User, UserInDB
|
|
from modules.datamodels.datamodelRbac import Role
|
|
from modules.interfaces.interfaceDbAppObjects import getInterface
|
|
|
|
# Configure logger
|
|
logger = logging.getLogger(__name__)
|
|
|
|
router = APIRouter(
|
|
prefix="/api/admin/rbac/roles",
|
|
tags=["Admin RBAC Roles"],
|
|
responses={404: {"description": "Not found"}}
|
|
)
|
|
|
|
|
|
def _ensureAdminAccess(currentUser: User) -> None:
|
|
"""Ensure current user has admin access to RBAC roles management."""
|
|
interface = getInterface(currentUser)
|
|
|
|
# Check if user has admin or sysadmin role
|
|
roleLabels = currentUser.roleLabels or []
|
|
if "sysadmin" not in roleLabels and "admin" not in roleLabels:
|
|
raise HTTPException(
|
|
status_code=403,
|
|
detail="Admin or sysadmin role required to manage RBAC roles"
|
|
)
|
|
|
|
# Additional RBAC check: verify user has permission to update UserInDB
|
|
# This is already covered by admin/sysadmin role check above, but we can add explicit RBAC check if needed
|
|
# For now, admin/sysadmin role check is sufficient
|
|
|
|
|
|
@router.get("/", response_model=List[Dict[str, Any]])
|
|
@limiter.limit("60/minute")
|
|
async def listRoles(
|
|
request: Request,
|
|
currentUser: User = Depends(getCurrentUser)
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
Get list of all available roles with metadata.
|
|
|
|
Returns:
|
|
- List of role dictionaries with role label, description, and user count
|
|
"""
|
|
try:
|
|
_ensureAdminAccess(currentUser)
|
|
|
|
interface = getInterface(currentUser)
|
|
|
|
# Get all roles from database
|
|
dbRoles = interface.getAllRoles()
|
|
|
|
# Get all users to count role assignments
|
|
allUsers = interface.getUsers()
|
|
|
|
# Count users per role
|
|
roleCounts: Dict[str, int] = {}
|
|
for user in allUsers:
|
|
for roleLabel in (user.roleLabels or []):
|
|
roleCounts[roleLabel] = roleCounts.get(roleLabel, 0) + 1
|
|
|
|
# Convert Role objects to dictionaries and add user counts
|
|
result = []
|
|
for role in dbRoles:
|
|
result.append({
|
|
"id": role.id,
|
|
"roleLabel": role.roleLabel,
|
|
"description": role.description,
|
|
"userCount": roleCounts.get(role.roleLabel, 0),
|
|
"isSystemRole": role.isSystemRole
|
|
})
|
|
|
|
# Add any roles found in user assignments that don't exist in database
|
|
dbRoleLabels = {role.roleLabel for role in dbRoles}
|
|
for roleLabel, count in roleCounts.items():
|
|
if roleLabel not in dbRoleLabels:
|
|
result.append({
|
|
"id": None,
|
|
"roleLabel": roleLabel,
|
|
"description": {"en": f"Custom role: {roleLabel}", "fr": f"Rôle personnalisé : {roleLabel}"},
|
|
"userCount": count,
|
|
"isSystemRole": False
|
|
})
|
|
|
|
return result
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error listing roles: {str(e)}")
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Failed to list roles: {str(e)}"
|
|
)
|
|
|
|
|
|
@router.get("/options", response_model=List[Dict[str, Any]])
|
|
@limiter.limit("60/minute")
|
|
async def getRoleOptions(
|
|
request: Request,
|
|
currentUser: User = Depends(getCurrentUser)
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
Get role options for select dropdowns.
|
|
Returns roles in format suitable for frontend select components.
|
|
|
|
Returns:
|
|
- List of role option dictionaries with value and label
|
|
"""
|
|
try:
|
|
_ensureAdminAccess(currentUser)
|
|
|
|
interface = getInterface(currentUser)
|
|
|
|
# Get all roles from database
|
|
dbRoles = interface.getAllRoles()
|
|
|
|
# Convert to options format
|
|
options = []
|
|
for role in dbRoles:
|
|
# Use English description as label, fallback to roleLabel
|
|
label = role.description.get("en", role.roleLabel) if isinstance(role.description, dict) else role.roleLabel
|
|
options.append({
|
|
"value": role.roleLabel,
|
|
"label": label
|
|
})
|
|
|
|
return options
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error getting role options: {str(e)}")
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Failed to get role options: {str(e)}"
|
|
)
|
|
|
|
|
|
@router.post("/", response_model=Dict[str, Any])
|
|
@limiter.limit("30/minute")
|
|
async def createRole(
|
|
request: Request,
|
|
role: Role = Body(...),
|
|
currentUser: User = Depends(getCurrentUser)
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Create a new role.
|
|
|
|
Request Body:
|
|
- role: Role object to create
|
|
|
|
Returns:
|
|
- Created role dictionary
|
|
"""
|
|
try:
|
|
_ensureAdminAccess(currentUser)
|
|
|
|
interface = getInterface(currentUser)
|
|
|
|
createdRole = interface.createRole(role)
|
|
|
|
return {
|
|
"id": createdRole.id,
|
|
"roleLabel": createdRole.roleLabel,
|
|
"description": createdRole.description,
|
|
"isSystemRole": createdRole.isSystemRole
|
|
}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except ValueError as e:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=str(e)
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Error creating role: {str(e)}")
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Failed to create role: {str(e)}"
|
|
)
|
|
|
|
|
|
@router.get("/{roleId}", response_model=Dict[str, Any])
|
|
@limiter.limit("60/minute")
|
|
async def getRole(
|
|
request: Request,
|
|
roleId: str = Path(..., description="Role ID"),
|
|
currentUser: User = Depends(getCurrentUser)
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Get a role by ID.
|
|
|
|
Path Parameters:
|
|
- roleId: Role ID
|
|
|
|
Returns:
|
|
- Role dictionary
|
|
"""
|
|
try:
|
|
_ensureAdminAccess(currentUser)
|
|
|
|
interface = getInterface(currentUser)
|
|
|
|
role = interface.getRole(roleId)
|
|
if not role:
|
|
raise HTTPException(
|
|
status_code=404,
|
|
detail=f"Role {roleId} not found"
|
|
)
|
|
|
|
return {
|
|
"id": role.id,
|
|
"roleLabel": role.roleLabel,
|
|
"description": role.description,
|
|
"isSystemRole": role.isSystemRole
|
|
}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error getting role: {str(e)}")
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Failed to get role: {str(e)}"
|
|
)
|
|
|
|
|
|
@router.put("/{roleId}", response_model=Dict[str, Any])
|
|
@limiter.limit("30/minute")
|
|
async def updateRole(
|
|
request: Request,
|
|
roleId: str = Path(..., description="Role ID"),
|
|
role: Role = Body(...),
|
|
currentUser: User = Depends(getCurrentUser)
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Update an existing role.
|
|
|
|
Path Parameters:
|
|
- roleId: Role ID
|
|
|
|
Request Body:
|
|
- role: Updated Role object
|
|
|
|
Returns:
|
|
- Updated role dictionary
|
|
"""
|
|
try:
|
|
_ensureAdminAccess(currentUser)
|
|
|
|
interface = getInterface(currentUser)
|
|
|
|
updatedRole = interface.updateRole(roleId, role)
|
|
|
|
return {
|
|
"id": updatedRole.id,
|
|
"roleLabel": updatedRole.roleLabel,
|
|
"description": updatedRole.description,
|
|
"isSystemRole": updatedRole.isSystemRole
|
|
}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except ValueError as e:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=str(e)
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Error updating role: {str(e)}")
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Failed to update role: {str(e)}"
|
|
)
|
|
|
|
|
|
@router.delete("/{roleId}", response_model=Dict[str, str])
|
|
@limiter.limit("30/minute")
|
|
async def deleteRole(
|
|
request: Request,
|
|
roleId: str = Path(..., description="Role ID"),
|
|
currentUser: User = Depends(getCurrentUser)
|
|
) -> Dict[str, str]:
|
|
"""
|
|
Delete a role.
|
|
|
|
Path Parameters:
|
|
- roleId: Role ID
|
|
|
|
Returns:
|
|
- Success message
|
|
"""
|
|
try:
|
|
_ensureAdminAccess(currentUser)
|
|
|
|
interface = getInterface(currentUser)
|
|
|
|
success = interface.deleteRole(roleId)
|
|
if not success:
|
|
raise HTTPException(
|
|
status_code=404,
|
|
detail=f"Role {roleId} not found"
|
|
)
|
|
|
|
return {"message": f"Role {roleId} deleted successfully"}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except ValueError as e:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=str(e)
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Error deleting role: {str(e)}")
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Failed to delete role: {str(e)}"
|
|
)
|
|
|
|
|
|
@router.get("/users", response_model=List[Dict[str, Any]])
|
|
@limiter.limit("60/minute")
|
|
async def listUsersWithRoles(
|
|
request: Request,
|
|
roleLabel: Optional[str] = Query(None, description="Filter by role label"),
|
|
mandateId: Optional[str] = Query(None, description="Filter by mandate ID"),
|
|
currentUser: User = Depends(getCurrentUser)
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
Get list of users with their role assignments.
|
|
|
|
Query Parameters:
|
|
- roleLabel: Optional filter by role label
|
|
- mandateId: Optional filter by mandate ID
|
|
|
|
Returns:
|
|
- List of user dictionaries with role assignments
|
|
"""
|
|
try:
|
|
_ensureAdminAccess(currentUser)
|
|
|
|
interface = getInterface(currentUser)
|
|
|
|
# Get users based on filters
|
|
if mandateId:
|
|
# Filter by mandate (if user has permission)
|
|
users = interface.getUsers()
|
|
users = [u for u in users if u.mandateId == mandateId]
|
|
else:
|
|
users = interface.getUsers()
|
|
|
|
# Filter by role if specified
|
|
if roleLabel:
|
|
users = [u for u in users if roleLabel in (u.roleLabels or [])]
|
|
|
|
# Format response
|
|
result = []
|
|
for user in users:
|
|
result.append({
|
|
"id": user.id,
|
|
"username": user.username,
|
|
"email": user.email,
|
|
"fullName": user.fullName,
|
|
"mandateId": user.mandateId,
|
|
"enabled": user.enabled,
|
|
"roleLabels": user.roleLabels or [],
|
|
"roleCount": len(user.roleLabels or [])
|
|
})
|
|
|
|
return result
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error listing users with roles: {str(e)}")
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Failed to list users with roles: {str(e)}"
|
|
)
|
|
|
|
|
|
@router.get("/users/{userId}", response_model=Dict[str, Any])
|
|
@limiter.limit("60/minute")
|
|
async def getUserRoles(
|
|
request: Request,
|
|
userId: str = Path(..., description="User ID"),
|
|
currentUser: User = Depends(getCurrentUser)
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Get role assignments for a specific user.
|
|
|
|
Path Parameters:
|
|
- userId: User ID
|
|
|
|
Returns:
|
|
- User dictionary with role assignments
|
|
"""
|
|
try:
|
|
_ensureAdminAccess(currentUser)
|
|
|
|
interface = getInterface(currentUser)
|
|
|
|
# Get user
|
|
user = interface.getUser(userId)
|
|
if not user:
|
|
raise HTTPException(
|
|
status_code=404,
|
|
detail=f"User {userId} not found"
|
|
)
|
|
|
|
return {
|
|
"id": user.id,
|
|
"username": user.username,
|
|
"email": user.email,
|
|
"fullName": user.fullName,
|
|
"mandateId": user.mandateId,
|
|
"enabled": user.enabled,
|
|
"roleLabels": user.roleLabels or [],
|
|
"roleCount": len(user.roleLabels or [])
|
|
}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error getting user roles: {str(e)}")
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Failed to get user roles: {str(e)}"
|
|
)
|
|
|
|
|
|
@router.put("/users/{userId}/roles", response_model=Dict[str, Any])
|
|
@limiter.limit("30/minute")
|
|
async def updateUserRoles(
|
|
request: Request,
|
|
userId: str = Path(..., description="User ID"),
|
|
roleLabels: List[str] = Body(..., description="List of role labels to assign"),
|
|
currentUser: User = Depends(getCurrentUser)
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Update role assignments for a specific user.
|
|
|
|
Path Parameters:
|
|
- userId: User ID
|
|
|
|
Request Body:
|
|
- roleLabels: List of role labels to assign (e.g., ["admin", "user"])
|
|
|
|
Returns:
|
|
- Updated user dictionary with role assignments
|
|
"""
|
|
try:
|
|
_ensureAdminAccess(currentUser)
|
|
|
|
interface = getInterface(currentUser)
|
|
|
|
# Get user
|
|
user = interface.getUser(userId)
|
|
if not user:
|
|
raise HTTPException(
|
|
status_code=404,
|
|
detail=f"User {userId} not found"
|
|
)
|
|
|
|
# Validate role labels (basic validation - check against standard roles)
|
|
standardRoles = ["sysadmin", "admin", "user", "viewer"]
|
|
for roleLabel in roleLabels:
|
|
if roleLabel not in standardRoles:
|
|
logger.warning(f"Non-standard role label assigned: {roleLabel}")
|
|
|
|
# Update user roles
|
|
userData = {
|
|
"roleLabels": roleLabels
|
|
}
|
|
|
|
updatedUser = interface.updateUser(userId, userData)
|
|
|
|
logger.info(f"Updated roles for user {userId}: {roleLabels}")
|
|
|
|
return {
|
|
"id": updatedUser.id,
|
|
"username": updatedUser.username,
|
|
"email": updatedUser.email,
|
|
"fullName": updatedUser.fullName,
|
|
"mandateId": updatedUser.mandateId,
|
|
"enabled": updatedUser.enabled,
|
|
"roleLabels": updatedUser.roleLabels or [],
|
|
"roleCount": len(updatedUser.roleLabels or [])
|
|
}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error updating user roles: {str(e)}")
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Failed to update user roles: {str(e)}"
|
|
)
|
|
|
|
|
|
@router.post("/users/{userId}/roles/{roleLabel}", response_model=Dict[str, Any])
|
|
@limiter.limit("30/minute")
|
|
async def addUserRole(
|
|
request: Request,
|
|
userId: str = Path(..., description="User ID"),
|
|
roleLabel: str = Path(..., description="Role label to add"),
|
|
currentUser: User = Depends(getCurrentUser)
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Add a role to a user (if not already assigned).
|
|
|
|
Path Parameters:
|
|
- userId: User ID
|
|
- roleLabel: Role label to add
|
|
|
|
Returns:
|
|
- Updated user dictionary with role assignments
|
|
"""
|
|
try:
|
|
_ensureAdminAccess(currentUser)
|
|
|
|
interface = getInterface(currentUser)
|
|
|
|
# Get user
|
|
user = interface.getUser(userId)
|
|
if not user:
|
|
raise HTTPException(
|
|
status_code=404,
|
|
detail=f"User {userId} not found"
|
|
)
|
|
|
|
# Get current roles
|
|
currentRoles = list(user.roleLabels or [])
|
|
|
|
# Add role if not already present
|
|
if roleLabel not in currentRoles:
|
|
currentRoles.append(roleLabel)
|
|
|
|
# Update user roles
|
|
userData = {
|
|
"roleLabels": currentRoles
|
|
}
|
|
|
|
updatedUser = interface.updateUser(userId, userData)
|
|
|
|
logger.info(f"Added role {roleLabel} to user {userId}")
|
|
else:
|
|
updatedUser = user
|
|
|
|
return {
|
|
"id": updatedUser.id,
|
|
"username": updatedUser.username,
|
|
"email": updatedUser.email,
|
|
"fullName": updatedUser.fullName,
|
|
"mandateId": updatedUser.mandateId,
|
|
"enabled": updatedUser.enabled,
|
|
"roleLabels": updatedUser.roleLabels or [],
|
|
"roleCount": len(updatedUser.roleLabels or [])
|
|
}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error adding role to user: {str(e)}")
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Failed to add role to user: {str(e)}"
|
|
)
|
|
|
|
|
|
@router.delete("/users/{userId}/roles/{roleLabel}", response_model=Dict[str, Any])
|
|
@limiter.limit("30/minute")
|
|
async def removeUserRole(
|
|
request: Request,
|
|
userId: str = Path(..., description="User ID"),
|
|
roleLabel: str = Path(..., description="Role label to remove"),
|
|
currentUser: User = Depends(getCurrentUser)
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Remove a role from a user.
|
|
|
|
Path Parameters:
|
|
- userId: User ID
|
|
- roleLabel: Role label to remove
|
|
|
|
Returns:
|
|
- Updated user dictionary with role assignments
|
|
"""
|
|
try:
|
|
_ensureAdminAccess(currentUser)
|
|
|
|
interface = getInterface(currentUser)
|
|
|
|
# Get user
|
|
user = interface.getUser(userId)
|
|
if not user:
|
|
raise HTTPException(
|
|
status_code=404,
|
|
detail=f"User {userId} not found"
|
|
)
|
|
|
|
# Get current roles
|
|
currentRoles = list(user.roleLabels or [])
|
|
|
|
# Remove role if present
|
|
if roleLabel in currentRoles:
|
|
currentRoles.remove(roleLabel)
|
|
|
|
# Ensure user has at least one role (default to "user")
|
|
if not currentRoles:
|
|
currentRoles = ["user"]
|
|
logger.warning(f"User {userId} had all roles removed, defaulting to 'user' role")
|
|
|
|
# Update user roles
|
|
userData = {
|
|
"roleLabels": currentRoles
|
|
}
|
|
|
|
updatedUser = interface.updateUser(userId, userData)
|
|
|
|
logger.info(f"Removed role {roleLabel} from user {userId}")
|
|
else:
|
|
updatedUser = user
|
|
|
|
return {
|
|
"id": updatedUser.id,
|
|
"username": updatedUser.username,
|
|
"email": updatedUser.email,
|
|
"fullName": updatedUser.fullName,
|
|
"mandateId": updatedUser.mandateId,
|
|
"enabled": updatedUser.enabled,
|
|
"roleLabels": updatedUser.roleLabels or [],
|
|
"roleCount": len(updatedUser.roleLabels or [])
|
|
}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error removing role from user: {str(e)}")
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Failed to remove role from user: {str(e)}"
|
|
)
|
|
|
|
|
|
@router.get("/roles/{roleLabel}/users", response_model=List[Dict[str, Any]])
|
|
@limiter.limit("60/minute")
|
|
async def getUsersWithRole(
|
|
request: Request,
|
|
roleLabel: str = Path(..., description="Role label"),
|
|
mandateId: Optional[str] = Query(None, description="Filter by mandate ID"),
|
|
currentUser: User = Depends(getCurrentUser)
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
Get all users with a specific role.
|
|
|
|
Path Parameters:
|
|
- roleLabel: Role label
|
|
|
|
Query Parameters:
|
|
- mandateId: Optional filter by mandate ID
|
|
|
|
Returns:
|
|
- List of users with the specified role
|
|
"""
|
|
try:
|
|
_ensureAdminAccess(currentUser)
|
|
|
|
interface = getInterface(currentUser)
|
|
|
|
# Get all users
|
|
users = interface.getUsers()
|
|
|
|
# Filter by role
|
|
users = [u for u in users if roleLabel in (u.roleLabels or [])]
|
|
|
|
# Filter by mandate if specified
|
|
if mandateId:
|
|
users = [u for u in users if u.mandateId == mandateId]
|
|
|
|
# Format response
|
|
result = []
|
|
for user in users:
|
|
result.append({
|
|
"id": user.id,
|
|
"username": user.username,
|
|
"email": user.email,
|
|
"fullName": user.fullName,
|
|
"mandateId": user.mandateId,
|
|
"enabled": user.enabled,
|
|
"roleLabels": user.roleLabels or [],
|
|
"roleCount": len(user.roleLabels or [])
|
|
})
|
|
|
|
return result
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error getting users with role: {str(e)}")
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Failed to get users with role: {str(e)}"
|
|
)
|