1000 lines
37 KiB
Python
1000 lines
37 KiB
Python
# Copyright (c) 2025 Patrick Motsch
|
|
# All rights reserved.
|
|
"""
|
|
Admin RBAC Roles Management routes.
|
|
Provides endpoints for managing roles and role assignments to users.
|
|
|
|
MULTI-TENANT: Context-aware access control.
|
|
- SysAdmin: Full access to all roles and assignments across all mandates.
|
|
- MandateAdmin: Can manage roles and assignments within their own mandates.
|
|
Template roles (mandateId=None, isSystemRole=True) are read-only.
|
|
The sysadmin role (roleLabel="sysadmin") is not manageable by MandateAdmins.
|
|
Role assignments are managed via UserMandateRole (not User.roleLabels).
|
|
"""
|
|
|
|
from fastapi import APIRouter, HTTPException, Depends, Query, Body, Path, Request, status
|
|
from typing import List, Dict, Any, Optional, Set
|
|
import logging
|
|
|
|
from modules.auth import limiter, requireSysAdminRole, getRequestContext, RequestContext
|
|
from modules.datamodels.datamodelUam import User, UserInDB
|
|
from modules.datamodels.datamodelRbac import Role
|
|
from modules.datamodels.datamodelMembership import UserMandate, UserMandateRole
|
|
from modules.interfaces.interfaceDbApp import getInterface, getRootInterface
|
|
|
|
# Configure logger
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def _getUserRoleLabels(interface, userId: str) -> List[str]:
|
|
"""
|
|
Get role labels for a user from UserMandateRole (across all mandates).
|
|
|
|
Args:
|
|
interface: Database interface
|
|
userId: User ID
|
|
|
|
Returns:
|
|
List of role labels
|
|
"""
|
|
roleLabels: Set[str] = set()
|
|
|
|
# Get all UserMandate records for this user (Pydantic models)
|
|
userMandates = interface.getUserMandates(userId)
|
|
|
|
for um in userMandates:
|
|
# Get all UserMandateRole records for this membership (Pydantic models)
|
|
userMandateRoles = interface.getUserMandateRoles(str(um.id))
|
|
|
|
for umr in userMandateRoles:
|
|
if umr.roleId:
|
|
# Get role by ID to get roleLabel
|
|
role = interface.getRole(str(umr.roleId))
|
|
if role:
|
|
roleLabels.add(role.roleLabel)
|
|
|
|
return list(roleLabels)
|
|
|
|
|
|
def _hasRoleLabel(interface, userId: str, roleLabel: str) -> bool:
|
|
"""
|
|
Check if user has a specific role label (across all mandates).
|
|
"""
|
|
return roleLabel in _getUserRoleLabels(interface, userId)
|
|
|
|
|
|
def _getAdminMandateIds(context: RequestContext) -> List[str]:
|
|
"""Get mandate IDs where the user has admin role."""
|
|
mandateIds = []
|
|
try:
|
|
rootInterface = getRootInterface()
|
|
userMandates = rootInterface.getUserMandates(str(context.user.id))
|
|
for um in userMandates:
|
|
if not getattr(um, 'enabled', True):
|
|
continue
|
|
umId = getattr(um, 'id', None)
|
|
mandateId = getattr(um, 'mandateId', None)
|
|
if not umId or not mandateId:
|
|
continue
|
|
roleIds = rootInterface.getRoleIdsForUserMandate(str(umId))
|
|
for roleId in roleIds:
|
|
role = rootInterface.getRole(roleId)
|
|
if role and role.roleLabel == "admin" and not role.featureInstanceId:
|
|
mandateIds.append(str(mandateId))
|
|
break
|
|
except Exception as e:
|
|
logger.error(f"Error getting admin mandate IDs: {e}")
|
|
return mandateIds
|
|
|
|
|
|
router = APIRouter(
|
|
prefix="/api/admin/rbac/roles",
|
|
tags=["Admin RBAC Roles"],
|
|
responses={404: {"description": "Not found"}}
|
|
)
|
|
|
|
|
|
@router.get("/", response_model=List[Dict[str, Any]])
|
|
@limiter.limit("60/minute")
|
|
def list_roles(
|
|
request: Request,
|
|
mandateId: Optional[str] = Query(None, description="Filter roles by mandate ID"),
|
|
context: RequestContext = Depends(getRequestContext)
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
Get list of roles with metadata.
|
|
|
|
Context-aware: SysAdmin sees all roles. MandateAdmin sees roles from own mandates
|
|
plus template roles (read-only).
|
|
|
|
Without mandateId: returns system template roles (mandateId=NULL).
|
|
With mandateId: returns mandate-level roles for that mandate (featureInstanceId=NULL).
|
|
"""
|
|
isSysAdmin = context.hasSysAdminRole
|
|
adminMandateIds = [] if isSysAdmin else _getAdminMandateIds(context)
|
|
if not isSysAdmin and not adminMandateIds:
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin role required")
|
|
currentUser = context.user # backward compat for existing code
|
|
|
|
try:
|
|
interface = getRootInterface()
|
|
|
|
# Get roles filtered by scope
|
|
print(f"[DEBUG list_roles] mandateId={mandateId}")
|
|
if mandateId:
|
|
# MandateAdmin can only query mandates they admin
|
|
if not isSysAdmin and mandateId not in adminMandateIds:
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Access denied to this mandate")
|
|
# Mandate-specific roles (mandate-level only, no feature-instance roles)
|
|
dbRoles = interface.getRolesForMandate(mandateId)
|
|
print(f"[DEBUG list_roles] getRolesForMandate returned {len(dbRoles)} roles")
|
|
else:
|
|
# System template roles only
|
|
dbRoles = interface.getAllRoles()
|
|
print(f"[DEBUG list_roles] getAllRoles returned {len(dbRoles)} roles")
|
|
# MandateAdmin: filter to template roles + roles from own mandates
|
|
if not isSysAdmin:
|
|
dbRoles = [r for r in dbRoles if r.mandateId is None or str(r.mandateId) in adminMandateIds]
|
|
|
|
# Count role assignments from UserMandateRole table
|
|
roleCounts = interface.countRoleAssignments()
|
|
|
|
# Convert Role objects to dictionaries and add user counts
|
|
result = []
|
|
for role in dbRoles:
|
|
result.append({
|
|
"id": role.id,
|
|
"roleLabel": role.roleLabel,
|
|
"description": role.description,
|
|
"mandateId": role.mandateId,
|
|
"featureInstanceId": role.featureInstanceId,
|
|
"userCount": roleCounts.get(str(role.id), 0),
|
|
"isSystemRole": role.isSystemRole
|
|
})
|
|
|
|
return result
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error listing roles: {str(e)}")
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Failed to list roles: {str(e)}"
|
|
)
|
|
|
|
|
|
@router.get("/options", response_model=List[Dict[str, Any]])
|
|
@limiter.limit("60/minute")
|
|
def get_role_options(
|
|
request: Request,
|
|
context: RequestContext = Depends(getRequestContext)
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
Get role options for select dropdowns.
|
|
Context-aware: SysAdmin sees all roles. MandateAdmin sees roles from own mandates
|
|
plus template roles.
|
|
|
|
Returns:
|
|
- List of role option dictionaries with value and label
|
|
"""
|
|
isSysAdmin = context.hasSysAdminRole
|
|
adminMandateIds = [] if isSysAdmin else _getAdminMandateIds(context)
|
|
if not isSysAdmin and not adminMandateIds:
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin role required")
|
|
currentUser = context.user # backward compat for existing code
|
|
|
|
try:
|
|
interface = getRootInterface()
|
|
|
|
# Get all roles from database
|
|
dbRoles = interface.getAllRoles()
|
|
|
|
# MandateAdmin: filter to template roles + roles from own mandates
|
|
if not isSysAdmin:
|
|
dbRoles = [r for r in dbRoles if r.mandateId is None or str(r.mandateId) in adminMandateIds]
|
|
|
|
# Convert to options format
|
|
options = []
|
|
for role in dbRoles:
|
|
# Use English description as label, fallback to roleLabel
|
|
label = role.description.get("en", role.roleLabel) if isinstance(role.description, dict) else role.roleLabel
|
|
options.append({
|
|
"value": role.roleLabel,
|
|
"label": label
|
|
})
|
|
|
|
return options
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error getting role options: {str(e)}")
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Failed to get role options: {str(e)}"
|
|
)
|
|
|
|
|
|
@router.post("/", response_model=Dict[str, Any])
|
|
@limiter.limit("30/minute")
|
|
def create_role(
|
|
request: Request,
|
|
role: Role = Body(...),
|
|
context: RequestContext = Depends(getRequestContext)
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Create a new role.
|
|
Context-aware: SysAdmin can create any role. MandateAdmin can create roles
|
|
within own mandates only (not template or sysadmin roles).
|
|
|
|
Request Body:
|
|
- role: Role object to create
|
|
|
|
Returns:
|
|
- Created role dictionary
|
|
"""
|
|
isSysAdmin = context.hasSysAdminRole
|
|
adminMandateIds = [] if isSysAdmin else _getAdminMandateIds(context)
|
|
if not isSysAdmin and not adminMandateIds:
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin role required")
|
|
currentUser = context.user # backward compat for existing code
|
|
|
|
try:
|
|
interface = getRootInterface()
|
|
|
|
# MandateAdmin restrictions
|
|
if not isSysAdmin:
|
|
if role.roleLabel == "sysadmin":
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Cannot create sysadmin role")
|
|
if role.mandateId is None:
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Cannot create template roles")
|
|
if str(role.mandateId) not in adminMandateIds:
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Access denied to this mandate")
|
|
|
|
createdRole = interface.createRole(role)
|
|
|
|
return {
|
|
"id": createdRole.id,
|
|
"roleLabel": createdRole.roleLabel,
|
|
"description": createdRole.description,
|
|
"isSystemRole": createdRole.isSystemRole
|
|
}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except ValueError as e:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=str(e)
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Error creating role: {str(e)}")
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Failed to create role: {str(e)}"
|
|
)
|
|
|
|
|
|
@router.get("/{roleId}", response_model=Dict[str, Any])
|
|
@limiter.limit("60/minute")
|
|
def get_role(
|
|
request: Request,
|
|
roleId: str = Path(..., description="Role ID"),
|
|
context: RequestContext = Depends(getRequestContext)
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Get a role by ID.
|
|
Context-aware: SysAdmin sees all. MandateAdmin sees roles from own mandates
|
|
plus template roles (read-only).
|
|
|
|
Path Parameters:
|
|
- roleId: Role ID
|
|
|
|
Returns:
|
|
- Role dictionary
|
|
"""
|
|
isSysAdmin = context.hasSysAdminRole
|
|
adminMandateIds = [] if isSysAdmin else _getAdminMandateIds(context)
|
|
if not isSysAdmin and not adminMandateIds:
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin role required")
|
|
currentUser = context.user # backward compat for existing code
|
|
|
|
try:
|
|
interface = getRootInterface()
|
|
|
|
role = interface.getRole(roleId)
|
|
if not role:
|
|
raise HTTPException(
|
|
status_code=404,
|
|
detail=f"Role {roleId} not found"
|
|
)
|
|
|
|
# MandateAdmin: can view template roles (read-only) or own mandate roles
|
|
if not isSysAdmin:
|
|
if role.mandateId is not None and str(role.mandateId) not in adminMandateIds:
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Access denied to this role")
|
|
|
|
return {
|
|
"id": role.id,
|
|
"roleLabel": role.roleLabel,
|
|
"description": role.description,
|
|
"isSystemRole": role.isSystemRole
|
|
}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error getting role: {str(e)}")
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Failed to get role: {str(e)}"
|
|
)
|
|
|
|
|
|
@router.put("/{roleId}", response_model=Dict[str, Any])
|
|
@limiter.limit("30/minute")
|
|
def update_role(
|
|
request: Request,
|
|
roleId: str = Path(..., description="Role ID"),
|
|
role: Role = Body(...),
|
|
context: RequestContext = Depends(getRequestContext)
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Update an existing role.
|
|
Context-aware: SysAdmin can update any role. MandateAdmin can update roles
|
|
within own mandates only. Template roles and sysadmin role are blocked.
|
|
|
|
Path Parameters:
|
|
- roleId: Role ID
|
|
|
|
Request Body:
|
|
- role: Updated Role object
|
|
|
|
Returns:
|
|
- Updated role dictionary
|
|
"""
|
|
isSysAdmin = context.hasSysAdminRole
|
|
adminMandateIds = [] if isSysAdmin else _getAdminMandateIds(context)
|
|
if not isSysAdmin and not adminMandateIds:
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin role required")
|
|
currentUser = context.user # backward compat for existing code
|
|
|
|
try:
|
|
interface = getRootInterface()
|
|
|
|
# MandateAdmin restrictions: check existing role before updating
|
|
if not isSysAdmin:
|
|
existingRole = interface.getRole(roleId)
|
|
if not existingRole:
|
|
raise HTTPException(status_code=404, detail=f"Role {roleId} not found")
|
|
if existingRole.roleLabel == "sysadmin":
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Cannot modify sysadmin role")
|
|
if existingRole.mandateId is None:
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Cannot modify template roles")
|
|
if str(existingRole.mandateId) not in adminMandateIds:
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Access denied to this role")
|
|
|
|
updatedRole = interface.updateRole(roleId, role)
|
|
|
|
return {
|
|
"id": updatedRole.id,
|
|
"roleLabel": updatedRole.roleLabel,
|
|
"description": updatedRole.description,
|
|
"isSystemRole": updatedRole.isSystemRole
|
|
}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except ValueError as e:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=str(e)
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Error updating role: {str(e)}")
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Failed to update role: {str(e)}"
|
|
)
|
|
|
|
|
|
@router.delete("/{roleId}", response_model=Dict[str, str])
|
|
@limiter.limit("30/minute")
|
|
def delete_role(
|
|
request: Request,
|
|
roleId: str = Path(..., description="Role ID"),
|
|
context: RequestContext = Depends(getRequestContext)
|
|
) -> Dict[str, str]:
|
|
"""
|
|
Delete a role.
|
|
Context-aware: SysAdmin can delete any role. MandateAdmin can delete roles
|
|
within own mandates only. Template roles and sysadmin role are blocked.
|
|
|
|
Path Parameters:
|
|
- roleId: Role ID
|
|
|
|
Returns:
|
|
- Success message
|
|
"""
|
|
isSysAdmin = context.hasSysAdminRole
|
|
adminMandateIds = [] if isSysAdmin else _getAdminMandateIds(context)
|
|
if not isSysAdmin and not adminMandateIds:
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin role required")
|
|
currentUser = context.user # backward compat for existing code
|
|
|
|
try:
|
|
interface = getRootInterface()
|
|
|
|
# MandateAdmin restrictions: check existing role before deleting
|
|
if not isSysAdmin:
|
|
existingRole = interface.getRole(roleId)
|
|
if not existingRole:
|
|
raise HTTPException(status_code=404, detail=f"Role {roleId} not found")
|
|
if existingRole.roleLabel == "sysadmin":
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Cannot delete sysadmin role")
|
|
if existingRole.mandateId is None:
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Cannot delete template roles")
|
|
if str(existingRole.mandateId) not in adminMandateIds:
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Access denied to this role")
|
|
|
|
success = interface.deleteRole(roleId)
|
|
if not success:
|
|
raise HTTPException(
|
|
status_code=404,
|
|
detail=f"Role {roleId} not found"
|
|
)
|
|
|
|
return {"message": f"Role {roleId} deleted successfully"}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except ValueError as e:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=str(e)
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Error deleting role: {str(e)}")
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Failed to delete role: {str(e)}"
|
|
)
|
|
|
|
|
|
@router.get("/users", response_model=List[Dict[str, Any]])
|
|
@limiter.limit("60/minute")
|
|
def list_users_with_roles(
|
|
request: Request,
|
|
roleLabel: Optional[str] = Query(None, description="Filter by role label"),
|
|
mandateId: Optional[str] = Query(None, description="Filter by mandate ID (via UserMandate)"),
|
|
context: RequestContext = Depends(getRequestContext)
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
Get list of users with their role assignments.
|
|
Context-aware: SysAdmin sees all users. MandateAdmin sees users from own mandates only.
|
|
|
|
Query Parameters:
|
|
- roleLabel: Optional filter by role label
|
|
- mandateId: Optional filter by mandate ID (via UserMandate table)
|
|
|
|
Returns:
|
|
- List of user dictionaries with role assignments
|
|
"""
|
|
isSysAdmin = context.hasSysAdminRole
|
|
adminMandateIds = [] if isSysAdmin else _getAdminMandateIds(context)
|
|
if not isSysAdmin and not adminMandateIds:
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin role required")
|
|
currentUser = context.user # backward compat for existing code
|
|
|
|
try:
|
|
interface = getRootInterface()
|
|
|
|
# Get all users via interface method (Pydantic models)
|
|
users = interface.getAllUsers()
|
|
|
|
# Filter by mandate if specified (via UserMandate table)
|
|
if mandateId:
|
|
# MandateAdmin can only query mandates they admin
|
|
if not isSysAdmin and mandateId not in adminMandateIds:
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Access denied to this mandate")
|
|
userMandates = interface.getUserMandatesByMandate(mandateId)
|
|
mandateUserIds = {str(um.userId) for um in userMandates}
|
|
users = [u for u in users if str(u.id) in mandateUserIds]
|
|
elif not isSysAdmin:
|
|
# MandateAdmin without mandateId filter: restrict to users in admin's mandates
|
|
allowedUserIds: Set[str] = set()
|
|
for mId in adminMandateIds:
|
|
userMandates = interface.getUserMandatesByMandate(mId)
|
|
for um in userMandates:
|
|
allowedUserIds.add(str(um.userId))
|
|
users = [u for u in users if str(u.id) in allowedUserIds]
|
|
|
|
# Filter by role if specified (via UserMandateRole)
|
|
if roleLabel:
|
|
users = [u for u in users if _hasRoleLabel(interface, str(u.id), roleLabel)]
|
|
|
|
# Format response
|
|
result = []
|
|
for user in users:
|
|
userRoleLabels = _getUserRoleLabels(interface, str(user.id))
|
|
result.append({
|
|
"id": user.id,
|
|
"username": user.username,
|
|
"email": user.email,
|
|
"fullName": user.fullName,
|
|
"isSysAdmin": user.isSysAdmin,
|
|
"enabled": user.enabled,
|
|
"roleLabels": userRoleLabels,
|
|
"roleCount": len(userRoleLabels)
|
|
})
|
|
|
|
return result
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error listing users with roles: {str(e)}")
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Failed to list users with roles: {str(e)}"
|
|
)
|
|
|
|
|
|
@router.get("/users/{userId}", response_model=Dict[str, Any])
|
|
@limiter.limit("60/minute")
|
|
def get_user_roles(
|
|
request: Request,
|
|
userId: str = Path(..., description="User ID"),
|
|
context: RequestContext = Depends(getRequestContext)
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Get role assignments for a specific user.
|
|
Context-aware: SysAdmin sees all. MandateAdmin can view users in own mandates only.
|
|
|
|
Path Parameters:
|
|
- userId: User ID
|
|
|
|
Returns:
|
|
- User dictionary with role assignments
|
|
"""
|
|
isSysAdmin = context.hasSysAdminRole
|
|
adminMandateIds = [] if isSysAdmin else _getAdminMandateIds(context)
|
|
if not isSysAdmin and not adminMandateIds:
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin role required")
|
|
currentUser = context.user # backward compat for existing code
|
|
|
|
try:
|
|
interface = getRootInterface()
|
|
|
|
# Get user
|
|
user = interface.getUser(userId)
|
|
if not user:
|
|
raise HTTPException(
|
|
status_code=404,
|
|
detail=f"User {userId} not found"
|
|
)
|
|
|
|
# MandateAdmin: check user is in one of admin's mandates
|
|
if not isSysAdmin:
|
|
userMandates = interface.getUserMandates(userId)
|
|
userMandateMandateIds = {str(um.mandateId) for um in userMandates}
|
|
if not userMandateMandateIds.intersection(adminMandateIds):
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Access denied to this user")
|
|
|
|
userRoleLabels = _getUserRoleLabels(interface, str(user.id))
|
|
return {
|
|
"id": user.id,
|
|
"username": user.username,
|
|
"email": user.email,
|
|
"fullName": user.fullName,
|
|
"isSysAdmin": user.isSysAdmin,
|
|
"enabled": user.enabled,
|
|
"roleLabels": userRoleLabels,
|
|
"roleCount": len(userRoleLabels)
|
|
}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error getting user roles: {str(e)}")
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Failed to get user roles: {str(e)}"
|
|
)
|
|
|
|
|
|
@router.put("/users/{userId}/roles", response_model=Dict[str, Any])
|
|
@limiter.limit("30/minute")
|
|
def update_user_roles(
|
|
request: Request,
|
|
userId: str = Path(..., description="User ID"),
|
|
newRoleLabels: List[str] = Body(..., description="List of role labels to assign"),
|
|
context: RequestContext = Depends(getRequestContext)
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Update role assignments for a specific user.
|
|
Context-aware: SysAdmin can update any user's roles. MandateAdmin can update roles
|
|
for users in own mandates only. Cannot assign sysadmin role.
|
|
|
|
Path Parameters:
|
|
- userId: User ID
|
|
|
|
Request Body:
|
|
- newRoleLabels: List of role labels to assign (e.g., ["admin", "user"])
|
|
|
|
Returns:
|
|
- Updated user dictionary with role assignments
|
|
"""
|
|
isSysAdmin = context.hasSysAdminRole
|
|
adminMandateIds = [] if isSysAdmin else _getAdminMandateIds(context)
|
|
if not isSysAdmin and not adminMandateIds:
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin role required")
|
|
currentUser = context.user # backward compat for existing code
|
|
|
|
try:
|
|
interface = getRootInterface()
|
|
|
|
# Get user
|
|
user = interface.getUser(userId)
|
|
if not user:
|
|
raise HTTPException(
|
|
status_code=404,
|
|
detail=f"User {userId} not found"
|
|
)
|
|
|
|
# MandateAdmin restrictions
|
|
if not isSysAdmin:
|
|
if "sysadmin" in newRoleLabels:
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Cannot assign sysadmin role")
|
|
|
|
# Validate role labels (basic validation - check against standard roles)
|
|
standardRoles = ["sysadmin", "admin", "user", "viewer"]
|
|
for roleLabel in newRoleLabels:
|
|
if roleLabel not in standardRoles:
|
|
logger.warning(f"Non-standard role label assigned: {roleLabel}")
|
|
|
|
# Get user's first mandate (for role assignment)
|
|
userMandates = interface.getUserMandates(userId)
|
|
if not userMandates:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"User {userId} has no mandate memberships. Add to mandate first."
|
|
)
|
|
|
|
userMandateId = str(userMandates[0].id)
|
|
targetMandateId = str(userMandates[0].mandateId)
|
|
|
|
# MandateAdmin: check target mandate belongs to admin's mandates
|
|
if not isSysAdmin:
|
|
if targetMandateId not in adminMandateIds:
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Access denied to this mandate")
|
|
|
|
# Get current roles for this mandate (Pydantic models)
|
|
existingRoles = interface.getUserMandateRoles(userMandateId)
|
|
existingRoleIds = {str(r.roleId) for r in existingRoles}
|
|
|
|
# Convert roleLabels to roleIds - use mandate-scoped lookup to get instance roles
|
|
# (prevents assigning template roles instead of mandate-instance roles)
|
|
newRoleIds = set()
|
|
for roleLabel in newRoleLabels:
|
|
role = interface.getRoleByLabelAndScope(roleLabel, mandateId=targetMandateId)
|
|
if not role:
|
|
logger.warning(f"Role '{roleLabel}' not found for mandate {targetMandateId}, skipping")
|
|
continue
|
|
newRoleIds.add(str(role.id))
|
|
|
|
# Remove roles that are no longer needed
|
|
for existingRole in existingRoles:
|
|
if str(existingRole.roleId) not in newRoleIds:
|
|
interface.removeRoleFromUserMandate(userMandateId, str(existingRole.roleId))
|
|
|
|
# Add new roles
|
|
for roleId in newRoleIds:
|
|
if roleId not in existingRoleIds:
|
|
newRole = UserMandateRole(userMandateId=userMandateId, roleId=roleId)
|
|
interface.db.recordCreate(UserMandateRole, newRole.model_dump())
|
|
|
|
logger.info(f"Updated roles for user {userId}: {newRoleLabels} by admin {currentUser.id}")
|
|
|
|
userRoleLabels = _getUserRoleLabels(interface, userId)
|
|
return {
|
|
"id": user.id,
|
|
"username": user.username,
|
|
"email": user.email,
|
|
"fullName": user.fullName,
|
|
"isSysAdmin": user.isSysAdmin,
|
|
"enabled": user.enabled,
|
|
"roleLabels": userRoleLabels,
|
|
"roleCount": len(userRoleLabels)
|
|
}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error updating user roles: {str(e)}")
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Failed to update user roles: {str(e)}"
|
|
)
|
|
|
|
|
|
@router.post("/users/{userId}/roles/{roleLabel}", response_model=Dict[str, Any])
|
|
@limiter.limit("30/minute")
|
|
def add_user_role(
|
|
request: Request,
|
|
userId: str = Path(..., description="User ID"),
|
|
roleLabel: str = Path(..., description="Role label to add"),
|
|
context: RequestContext = Depends(getRequestContext)
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Add a role to a user (if not already assigned).
|
|
Context-aware: SysAdmin can add any role. MandateAdmin can add roles to users
|
|
in own mandates only. Cannot assign sysadmin role.
|
|
|
|
Path Parameters:
|
|
- userId: User ID
|
|
- roleLabel: Role label to add
|
|
|
|
Returns:
|
|
- Updated user dictionary with role assignments
|
|
"""
|
|
isSysAdmin = context.hasSysAdminRole
|
|
adminMandateIds = [] if isSysAdmin else _getAdminMandateIds(context)
|
|
if not isSysAdmin and not adminMandateIds:
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin role required")
|
|
currentUser = context.user # backward compat for existing code
|
|
|
|
try:
|
|
interface = getRootInterface()
|
|
|
|
# Get user
|
|
user = interface.getUser(userId)
|
|
if not user:
|
|
raise HTTPException(
|
|
status_code=404,
|
|
detail=f"User {userId} not found"
|
|
)
|
|
|
|
# MandateAdmin restrictions
|
|
if not isSysAdmin:
|
|
if roleLabel == "sysadmin":
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Cannot assign sysadmin role")
|
|
|
|
# Get user's first mandate
|
|
userMandates = interface.getUserMandates(userId)
|
|
if not userMandates:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"User {userId} has no mandate memberships. Add to mandate first."
|
|
)
|
|
|
|
userMandateId = str(userMandates[0].id)
|
|
targetMandateId = str(userMandates[0].mandateId)
|
|
|
|
# MandateAdmin: check target mandate belongs to admin's mandates
|
|
if not isSysAdmin:
|
|
if targetMandateId not in adminMandateIds:
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Access denied to this mandate")
|
|
|
|
# Get role by label - use mandate-scoped lookup to get instance role
|
|
# (prevents assigning template roles instead of mandate-instance roles)
|
|
role = interface.getRoleByLabelAndScope(roleLabel, mandateId=targetMandateId)
|
|
if not role:
|
|
raise HTTPException(
|
|
status_code=404,
|
|
detail=f"Role '{roleLabel}' not found for mandate {targetMandateId}"
|
|
)
|
|
|
|
# Check if role is already assigned - use interface method
|
|
existingRoles = interface.getUserMandateRoles(userMandateId)
|
|
roleAlreadyAssigned = any(str(r.roleId) == str(role.id) for r in existingRoles)
|
|
|
|
if not roleAlreadyAssigned:
|
|
# Add the role via interface method
|
|
interface.addRoleToUserMandate(userMandateId, str(role.id))
|
|
logger.info(f"Added role {roleLabel} to user {userId} by admin {currentUser.id}")
|
|
|
|
userRoleLabels = _getUserRoleLabels(interface, userId)
|
|
return {
|
|
"id": user.id,
|
|
"username": user.username,
|
|
"email": user.email,
|
|
"fullName": user.fullName,
|
|
"isSysAdmin": user.isSysAdmin,
|
|
"enabled": user.enabled,
|
|
"roleLabels": userRoleLabels,
|
|
"roleCount": len(userRoleLabels)
|
|
}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error adding role to user: {str(e)}")
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Failed to add role to user: {str(e)}"
|
|
)
|
|
|
|
|
|
@router.delete("/users/{userId}/roles/{roleLabel}", response_model=Dict[str, Any])
|
|
@limiter.limit("30/minute")
|
|
def remove_user_role(
|
|
request: Request,
|
|
userId: str = Path(..., description="User ID"),
|
|
roleLabel: str = Path(..., description="Role label to remove"),
|
|
context: RequestContext = Depends(getRequestContext)
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Remove a role from a user.
|
|
Context-aware: SysAdmin can remove any role. MandateAdmin can remove roles from
|
|
users in own mandates only. Cannot remove sysadmin role.
|
|
|
|
Path Parameters:
|
|
- userId: User ID
|
|
- roleLabel: Role label to remove
|
|
|
|
Returns:
|
|
- Updated user dictionary with role assignments
|
|
"""
|
|
isSysAdmin = context.hasSysAdminRole
|
|
adminMandateIds = [] if isSysAdmin else _getAdminMandateIds(context)
|
|
if not isSysAdmin and not adminMandateIds:
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin role required")
|
|
currentUser = context.user # backward compat for existing code
|
|
|
|
try:
|
|
interface = getRootInterface()
|
|
|
|
# Get user
|
|
user = interface.getUser(userId)
|
|
if not user:
|
|
raise HTTPException(
|
|
status_code=404,
|
|
detail=f"User {userId} not found"
|
|
)
|
|
|
|
# MandateAdmin restrictions
|
|
if not isSysAdmin:
|
|
if roleLabel == "sysadmin":
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Cannot remove sysadmin role")
|
|
|
|
# Get role by label
|
|
role = interface.getRoleByLabel(roleLabel)
|
|
if not role:
|
|
raise HTTPException(
|
|
status_code=404,
|
|
detail=f"Role '{roleLabel}' not found"
|
|
)
|
|
|
|
# Remove role from user's mandates
|
|
userMandates = interface.getUserMandates(userId)
|
|
|
|
# MandateAdmin: check user's mandates overlap with admin's mandates
|
|
if not isSysAdmin:
|
|
userMandateMandateIds = {str(um.mandateId) for um in userMandates}
|
|
if not userMandateMandateIds.intersection(set(adminMandateIds)):
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Access denied to this user")
|
|
|
|
roleRemoved = False
|
|
|
|
for um in userMandates:
|
|
userMandateId = str(um.id)
|
|
|
|
# MandateAdmin: only remove from mandates they admin
|
|
if not isSysAdmin and str(um.mandateId) not in adminMandateIds:
|
|
continue
|
|
|
|
# Remove role via interface method
|
|
if interface.removeRoleFromUserMandate(userMandateId, str(role.id)):
|
|
roleRemoved = True
|
|
|
|
if roleRemoved:
|
|
logger.info(f"Removed role {roleLabel} from user {userId} by admin {currentUser.id}")
|
|
|
|
userRoleLabels = _getUserRoleLabels(interface, userId)
|
|
return {
|
|
"id": user.id,
|
|
"username": user.username,
|
|
"email": user.email,
|
|
"fullName": user.fullName,
|
|
"isSysAdmin": user.isSysAdmin,
|
|
"enabled": user.enabled,
|
|
"roleLabels": userRoleLabels,
|
|
"roleCount": len(userRoleLabels)
|
|
}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error removing role from user: {str(e)}")
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Failed to remove role from user: {str(e)}"
|
|
)
|
|
|
|
|
|
@router.get("/roles/{roleLabel}/users", response_model=List[Dict[str, Any]])
|
|
@limiter.limit("60/minute")
|
|
def get_users_with_role(
|
|
request: Request,
|
|
roleLabel: str = Path(..., description="Role label"),
|
|
mandateId: Optional[str] = Query(None, description="Filter by mandate ID (via UserMandate)"),
|
|
context: RequestContext = Depends(getRequestContext)
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
Get all users with a specific role.
|
|
Context-aware: SysAdmin sees all. MandateAdmin sees users from own mandates only.
|
|
|
|
Path Parameters:
|
|
- roleLabel: Role label
|
|
|
|
Query Parameters:
|
|
- mandateId: Optional filter by mandate ID (via UserMandate table)
|
|
|
|
Returns:
|
|
- List of users with the specified role
|
|
"""
|
|
isSysAdmin = context.hasSysAdminRole
|
|
adminMandateIds = [] if isSysAdmin else _getAdminMandateIds(context)
|
|
if not isSysAdmin and not adminMandateIds:
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin role required")
|
|
currentUser = context.user # backward compat for existing code
|
|
|
|
try:
|
|
interface = getRootInterface()
|
|
|
|
# Get role by label
|
|
role = interface.getRoleByLabel(roleLabel)
|
|
if not role:
|
|
raise HTTPException(
|
|
status_code=404,
|
|
detail=f"Role '{roleLabel}' not found"
|
|
)
|
|
|
|
# Get all UserMandateRole assignments for this role (Pydantic models)
|
|
roleAssignments = interface.getUserMandateRolesByRole(str(role.id))
|
|
|
|
# Get unique userMandateIds
|
|
userMandateIds = {str(ra.userMandateId) for ra in roleAssignments}
|
|
|
|
# Get userIds from UserMandate records
|
|
userIds: Set[str] = set()
|
|
for userMandateId in userMandateIds:
|
|
um = interface.getUserMandateById(userMandateId)
|
|
if um:
|
|
# Filter by mandate if specified
|
|
if mandateId and str(um.mandateId) != mandateId:
|
|
continue
|
|
# MandateAdmin: filter to own mandates
|
|
if not isSysAdmin and str(um.mandateId) not in adminMandateIds:
|
|
continue
|
|
userIds.add(str(um.userId))
|
|
|
|
# Get users and format response
|
|
result = []
|
|
for userId in userIds:
|
|
user = interface.getUser(userId)
|
|
if user:
|
|
userRoleLabels = _getUserRoleLabels(interface, userId)
|
|
result.append({
|
|
"id": user.id,
|
|
"username": user.username,
|
|
"email": user.email,
|
|
"fullName": user.fullName,
|
|
"isSysAdmin": user.isSysAdmin,
|
|
"enabled": user.enabled,
|
|
"roleLabels": userRoleLabels,
|
|
"roleCount": len(userRoleLabels)
|
|
})
|
|
|
|
return result
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error getting users with role: {str(e)}")
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Failed to get users with role: {str(e)}"
|
|
)
|