gateway/modules/routes/routeRbac.py
2026-01-17 02:17:58 +01:00

885 lines
30 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
RBAC routes for the backend API.
Implements endpoints for role-based access control permissions.
MULTI-TENANT:
- Permission queries use RequestContext (mandateId from header)
- AccessRule management is SysAdmin-only (system resources)
- Role management is SysAdmin-only (system resources)
"""
from fastapi import APIRouter, HTTPException, Depends, Query, Body, Path, Request
from typing import Optional, List, Dict, Any
import logging
import json
import math
from modules.auth import limiter, getRequestContext, requireSysAdmin, RequestContext
from modules.datamodels.datamodelUam import User, UserPermissions, AccessLevel
from modules.datamodels.datamodelRbac import AccessRuleContext, AccessRule, Role
from modules.datamodels.datamodelPagination import PaginationParams, PaginatedResponse, PaginationMetadata, normalize_pagination_dict
from modules.interfaces.interfaceDbAppObjects import getInterface, getRootInterface
# Configure logger
logger = logging.getLogger(__name__)
router = APIRouter(
prefix="/api/rbac",
tags=["RBAC"],
responses={404: {"description": "Not found"}}
)
@router.get("/permissions", response_model=UserPermissions)
@limiter.limit("300/minute") # Raised from 60 - sidebar checks many pages individually
async def getPermissions(
request: Request,
context: str = Query(..., description="Context type: DATA, UI, or RESOURCE"),
item: Optional[str] = Query(None, description="Item identifier (table name, UI path, or resource path)"),
reqContext: RequestContext = Depends(getRequestContext)
) -> UserPermissions:
"""
Get RBAC permissions for the current user for a specific context and item.
MULTI-TENANT: Uses RequestContext (mandateId/featureInstanceId from headers).
Query Parameters:
- context: Context type (DATA, UI, or RESOURCE)
- item: Optional item identifier. For DATA: table name (e.g., "UserInDB"),
For UI: cascading string (e.g., "playground.voice.settings"),
For RESOURCE: cascading string (e.g., "ai.model.anthropic")
Returns:
- UserPermissions object with view, read, create, update, delete permissions
Examples:
- GET /api/rbac/permissions?context=DATA&item=UserInDB
- GET /api/rbac/permissions?context=UI&item=playground.voice.settings
- GET /api/rbac/permissions?context=RESOURCE&item=ai.model.anthropic
"""
try:
# Validate context
try:
accessContext = AccessRuleContext(context.upper())
except ValueError:
raise HTTPException(
status_code=400,
detail=f"Invalid context '{context}'. Must be one of: DATA, UI, RESOURCE"
)
# Get interface and RBAC permissions
interface = getInterface(reqContext.user)
if not interface.rbac:
raise HTTPException(
status_code=500,
detail="RBAC interface not available"
)
# MULTI-TENANT: Get permissions using context (mandateId/featureInstanceId)
# For now, pass user - RBAC will be extended to use context in later phases
permissions = interface.rbac.getUserPermissions(
reqContext.user,
accessContext,
item or ""
)
return permissions
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting RBAC permissions: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Failed to get permissions: {str(e)}"
)
@router.get("/permissions/all", response_model=Dict[str, Any])
@limiter.limit("120/minute") # Raised from 30 - optimized endpoint for bulk permission fetch
async def getAllPermissions(
request: Request,
context: Optional[str] = Query(None, description="Context type: UI or RESOURCE (if not provided, returns both)"),
reqContext: RequestContext = Depends(getRequestContext)
) -> Dict[str, Any]:
"""
Get all RBAC permissions for the current user for UI and/or RESOURCE contexts.
MULTI-TENANT: Uses RequestContext (mandateId/featureInstanceId from headers).
This endpoint is optimized for UI initialization to avoid multiple API calls.
Query Parameters:
- context: Optional context filter. If "UI", returns only UI permissions.
If "RESOURCE", returns only RESOURCE permissions.
If not provided, returns both UI and RESOURCE permissions.
Returns:
- Dictionary with structure:
{
"ui": {
"item1": UserPermissions,
"item2": UserPermissions,
...
},
"resource": {
"item1": UserPermissions,
"item2": UserPermissions,
...
}
}
If context is specified, only that context is returned.
Example:
- GET /api/rbac/permissions/all
- GET /api/rbac/permissions/all?context=UI
- GET /api/rbac/permissions/all?context=RESOURCE
"""
try:
# Get interface and RBAC permissions
interface = getInterface(reqContext.user)
if not interface.rbac:
raise HTTPException(
status_code=500,
detail="RBAC interface not available"
)
# Determine which contexts to fetch
contextsToFetch = []
if context:
try:
accessContext = AccessRuleContext(context.upper())
if accessContext in [AccessRuleContext.UI, AccessRuleContext.RESOURCE]:
contextsToFetch = [accessContext]
else:
raise HTTPException(
status_code=400,
detail=f"Context '{context}' must be UI or RESOURCE for this endpoint"
)
except ValueError:
raise HTTPException(
status_code=400,
detail=f"Invalid context '{context}'. Must be UI or RESOURCE"
)
else:
# Return both UI and RESOURCE if no context specified
contextsToFetch = [AccessRuleContext.UI, AccessRuleContext.RESOURCE]
result: Dict[str, Any] = {}
# MULTI-TENANT: Get role IDs from context (computed from mandateId/featureInstanceId)
roleIds = reqContext.roleIds or []
if not roleIds and not reqContext.isSysAdmin:
# User has no roles, return empty permissions
for ctx in contextsToFetch:
result[ctx.value.lower()] = {}
return result
# Get all access rules for user's roles and requested contexts
allRules: Dict[AccessRuleContext, List[AccessRule]] = {}
for ctx in contextsToFetch:
allRules[ctx] = []
# Get all rules for user's roles in this context
for roleId in roleIds:
rules = interface.getAccessRules(
roleId=str(roleId),
context=ctx,
pagination=None
)
allRules[ctx].extend(rules)
# Build result: for each context, collect all unique items and calculate permissions
for ctx in contextsToFetch:
result[ctx.value.lower()] = {}
# Collect all unique items from rules
items = set()
for rule in allRules[ctx]:
if rule.item:
items.add(rule.item)
# For each item, calculate user permissions
for item in sorted(items):
permissions = interface.rbac.getUserPermissions(reqContext.user, ctx, item)
# Only include if user has view permission
if permissions.view:
result[ctx.value.lower()][item] = {
"view": permissions.view,
"read": permissions.read.value if permissions.read else None,
"create": permissions.create.value if permissions.create else None,
"update": permissions.update.value if permissions.update else None,
"delete": permissions.delete.value if permissions.delete else None
}
return result
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting all RBAC permissions: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Failed to get all permissions: {str(e)}"
)
@router.get("/rules", response_model=PaginatedResponse)
@limiter.limit("30/minute")
async def getAccessRules(
request: Request,
roleLabel: Optional[str] = Query(None, description="Filter by role label"),
context: Optional[str] = Query(None, description="Filter by context (DATA, UI, RESOURCE)"),
item: Optional[str] = Query(None, description="Filter by item identifier"),
pagination: Optional[str] = Query(None, description="JSON-encoded PaginationParams object"),
reqContext: RequestContext = Depends(requireSysAdmin)
) -> PaginatedResponse:
"""
Get access rules with optional filters.
MULTI-TENANT: SysAdmin-only (AccessRules are system resources).
Query Parameters:
- roleLabel: Optional role label filter
- context: Optional context filter (DATA, UI, RESOURCE)
- item: Optional item filter
Returns:
- List of AccessRule objects
"""
try:
# Get interface - SysAdmin uses root interface
interface = getRootInterface()
# Parse context if provided
accessContext = None
if context:
try:
accessContext = AccessRuleContext(context.upper())
except ValueError:
raise HTTPException(
status_code=400,
detail=f"Invalid context '{context}'. Must be one of: DATA, UI, RESOURCE"
)
# Parse pagination parameter
paginationParams = None
if pagination:
try:
paginationDict = json.loads(pagination)
if paginationDict:
paginationDict = normalize_pagination_dict(paginationDict)
paginationParams = PaginationParams(**paginationDict)
except (json.JSONDecodeError, ValueError) as e:
raise HTTPException(
status_code=400,
detail=f"Invalid pagination parameter: {str(e)}"
)
# Get rules with optional pagination
result = interface.getAccessRules(
roleLabel=roleLabel,
context=accessContext,
item=item,
pagination=paginationParams
)
# If pagination was requested, result is PaginatedResult
# If no pagination, result is List[AccessRule]
if paginationParams:
return PaginatedResponse(
items=[rule.model_dump() for rule in result.items],
pagination=PaginationMetadata(
currentPage=paginationParams.page,
pageSize=paginationParams.pageSize,
totalItems=result.totalItems,
totalPages=result.totalPages,
sort=paginationParams.sort,
filters=paginationParams.filters
)
)
else:
return PaginatedResponse(
items=[rule.model_dump() for rule in result],
pagination=None
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting access rules: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Failed to get access rules: {str(e)}"
)
@router.get("/rules/{ruleId}", response_model=dict)
@limiter.limit("30/minute")
async def getAccessRule(
request: Request,
ruleId: str = Path(..., description="Access rule ID"),
reqContext: RequestContext = Depends(requireSysAdmin)
) -> dict:
"""
Get a specific access rule by ID.
MULTI-TENANT: SysAdmin-only.
Path Parameters:
- ruleId: Access rule ID
Returns:
- AccessRule object
"""
try:
# Get interface - SysAdmin uses root interface
interface = getRootInterface()
# Get rule
rule = interface.getAccessRule(ruleId)
if not rule:
raise HTTPException(
status_code=404,
detail=f"Access rule {ruleId} not found"
)
# Convert to dict for JSON serialization
return rule.model_dump()
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting access rule {ruleId}: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Failed to get access rule: {str(e)}"
)
@router.post("/rules", response_model=dict)
@limiter.limit("30/minute")
async def createAccessRule(
request: Request,
accessRuleData: dict = Body(..., description="Access rule data"),
reqContext: RequestContext = Depends(requireSysAdmin)
) -> dict:
"""
Create a new access rule.
MULTI-TENANT: SysAdmin-only.
Request Body:
- AccessRule object data (roleLabel, context, item, view, read, create, update, delete)
Returns:
- Created AccessRule object
"""
try:
# Get interface - SysAdmin uses root interface
interface = getRootInterface()
# Validate and parse access rule data
try:
logger.debug(f"Creating access rule with data: {accessRuleData}")
# Parse context if provided as string
if "context" in accessRuleData and isinstance(accessRuleData["context"], str):
accessRuleData["context"] = AccessRuleContext(accessRuleData["context"].upper())
# Parse AccessLevel fields if provided as strings
# Handle empty strings by converting to None
for field in ["read", "create", "update", "delete"]:
if field in accessRuleData:
value = accessRuleData[field]
if value == "" or value is None:
accessRuleData[field] = None
elif isinstance(value, str):
accessRuleData[field] = AccessLevel(value)
# Create AccessRule object
accessRule = AccessRule(**accessRuleData)
except ValueError as e:
logger.error(f"Invalid access rule data: {accessRuleData} - Error: {str(e)}")
raise HTTPException(
status_code=400,
detail=f"Invalid access rule data: {str(e)}"
)
# Create rule
createdRule = interface.createAccessRule(accessRule)
logger.info(f"Created access rule {createdRule.id} by SysAdmin {reqContext.user.id}")
# Convert to dict for JSON serialization
return createdRule.model_dump()
except HTTPException:
raise
except Exception as e:
logger.error(f"Error creating access rule: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Failed to create access rule: {str(e)}"
)
@router.put("/rules/{ruleId}", response_model=dict)
@limiter.limit("30/minute")
async def updateAccessRule(
request: Request,
ruleId: str = Path(..., description="Access rule ID"),
accessRuleData: dict = Body(..., description="Updated access rule data"),
reqContext: RequestContext = Depends(requireSysAdmin)
) -> dict:
"""
Update an existing access rule.
MULTI-TENANT: SysAdmin-only.
Path Parameters:
- ruleId: Access rule ID
Request Body:
- AccessRule object data (roleLabel, context, item, view, read, create, update, delete)
Returns:
- Updated AccessRule object
"""
try:
# Get interface - SysAdmin uses root interface
interface = getRootInterface()
# Get existing rule to ensure it exists
existingRule = interface.getAccessRule(ruleId)
if not existingRule:
raise HTTPException(
status_code=404,
detail=f"Access rule {ruleId} not found"
)
# Validate and parse access rule data
try:
# Merge with existing rule data
updateData = existingRule.model_dump()
updateData.update(accessRuleData)
# Parse context if provided as string
if "context" in updateData and isinstance(updateData["context"], str):
updateData["context"] = AccessRuleContext(updateData["context"].upper())
# Parse AccessLevel fields if provided as strings
# Handle empty strings by converting to None
for field in ["read", "create", "update", "delete"]:
if field in updateData:
value = updateData[field]
if value == "" or value is None:
updateData[field] = None
elif isinstance(value, str):
updateData[field] = AccessLevel(value)
# Ensure ID is set correctly
updateData["id"] = ruleId
# Create AccessRule object
accessRule = AccessRule(**updateData)
except ValueError as e:
logger.error(f"Invalid access rule update data: {updateData} - Error: {str(e)}")
raise HTTPException(
status_code=400,
detail=f"Invalid access rule data: {str(e)}"
)
# Update rule
updatedRule = interface.updateAccessRule(ruleId, accessRule)
logger.info(f"Updated access rule {ruleId} by SysAdmin {reqContext.user.id}")
# Convert to dict for JSON serialization
return updatedRule.model_dump()
except HTTPException:
raise
except Exception as e:
logger.error(f"Error updating access rule {ruleId}: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Failed to update access rule: {str(e)}"
)
@router.delete("/rules/{ruleId}")
@limiter.limit("30/minute")
async def deleteAccessRule(
request: Request,
ruleId: str = Path(..., description="Access rule ID"),
reqContext: RequestContext = Depends(requireSysAdmin)
) -> dict:
"""
Delete an access rule.
MULTI-TENANT: SysAdmin-only.
Path Parameters:
- ruleId: Access rule ID
Returns:
- Success message
"""
try:
# Get interface - SysAdmin uses root interface
interface = getRootInterface()
# Get existing rule to ensure it exists
existingRule = interface.getAccessRule(ruleId)
if not existingRule:
raise HTTPException(
status_code=404,
detail=f"Access rule {ruleId} not found"
)
# Delete rule
success = interface.deleteAccessRule(ruleId)
if not success:
raise HTTPException(
status_code=500,
detail=f"Failed to delete access rule {ruleId}"
)
logger.info(f"Deleted access rule {ruleId} by SysAdmin {reqContext.user.id}")
return {"success": True, "message": f"Access rule {ruleId} deleted successfully"}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error deleting access rule {ruleId}: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Failed to delete access rule: {str(e)}"
)
# ============================================================================
# Role Management Endpoints
# MULTI-TENANT: All role management is SysAdmin-only (roles are system resources)
# ============================================================================
@router.get("/roles", response_model=PaginatedResponse)
@limiter.limit("60/minute")
async def listRoles(
request: Request,
pagination: Optional[str] = Query(None, description="JSON-encoded PaginationParams object"),
reqContext: RequestContext = Depends(requireSysAdmin)
) -> PaginatedResponse:
"""
Get list of all available roles with metadata.
MULTI-TENANT: SysAdmin-only (roles are system resources).
Returns:
- List of role dictionaries with role label, description, and user count
"""
try:
interface = getRootInterface()
# Parse pagination parameter
paginationParams = None
if pagination:
try:
paginationDict = json.loads(pagination)
if paginationDict:
paginationDict = normalize_pagination_dict(paginationDict)
paginationParams = PaginationParams(**paginationDict)
except (json.JSONDecodeError, ValueError) as e:
raise HTTPException(
status_code=400,
detail=f"Invalid pagination parameter: {str(e)}"
)
# Get all roles from database
dbRoles = interface.getAllRoles(pagination=None)
# Count role assignments from UserMandateRole table
roleCounts = interface.countRoleAssignments()
# Convert Role objects to dictionaries and add user counts
result = []
for role in dbRoles:
result.append({
"id": role.id,
"roleLabel": role.roleLabel,
"description": role.description,
"userCount": roleCounts.get(str(role.id), 0),
"isSystemRole": role.isSystemRole
})
# Apply filtering and sorting if pagination requested
if paginationParams:
# Apply filtering (if filters provided)
if paginationParams.filters:
# Use the interface's filter method
filteredResult = interface._applyFilters(result, paginationParams.filters)
else:
filteredResult = result
# Apply sorting (in order of sortFields)
if paginationParams.sort:
sortedResult = interface._applySorting(filteredResult, paginationParams.sort)
else:
sortedResult = filteredResult
# Apply pagination
totalItems = len(sortedResult)
totalPages = math.ceil(totalItems / paginationParams.pageSize) if totalItems > 0 else 0
startIdx = (paginationParams.page - 1) * paginationParams.pageSize
endIdx = startIdx + paginationParams.pageSize
paginatedResult = sortedResult[startIdx:endIdx]
return PaginatedResponse(
items=paginatedResult,
pagination=PaginationMetadata(
currentPage=paginationParams.page,
pageSize=paginationParams.pageSize,
totalItems=totalItems,
totalPages=totalPages,
sort=paginationParams.sort,
filters=paginationParams.filters
)
)
else:
# No pagination - return all roles
return PaginatedResponse(
items=result,
pagination=None
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error listing roles: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Failed to list roles: {str(e)}"
)
@router.get("/roles/options", response_model=List[Dict[str, Any]])
@limiter.limit("60/minute")
async def getRoleOptions(
request: Request,
reqContext: RequestContext = Depends(requireSysAdmin)
) -> List[Dict[str, Any]]:
"""
Get role options for select dropdowns.
MULTI-TENANT: SysAdmin-only.
Returns:
- List of role option dictionaries with value and label
"""
try:
interface = getRootInterface()
# Get all roles from database
dbRoles = interface.getAllRoles()
# Convert to options format
options = []
for role in dbRoles:
# Use English description as label, fallback to roleLabel
label = role.description.get("en", role.roleLabel) if isinstance(role.description, dict) else role.roleLabel
options.append({
"value": role.roleLabel,
"label": label
})
return options
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting role options: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Failed to get role options: {str(e)}"
)
@router.post("/roles", response_model=Dict[str, Any])
@limiter.limit("30/minute")
async def createRole(
request: Request,
role: Role = Body(...),
reqContext: RequestContext = Depends(requireSysAdmin)
) -> Dict[str, Any]:
"""
Create a new role.
MULTI-TENANT: SysAdmin-only.
Request Body:
- role: Role object to create
Returns:
- Created role dictionary
"""
try:
interface = getRootInterface()
createdRole = interface.createRole(role)
logger.info(f"Created role {createdRole.roleLabel} by SysAdmin {reqContext.user.id}")
return {
"id": createdRole.id,
"roleLabel": createdRole.roleLabel,
"description": createdRole.description,
"isSystemRole": createdRole.isSystemRole
}
except HTTPException:
raise
except ValueError as e:
raise HTTPException(
status_code=400,
detail=str(e)
)
except Exception as e:
logger.error(f"Error creating role: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Failed to create role: {str(e)}"
)
@router.get("/roles/{roleId}", response_model=Dict[str, Any])
@limiter.limit("60/minute")
async def getRole(
request: Request,
roleId: str = Path(..., description="Role ID"),
reqContext: RequestContext = Depends(requireSysAdmin)
) -> Dict[str, Any]:
"""
Get a role by ID.
MULTI-TENANT: SysAdmin-only.
Path Parameters:
- roleId: Role ID
Returns:
- Role dictionary
"""
try:
interface = getRootInterface()
role = interface.getRole(roleId)
if not role:
raise HTTPException(
status_code=404,
detail=f"Role {roleId} not found"
)
return {
"id": role.id,
"roleLabel": role.roleLabel,
"description": role.description,
"isSystemRole": role.isSystemRole
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting role: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Failed to get role: {str(e)}"
)
@router.put("/roles/{roleId}", response_model=Dict[str, Any])
@limiter.limit("30/minute")
async def updateRole(
request: Request,
roleId: str = Path(..., description="Role ID"),
role: Role = Body(...),
reqContext: RequestContext = Depends(requireSysAdmin)
) -> Dict[str, Any]:
"""
Update an existing role.
MULTI-TENANT: SysAdmin-only.
Path Parameters:
- roleId: Role ID
Request Body:
- role: Updated Role object
Returns:
- Updated role dictionary
"""
try:
interface = getRootInterface()
updatedRole = interface.updateRole(roleId, role)
logger.info(f"Updated role {roleId} by SysAdmin {reqContext.user.id}")
return {
"id": updatedRole.id,
"roleLabel": updatedRole.roleLabel,
"description": updatedRole.description,
"isSystemRole": updatedRole.isSystemRole
}
except HTTPException:
raise
except ValueError as e:
raise HTTPException(
status_code=400,
detail=str(e)
)
except Exception as e:
logger.error(f"Error updating role: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Failed to update role: {str(e)}"
)
@router.delete("/roles/{roleId}", response_model=Dict[str, str])
@limiter.limit("30/minute")
async def deleteRole(
request: Request,
roleId: str = Path(..., description="Role ID"),
reqContext: RequestContext = Depends(requireSysAdmin)
) -> Dict[str, str]:
"""
Delete a role.
MULTI-TENANT: SysAdmin-only.
Path Parameters:
- roleId: Role ID
Returns:
- Success message
"""
try:
interface = getRootInterface()
success = interface.deleteRole(roleId)
if not success:
raise HTTPException(
status_code=404,
detail=f"Role {roleId} not found"
)
logger.info(f"Deleted role {roleId} by SysAdmin {reqContext.user.id}")
return {"message": f"Role {roleId} deleted successfully"}
except HTTPException:
raise
except ValueError as e:
raise HTTPException(
status_code=400,
detail=str(e)
)
except Exception as e:
logger.error(f"Error deleting role: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Failed to delete role: {str(e)}"
)