gateway/modules/routes/routeAdminRbacExport.py
2026-02-08 14:26:01 +01:00

599 lines
21 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
RBAC export/import routes for the backend API.
Implements endpoints for exporting and importing RBAC configurations.
Multi-Tenant Design:
- Global templates: SysAdmin can export/import
- Mandate-scoped RBAC: Mandate Admin can export/import
- Feature instance roles: Included in mandate export
"""
from fastapi import APIRouter, HTTPException, Depends, Request, UploadFile, File
from fastapi.responses import JSONResponse
from typing import List, Dict, Any, Optional
from fastapi import status
import logging
import json
from pydantic import BaseModel, Field
from modules.auth import limiter, getRequestContext, RequestContext, requireSysAdmin
from modules.datamodels.datamodelUam import User
from modules.datamodels.datamodelRbac import Role, AccessRule
from modules.interfaces.interfaceDbApp import getRootInterface
from modules.shared.timeUtils import getUtcTimestamp
logger = logging.getLogger(__name__)
router = APIRouter(
prefix="/api/rbac",
tags=["RBAC Export/Import"],
responses={404: {"description": "Not found"}}
)
# =============================================================================
# Request/Response Models
# =============================================================================
class RoleExport(BaseModel):
"""Export model for a role with its access rules"""
roleLabel: str
description: Dict[str, str]
featureCode: Optional[str]
isSystemRole: bool
accessRules: List[Dict[str, Any]]
class RbacExportData(BaseModel):
"""Complete RBAC export data"""
exportVersion: str = "1.0"
exportedAt: float
exportedBy: str
scope: str # "global" or "mandate"
mandateId: Optional[str]
roles: List[RoleExport]
class RbacImportResult(BaseModel):
"""Result of RBAC import operation"""
rolesCreated: int
rolesUpdated: int
rolesSkipped: int
rulesCreated: int
rulesUpdated: int
errors: List[str]
# =============================================================================
# Global RBAC Export/Import (SysAdmin only)
# =============================================================================
@router.get("/export/global", response_model=RbacExportData)
@limiter.limit("10/minute")
def export_global_rbac(
request: Request,
sysAdmin: User = Depends(requireSysAdmin)
) -> RbacExportData:
"""
Export global (template) RBAC rules.
SysAdmin only - exports template roles that are copied to new feature instances.
These are roles with mandateId=NULL.
"""
try:
rootInterface = getRootInterface()
# Get all global template roles (mandateId is NULL) using interface method
allRoles = rootInterface.getAllRoles()
globalRoles = [r for r in allRoles if r.mandateId is None]
exportRoles = []
for role in globalRoles:
roleId = role.id
# Get access rules for this role using interface method
accessRules = rootInterface.getAccessRulesByRole(roleId)
exportRoles.append(RoleExport(
roleLabel=role.roleLabel,
description=role.description or {},
featureCode=role.featureCode,
isSystemRole=role.isSystemRole,
accessRules=[
{
"context": r.context,
"item": r.item,
"view": r.view if r.view is not None else False,
"read": r.read,
"create": r.create,
"update": r.update,
"delete": r.delete
}
for r in accessRules
]
))
logger.info(f"SysAdmin {sysAdmin.id} exported global RBAC ({len(exportRoles)} roles)")
return RbacExportData(
exportedAt=getUtcTimestamp(),
exportedBy=str(sysAdmin.id),
scope="global",
mandateId=None,
roles=exportRoles
)
except Exception as e:
logger.error(f"Error exporting global RBAC: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to export RBAC: {str(e)}"
)
@router.post("/import/global", response_model=RbacImportResult)
@limiter.limit("5/minute")
async def import_global_rbac(
request: Request,
file: UploadFile = File(..., description="JSON file with RBAC export data"),
updateExisting: bool = False,
sysAdmin: User = Depends(requireSysAdmin)
) -> RbacImportResult:
"""
Import global (template) RBAC rules.
SysAdmin only - imports template roles and their access rules.
Args:
file: JSON file containing RbacExportData
updateExisting: If True, update existing roles. If False, skip them.
"""
try:
# Read and parse file
content = await file.read()
try:
data = json.loads(content.decode("utf-8"))
except json.JSONDecodeError as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid JSON: {str(e)}"
)
# Validate structure
if "roles" not in data:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Missing 'roles' field in import data"
)
rootInterface = getRootInterface()
result = RbacImportResult(
rolesCreated=0,
rolesUpdated=0,
rolesSkipped=0,
rulesCreated=0,
rulesUpdated=0,
errors=[]
)
for roleData in data.get("roles", []):
try:
roleLabel = roleData.get("roleLabel")
featureCode = roleData.get("featureCode")
if not roleLabel:
result.errors.append(f"Role without label skipped")
result.rolesSkipped += 1
continue
# Check if role exists (global role with same label and featureCode) using interface method
allRoles = rootInterface.getAllRoles()
existingRoles = [
r for r in allRoles
if r.roleLabel == roleLabel
and r.mandateId is None
and r.featureCode == featureCode
]
if existingRoles:
if updateExisting:
# Update existing role
existingRole = existingRoles[0]
roleId = existingRole.id
rootInterface.db.recordModify(
Role,
roleId,
{
"description": roleData.get("description", {}),
"isSystemRole": roleData.get("isSystemRole", False)
}
)
# Update access rules
result.rulesUpdated += _updateAccessRules(
rootInterface,
roleId,
roleData.get("accessRules", [])
)
result.rolesUpdated += 1
else:
result.rolesSkipped += 1
continue
else:
# Create new role
newRole = Role(
roleLabel=roleLabel,
description=roleData.get("description", {}),
featureCode=featureCode,
mandateId=None,
featureInstanceId=None,
isSystemRole=roleData.get("isSystemRole", False)
)
createdRole = rootInterface.db.recordCreate(Role, newRole.model_dump())
roleId = createdRole.get("id")
# Create access rules
for ruleData in roleData.get("accessRules", []):
newRule = AccessRule(
roleId=roleId,
context=ruleData.get("context"),
item=ruleData.get("item"),
view=ruleData.get("view", False),
read=ruleData.get("read"),
create=ruleData.get("create"),
update=ruleData.get("update"),
delete=ruleData.get("delete")
)
rootInterface.db.recordCreate(AccessRule, newRule.model_dump())
result.rulesCreated += 1
result.rolesCreated += 1
except Exception as e:
result.errors.append(f"Error processing role '{roleData.get('roleLabel', 'unknown')}': {str(e)}")
logger.info(
f"SysAdmin {sysAdmin.id} imported global RBAC: "
f"{result.rolesCreated} created, {result.rolesUpdated} updated, "
f"{result.rolesSkipped} skipped"
)
return result
except HTTPException:
raise
except Exception as e:
logger.error(f"Error importing global RBAC: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to import RBAC: {str(e)}"
)
# =============================================================================
# Mandate RBAC Export/Import (Mandate Admin)
# =============================================================================
@router.get("/export/mandate", response_model=RbacExportData)
@limiter.limit("10/minute")
def export_mandate_rbac(
request: Request,
includeFeatureInstances: bool = True,
context: RequestContext = Depends(getRequestContext)
) -> RbacExportData:
"""
Export RBAC rules for the current mandate.
Requires Mandate-Admin role. Exports mandate-level roles and optionally
feature instance roles.
Args:
includeFeatureInstances: Include feature instance roles in export
"""
if not context.mandateId:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="X-Mandate-Id header is required"
)
# Check mandate admin permission
if not _hasMandateAdminRole(context):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Mandate-Admin role required to export RBAC"
)
try:
rootInterface = getRootInterface()
# Get mandate-level roles using interface method
allRoles = rootInterface.getAllRoles()
mandateRoles = [
r for r in allRoles
if str(r.mandateId) == str(context.mandateId)
]
# Filter by feature instance if not including them
if not includeFeatureInstances:
mandateRoles = [r for r in mandateRoles if not r.featureInstanceId]
exportRoles = []
for role in mandateRoles:
roleId = role.id
# Get access rules for this role using interface method
accessRules = rootInterface.getAccessRulesByRole(roleId)
exportRoles.append(RoleExport(
roleLabel=role.roleLabel,
description=role.description or {},
featureCode=role.featureCode,
isSystemRole=role.isSystemRole,
accessRules=[
{
"context": r.context,
"item": r.item,
"view": r.view if r.view is not None else False,
"read": r.read,
"create": r.create,
"update": r.update,
"delete": r.delete
}
for r in accessRules
]
))
logger.info(
f"User {context.user.id} exported mandate {context.mandateId} RBAC "
f"({len(exportRoles)} roles)"
)
return RbacExportData(
exportedAt=getUtcTimestamp(),
exportedBy=str(context.user.id),
scope="mandate",
mandateId=str(context.mandateId),
roles=exportRoles
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error exporting mandate RBAC: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to export RBAC: {str(e)}"
)
@router.post("/import/mandate", response_model=RbacImportResult)
@limiter.limit("5/minute")
async def import_mandate_rbac(
request: Request,
file: UploadFile = File(..., description="JSON file with RBAC export data"),
updateExisting: bool = False,
context: RequestContext = Depends(getRequestContext)
) -> RbacImportResult:
"""
Import RBAC rules for the current mandate.
Requires Mandate-Admin role. Imports roles as mandate-level roles
(not feature instance roles - those are created via template copying).
Args:
file: JSON file containing RbacExportData
updateExisting: If True, update existing roles. If False, skip them.
"""
if not context.mandateId:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="X-Mandate-Id header is required"
)
# Check mandate admin permission
if not _hasMandateAdminRole(context):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Mandate-Admin role required to import RBAC"
)
try:
# Read and parse file
content = await file.read()
try:
data = json.loads(content.decode("utf-8"))
except json.JSONDecodeError as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid JSON: {str(e)}"
)
# Validate structure
if "roles" not in data:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Missing 'roles' field in import data"
)
rootInterface = getRootInterface()
result = RbacImportResult(
rolesCreated=0,
rolesUpdated=0,
rolesSkipped=0,
rulesCreated=0,
rulesUpdated=0,
errors=[]
)
for roleData in data.get("roles", []):
try:
roleLabel = roleData.get("roleLabel")
featureCode = roleData.get("featureCode")
if not roleLabel:
result.errors.append(f"Role without label skipped")
result.rolesSkipped += 1
continue
# System roles cannot be imported at mandate level
if roleData.get("isSystemRole", False):
result.errors.append(f"System role '{roleLabel}' skipped (SysAdmin only)")
result.rolesSkipped += 1
continue
# Check if role exists (mandate role with same label) using interface method
allRoles = rootInterface.getAllRoles()
existingRoles = [
r for r in allRoles
if r.roleLabel == roleLabel
and str(r.mandateId) == str(context.mandateId)
and r.featureInstanceId is None # Only mandate-level roles
]
if existingRoles:
if updateExisting:
# Update existing role
existingRole = existingRoles[0]
roleId = existingRole.id
rootInterface.db.recordModify(
Role,
roleId,
{"description": roleData.get("description", {})}
)
# Update access rules
result.rulesUpdated += _updateAccessRules(
rootInterface,
roleId,
roleData.get("accessRules", [])
)
result.rolesUpdated += 1
else:
result.rolesSkipped += 1
continue
else:
# Create new role at mandate level
newRole = Role(
roleLabel=roleLabel,
description=roleData.get("description", {}),
featureCode=featureCode,
mandateId=str(context.mandateId),
featureInstanceId=None,
isSystemRole=False # Never create system roles via import
)
createdRole = rootInterface.db.recordCreate(Role, newRole.model_dump())
roleId = createdRole.get("id")
# Create access rules
for ruleData in roleData.get("accessRules", []):
newRule = AccessRule(
roleId=roleId,
context=ruleData.get("context"),
item=ruleData.get("item"),
view=ruleData.get("view", False),
read=ruleData.get("read"),
create=ruleData.get("create"),
update=ruleData.get("update"),
delete=ruleData.get("delete")
)
rootInterface.db.recordCreate(AccessRule, newRule.model_dump())
result.rulesCreated += 1
result.rolesCreated += 1
except Exception as e:
result.errors.append(f"Error processing role '{roleData.get('roleLabel', 'unknown')}': {str(e)}")
logger.info(
f"User {context.user.id} imported mandate {context.mandateId} RBAC: "
f"{result.rolesCreated} created, {result.rolesUpdated} updated, "
f"{result.rolesSkipped} skipped"
)
return result
except HTTPException:
raise
except Exception as e:
logger.error(f"Error importing mandate RBAC: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to import RBAC: {str(e)}"
)
# =============================================================================
# Helper Functions
# =============================================================================
def _hasMandateAdminRole(context: RequestContext) -> bool:
"""
Check if the user has mandate admin role in the current context.
"""
if context.isSysAdmin:
return True
if not context.roleIds:
return False
try:
rootInterface = getRootInterface()
for roleId in context.roleIds:
role = rootInterface.getRole(roleId)
if role:
roleLabel = role.roleLabel
# Admin role at mandate level
if roleLabel == "admin" and role.mandateId and not role.featureInstanceId:
return True
return False
except Exception as e:
logger.error(f"Error checking mandate admin role: {e}")
return False
def _updateAccessRules(interface, roleId: str, newRules: List[Dict[str, Any]]) -> int:
"""
Update access rules for a role.
Replaces existing rules with new ones.
Returns:
Number of rules created/updated
"""
try:
# Delete existing rules for this role using interface method
existingRules = interface.getAccessRulesByRole(roleId)
for rule in existingRules:
interface.db.recordDelete(AccessRule, rule.id)
# Create new rules
count = 0
for ruleData in newRules:
newRule = AccessRule(
roleId=roleId,
context=ruleData.get("context"),
item=ruleData.get("item"),
view=ruleData.get("view", False),
read=ruleData.get("read"),
create=ruleData.get("create"),
update=ruleData.get("update"),
delete=ruleData.get("delete")
)
interface.db.recordCreate(AccessRule, newRule.model_dump())
count += 1
return count
except Exception as e:
logger.error(f"Error updating access rules: {e}")
return 0