gateway/modules/routes/routeAdminRbacRules.py
2026-04-12 14:04:49 +02:00

1614 lines
65 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
RBAC routes for the backend API.
Implements endpoints for role-based access control permissions.
MULTI-TENANT:
- Permission queries use RequestContext (mandateId from header)
- AccessRule management is Admin-aware (SysAdmin sees all, MandateAdmin sees own mandate's rules)
- Role management is Admin-aware (SysAdmin sees all, MandateAdmin sees own mandate's roles)
- Catalog stats and cleanup remain SysAdmin-only
"""
from fastapi import APIRouter, HTTPException, Depends, Query, Body, Path, Request
from typing import Optional, List, Dict, Any
import logging
import json
import math
from modules.auth import limiter, getRequestContext, requireSysAdminRole, RequestContext
from modules.datamodels.datamodelUam import User, UserPermissions, AccessLevel
from modules.datamodels.datamodelRbac import AccessRuleContext, AccessRule, Role
from modules.datamodels.datamodelMembership import UserMandate
from modules.datamodels.datamodelPagination import PaginationParams, PaginatedResponse, PaginationMetadata, normalize_pagination_dict
from modules.interfaces.interfaceDbApp import getInterface, getRootInterface
from modules.shared.i18nRegistry import apiRouteContext, t, resolveText
routeApiMsg = apiRouteContext("routeAdminRbacRules")
# Configure logger
logger = logging.getLogger(__name__)
router = APIRouter(
prefix="/api/rbac",
tags=["RBAC"],
responses={404: {"description": "Not found"}}
)
def _getAdminMandateIds(context: RequestContext) -> List[str]:
"""Get mandate IDs where the user has an admin role."""
mandateIds = []
try:
from modules.interfaces.interfaceDbApp import getRootInterface
rootInterface = getRootInterface()
userMandates = rootInterface.getUserMandates(str(context.user.id))
for um in userMandates:
if not getattr(um, 'enabled', True):
continue
umId = getattr(um, 'id', None)
mandateId = getattr(um, 'mandateId', None)
if not umId or not mandateId:
continue
roleIds = rootInterface.getRoleIdsForUserMandate(str(umId))
for roleId in roleIds:
role = rootInterface.getRole(roleId)
if role and role.roleLabel == "admin" and not role.featureInstanceId:
mandateIds.append(str(mandateId))
break
except Exception as e:
logger.error(f"Error getting admin mandate IDs: {e}")
return mandateIds
def _isRoleInAdminMandates(roleId: str, adminMandateIds: List[str]) -> bool:
"""Check if a role belongs to one of the admin's mandates."""
try:
from modules.interfaces.interfaceDbApp import getRootInterface
rootInterface = getRootInterface()
role = rootInterface.getRole(roleId)
if not role:
return False
return str(role.mandateId) in adminMandateIds if role.mandateId else False
except Exception:
return False
@router.get("/permissions", response_model=UserPermissions)
@limiter.limit("300/minute") # Raised from 60 - sidebar checks many pages individually
def get_permissions(
request: Request,
context: str = Query(..., description="Context type: DATA, UI, or RESOURCE"),
item: Optional[str] = Query(None, description="Item identifier (table name, UI path, or resource path)"),
reqContext: RequestContext = Depends(getRequestContext)
) -> UserPermissions:
"""
Get RBAC permissions for the current user for a specific context and item.
MULTI-TENANT: Uses RequestContext (mandateId/featureInstanceId from headers).
Query Parameters:
- context: Context type (DATA, UI, or RESOURCE)
- item: Optional item identifier. For DATA: table name (e.g., "UserInDB"),
For UI: cascading string (e.g., "playground.voice.settings"),
For RESOURCE: cascading string (e.g., "ai.model.anthropic")
Returns:
- UserPermissions object with view, read, create, update, delete permissions
Examples:
- GET /api/rbac/permissions?context=DATA&item=UserInDB
- GET /api/rbac/permissions?context=UI&item=playground.voice.settings
- GET /api/rbac/permissions?context=RESOURCE&item=ai.model.anthropic
"""
try:
# Validate context
try:
accessContext = AccessRuleContext(context.upper())
except ValueError:
raise HTTPException(
status_code=400,
detail=f"Invalid context '{context}'. Must be one of: DATA, UI, RESOURCE"
)
# Get interface and RBAC permissions
interface = getInterface(reqContext.user)
if not interface.rbac:
raise HTTPException(
status_code=500,
detail=routeApiMsg("RBAC interface not available")
)
# MULTI-TENANT: Get permissions using context (mandateId/featureInstanceId)
# For DATA context, resolve short model names to full objectKeys
# e.g., "ChatWorkflow" → "data.chat.ChatWorkflow"
resolvedItem = item or ""
if accessContext == AccessRuleContext.DATA and resolvedItem and "." not in resolvedItem:
from modules.interfaces.interfaceRbac import buildDataObjectKey
resolvedItem = buildDataObjectKey(resolvedItem)
# Pass mandateId and featureInstanceId to load Feature-Instance roles
permissions = interface.rbac.getUserPermissions(
reqContext.user,
accessContext,
resolvedItem,
mandateId=reqContext.mandateId,
featureInstanceId=reqContext.featureInstanceId
)
return permissions
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting RBAC permissions: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Failed to get permissions: {str(e)}"
)
@router.get("/permissions/all", response_model=Dict[str, Any])
@limiter.limit("120/minute") # Raised from 30 - optimized endpoint for bulk permission fetch
def get_all_permissions(
request: Request,
context: Optional[str] = Query(None, description="Context type: UI or RESOURCE (if not provided, returns both)"),
reqContext: RequestContext = Depends(getRequestContext)
) -> Dict[str, Any]:
"""
Get all RBAC permissions for the current user for UI and/or RESOURCE contexts.
MULTI-TENANT: Uses RequestContext (mandateId/featureInstanceId from headers).
This endpoint is optimized for UI initialization to avoid multiple API calls.
Query Parameters:
- context: Optional context filter. If "UI", returns only UI permissions.
If "RESOURCE", returns only RESOURCE permissions.
If not provided, returns both UI and RESOURCE permissions.
Returns:
- Dictionary with structure:
{
"ui": {
"item1": UserPermissions,
"item2": UserPermissions,
...
},
"resource": {
"item1": UserPermissions,
"item2": UserPermissions,
...
}
}
If context is specified, only that context is returned.
Example:
- GET /api/rbac/permissions/all
- GET /api/rbac/permissions/all?context=UI
- GET /api/rbac/permissions/all?context=RESOURCE
"""
try:
# Get interface and RBAC permissions
interface = getInterface(reqContext.user)
if not interface.rbac:
raise HTTPException(
status_code=500,
detail=routeApiMsg("RBAC interface not available")
)
# Determine which contexts to fetch
contextsToFetch = []
if context:
try:
accessContext = AccessRuleContext(context.upper())
if accessContext in [AccessRuleContext.UI, AccessRuleContext.RESOURCE]:
contextsToFetch = [accessContext]
else:
raise HTTPException(
status_code=400,
detail=f"Context '{context}' must be UI or RESOURCE for this endpoint"
)
except ValueError:
raise HTTPException(
status_code=400,
detail=f"Invalid context '{context}'. Must be UI or RESOURCE"
)
else:
# Return both UI and RESOURCE if no context specified
contextsToFetch = [AccessRuleContext.UI, AccessRuleContext.RESOURCE]
result: Dict[str, Any] = {}
# For UI/RESOURCE permissions: These are GLOBAL (not mandate-specific)
# System roles (admin, user, viewer) have global UI rules that apply without mandate context
rootInterface = getRootInterface()
# Start with roleIds from current mandate context (if any)
roleIds = list(reqContext.roleIds or [])
# For UI/RESOURCE: Load system roles the user has across ALL their mandates
# This allows users to access system UI elements without needing a specific mandate header
allUserMandates = rootInterface.getUserMandates(str(reqContext.user.id))
userMandates = [um for um in allUserMandates if um.enabled]
logger.debug(f"UI/RESOURCE permissions: Found {len(userMandates)} UserMandates for user {reqContext.user.id}")
# Collect all role IDs the user has across all mandates
for userMandate in userMandates:
mandateRoleIds = rootInterface.getRoleIdsForUserMandate(userMandate.id)
logger.debug(f"UI/RESOURCE permissions: UserMandate {userMandate.id} (mandate {userMandate.mandateId}) has {len(mandateRoleIds)} roles: {mandateRoleIds}")
for rid in mandateRoleIds:
if rid not in roleIds:
roleIds.append(rid)
logger.debug(f"UI/RESOURCE permissions: User has {len(roleIds)} roles across all mandates")
if not roleIds and not reqContext.hasSysAdminRole:
# No roles at all, return empty permissions
for ctx in contextsToFetch:
result[ctx.value.lower()] = {}
return result
# Get all access rules for user's roles and requested contexts
# IMPORTANT: Use direct DB access without RBAC filtering!
# Otherwise we have a chicken-and-egg problem: need AccessRule read permission to calculate permissions
allRules: Dict[AccessRuleContext, List[AccessRule]] = {}
for ctx in contextsToFetch:
allRules[ctx] = []
# Get all rules for user's roles - bypass RBAC filtering
for roleId in roleIds:
# Use interface method and filter by context
rules = rootInterface.getAccessRulesByRole(str(roleId))
for rule in rules:
if rule.context == ctx.value:
allRules[ctx].append(rule)
# Build result: for each context, collect all unique items and calculate permissions
for ctx in contextsToFetch:
result[ctx.value.lower()] = {}
# Check for global rule (item=None) first - this grants access to ALL UI/RESOURCE items
# Calculate permissions directly from loaded rules (don't call getUserPermissions - it requires mandateId)
hasGlobalRule = False
globalView = False
globalRead = None
globalCreate = None
globalUpdate = None
globalDelete = None
for rule in allRules[ctx]:
if rule.item is None:
hasGlobalRule = True
if rule.view:
globalView = True
if rule.read:
globalRead = rule.read.value if hasattr(rule.read, 'value') else rule.read
if rule.create:
globalCreate = rule.create.value if hasattr(rule.create, 'value') else rule.create
if rule.update:
globalUpdate = rule.update.value if hasattr(rule.update, 'value') else rule.update
if rule.delete:
globalDelete = rule.delete.value if hasattr(rule.delete, 'value') else rule.delete
# If there's a global rule with view permission, add "_global" key
if hasGlobalRule and globalView:
logger.debug(f"Adding _global key for context {ctx.value} with view={globalView}")
result[ctx.value.lower()]["_global"] = {
"view": globalView,
"read": globalRead,
"create": globalCreate,
"update": globalUpdate,
"delete": globalDelete
}
# Collect all unique items from rules (specific rules)
items = set()
for rule in allRules[ctx]:
if rule.item:
items.add(rule.item)
# For each item, calculate user permissions
# For UI/RESOURCE context: Calculate permissions directly from the collected rules
# (Don't use getUserPermissions with mandateId - that would limit to one mandate's roles)
for item in sorted(items):
# Find matching rules for this item from the already-collected rules
itemView = False
for rule in allRules[ctx]:
if rule.item == item and rule.view:
itemView = True
break
# Only include if user has view permission
if itemView:
result[ctx.value.lower()][item] = {
"view": True,
"read": None, # UI context doesn't use CRUD permissions
"create": None,
"update": None,
"delete": None
}
return result
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting all RBAC permissions: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Failed to get all permissions: {str(e)}"
)
@router.get("/rules", response_model=PaginatedResponse)
@limiter.limit("30/minute")
def get_access_rules(
request: Request,
roleLabel: Optional[str] = Query(None, description="Filter by role label"),
context: Optional[str] = Query(None, description="Filter by context (DATA, UI, RESOURCE)"),
item: Optional[str] = Query(None, description="Filter by item identifier"),
pagination: Optional[str] = Query(None, description="JSON-encoded PaginationParams object"),
reqContext: RequestContext = Depends(getRequestContext)
) -> PaginatedResponse:
"""
Get access rules with optional filters.
MULTI-TENANT: Admin-aware (SysAdmin sees all, MandateAdmin sees own mandate's rules).
Query Parameters:
- roleLabel: Optional role label filter
- context: Optional context filter (DATA, UI, RESOURCE)
- item: Optional item filter
Returns:
- List of AccessRule objects
"""
try:
isSysAdmin = reqContext.hasSysAdminRole
adminMandateIds = [] if isSysAdmin else _getAdminMandateIds(reqContext)
if not isSysAdmin and not adminMandateIds:
raise HTTPException(status_code=403, detail=routeApiMsg("Admin role required"))
# Get interface - uses root interface for admin access
interface = getRootInterface()
# Parse context if provided
accessContext = None
if context:
try:
accessContext = AccessRuleContext(context.upper())
except ValueError:
raise HTTPException(
status_code=400,
detail=f"Invalid context '{context}'. Must be one of: DATA, UI, RESOURCE"
)
# Parse pagination parameter
paginationParams = None
if pagination:
try:
paginationDict = json.loads(pagination)
if paginationDict:
paginationDict = normalize_pagination_dict(paginationDict)
paginationParams = PaginationParams(**paginationDict)
except (json.JSONDecodeError, ValueError) as e:
raise HTTPException(
status_code=400,
detail=f"Invalid pagination parameter: {str(e)}"
)
# Get rules with optional pagination
# MandateAdmin: fetch all then filter by admin's mandates.
# NOTE: Cannot use DB-level pagination for MandateAdmin because
# _isRoleInAdminMandates requires joining Role → mandateId which
# isn't expressible via getRecordsetPaginated's recordFilter.
if not isSysAdmin:
allRules = interface.getAccessRules(
roleLabel=roleLabel,
context=accessContext,
item=item,
pagination=None
)
filteredRules = [rule for rule in allRules if _isRoleInAdminMandates(str(rule.roleId), adminMandateIds)]
if paginationParams:
totalItems = len(filteredRules)
totalPages = math.ceil(totalItems / paginationParams.pageSize) if totalItems > 0 else 0
startIdx = (paginationParams.page - 1) * paginationParams.pageSize
endIdx = startIdx + paginationParams.pageSize
return PaginatedResponse(
items=[rule.model_dump() for rule in filteredRules[startIdx:endIdx]],
pagination=PaginationMetadata(
currentPage=paginationParams.page,
pageSize=paginationParams.pageSize,
totalItems=totalItems,
totalPages=totalPages,
sort=paginationParams.sort,
filters=paginationParams.filters
)
)
else:
return PaginatedResponse(
items=[rule.model_dump() for rule in filteredRules],
pagination=None
)
# SysAdmin: use server-side pagination
result = interface.getAccessRules(
roleLabel=roleLabel,
context=accessContext,
item=item,
pagination=paginationParams
)
# If pagination was requested, result is PaginatedResult
# If no pagination, result is List[AccessRule]
if paginationParams:
return PaginatedResponse(
items=[rule.model_dump() for rule in result.items],
pagination=PaginationMetadata(
currentPage=paginationParams.page,
pageSize=paginationParams.pageSize,
totalItems=result.totalItems,
totalPages=result.totalPages,
sort=paginationParams.sort,
filters=paginationParams.filters
)
)
else:
return PaginatedResponse(
items=[rule.model_dump() for rule in result],
pagination=None
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting access rules: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Failed to get access rules: {str(e)}"
)
@router.get("/rules/by-role/{roleId}", response_model=PaginatedResponse)
@limiter.limit("30/minute")
def get_access_rules_by_role(
request: Request,
roleId: str = Path(..., description="Role ID to get rules for"),
reqContext: RequestContext = Depends(getRequestContext)
) -> PaginatedResponse:
"""
Get all access rules for a specific role.
MULTI-TENANT: Admin-aware (SysAdmin sees all, MandateAdmin sees own mandate's roles).
Path Parameters:
- roleId: The role ID to get rules for
Returns:
- List of AccessRule objects for the specified role
"""
try:
isSysAdmin = reqContext.hasSysAdminRole
adminMandateIds = [] if isSysAdmin else _getAdminMandateIds(reqContext)
if not isSysAdmin and not adminMandateIds:
raise HTTPException(status_code=403, detail=routeApiMsg("Admin role required"))
# MandateAdmin: verify role belongs to their mandates
if not isSysAdmin and not _isRoleInAdminMandates(roleId, adminMandateIds):
raise HTTPException(status_code=403, detail=routeApiMsg("Access denied: role not in your mandates"))
interface = getRootInterface()
# Get rules from database using interface method
ruleObjects = interface.getAccessRulesByRole(roleId)
return PaginatedResponse(
items=[rule.model_dump() for rule in ruleObjects],
pagination=None
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting access rules for role {roleId}: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Failed to get access rules for role: {str(e)}"
)
@router.get("/rules/{ruleId}", response_model=dict)
@limiter.limit("30/minute")
def get_access_rule(
request: Request,
ruleId: str = Path(..., description="Access rule ID"),
reqContext: RequestContext = Depends(getRequestContext)
) -> dict:
"""
Get a specific access rule by ID.
MULTI-TENANT: Admin-aware (SysAdmin sees all, MandateAdmin sees own mandate's rules).
Path Parameters:
- ruleId: Access rule ID
Returns:
- AccessRule object
"""
try:
isSysAdmin = reqContext.hasSysAdminRole
adminMandateIds = [] if isSysAdmin else _getAdminMandateIds(reqContext)
if not isSysAdmin and not adminMandateIds:
raise HTTPException(status_code=403, detail=routeApiMsg("Admin role required"))
# Get interface - uses root interface for admin access
interface = getRootInterface()
# Get rule
rule = interface.getAccessRule(ruleId)
if not rule:
raise HTTPException(
status_code=404,
detail=f"Access rule {ruleId} not found"
)
# MandateAdmin: verify rule's role belongs to their mandates
if not isSysAdmin and not _isRoleInAdminMandates(str(rule.roleId), adminMandateIds):
raise HTTPException(status_code=403, detail=routeApiMsg("Access denied: rule's role not in your mandates"))
# Convert to dict for JSON serialization
return rule.model_dump()
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting access rule {ruleId}: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Failed to get access rule: {str(e)}"
)
@router.post("/rules", response_model=dict)
@limiter.limit("30/minute")
def create_access_rule(
request: Request,
accessRuleData: dict = Body(..., description="Access rule data"),
reqContext: RequestContext = Depends(getRequestContext)
) -> dict:
"""
Create a new access rule.
MULTI-TENANT: Admin-aware (SysAdmin sees all, MandateAdmin creates for own mandate's roles).
Request Body:
- AccessRule object data (roleLabel, context, item, view, read, create, update, delete)
Returns:
- Created AccessRule object
"""
try:
isSysAdmin = reqContext.hasSysAdminRole
adminMandateIds = [] if isSysAdmin else _getAdminMandateIds(reqContext)
if not isSysAdmin and not adminMandateIds:
raise HTTPException(status_code=403, detail=routeApiMsg("Admin role required"))
# Get interface - uses root interface for admin access
interface = getRootInterface()
# Validate and parse access rule data
try:
logger.debug(f"Creating access rule with data: {accessRuleData}")
# Parse context if provided as string
if "context" in accessRuleData and isinstance(accessRuleData["context"], str):
accessRuleData["context"] = AccessRuleContext(accessRuleData["context"].upper())
# Parse AccessLevel fields if provided as strings
# Handle empty strings by converting to None
for field in ["read", "create", "update", "delete"]:
if field in accessRuleData:
value = accessRuleData[field]
if value == "" or value is None:
accessRuleData[field] = None
elif isinstance(value, str):
accessRuleData[field] = AccessLevel(value)
# Create AccessRule object
accessRule = AccessRule(**accessRuleData)
except ValueError as e:
logger.error(f"Invalid access rule data: {accessRuleData} - Error: {str(e)}")
raise HTTPException(
status_code=400,
detail=f"Invalid access rule data: {str(e)}"
)
# MandateAdmin: verify the rule's role belongs to their mandates
if not isSysAdmin and accessRule.roleId:
if not _isRoleInAdminMandates(str(accessRule.roleId), adminMandateIds):
raise HTTPException(status_code=403, detail=routeApiMsg("Access denied: role not in your mandates"))
# Create rule
createdRule = interface.createAccessRule(accessRule)
logger.info(f"Created access rule {createdRule.id} by admin {reqContext.user.id}")
# Convert to dict for JSON serialization
return createdRule.model_dump()
except HTTPException:
raise
except Exception as e:
logger.error(f"Error creating access rule: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Failed to create access rule: {str(e)}"
)
@router.put("/rules/{ruleId}", response_model=dict)
@limiter.limit("30/minute")
def update_access_rule(
request: Request,
ruleId: str = Path(..., description="Access rule ID"),
accessRuleData: dict = Body(..., description="Updated access rule data"),
reqContext: RequestContext = Depends(getRequestContext)
) -> dict:
"""
Update an existing access rule.
MULTI-TENANT: Admin-aware (SysAdmin sees all, MandateAdmin updates own mandate's rules).
Path Parameters:
- ruleId: Access rule ID
Request Body:
- AccessRule object data (roleLabel, context, item, view, read, create, update, delete)
Returns:
- Updated AccessRule object
"""
try:
isSysAdmin = reqContext.hasSysAdminRole
adminMandateIds = [] if isSysAdmin else _getAdminMandateIds(reqContext)
if not isSysAdmin and not adminMandateIds:
raise HTTPException(status_code=403, detail=routeApiMsg("Admin role required"))
# Get interface - uses root interface for admin access
interface = getRootInterface()
# Get existing rule to ensure it exists
existingRule = interface.getAccessRule(ruleId)
if not existingRule:
raise HTTPException(
status_code=404,
detail=f"Access rule {ruleId} not found"
)
# MandateAdmin: verify existing rule's role belongs to their mandates
if not isSysAdmin and not _isRoleInAdminMandates(str(existingRule.roleId), adminMandateIds):
raise HTTPException(status_code=403, detail=routeApiMsg("Access denied: rule's role not in your mandates"))
# Validate and parse access rule data
try:
# Merge with existing rule data
updateData = existingRule.model_dump()
updateData.update(accessRuleData)
# Parse context if provided as string
if "context" in updateData and isinstance(updateData["context"], str):
updateData["context"] = AccessRuleContext(updateData["context"].upper())
# Parse AccessLevel fields if provided as strings
# Handle empty strings by converting to None
for field in ["read", "create", "update", "delete"]:
if field in updateData:
value = updateData[field]
if value == "" or value is None:
updateData[field] = None
elif isinstance(value, str):
updateData[field] = AccessLevel(value)
# Ensure ID is set correctly
updateData["id"] = ruleId
# Create AccessRule object
accessRule = AccessRule(**updateData)
except ValueError as e:
logger.error(f"Invalid access rule update data: {updateData} - Error: {str(e)}")
raise HTTPException(
status_code=400,
detail=f"Invalid access rule data: {str(e)}"
)
# Update rule
updatedRule = interface.updateAccessRule(ruleId, accessRule)
logger.info(f"Updated access rule {ruleId} by admin {reqContext.user.id}")
# Convert to dict for JSON serialization
return updatedRule.model_dump()
except HTTPException:
raise
except Exception as e:
logger.error(f"Error updating access rule {ruleId}: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Failed to update access rule: {str(e)}"
)
@router.delete("/rules/{ruleId}")
@limiter.limit("30/minute")
def delete_access_rule(
request: Request,
ruleId: str = Path(..., description="Access rule ID"),
reqContext: RequestContext = Depends(getRequestContext)
) -> dict:
"""
Delete an access rule.
MULTI-TENANT: Admin-aware (SysAdmin sees all, MandateAdmin deletes own mandate's rules).
Path Parameters:
- ruleId: Access rule ID
Returns:
- Success message
"""
try:
isSysAdmin = reqContext.hasSysAdminRole
adminMandateIds = [] if isSysAdmin else _getAdminMandateIds(reqContext)
if not isSysAdmin and not adminMandateIds:
raise HTTPException(status_code=403, detail=routeApiMsg("Admin role required"))
# Get interface - uses root interface for admin access
interface = getRootInterface()
# Get existing rule to ensure it exists
existingRule = interface.getAccessRule(ruleId)
if not existingRule:
raise HTTPException(
status_code=404,
detail=f"Access rule {ruleId} not found"
)
# MandateAdmin: verify rule's role belongs to their mandates
if not isSysAdmin and not _isRoleInAdminMandates(str(existingRule.roleId), adminMandateIds):
raise HTTPException(status_code=403, detail=routeApiMsg("Access denied: rule's role not in your mandates"))
# Delete rule
success = interface.deleteAccessRule(ruleId)
if not success:
raise HTTPException(
status_code=500,
detail=f"Failed to delete access rule {ruleId}"
)
logger.info(f"Deleted access rule {ruleId} by admin {reqContext.user.id}")
return {"success": True, "message": f"Access rule {ruleId} deleted successfully"}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error deleting access rule {ruleId}: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Failed to delete access rule: {str(e)}"
)
# ============================================================================
# Role Management Endpoints
# MULTI-TENANT: Admin-aware (SysAdmin sees all, MandateAdmin sees own mandate's roles)
# ============================================================================
@router.get("/roles", response_model=PaginatedResponse)
@limiter.limit("60/minute")
def list_roles(
request: Request,
pagination: Optional[str] = Query(None, description="JSON-encoded PaginationParams object"),
includeTemplates: bool = Query(False, description="Include feature template roles"),
mandateId: Optional[str] = Query(None, description="Include mandate-specific roles for this mandate"),
scopeFilter: Optional[str] = Query(None, description="Filter by scope: 'all', 'mandate', 'global', 'system'"),
reqContext: RequestContext = Depends(getRequestContext)
) -> PaginatedResponse:
"""
Get list of roles with metadata.
MULTI-TENANT: Admin-aware (SysAdmin sees all, MandateAdmin sees own mandate's roles).
By default, only returns true global roles (mandateId=None, featureInstanceId=None, featureCode=None).
Feature template roles are managed via /api/features/templates/roles.
NOTE: Base query (getAllRoles) already uses db.getRecordsetPaginated() internally.
However pagination=None is passed here because post-processing adds computed fields
(userCount, scopeType) and applies scope/mandate/template filtering that cannot run
at the DB level. In-memory pagination is applied after all transformations.
Args:
pagination: Optional pagination parameters (includes search, filters, sort)
includeTemplates: If True, also include feature template roles (featureCode != None)
mandateId: If provided, also include mandate-specific roles for this mandate
scopeFilter: Filter by scope type: 'all', 'mandate', 'global', 'system'
Returns:
- List of role dictionaries with role label, description, user count, and computed scopeType
"""
try:
isSysAdmin = reqContext.hasSysAdminRole
adminMandateIds = [] if isSysAdmin else _getAdminMandateIds(reqContext)
if not isSysAdmin and not adminMandateIds:
raise HTTPException(status_code=403, detail=routeApiMsg("Admin role required"))
interface = getRootInterface()
# Parse pagination parameter
paginationParams = None
if pagination:
try:
paginationDict = json.loads(pagination)
if paginationDict:
paginationDict = normalize_pagination_dict(paginationDict)
paginationParams = PaginationParams(**paginationDict)
except (json.JSONDecodeError, ValueError) as e:
raise HTTPException(
status_code=400,
detail=f"Invalid pagination parameter: {str(e)}"
)
# Get all roles from database
dbRoles = interface.getAllRoles(pagination=None)
# Count role assignments from UserMandateRole table
roleCounts = interface.countRoleAssignments()
# Helper function to compute scopeType
# Note: mandateId takes precedence — a role with mandateId is always "mandate" scope,
# even if isSystemRole=True (which just means it was copied from a system template)
def _computeScopeType(role) -> str:
if role.mandateId:
return "mandate"
if role.isSystemRole:
return "system"
return "global"
# Convert Role objects to dictionaries and add user counts
# Filter logic:
# - Always include global roles (mandateId=None, featureInstanceId=None)
# - If mandateId provided, also include roles for that specific mandate
# - Unless includeTemplates=True, exclude feature template roles (featureCode != None)
result = []
for role in dbRoles:
# Always exclude feature-instance level roles
if role.featureInstanceId is not None:
continue
# When mandateId requested: only mandate-scoped roles
# When no mandateId: only global roles (mandateId=None)
if mandateId:
if role.mandateId != mandateId:
continue
else:
if role.mandateId is not None:
continue
# Filter: Exclude feature template roles unless includeTemplates=True
if not includeTemplates and role.featureCode is not None:
continue
# Compute scopeType (system, global, mandate)
scopeType = _computeScopeType(role)
# Apply scopeFilter if provided
if scopeFilter and scopeFilter != 'all':
if scopeFilter == 'mandate' and scopeType != 'mandate':
continue
if scopeFilter == 'global' and scopeType not in ('global', 'system'):
continue
if scopeFilter == 'system' and scopeType != 'system':
continue
result.append({
"id": role.id,
"roleLabel": role.roleLabel,
"description": role.description.model_dump() if hasattr(role.description, 'model_dump') else role.description,
"mandateId": role.mandateId,
"featureInstanceId": role.featureInstanceId,
"featureCode": role.featureCode,
"userCount": roleCounts.get(str(role.id), 0),
"isSystemRole": role.isSystemRole,
"scopeType": scopeType
})
# MandateAdmin: filter to only roles in admin's mandates
if not isSysAdmin:
result = [r for r in result if r.get("mandateId") and str(r["mandateId"]) in adminMandateIds]
# Apply search, filtering and sorting if pagination requested
if paginationParams:
# Apply search (if search term provided in filters)
searchTerm = paginationParams.filters.get("search", "").lower() if paginationParams.filters else ""
if searchTerm:
searchedResult = []
for item in result:
roleLabel = (item.get("roleLabel") or "").lower()
descText = (item.get("description") or "").lower()
scopeType = (item.get("scopeType") or "").lower()
if searchTerm in roleLabel or searchTerm in descText or searchTerm in scopeType:
searchedResult.append(item)
result = searchedResult
# Apply filtering (if filters provided)
if paginationParams.filters:
# Use the interface's filter method
filteredResult = interface._applyFilters(result, paginationParams.filters)
else:
filteredResult = result
# Apply sorting (in order of sortFields)
if paginationParams.sort:
sortedResult = interface._applySorting(filteredResult, paginationParams.sort)
else:
sortedResult = filteredResult
# Apply pagination
totalItems = len(sortedResult)
totalPages = math.ceil(totalItems / paginationParams.pageSize) if totalItems > 0 else 0
startIdx = (paginationParams.page - 1) * paginationParams.pageSize
endIdx = startIdx + paginationParams.pageSize
paginatedResult = sortedResult[startIdx:endIdx]
return PaginatedResponse(
items=paginatedResult,
pagination=PaginationMetadata(
currentPage=paginationParams.page,
pageSize=paginationParams.pageSize,
totalItems=totalItems,
totalPages=totalPages,
sort=paginationParams.sort,
filters=paginationParams.filters
)
)
else:
# No pagination - return all roles
return PaginatedResponse(
items=result,
pagination=None
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error listing roles: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Failed to list roles: {str(e)}"
)
@router.get("/roles/filter-values")
@limiter.limit("60/minute")
def get_roles_filter_values(
request: Request,
column: str = Query(..., description="Column key"),
pagination: Optional[str] = Query(None, description="JSON-encoded current filters"),
includeTemplates: bool = Query(False, description="Include feature template roles"),
mandateId: Optional[str] = Query(None, description="Include mandate-specific roles for this mandate"),
scopeFilter: Optional[str] = Query(None, description="Filter by scope: 'all', 'mandate', 'global', 'system'"),
reqContext: RequestContext = Depends(getRequestContext)
) -> list:
"""Return distinct filter values for a column in roles."""
try:
from modules.routes.routeDataUsers import _handleFilterValuesRequest
isSysAdmin = reqContext.hasSysAdminRole
adminMandateIds = [] if isSysAdmin else _getAdminMandateIds(reqContext)
if not isSysAdmin and not adminMandateIds:
raise HTTPException(status_code=403, detail=routeApiMsg("Admin role required"))
interface = getRootInterface()
dbRoles = interface.getAllRoles(pagination=None)
roleCounts = interface.countRoleAssignments()
def _computeScopeType(role) -> str:
if role.mandateId:
return "mandate"
if role.isSystemRole:
return "system"
return "global"
result = []
for role in dbRoles:
if role.featureInstanceId is not None:
continue
if mandateId:
if role.mandateId != mandateId:
continue
else:
if role.mandateId is not None:
continue
if not includeTemplates and role.featureCode is not None:
continue
scopeType = _computeScopeType(role)
if scopeFilter and scopeFilter != 'all':
if scopeFilter == 'mandate' and scopeType != 'mandate':
continue
if scopeFilter == 'global' and scopeType not in ('global', 'system'):
continue
if scopeFilter == 'system' and scopeType != 'system':
continue
result.append({
"id": role.id,
"roleLabel": role.roleLabel,
"description": role.description.model_dump() if hasattr(role.description, 'model_dump') else role.description,
"mandateId": role.mandateId,
"featureInstanceId": role.featureInstanceId,
"featureCode": role.featureCode,
"userCount": roleCounts.get(str(role.id), 0),
"isSystemRole": role.isSystemRole,
"scopeType": scopeType
})
if not isSysAdmin:
result = [r for r in result if r.get("mandateId") and str(r["mandateId"]) in adminMandateIds]
return _handleFilterValuesRequest(result, column, pagination)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting filter values for roles: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/roles", response_model=Dict[str, Any])
@limiter.limit("30/minute")
def create_role(
request: Request,
role: Role = Body(...),
reqContext: RequestContext = Depends(getRequestContext)
) -> Dict[str, Any]:
"""
Create a new role.
MULTI-TENANT: Admin-aware (SysAdmin sees all, MandateAdmin creates in own mandate).
Request Body:
- role: Role object to create
Returns:
- Created role dictionary
"""
try:
isSysAdmin = reqContext.hasSysAdminRole
adminMandateIds = [] if isSysAdmin else _getAdminMandateIds(reqContext)
if not isSysAdmin and not adminMandateIds:
raise HTTPException(status_code=403, detail=routeApiMsg("Admin role required"))
# MandateAdmin: can only create roles in their own mandates
if not isSysAdmin:
if not role.mandateId or str(role.mandateId) not in adminMandateIds:
raise HTTPException(status_code=403, detail=routeApiMsg("Access denied: can only create roles in your own mandates"))
interface = getRootInterface()
createdRole = interface.createRole(role)
logger.info(f"Created role {createdRole.roleLabel} by admin {reqContext.user.id}")
return {
"id": createdRole.id,
"roleLabel": createdRole.roleLabel,
"description": createdRole.description.model_dump() if hasattr(createdRole.description, 'model_dump') else createdRole.description,
"mandateId": createdRole.mandateId,
"featureInstanceId": createdRole.featureInstanceId,
"featureCode": createdRole.featureCode,
"isSystemRole": createdRole.isSystemRole
}
except HTTPException:
raise
except ValueError as e:
raise HTTPException(
status_code=400,
detail=str(e)
)
except Exception as e:
logger.error(f"Error creating role: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Failed to create role: {str(e)}"
)
@router.get("/roles/{roleId}", response_model=Dict[str, Any])
@limiter.limit("60/minute")
def get_role(
request: Request,
roleId: str = Path(..., description="Role ID"),
reqContext: RequestContext = Depends(getRequestContext)
) -> Dict[str, Any]:
"""
Get a role by ID.
MULTI-TENANT: Admin-aware (SysAdmin sees all, MandateAdmin sees own mandate's roles).
Path Parameters:
- roleId: Role ID
Returns:
- Role dictionary
"""
try:
isSysAdmin = reqContext.hasSysAdminRole
adminMandateIds = [] if isSysAdmin else _getAdminMandateIds(reqContext)
if not isSysAdmin and not adminMandateIds:
raise HTTPException(status_code=403, detail=routeApiMsg("Admin role required"))
interface = getRootInterface()
role = interface.getRole(roleId)
if not role:
raise HTTPException(
status_code=404,
detail=f"Role {roleId} not found"
)
# MandateAdmin: verify role belongs to their mandates
if not isSysAdmin:
if not role.mandateId or str(role.mandateId) not in adminMandateIds:
raise HTTPException(status_code=403, detail=routeApiMsg("Access denied: role not in your mandates"))
return {
"id": role.id,
"roleLabel": role.roleLabel,
"description": role.description.model_dump() if hasattr(role.description, 'model_dump') else role.description,
"mandateId": role.mandateId,
"featureInstanceId": role.featureInstanceId,
"featureCode": role.featureCode,
"isSystemRole": role.isSystemRole
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting role: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Failed to get role: {str(e)}"
)
@router.put("/roles/{roleId}", response_model=Dict[str, Any])
@limiter.limit("30/minute")
def update_role(
request: Request,
roleId: str = Path(..., description="Role ID"),
role: Role = Body(...),
reqContext: RequestContext = Depends(getRequestContext)
) -> Dict[str, Any]:
"""
Update an existing role.
MULTI-TENANT: Admin-aware (SysAdmin sees all, MandateAdmin updates own mandate's roles, not template/system).
Path Parameters:
- roleId: Role ID
Request Body:
- role: Updated Role object
Returns:
- Updated role dictionary
"""
try:
isSysAdmin = reqContext.hasSysAdminRole
adminMandateIds = [] if isSysAdmin else _getAdminMandateIds(reqContext)
if not isSysAdmin and not adminMandateIds:
raise HTTPException(status_code=403, detail=routeApiMsg("Admin role required"))
interface = getRootInterface()
# MandateAdmin: verify role belongs to their mandates and is not a template/system role
if not isSysAdmin:
existingRole = interface.getRole(roleId)
if not existingRole:
raise HTTPException(status_code=404, detail=f"Role {roleId} not found")
if existingRole.isSystemRole and not existingRole.mandateId:
raise HTTPException(status_code=403, detail=routeApiMsg("Access denied: cannot modify template/system roles"))
if not existingRole.mandateId or str(existingRole.mandateId) not in adminMandateIds:
raise HTTPException(status_code=403, detail=routeApiMsg("Access denied: role not in your mandates"))
updatedRole = interface.updateRole(roleId, role)
logger.info(f"Updated role {roleId} by admin {reqContext.user.id}")
return {
"id": updatedRole.id,
"roleLabel": updatedRole.roleLabel,
"description": updatedRole.description.model_dump() if hasattr(updatedRole.description, 'model_dump') else updatedRole.description,
"mandateId": updatedRole.mandateId,
"featureInstanceId": updatedRole.featureInstanceId,
"featureCode": updatedRole.featureCode,
"isSystemRole": updatedRole.isSystemRole
}
except HTTPException:
raise
except ValueError as e:
raise HTTPException(
status_code=400,
detail=str(e)
)
except Exception as e:
logger.error(f"Error updating role: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Failed to update role: {str(e)}"
)
@router.delete("/roles/{roleId}", response_model=Dict[str, str])
@limiter.limit("30/minute")
def delete_role(
request: Request,
roleId: str = Path(..., description="Role ID"),
reqContext: RequestContext = Depends(getRequestContext)
) -> Dict[str, str]:
"""
Delete a role.
MULTI-TENANT: Admin-aware (SysAdmin sees all, MandateAdmin deletes own mandate's roles, not template/system).
Path Parameters:
- roleId: Role ID
Returns:
- Success message
"""
try:
isSysAdmin = reqContext.hasSysAdminRole
adminMandateIds = [] if isSysAdmin else _getAdminMandateIds(reqContext)
if not isSysAdmin and not adminMandateIds:
raise HTTPException(status_code=403, detail=routeApiMsg("Admin role required"))
interface = getRootInterface()
# MandateAdmin: verify role belongs to their mandates and is not a template/system role
if not isSysAdmin:
existingRole = interface.getRole(roleId)
if not existingRole:
raise HTTPException(status_code=404, detail=f"Role {roleId} not found")
if existingRole.isSystemRole and not existingRole.mandateId:
raise HTTPException(status_code=403, detail=routeApiMsg("Access denied: cannot delete template/system roles"))
if not existingRole.mandateId or str(existingRole.mandateId) not in adminMandateIds:
raise HTTPException(status_code=403, detail=routeApiMsg("Access denied: role not in your mandates"))
success = interface.deleteRole(roleId)
if not success:
raise HTTPException(
status_code=404,
detail=f"Role {roleId} not found"
)
logger.info(f"Deleted role {roleId} by admin {reqContext.user.id}")
return {"message": f"Role {roleId} deleted successfully"}
except HTTPException:
raise
except ValueError as e:
raise HTTPException(
status_code=400,
detail=str(e)
)
except Exception as e:
logger.error(f"Error deleting role: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Failed to delete role: {str(e)}"
)
# ============================================================================
# RBAC Catalog Endpoints
# ============================================================================
@router.get("/catalog/objects", response_model=Dict[str, Any])
@limiter.limit("60/minute")
def getCatalogObjects(
request: Request,
context: Optional[str] = Query(None, description="Filter by context (DATA, UI, RESOURCE)"),
featureCode: Optional[str] = Query(None, description="Filter by feature code"),
mandateId: Optional[str] = Query(None, description="Filter by mandate's active features"),
reqContext: RequestContext = Depends(getRequestContext) # Available to all authenticated users
) -> Dict[str, Any]:
"""
Get available RBAC catalog objects.
Returns all registered DATA, UI and RESOURCE objects that can be used in AccessRules.
Query Parameters:
- context: Optional filter by context type (DATA, UI, RESOURCE)
- featureCode: Optional filter by feature (e.g., "trustee")
- mandateId: Optional filter to only include objects from features active in this mandate
Returns:
- Dictionary with objects grouped by context type, each with:
- objectKey: Dot-notation identifier (e.g., "data.feature.trustee.TrusteeContract")
- label: Multilingual label
- featureCode: Owning feature
- meta: Additional metadata (table name, fields, etc.)
Examples:
- GET /api/rbac/catalog/objects → all objects
- GET /api/rbac/catalog/objects?context=DATA → only DATA objects
- GET /api/rbac/catalog/objects?featureCode=trustee → only trustee objects
- GET /api/rbac/catalog/objects?mandateId=xxx → objects from mandate's active features
"""
try:
from modules.security.rbacCatalog import getCatalogService
catalog = getCatalogService()
# If mandateId is provided, get active features for that mandate
activeFeatures = None
if mandateId:
try:
interface = getRootInterface()
# Get all feature instances for this mandate using interface method
instances = interface.getFeatureInstancesByMandate(mandateId, enabledOnly=True)
activeFeatures = set(inst.featureCode for inst in instances)
# Always include "system" feature
activeFeatures.add("system")
except Exception as e:
logger.warning(f"Could not get active features for mandate {mandateId}: {e}")
def _resolveLabels(objects: list) -> list:
for obj in objects:
resolved = resolveText(obj.get("label"))
obj["label"] = resolved if resolved else f"[{obj.get('objectKey', '?')}]"
return objects
if context:
# Single context filter
try:
accessContext = AccessRuleContext(context.upper())
except ValueError:
raise HTTPException(
status_code=400,
detail=f"Invalid context '{context}'. Must be one of: DATA, UI, RESOURCE"
)
if accessContext == AccessRuleContext.DATA:
objects = catalog.getDataObjects(featureCode)
elif accessContext == AccessRuleContext.UI:
objects = catalog.getUiObjects(featureCode)
else:
objects = catalog.getResourceObjects(featureCode)
# Filter by active features if mandateId was provided
if activeFeatures:
objects = [obj for obj in objects if obj.get("featureCode") in activeFeatures]
return {context.upper(): _resolveLabels(objects)}
else:
# All contexts
result = catalog.getAllCatalogObjects(featureCode)
# Filter by active features if mandateId was provided
if activeFeatures:
for ctxKey in result:
result[ctxKey] = [obj for obj in result[ctxKey] if obj.get("featureCode") in activeFeatures]
for ctxKey in result:
_resolveLabels(result[ctxKey])
return result
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting catalog objects: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Failed to get catalog objects: {str(e)}"
)
@router.get("/catalog/stats", response_model=Dict[str, Any])
@limiter.limit("60/minute")
def getCatalogStats(
request: Request,
currentUser: User = Depends(requireSysAdminRole)
) -> Dict[str, Any]:
"""
Get statistics about the RBAC catalog.
Returns:
- Statistics about registered features, objects, and roles
"""
try:
from modules.security.rbacCatalog import getCatalogService
catalog = getCatalogService()
return catalog.getCatalogStats()
except Exception as e:
logger.error(f"Error getting catalog stats: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Failed to get catalog stats: {str(e)}"
)
# =============================================================================
# CLEANUP: Remove duplicate AccessRules
# =============================================================================
@router.post("/cleanup/duplicate-rules", response_model=dict)
@limiter.limit("5/minute")
def cleanup_duplicate_access_rules(
request: Request,
dryRun: bool = Query(True, description="If true, only report duplicates without deleting"),
currentUser: User = Depends(requireSysAdminRole)
) -> dict:
"""
Find and remove duplicate AccessRules.
Duplicates are rules with the same (roleId, context, item) signature.
Only the first rule (oldest) is kept, all others are deleted.
Query Parameters:
- dryRun: If true (default), only report what would be deleted. Set to false to actually delete.
Returns:
- Summary with counts and details of duplicates found/removed
"""
try:
rootInterface = getRootInterface()
# Get ALL AccessRules from DB
allRules = rootInterface.db.getRecordset(AccessRule)
# Group by signature (roleId, context, item)
rulesBySignature: Dict[tuple, list] = {}
for rule in allRules:
context = rule.get("context", "")
# Normalize context enum value
if hasattr(context, 'value'):
context = context.value
sig = (rule.get("roleId"), str(context), rule.get("item"))
if sig not in rulesBySignature:
rulesBySignature[sig] = []
rulesBySignature[sig].append(rule)
# Find duplicates and collect IDs to delete
duplicateGroups = []
idsToDelete = []
for sig, rules in rulesBySignature.items():
if len(rules) > 1:
# Sort by creation time (keep oldest)
rules.sort(key=lambda r: r.get("sysCreatedAt", 0))
keepRule = rules[0]
deleteRules = rules[1:]
duplicateGroups.append({
"roleId": sig[0],
"context": sig[1],
"item": sig[2] or "(global)",
"totalCount": len(rules),
"keepId": keepRule.get("id"),
"deleteCount": len(deleteRules),
"deleteIds": [r.get("id") for r in deleteRules]
})
idsToDelete.extend([r.get("id") for r in deleteRules])
# Perform deletion if not dry run
deletedCount = 0
if not dryRun and idsToDelete:
for ruleId in idsToDelete:
try:
rootInterface.db.recordDelete(AccessRule, ruleId)
deletedCount += 1
except Exception as e:
logger.warning(f"Failed to delete rule {ruleId}: {e}")
# =====================================================================
# Phase 2: Fix template role assignments
# UserMandateRole should reference mandate-instance roles, not templates
# =====================================================================
from modules.datamodels.datamodelMembership import UserMandate, UserMandateRole
allUserMandateRoles = rootInterface.db.getRecordset(UserMandateRole)
templateFixDetails = []
templateFixedCount = 0
for umr in allUserMandateRoles:
roleId = umr.get("roleId")
userMandateId = umr.get("userMandateId")
umrId = umr.get("id")
if not roleId or not userMandateId:
continue
# Check if assigned role is a template
role = rootInterface.getRole(roleId)
if not role or role.mandateId is not None:
continue # Not a template role, OK
if not role.isSystemRole:
continue # Not a system template, skip
# Template role assigned! Find the UserMandate to get the mandateId
userMandateRecords = rootInterface.db.getRecordset(
UserMandate, recordFilter={"id": userMandateId}
)
if not userMandateRecords:
continue
mandateId = userMandateRecords[0].get("mandateId")
if not mandateId:
continue
# Find the correct mandate-instance role
mandateRoles = rootInterface.db.getRecordset(
Role, recordFilter={"roleLabel": role.roleLabel, "mandateId": mandateId, "featureInstanceId": None}
)
detail = {
"userMandateRoleId": umrId,
"userMandateId": userMandateId,
"mandateId": mandateId,
"templateRoleId": roleId,
"templateRoleLabel": role.roleLabel,
"action": "none"
}
if mandateRoles:
instanceRoleId = mandateRoles[0].get("id")
detail["instanceRoleId"] = instanceRoleId
detail["action"] = "replace" if not dryRun else "would_replace"
if not dryRun:
try:
rootInterface.db.recordModify(UserMandateRole, umrId, {"roleId": instanceRoleId})
templateFixedCount += 1
logger.info(f"Fixed template role assignment: {umrId}{role.roleLabel} template → instance {instanceRoleId}")
except Exception as e:
detail["action"] = f"error: {e}"
logger.warning(f"Failed to fix role assignment {umrId}: {e}")
else:
detail["action"] = "delete_inconsistent" if not dryRun else "would_delete_inconsistent"
if not dryRun:
try:
rootInterface.db.recordDelete(UserMandateRole, umrId)
templateFixedCount += 1
logger.info(f"Deleted inconsistent template role assignment: {umrId} (template '{role.roleLabel}' in mandate {mandateId}, no instance role found)")
except Exception as e:
detail["action"] = f"error: {e}"
logger.warning(f"Failed to delete inconsistent assignment {umrId}: {e}")
templateFixDetails.append(detail)
result = {
"dryRun": dryRun,
"duplicateRules": {
"totalRules": len(allRules),
"uniqueSignatures": len(rulesBySignature),
"duplicateGroups": len(duplicateGroups),
"duplicateRulesToDelete": len(idsToDelete),
"deletedCount": deletedCount,
"details": duplicateGroups[:50]
},
"templateRoleAssignments": {
"totalUserMandateRoles": len(allUserMandateRoles),
"invalidAssignments": len(templateFixDetails),
"fixedCount": templateFixedCount,
"details": templateFixDetails[:50]
}
}
logger.info(f"RBAC cleanup: dryRun={dryRun}, "
f"duplicates={len(duplicateGroups)}/{deletedCount} deleted, "
f"templateFixes={len(templateFixDetails)}/{templateFixedCount} fixed")
return result
except Exception as e:
logger.error(f"Error during AccessRule cleanup: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Failed to cleanup duplicate rules: {str(e)}"
)