# Copyright (c) 2025 Patrick Motsch # All rights reserved. """ Admin RBAC Roles Management routes. Provides endpoints for managing roles and role assignments to users. MULTI-TENANT: Context-aware access control. - SysAdmin: Full access to all roles and assignments across all mandates. - MandateAdmin: Can manage roles and assignments within their own mandates. Template roles (mandateId=None, isSystemRole=True) are read-only. The sysadmin role (roleLabel="sysadmin") is not manageable by MandateAdmins. Role assignments are managed via UserMandateRole (not User.roleLabels). """ from fastapi import APIRouter, HTTPException, Depends, Query, Body, Path, Request, status from typing import List, Dict, Any, Optional, Set import logging from modules.auth import limiter, requireSysAdminRole, getRequestContext, RequestContext from modules.datamodels.datamodelUam import User, UserInDB from modules.datamodels.datamodelRbac import Role from modules.datamodels.datamodelMembership import UserMandate, UserMandateRole from modules.interfaces.interfaceDbApp import getInterface, getRootInterface # Configure logger logger = logging.getLogger(__name__) def _getUserRoleLabels(interface, userId: str) -> List[str]: """ Get role labels for a user from UserMandateRole (across all mandates). Args: interface: Database interface userId: User ID Returns: List of role labels """ roleLabels: Set[str] = set() # Get all UserMandate records for this user (Pydantic models) userMandates = interface.getUserMandates(userId) for um in userMandates: # Get all UserMandateRole records for this membership (Pydantic models) userMandateRoles = interface.getUserMandateRoles(str(um.id)) for umr in userMandateRoles: if umr.roleId: # Get role by ID to get roleLabel role = interface.getRole(str(umr.roleId)) if role: roleLabels.add(role.roleLabel) return list(roleLabels) def _hasRoleLabel(interface, userId: str, roleLabel: str) -> bool: """ Check if user has a specific role label (across all mandates). """ return roleLabel in _getUserRoleLabels(interface, userId) def _getAdminMandateIds(context: RequestContext) -> List[str]: """Get mandate IDs where the user has admin role.""" mandateIds = [] try: rootInterface = getRootInterface() userMandates = rootInterface.getUserMandates(str(context.user.id)) for um in userMandates: if not getattr(um, 'enabled', True): continue umId = getattr(um, 'id', None) mandateId = getattr(um, 'mandateId', None) if not umId or not mandateId: continue roleIds = rootInterface.getRoleIdsForUserMandate(str(umId)) for roleId in roleIds: role = rootInterface.getRole(roleId) if role and role.roleLabel == "admin" and not role.featureInstanceId: mandateIds.append(str(mandateId)) break except Exception as e: logger.error(f"Error getting admin mandate IDs: {e}") return mandateIds router = APIRouter( prefix="/api/admin/rbac/roles", tags=["Admin RBAC Roles"], responses={404: {"description": "Not found"}} ) @router.get("/", response_model=List[Dict[str, Any]]) @limiter.limit("60/minute") def list_roles( request: Request, mandateId: Optional[str] = Query(None, description="Filter roles by mandate ID"), context: RequestContext = Depends(getRequestContext) ) -> List[Dict[str, Any]]: """ Get list of roles with metadata. Context-aware: SysAdmin sees all roles. MandateAdmin sees roles from own mandates plus template roles (read-only). Without mandateId: returns system template roles (mandateId=NULL). With mandateId: returns mandate-level roles for that mandate (featureInstanceId=NULL). """ isSysAdmin = context.hasSysAdminRole adminMandateIds = [] if isSysAdmin else _getAdminMandateIds(context) if not isSysAdmin and not adminMandateIds: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin role required") currentUser = context.user # backward compat for existing code try: interface = getRootInterface() # Get roles filtered by scope print(f"[DEBUG list_roles] mandateId={mandateId}") if mandateId: # MandateAdmin can only query mandates they admin if not isSysAdmin and mandateId not in adminMandateIds: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Access denied to this mandate") # Mandate-specific roles (mandate-level only, no feature-instance roles) dbRoles = interface.getRolesForMandate(mandateId) print(f"[DEBUG list_roles] getRolesForMandate returned {len(dbRoles)} roles") else: # System template roles only dbRoles = interface.getAllRoles() print(f"[DEBUG list_roles] getAllRoles returned {len(dbRoles)} roles") # MandateAdmin: filter to template roles + roles from own mandates if not isSysAdmin: dbRoles = [r for r in dbRoles if r.mandateId is None or str(r.mandateId) in adminMandateIds] # Count role assignments from UserMandateRole table roleCounts = interface.countRoleAssignments() # Convert Role objects to dictionaries and add user counts result = [] for role in dbRoles: result.append({ "id": role.id, "roleLabel": role.roleLabel, "description": role.description, "mandateId": role.mandateId, "featureInstanceId": role.featureInstanceId, "userCount": roleCounts.get(str(role.id), 0), "isSystemRole": role.isSystemRole }) return result except HTTPException: raise except Exception as e: logger.error(f"Error listing roles: {str(e)}") raise HTTPException( status_code=500, detail=f"Failed to list roles: {str(e)}" ) @router.get("/options", response_model=List[Dict[str, Any]]) @limiter.limit("60/minute") def get_role_options( request: Request, context: RequestContext = Depends(getRequestContext) ) -> List[Dict[str, Any]]: """ Get role options for select dropdowns. Context-aware: SysAdmin sees all roles. MandateAdmin sees roles from own mandates plus template roles. Returns: - List of role option dictionaries with value and label """ isSysAdmin = context.hasSysAdminRole adminMandateIds = [] if isSysAdmin else _getAdminMandateIds(context) if not isSysAdmin and not adminMandateIds: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin role required") currentUser = context.user # backward compat for existing code try: interface = getRootInterface() # Get all roles from database dbRoles = interface.getAllRoles() # MandateAdmin: filter to template roles + roles from own mandates if not isSysAdmin: dbRoles = [r for r in dbRoles if r.mandateId is None or str(r.mandateId) in adminMandateIds] # Convert to options format options = [] for role in dbRoles: # Use English description as label, fallback to roleLabel label = role.description.get("en", role.roleLabel) if isinstance(role.description, dict) else role.roleLabel options.append({ "value": role.roleLabel, "label": label }) return options except HTTPException: raise except Exception as e: logger.error(f"Error getting role options: {str(e)}") raise HTTPException( status_code=500, detail=f"Failed to get role options: {str(e)}" ) @router.post("/", response_model=Dict[str, Any]) @limiter.limit("30/minute") def create_role( request: Request, role: Role = Body(...), context: RequestContext = Depends(getRequestContext) ) -> Dict[str, Any]: """ Create a new role. Context-aware: SysAdmin can create any role. MandateAdmin can create roles within own mandates only (not template or sysadmin roles). Request Body: - role: Role object to create Returns: - Created role dictionary """ isSysAdmin = context.hasSysAdminRole adminMandateIds = [] if isSysAdmin else _getAdminMandateIds(context) if not isSysAdmin and not adminMandateIds: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin role required") currentUser = context.user # backward compat for existing code try: interface = getRootInterface() # MandateAdmin restrictions if not isSysAdmin: if role.roleLabel == "sysadmin": raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Cannot create sysadmin role") if role.mandateId is None: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Cannot create template roles") if str(role.mandateId) not in adminMandateIds: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Access denied to this mandate") createdRole = interface.createRole(role) return { "id": createdRole.id, "roleLabel": createdRole.roleLabel, "description": createdRole.description, "isSystemRole": createdRole.isSystemRole } except HTTPException: raise except ValueError as e: raise HTTPException( status_code=400, detail=str(e) ) except Exception as e: logger.error(f"Error creating role: {str(e)}") raise HTTPException( status_code=500, detail=f"Failed to create role: {str(e)}" ) @router.get("/{roleId}", response_model=Dict[str, Any]) @limiter.limit("60/minute") def get_role( request: Request, roleId: str = Path(..., description="Role ID"), context: RequestContext = Depends(getRequestContext) ) -> Dict[str, Any]: """ Get a role by ID. Context-aware: SysAdmin sees all. MandateAdmin sees roles from own mandates plus template roles (read-only). Path Parameters: - roleId: Role ID Returns: - Role dictionary """ isSysAdmin = context.hasSysAdminRole adminMandateIds = [] if isSysAdmin else _getAdminMandateIds(context) if not isSysAdmin and not adminMandateIds: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin role required") currentUser = context.user # backward compat for existing code try: interface = getRootInterface() role = interface.getRole(roleId) if not role: raise HTTPException( status_code=404, detail=f"Role {roleId} not found" ) # MandateAdmin: can view template roles (read-only) or own mandate roles if not isSysAdmin: if role.mandateId is not None and str(role.mandateId) not in adminMandateIds: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Access denied to this role") return { "id": role.id, "roleLabel": role.roleLabel, "description": role.description, "isSystemRole": role.isSystemRole } except HTTPException: raise except Exception as e: logger.error(f"Error getting role: {str(e)}") raise HTTPException( status_code=500, detail=f"Failed to get role: {str(e)}" ) @router.put("/{roleId}", response_model=Dict[str, Any]) @limiter.limit("30/minute") def update_role( request: Request, roleId: str = Path(..., description="Role ID"), role: Role = Body(...), context: RequestContext = Depends(getRequestContext) ) -> Dict[str, Any]: """ Update an existing role. Context-aware: SysAdmin can update any role. MandateAdmin can update roles within own mandates only. Template roles and sysadmin role are blocked. Path Parameters: - roleId: Role ID Request Body: - role: Updated Role object Returns: - Updated role dictionary """ isSysAdmin = context.hasSysAdminRole adminMandateIds = [] if isSysAdmin else _getAdminMandateIds(context) if not isSysAdmin and not adminMandateIds: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin role required") currentUser = context.user # backward compat for existing code try: interface = getRootInterface() # MandateAdmin restrictions: check existing role before updating if not isSysAdmin: existingRole = interface.getRole(roleId) if not existingRole: raise HTTPException(status_code=404, detail=f"Role {roleId} not found") if existingRole.roleLabel == "sysadmin": raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Cannot modify sysadmin role") if existingRole.mandateId is None: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Cannot modify template roles") if str(existingRole.mandateId) not in adminMandateIds: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Access denied to this role") updatedRole = interface.updateRole(roleId, role) return { "id": updatedRole.id, "roleLabel": updatedRole.roleLabel, "description": updatedRole.description, "isSystemRole": updatedRole.isSystemRole } except HTTPException: raise except ValueError as e: raise HTTPException( status_code=400, detail=str(e) ) except Exception as e: logger.error(f"Error updating role: {str(e)}") raise HTTPException( status_code=500, detail=f"Failed to update role: {str(e)}" ) @router.delete("/{roleId}", response_model=Dict[str, str]) @limiter.limit("30/minute") def delete_role( request: Request, roleId: str = Path(..., description="Role ID"), context: RequestContext = Depends(getRequestContext) ) -> Dict[str, str]: """ Delete a role. Context-aware: SysAdmin can delete any role. MandateAdmin can delete roles within own mandates only. Template roles and sysadmin role are blocked. Path Parameters: - roleId: Role ID Returns: - Success message """ isSysAdmin = context.hasSysAdminRole adminMandateIds = [] if isSysAdmin else _getAdminMandateIds(context) if not isSysAdmin and not adminMandateIds: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin role required") currentUser = context.user # backward compat for existing code try: interface = getRootInterface() # MandateAdmin restrictions: check existing role before deleting if not isSysAdmin: existingRole = interface.getRole(roleId) if not existingRole: raise HTTPException(status_code=404, detail=f"Role {roleId} not found") if existingRole.roleLabel == "sysadmin": raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Cannot delete sysadmin role") if existingRole.mandateId is None: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Cannot delete template roles") if str(existingRole.mandateId) not in adminMandateIds: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Access denied to this role") success = interface.deleteRole(roleId) if not success: raise HTTPException( status_code=404, detail=f"Role {roleId} not found" ) return {"message": f"Role {roleId} deleted successfully"} except HTTPException: raise except ValueError as e: raise HTTPException( status_code=400, detail=str(e) ) except Exception as e: logger.error(f"Error deleting role: {str(e)}") raise HTTPException( status_code=500, detail=f"Failed to delete role: {str(e)}" ) @router.get("/users", response_model=List[Dict[str, Any]]) @limiter.limit("60/minute") def list_users_with_roles( request: Request, roleLabel: Optional[str] = Query(None, description="Filter by role label"), mandateId: Optional[str] = Query(None, description="Filter by mandate ID (via UserMandate)"), context: RequestContext = Depends(getRequestContext) ) -> List[Dict[str, Any]]: """ Get list of users with their role assignments. Context-aware: SysAdmin sees all users. MandateAdmin sees users from own mandates only. Query Parameters: - roleLabel: Optional filter by role label - mandateId: Optional filter by mandate ID (via UserMandate table) Returns: - List of user dictionaries with role assignments """ isSysAdmin = context.hasSysAdminRole adminMandateIds = [] if isSysAdmin else _getAdminMandateIds(context) if not isSysAdmin and not adminMandateIds: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin role required") currentUser = context.user # backward compat for existing code try: interface = getRootInterface() # Get all users via interface method (Pydantic models) users = interface.getAllUsers() # Filter by mandate if specified (via UserMandate table) if mandateId: # MandateAdmin can only query mandates they admin if not isSysAdmin and mandateId not in adminMandateIds: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Access denied to this mandate") userMandates = interface.getUserMandatesByMandate(mandateId) mandateUserIds = {str(um.userId) for um in userMandates} users = [u for u in users if str(u.id) in mandateUserIds] elif not isSysAdmin: # MandateAdmin without mandateId filter: restrict to users in admin's mandates allowedUserIds: Set[str] = set() for mId in adminMandateIds: userMandates = interface.getUserMandatesByMandate(mId) for um in userMandates: allowedUserIds.add(str(um.userId)) users = [u for u in users if str(u.id) in allowedUserIds] # Filter by role if specified (via UserMandateRole) if roleLabel: users = [u for u in users if _hasRoleLabel(interface, str(u.id), roleLabel)] # Format response result = [] for user in users: userRoleLabels = _getUserRoleLabels(interface, str(user.id)) result.append({ "id": user.id, "username": user.username, "email": user.email, "fullName": user.fullName, "isSysAdmin": user.isSysAdmin, "enabled": user.enabled, "roleLabels": userRoleLabels, "roleCount": len(userRoleLabels) }) return result except HTTPException: raise except Exception as e: logger.error(f"Error listing users with roles: {str(e)}") raise HTTPException( status_code=500, detail=f"Failed to list users with roles: {str(e)}" ) @router.get("/users/{userId}", response_model=Dict[str, Any]) @limiter.limit("60/minute") def get_user_roles( request: Request, userId: str = Path(..., description="User ID"), context: RequestContext = Depends(getRequestContext) ) -> Dict[str, Any]: """ Get role assignments for a specific user. Context-aware: SysAdmin sees all. MandateAdmin can view users in own mandates only. Path Parameters: - userId: User ID Returns: - User dictionary with role assignments """ isSysAdmin = context.hasSysAdminRole adminMandateIds = [] if isSysAdmin else _getAdminMandateIds(context) if not isSysAdmin and not adminMandateIds: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin role required") currentUser = context.user # backward compat for existing code try: interface = getRootInterface() # Get user user = interface.getUser(userId) if not user: raise HTTPException( status_code=404, detail=f"User {userId} not found" ) # MandateAdmin: check user is in one of admin's mandates if not isSysAdmin: userMandates = interface.getUserMandates(userId) userMandateMandateIds = {str(um.mandateId) for um in userMandates} if not userMandateMandateIds.intersection(adminMandateIds): raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Access denied to this user") userRoleLabels = _getUserRoleLabels(interface, str(user.id)) return { "id": user.id, "username": user.username, "email": user.email, "fullName": user.fullName, "isSysAdmin": user.isSysAdmin, "enabled": user.enabled, "roleLabels": userRoleLabels, "roleCount": len(userRoleLabels) } except HTTPException: raise except Exception as e: logger.error(f"Error getting user roles: {str(e)}") raise HTTPException( status_code=500, detail=f"Failed to get user roles: {str(e)}" ) @router.put("/users/{userId}/roles", response_model=Dict[str, Any]) @limiter.limit("30/minute") def update_user_roles( request: Request, userId: str = Path(..., description="User ID"), newRoleLabels: List[str] = Body(..., description="List of role labels to assign"), context: RequestContext = Depends(getRequestContext) ) -> Dict[str, Any]: """ Update role assignments for a specific user. Context-aware: SysAdmin can update any user's roles. MandateAdmin can update roles for users in own mandates only. Cannot assign sysadmin role. Path Parameters: - userId: User ID Request Body: - newRoleLabels: List of role labels to assign (e.g., ["admin", "user"]) Returns: - Updated user dictionary with role assignments """ isSysAdmin = context.hasSysAdminRole adminMandateIds = [] if isSysAdmin else _getAdminMandateIds(context) if not isSysAdmin and not adminMandateIds: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin role required") currentUser = context.user # backward compat for existing code try: interface = getRootInterface() # Get user user = interface.getUser(userId) if not user: raise HTTPException( status_code=404, detail=f"User {userId} not found" ) # MandateAdmin restrictions if not isSysAdmin: if "sysadmin" in newRoleLabels: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Cannot assign sysadmin role") # Validate role labels (basic validation - check against standard roles) standardRoles = ["sysadmin", "admin", "user", "viewer"] for roleLabel in newRoleLabels: if roleLabel not in standardRoles: logger.warning(f"Non-standard role label assigned: {roleLabel}") # Get user's first mandate (for role assignment) userMandates = interface.getUserMandates(userId) if not userMandates: raise HTTPException( status_code=400, detail=f"User {userId} has no mandate memberships. Add to mandate first." ) userMandateId = str(userMandates[0].id) targetMandateId = str(userMandates[0].mandateId) # MandateAdmin: check target mandate belongs to admin's mandates if not isSysAdmin: if targetMandateId not in adminMandateIds: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Access denied to this mandate") # Get current roles for this mandate (Pydantic models) existingRoles = interface.getUserMandateRoles(userMandateId) existingRoleIds = {str(r.roleId) for r in existingRoles} # Convert roleLabels to roleIds - use mandate-scoped lookup to get instance roles # (prevents assigning template roles instead of mandate-instance roles) newRoleIds = set() for roleLabel in newRoleLabels: role = interface.getRoleByLabelAndScope(roleLabel, mandateId=targetMandateId) if not role: logger.warning(f"Role '{roleLabel}' not found for mandate {targetMandateId}, skipping") continue newRoleIds.add(str(role.id)) # Remove roles that are no longer needed for existingRole in existingRoles: if str(existingRole.roleId) not in newRoleIds: interface.removeRoleFromUserMandate(userMandateId, str(existingRole.roleId)) # Add new roles for roleId in newRoleIds: if roleId not in existingRoleIds: newRole = UserMandateRole(userMandateId=userMandateId, roleId=roleId) interface.db.recordCreate(UserMandateRole, newRole.model_dump()) logger.info(f"Updated roles for user {userId}: {newRoleLabels} by admin {currentUser.id}") userRoleLabels = _getUserRoleLabels(interface, userId) return { "id": user.id, "username": user.username, "email": user.email, "fullName": user.fullName, "isSysAdmin": user.isSysAdmin, "enabled": user.enabled, "roleLabels": userRoleLabels, "roleCount": len(userRoleLabels) } except HTTPException: raise except Exception as e: logger.error(f"Error updating user roles: {str(e)}") raise HTTPException( status_code=500, detail=f"Failed to update user roles: {str(e)}" ) @router.post("/users/{userId}/roles/{roleLabel}", response_model=Dict[str, Any]) @limiter.limit("30/minute") def add_user_role( request: Request, userId: str = Path(..., description="User ID"), roleLabel: str = Path(..., description="Role label to add"), context: RequestContext = Depends(getRequestContext) ) -> Dict[str, Any]: """ Add a role to a user (if not already assigned). Context-aware: SysAdmin can add any role. MandateAdmin can add roles to users in own mandates only. Cannot assign sysadmin role. Path Parameters: - userId: User ID - roleLabel: Role label to add Returns: - Updated user dictionary with role assignments """ isSysAdmin = context.hasSysAdminRole adminMandateIds = [] if isSysAdmin else _getAdminMandateIds(context) if not isSysAdmin and not adminMandateIds: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin role required") currentUser = context.user # backward compat for existing code try: interface = getRootInterface() # Get user user = interface.getUser(userId) if not user: raise HTTPException( status_code=404, detail=f"User {userId} not found" ) # MandateAdmin restrictions if not isSysAdmin: if roleLabel == "sysadmin": raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Cannot assign sysadmin role") # Get user's first mandate userMandates = interface.getUserMandates(userId) if not userMandates: raise HTTPException( status_code=400, detail=f"User {userId} has no mandate memberships. Add to mandate first." ) userMandateId = str(userMandates[0].id) targetMandateId = str(userMandates[0].mandateId) # MandateAdmin: check target mandate belongs to admin's mandates if not isSysAdmin: if targetMandateId not in adminMandateIds: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Access denied to this mandate") # Get role by label - use mandate-scoped lookup to get instance role # (prevents assigning template roles instead of mandate-instance roles) role = interface.getRoleByLabelAndScope(roleLabel, mandateId=targetMandateId) if not role: raise HTTPException( status_code=404, detail=f"Role '{roleLabel}' not found for mandate {targetMandateId}" ) # Check if role is already assigned - use interface method existingRoles = interface.getUserMandateRoles(userMandateId) roleAlreadyAssigned = any(str(r.roleId) == str(role.id) for r in existingRoles) if not roleAlreadyAssigned: # Add the role via interface method interface.addRoleToUserMandate(userMandateId, str(role.id)) logger.info(f"Added role {roleLabel} to user {userId} by admin {currentUser.id}") userRoleLabels = _getUserRoleLabels(interface, userId) return { "id": user.id, "username": user.username, "email": user.email, "fullName": user.fullName, "isSysAdmin": user.isSysAdmin, "enabled": user.enabled, "roleLabels": userRoleLabels, "roleCount": len(userRoleLabels) } except HTTPException: raise except Exception as e: logger.error(f"Error adding role to user: {str(e)}") raise HTTPException( status_code=500, detail=f"Failed to add role to user: {str(e)}" ) @router.delete("/users/{userId}/roles/{roleLabel}", response_model=Dict[str, Any]) @limiter.limit("30/minute") def remove_user_role( request: Request, userId: str = Path(..., description="User ID"), roleLabel: str = Path(..., description="Role label to remove"), context: RequestContext = Depends(getRequestContext) ) -> Dict[str, Any]: """ Remove a role from a user. Context-aware: SysAdmin can remove any role. MandateAdmin can remove roles from users in own mandates only. Cannot remove sysadmin role. Path Parameters: - userId: User ID - roleLabel: Role label to remove Returns: - Updated user dictionary with role assignments """ isSysAdmin = context.hasSysAdminRole adminMandateIds = [] if isSysAdmin else _getAdminMandateIds(context) if not isSysAdmin and not adminMandateIds: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin role required") currentUser = context.user # backward compat for existing code try: interface = getRootInterface() # Get user user = interface.getUser(userId) if not user: raise HTTPException( status_code=404, detail=f"User {userId} not found" ) # MandateAdmin restrictions if not isSysAdmin: if roleLabel == "sysadmin": raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Cannot remove sysadmin role") # Get role by label role = interface.getRoleByLabel(roleLabel) if not role: raise HTTPException( status_code=404, detail=f"Role '{roleLabel}' not found" ) # Remove role from user's mandates userMandates = interface.getUserMandates(userId) # MandateAdmin: check user's mandates overlap with admin's mandates if not isSysAdmin: userMandateMandateIds = {str(um.mandateId) for um in userMandates} if not userMandateMandateIds.intersection(set(adminMandateIds)): raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Access denied to this user") roleRemoved = False for um in userMandates: userMandateId = str(um.id) # MandateAdmin: only remove from mandates they admin if not isSysAdmin and str(um.mandateId) not in adminMandateIds: continue # Remove role via interface method if interface.removeRoleFromUserMandate(userMandateId, str(role.id)): roleRemoved = True if roleRemoved: logger.info(f"Removed role {roleLabel} from user {userId} by admin {currentUser.id}") userRoleLabels = _getUserRoleLabels(interface, userId) return { "id": user.id, "username": user.username, "email": user.email, "fullName": user.fullName, "isSysAdmin": user.isSysAdmin, "enabled": user.enabled, "roleLabels": userRoleLabels, "roleCount": len(userRoleLabels) } except HTTPException: raise except Exception as e: logger.error(f"Error removing role from user: {str(e)}") raise HTTPException( status_code=500, detail=f"Failed to remove role from user: {str(e)}" ) @router.get("/roles/{roleLabel}/users", response_model=List[Dict[str, Any]]) @limiter.limit("60/minute") def get_users_with_role( request: Request, roleLabel: str = Path(..., description="Role label"), mandateId: Optional[str] = Query(None, description="Filter by mandate ID (via UserMandate)"), context: RequestContext = Depends(getRequestContext) ) -> List[Dict[str, Any]]: """ Get all users with a specific role. Context-aware: SysAdmin sees all. MandateAdmin sees users from own mandates only. Path Parameters: - roleLabel: Role label Query Parameters: - mandateId: Optional filter by mandate ID (via UserMandate table) Returns: - List of users with the specified role """ isSysAdmin = context.hasSysAdminRole adminMandateIds = [] if isSysAdmin else _getAdminMandateIds(context) if not isSysAdmin and not adminMandateIds: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin role required") currentUser = context.user # backward compat for existing code try: interface = getRootInterface() # Get role by label role = interface.getRoleByLabel(roleLabel) if not role: raise HTTPException( status_code=404, detail=f"Role '{roleLabel}' not found" ) # Get all UserMandateRole assignments for this role (Pydantic models) roleAssignments = interface.getUserMandateRolesByRole(str(role.id)) # Get unique userMandateIds userMandateIds = {str(ra.userMandateId) for ra in roleAssignments} # Get userIds from UserMandate records userIds: Set[str] = set() for userMandateId in userMandateIds: um = interface.getUserMandateById(userMandateId) if um: # Filter by mandate if specified if mandateId and str(um.mandateId) != mandateId: continue # MandateAdmin: filter to own mandates if not isSysAdmin and str(um.mandateId) not in adminMandateIds: continue userIds.add(str(um.userId)) # Get users and format response result = [] for userId in userIds: user = interface.getUser(userId) if user: userRoleLabels = _getUserRoleLabels(interface, userId) result.append({ "id": user.id, "username": user.username, "email": user.email, "fullName": user.fullName, "isSysAdmin": user.isSysAdmin, "enabled": user.enabled, "roleLabels": userRoleLabels, "roleCount": len(userRoleLabels) }) return result except HTTPException: raise except Exception as e: logger.error(f"Error getting users with role: {str(e)}") raise HTTPException( status_code=500, detail=f"Failed to get users with role: {str(e)}" )