gateway/modules/routes/routeRbac.py
2025-12-07 23:51:05 +01:00

781 lines
24 KiB
Python

"""
RBAC routes for the backend API.
Implements endpoints for role-based access control permissions.
"""
from fastapi import APIRouter, HTTPException, Depends, Query, Body, Path, Request
from typing import Optional, List, Dict, Any
import logging
from modules.security.auth import getCurrentUser, limiter
from modules.datamodels.datamodelUam import User, UserPermissions, AccessLevel
from modules.datamodels.datamodelRbac import AccessRuleContext, AccessRule, Role
from modules.interfaces.interfaceDbAppObjects import getInterface
# Configure logger
logger = logging.getLogger(__name__)
router = APIRouter(
prefix="/api/rbac",
tags=["RBAC"],
responses={404: {"description": "Not found"}}
)
@router.get("/permissions", response_model=UserPermissions)
@limiter.limit("60/minute")
async def getPermissions(
request: Request,
context: str = Query(..., description="Context type: DATA, UI, or RESOURCE"),
item: Optional[str] = Query(None, description="Item identifier (table name, UI path, or resource path)"),
currentUser: User = Depends(getCurrentUser)
) -> UserPermissions:
"""
Get RBAC permissions for the current user for a specific context and item.
Query Parameters:
- context: Context type (DATA, UI, or RESOURCE)
- item: Optional item identifier. For DATA: table name (e.g., "UserInDB"),
For UI: cascading string (e.g., "playground.voice.settings"),
For RESOURCE: cascading string (e.g., "ai.model.anthropic")
Returns:
- UserPermissions object with view, read, create, update, delete permissions
Examples:
- GET /api/rbac/permissions?context=DATA&item=UserInDB
- GET /api/rbac/permissions?context=UI&item=playground.voice.settings
- GET /api/rbac/permissions?context=RESOURCE&item=ai.model.anthropic
"""
try:
# Validate context
try:
accessContext = AccessRuleContext(context.upper())
except ValueError:
raise HTTPException(
status_code=400,
detail=f"Invalid context '{context}'. Must be one of: DATA, UI, RESOURCE"
)
# Get interface and RBAC permissions
interface = getInterface(currentUser)
if not interface.rbac:
raise HTTPException(
status_code=500,
detail="RBAC interface not available"
)
# Get permissions
permissions = interface.rbac.getUserPermissions(
currentUser,
accessContext,
item or ""
)
return permissions
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting RBAC permissions: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Failed to get permissions: {str(e)}"
)
@router.get("/rules", response_model=list)
@limiter.limit("30/minute")
async def getAccessRules(
request: Request,
roleLabel: Optional[str] = Query(None, description="Filter by role label"),
context: Optional[str] = Query(None, description="Filter by context (DATA, UI, RESOURCE)"),
item: Optional[str] = Query(None, description="Filter by item identifier"),
currentUser: User = Depends(getCurrentUser)
) -> list:
"""
Get access rules with optional filters.
Only returns rules that the current user has permission to view.
Query Parameters:
- roleLabel: Optional role label filter
- context: Optional context filter (DATA, UI, RESOURCE)
- item: Optional item filter
Returns:
- List of AccessRule objects
"""
try:
# Get interface
interface = getInterface(currentUser)
# Check if user has permission to view access rules
# For now, only sysadmin can view rules
if not interface.rbac:
raise HTTPException(
status_code=500,
detail="RBAC interface not available"
)
# Check permission - only sysadmin can view rules
permissions = interface.rbac.getUserPermissions(
currentUser,
AccessRuleContext.DATA,
"AccessRule"
)
if not permissions.view or permissions.read == AccessLevel.NONE:
raise HTTPException(
status_code=403,
detail="No permission to view access rules"
)
# Parse context if provided
accessContext = None
if context:
try:
accessContext = AccessRuleContext(context.upper())
except ValueError:
raise HTTPException(
status_code=400,
detail=f"Invalid context '{context}'. Must be one of: DATA, UI, RESOURCE"
)
# Get rules
rules = interface.getAccessRules(
roleLabel=roleLabel,
context=accessContext,
item=item
)
# Convert to dict for JSON serialization
return [rule.model_dump() for rule in rules]
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting access rules: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Failed to get access rules: {str(e)}"
)
@router.get("/rules/{ruleId}", response_model=dict)
@limiter.limit("30/minute")
async def getAccessRule(
request: Request,
ruleId: str = Path(..., description="Access rule ID"),
currentUser: User = Depends(getCurrentUser)
) -> dict:
"""
Get a specific access rule by ID.
Only returns rule if the current user has permission to view it.
Path Parameters:
- ruleId: Access rule ID
Returns:
- AccessRule object
"""
try:
# Get interface
interface = getInterface(currentUser)
# Check if user has permission to view access rules
if not interface.rbac:
raise HTTPException(
status_code=500,
detail="RBAC interface not available"
)
# Check permission - only sysadmin can view rules
permissions = interface.rbac.getUserPermissions(
currentUser,
AccessRuleContext.DATA,
"AccessRule"
)
if not permissions.view or permissions.read == AccessLevel.NONE:
raise HTTPException(
status_code=403,
detail="No permission to view access rules"
)
# Get rule
rule = interface.getAccessRule(ruleId)
if not rule:
raise HTTPException(
status_code=404,
detail=f"Access rule {ruleId} not found"
)
# Convert to dict for JSON serialization
return rule.model_dump()
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting access rule {ruleId}: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Failed to get access rule: {str(e)}"
)
@router.post("/rules", response_model=dict)
@limiter.limit("30/minute")
async def createAccessRule(
request: Request,
accessRuleData: dict = Body(..., description="Access rule data"),
currentUser: User = Depends(getCurrentUser)
) -> dict:
"""
Create a new access rule.
Only sysadmin can create access rules.
Request Body:
- AccessRule object data (roleLabel, context, item, view, read, create, update, delete)
Returns:
- Created AccessRule object
"""
try:
# Get interface
interface = getInterface(currentUser)
# Check if user has permission to create access rules
if not interface.rbac:
raise HTTPException(
status_code=500,
detail="RBAC interface not available"
)
# Check permission - only sysadmin can create rules
permissions = interface.rbac.getUserPermissions(
currentUser,
AccessRuleContext.DATA,
"AccessRule"
)
if not permissions.create or permissions.create == AccessLevel.NONE:
raise HTTPException(
status_code=403,
detail="No permission to create access rules"
)
# Validate and parse access rule data
try:
# Parse context if provided as string
if "context" in accessRuleData and isinstance(accessRuleData["context"], str):
accessRuleData["context"] = AccessRuleContext(accessRuleData["context"].upper())
# Parse AccessLevel fields if provided as strings
for field in ["read", "create", "update", "delete"]:
if field in accessRuleData and isinstance(accessRuleData[field], str):
accessRuleData[field] = AccessLevel(accessRuleData[field])
# Create AccessRule object
accessRule = AccessRule(**accessRuleData)
except ValueError as e:
raise HTTPException(
status_code=400,
detail=f"Invalid access rule data: {str(e)}"
)
# Create rule
createdRule = interface.createAccessRule(accessRule)
logger.info(f"Created access rule {createdRule.id} by user {currentUser.id}")
# Convert to dict for JSON serialization
return createdRule.model_dump()
except HTTPException:
raise
except Exception as e:
logger.error(f"Error creating access rule: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Failed to create access rule: {str(e)}"
)
@router.put("/rules/{ruleId}", response_model=dict)
@limiter.limit("30/minute")
async def updateAccessRule(
request: Request,
ruleId: str = Path(..., description="Access rule ID"),
accessRuleData: dict = Body(..., description="Updated access rule data"),
currentUser: User = Depends(getCurrentUser)
) -> dict:
"""
Update an existing access rule.
Only sysadmin can update access rules.
Path Parameters:
- ruleId: Access rule ID
Request Body:
- AccessRule object data (roleLabel, context, item, view, read, create, update, delete)
Returns:
- Updated AccessRule object
"""
try:
# Get interface
interface = getInterface(currentUser)
# Check if user has permission to update access rules
if not interface.rbac:
raise HTTPException(
status_code=500,
detail="RBAC interface not available"
)
# Check permission - only sysadmin can update rules
permissions = interface.rbac.getUserPermissions(
currentUser,
AccessRuleContext.DATA,
"AccessRule"
)
if not permissions.update or permissions.update == AccessLevel.NONE:
raise HTTPException(
status_code=403,
detail="No permission to update access rules"
)
# Get existing rule to ensure it exists
existingRule = interface.getAccessRule(ruleId)
if not existingRule:
raise HTTPException(
status_code=404,
detail=f"Access rule {ruleId} not found"
)
# Validate and parse access rule data
try:
# Merge with existing rule data
updateData = existingRule.model_dump()
updateData.update(accessRuleData)
# Parse context if provided as string
if "context" in updateData and isinstance(updateData["context"], str):
updateData["context"] = AccessRuleContext(updateData["context"].upper())
# Parse AccessLevel fields if provided as strings
for field in ["read", "create", "update", "delete"]:
if field in updateData and isinstance(updateData[field], str):
updateData[field] = AccessLevel(updateData[field])
# Ensure ID is set correctly
updateData["id"] = ruleId
# Create AccessRule object
accessRule = AccessRule(**updateData)
except ValueError as e:
raise HTTPException(
status_code=400,
detail=f"Invalid access rule data: {str(e)}"
)
# Update rule
updatedRule = interface.updateAccessRule(ruleId, accessRule)
logger.info(f"Updated access rule {ruleId} by user {currentUser.id}")
# Convert to dict for JSON serialization
return updatedRule.model_dump()
except HTTPException:
raise
except Exception as e:
logger.error(f"Error updating access rule {ruleId}: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Failed to update access rule: {str(e)}"
)
@router.delete("/rules/{ruleId}")
@limiter.limit("30/minute")
async def deleteAccessRule(
request: Request,
ruleId: str = Path(..., description="Access rule ID"),
currentUser: User = Depends(getCurrentUser)
) -> dict:
"""
Delete an access rule.
Only sysadmin can delete access rules.
Path Parameters:
- ruleId: Access rule ID
Returns:
- Success message
"""
try:
# Get interface
interface = getInterface(currentUser)
# Check if user has permission to delete access rules
if not interface.rbac:
raise HTTPException(
status_code=500,
detail="RBAC interface not available"
)
# Check permission - only sysadmin can delete rules
permissions = interface.rbac.getUserPermissions(
currentUser,
AccessRuleContext.DATA,
"AccessRule"
)
if not permissions.delete or permissions.delete == AccessLevel.NONE:
raise HTTPException(
status_code=403,
detail="No permission to delete access rules"
)
# Get existing rule to ensure it exists
existingRule = interface.getAccessRule(ruleId)
if not existingRule:
raise HTTPException(
status_code=404,
detail=f"Access rule {ruleId} not found"
)
# Delete rule
success = interface.deleteAccessRule(ruleId)
if not success:
raise HTTPException(
status_code=500,
detail=f"Failed to delete access rule {ruleId}"
)
logger.info(f"Deleted access rule {ruleId} by user {currentUser.id}")
return {"success": True, "message": f"Access rule {ruleId} deleted successfully"}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error deleting access rule {ruleId}: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Failed to delete access rule: {str(e)}"
)
# ============================================================================
# Role Management Endpoints
# ============================================================================
def _ensureAdminAccess(currentUser: User) -> None:
"""Ensure current user has admin access to RBAC roles management."""
interface = getInterface(currentUser)
# Check if user has admin or sysadmin role
roleLabels = currentUser.roleLabels or []
if "sysadmin" not in roleLabels and "admin" not in roleLabels:
raise HTTPException(
status_code=403,
detail="Admin or sysadmin role required to manage RBAC roles"
)
@router.get("/roles", response_model=List[Dict[str, Any]])
@limiter.limit("60/minute")
async def listRoles(
request: Request,
currentUser: User = Depends(getCurrentUser)
) -> List[Dict[str, Any]]:
"""
Get list of all available roles with metadata.
Returns:
- List of role dictionaries with role label, description, and user count
"""
try:
_ensureAdminAccess(currentUser)
interface = getInterface(currentUser)
# Get all roles from database
dbRoles = interface.getAllRoles()
# Get all users to count role assignments
# Since _ensureAdminAccess ensures user is sysadmin or admin,
# and getUsersByMandate returns all users for sysadmin regardless of mandateId,
# we can pass the current user's mandateId (for sysadmin it will be ignored by RBAC)
allUsers = interface.getUsersByMandate(currentUser.mandateId or "")
# Count users per role
roleCounts: Dict[str, int] = {}
for user in allUsers:
for roleLabel in (user.roleLabels or []):
roleCounts[roleLabel] = roleCounts.get(roleLabel, 0) + 1
# Convert Role objects to dictionaries and add user counts
result = []
for role in dbRoles:
result.append({
"id": role.id,
"roleLabel": role.roleLabel,
"description": role.description,
"userCount": roleCounts.get(role.roleLabel, 0),
"isSystemRole": role.isSystemRole
})
# Add any roles found in user assignments that don't exist in database
dbRoleLabels = {role.roleLabel for role in dbRoles}
for roleLabel, count in roleCounts.items():
if roleLabel not in dbRoleLabels:
result.append({
"id": None,
"roleLabel": roleLabel,
"description": {"en": f"Custom role: {roleLabel}", "fr": f"Rôle personnalisé : {roleLabel}"},
"userCount": count,
"isSystemRole": False
})
return result
except HTTPException:
raise
except Exception as e:
logger.error(f"Error listing roles: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Failed to list roles: {str(e)}"
)
@router.get("/roles/options", response_model=List[Dict[str, Any]])
@limiter.limit("60/minute")
async def getRoleOptions(
request: Request,
currentUser: User = Depends(getCurrentUser)
) -> List[Dict[str, Any]]:
"""
Get role options for select dropdowns.
Returns roles in format suitable for frontend select components.
Returns:
- List of role option dictionaries with value and label
"""
try:
_ensureAdminAccess(currentUser)
interface = getInterface(currentUser)
# Get all roles from database
dbRoles = interface.getAllRoles()
# Convert to options format
options = []
for role in dbRoles:
# Use English description as label, fallback to roleLabel
label = role.description.get("en", role.roleLabel) if isinstance(role.description, dict) else role.roleLabel
options.append({
"value": role.roleLabel,
"label": label
})
return options
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting role options: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Failed to get role options: {str(e)}"
)
@router.post("/roles", response_model=Dict[str, Any])
@limiter.limit("30/minute")
async def createRole(
request: Request,
role: Role = Body(...),
currentUser: User = Depends(getCurrentUser)
) -> Dict[str, Any]:
"""
Create a new role.
Request Body:
- role: Role object to create
Returns:
- Created role dictionary
"""
try:
_ensureAdminAccess(currentUser)
interface = getInterface(currentUser)
createdRole = interface.createRole(role)
return {
"id": createdRole.id,
"roleLabel": createdRole.roleLabel,
"description": createdRole.description,
"isSystemRole": createdRole.isSystemRole
}
except HTTPException:
raise
except ValueError as e:
raise HTTPException(
status_code=400,
detail=str(e)
)
except Exception as e:
logger.error(f"Error creating role: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Failed to create role: {str(e)}"
)
@router.get("/roles/{roleId}", response_model=Dict[str, Any])
@limiter.limit("60/minute")
async def getRole(
request: Request,
roleId: str = Path(..., description="Role ID"),
currentUser: User = Depends(getCurrentUser)
) -> Dict[str, Any]:
"""
Get a role by ID.
Path Parameters:
- roleId: Role ID
Returns:
- Role dictionary
"""
try:
_ensureAdminAccess(currentUser)
interface = getInterface(currentUser)
role = interface.getRole(roleId)
if not role:
raise HTTPException(
status_code=404,
detail=f"Role {roleId} not found"
)
return {
"id": role.id,
"roleLabel": role.roleLabel,
"description": role.description,
"isSystemRole": role.isSystemRole
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting role: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Failed to get role: {str(e)}"
)
@router.put("/roles/{roleId}", response_model=Dict[str, Any])
@limiter.limit("30/minute")
async def updateRole(
request: Request,
roleId: str = Path(..., description="Role ID"),
role: Role = Body(...),
currentUser: User = Depends(getCurrentUser)
) -> Dict[str, Any]:
"""
Update an existing role.
Path Parameters:
- roleId: Role ID
Request Body:
- role: Updated Role object
Returns:
- Updated role dictionary
"""
try:
_ensureAdminAccess(currentUser)
interface = getInterface(currentUser)
updatedRole = interface.updateRole(roleId, role)
return {
"id": updatedRole.id,
"roleLabel": updatedRole.roleLabel,
"description": updatedRole.description,
"isSystemRole": updatedRole.isSystemRole
}
except HTTPException:
raise
except ValueError as e:
raise HTTPException(
status_code=400,
detail=str(e)
)
except Exception as e:
logger.error(f"Error updating role: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Failed to update role: {str(e)}"
)
@router.delete("/roles/{roleId}", response_model=Dict[str, str])
@limiter.limit("30/minute")
async def deleteRole(
request: Request,
roleId: str = Path(..., description="Role ID"),
currentUser: User = Depends(getCurrentUser)
) -> Dict[str, str]:
"""
Delete a role.
Path Parameters:
- roleId: Role ID
Returns:
- Success message
"""
try:
_ensureAdminAccess(currentUser)
interface = getInterface(currentUser)
success = interface.deleteRole(roleId)
if not success:
raise HTTPException(
status_code=404,
detail=f"Role {roleId} not found"
)
return {"message": f"Role {roleId} deleted successfully"}
except HTTPException:
raise
except ValueError as e:
raise HTTPException(
status_code=400,
detail=str(e)
)
except Exception as e:
logger.error(f"Error deleting role: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Failed to delete role: {str(e)}"
)