789 lines
28 KiB
Python
789 lines
28 KiB
Python
# Copyright (c) 2025 Patrick Motsch
|
|
# All rights reserved.
|
|
"""
|
|
Feature management routes for the backend API.
|
|
Implements endpoints for Feature and FeatureInstance management.
|
|
|
|
Multi-Tenant Design:
|
|
- Feature definitions are global (SysAdmin can manage)
|
|
- FeatureInstances belong to mandates (Mandate Admin can manage)
|
|
- Template roles are copied on instance creation
|
|
"""
|
|
|
|
from fastapi import APIRouter, HTTPException, Depends, Request, Query
|
|
from typing import List, Dict, Any, Optional
|
|
from fastapi import status
|
|
import logging
|
|
from pydantic import BaseModel, Field
|
|
|
|
from modules.auth import limiter, getRequestContext, RequestContext, requireSysAdmin
|
|
from modules.datamodels.datamodelUam import User
|
|
from modules.datamodels.datamodelFeatures import Feature, FeatureInstance
|
|
from modules.interfaces.interfaceDbAppObjects import getRootInterface
|
|
from modules.interfaces.interfaceFeatures import getFeatureInterface
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
router = APIRouter(
|
|
prefix="/api/features",
|
|
tags=["Features"],
|
|
responses={404: {"description": "Not found"}}
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# Request/Response Models
|
|
# =============================================================================
|
|
|
|
class FeatureInstanceCreate(BaseModel):
|
|
"""Request model for creating a feature instance"""
|
|
featureCode: str = Field(..., description="Feature code (e.g., 'trustee', 'chatbot')")
|
|
label: str = Field(..., description="Instance label (e.g., 'Buchhaltung 2025')")
|
|
copyTemplateRoles: bool = Field(True, description="Whether to copy template roles on creation")
|
|
|
|
|
|
class FeatureInstanceResponse(BaseModel):
|
|
"""Response model for feature instance"""
|
|
id: str
|
|
featureCode: str
|
|
mandateId: str
|
|
label: str
|
|
enabled: bool
|
|
|
|
|
|
class SyncRolesResult(BaseModel):
|
|
"""Response model for role synchronization"""
|
|
added: int
|
|
removed: int
|
|
unchanged: int
|
|
|
|
|
|
# =============================================================================
|
|
# Feature Endpoints (Global - mostly read-only for non-SysAdmin)
|
|
# =============================================================================
|
|
|
|
@router.get("/", response_model=List[Dict[str, Any]])
|
|
@limiter.limit("60/minute")
|
|
async def listFeatures(
|
|
request: Request,
|
|
context: RequestContext = Depends(getRequestContext)
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
List all available features.
|
|
|
|
Returns global feature definitions that can be activated for mandates.
|
|
Any authenticated user can see available features.
|
|
"""
|
|
try:
|
|
rootInterface = getRootInterface()
|
|
featureInterface = getFeatureInterface(rootInterface.db)
|
|
|
|
features = featureInterface.getAllFeatures()
|
|
return [f.model_dump() for f in features]
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error listing features: {e}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Failed to list features: {str(e)}"
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# My Feature Instances (No mandate context needed)
|
|
# IMPORTANT: Must be before /{featureCode} to avoid route matching conflict
|
|
# =============================================================================
|
|
|
|
class FeaturesMyResponse(BaseModel):
|
|
"""Hierarchical response for GET /features/my"""
|
|
mandates: List[Dict[str, Any]]
|
|
|
|
|
|
@router.get("/my", response_model=FeaturesMyResponse)
|
|
@limiter.limit("60/minute")
|
|
async def getMyFeatureInstances(
|
|
request: Request,
|
|
context: RequestContext = Depends(getRequestContext)
|
|
) -> FeaturesMyResponse:
|
|
"""
|
|
Get all feature instances the current user has access to.
|
|
|
|
Returns hierarchical structure: mandates -> features -> instances -> permissions
|
|
This endpoint does not require X-Mandate-Id header.
|
|
"""
|
|
try:
|
|
rootInterface = getRootInterface()
|
|
featureInterface = getFeatureInterface(rootInterface.db)
|
|
|
|
# Get all feature accesses for this user
|
|
featureAccesses = rootInterface.getFeatureAccessesForUser(str(context.user.id))
|
|
|
|
if not featureAccesses:
|
|
return FeaturesMyResponse(mandates=[])
|
|
|
|
# Build hierarchical structure: mandate -> feature -> instances
|
|
mandatesMap: Dict[str, Dict[str, Any]] = {}
|
|
featuresMap: Dict[str, Dict[str, Any]] = {} # key: mandateId_featureCode
|
|
|
|
for access in featureAccesses:
|
|
if not access.enabled:
|
|
continue
|
|
|
|
instance = featureInterface.getFeatureInstance(str(access.featureInstanceId))
|
|
if not instance or not instance.enabled:
|
|
continue
|
|
|
|
# Get mandate info
|
|
mandateId = str(instance.mandateId)
|
|
if mandateId not in mandatesMap:
|
|
mandate = rootInterface.getMandate(mandateId)
|
|
if mandate:
|
|
mandatesMap[mandateId] = {
|
|
"id": mandateId,
|
|
"name": mandate.name if hasattr(mandate, 'name') else mandateId,
|
|
"code": mandate.code if hasattr(mandate, 'code') else None,
|
|
"features": []
|
|
}
|
|
else:
|
|
mandatesMap[mandateId] = {
|
|
"id": mandateId,
|
|
"name": mandateId,
|
|
"code": None,
|
|
"features": []
|
|
}
|
|
|
|
# Get feature info
|
|
featureKey = f"{mandateId}_{instance.featureCode}"
|
|
if featureKey not in featuresMap:
|
|
feature = featureInterface.getFeature(instance.featureCode)
|
|
featuresMap[featureKey] = {
|
|
"code": instance.featureCode,
|
|
"label": feature.label if feature and hasattr(feature, 'label') else {"de": instance.featureCode, "en": instance.featureCode},
|
|
"icon": feature.icon if feature and hasattr(feature, 'icon') else "folder",
|
|
"instances": [],
|
|
"_mandateId": mandateId # Temporary for grouping
|
|
}
|
|
|
|
# Get user's role in this instance
|
|
userRole = _getUserRoleInInstance(rootInterface, str(context.user.id), str(instance.id))
|
|
|
|
# Get permissions for this instance
|
|
permissions = _getInstancePermissions(rootInterface, str(context.user.id), str(instance.id))
|
|
|
|
# Add instance to feature
|
|
featuresMap[featureKey]["instances"].append({
|
|
"id": str(instance.id),
|
|
"featureCode": instance.featureCode,
|
|
"mandateId": mandateId,
|
|
"mandateName": mandatesMap[mandateId]["name"],
|
|
"instanceLabel": instance.label,
|
|
"userRole": userRole,
|
|
"permissions": permissions
|
|
})
|
|
|
|
# Build final structure
|
|
for featureKey, featureData in featuresMap.items():
|
|
mandateId = featureData.pop("_mandateId")
|
|
mandatesMap[mandateId]["features"].append(featureData)
|
|
|
|
return FeaturesMyResponse(mandates=list(mandatesMap.values()))
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting user's feature instances: {e}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Failed to get feature instances: {str(e)}"
|
|
)
|
|
|
|
|
|
def _getUserRoleInInstance(rootInterface, userId: str, instanceId: str) -> str:
|
|
"""Get the user's primary role label in a feature instance."""
|
|
try:
|
|
from modules.datamodels.datamodelRbac import UserRole, Role
|
|
|
|
# Get user-role assignments for this instance
|
|
userRoles = rootInterface.db.getRecordset(
|
|
UserRole,
|
|
recordFilter={"userId": userId}
|
|
)
|
|
|
|
for ur in userRoles:
|
|
roleId = ur.get("roleId")
|
|
if roleId:
|
|
roles = rootInterface.db.getRecordset(Role, recordFilter={"id": roleId})
|
|
if roles and str(roles[0].get("featureInstanceId")) == instanceId:
|
|
return roles[0].get("roleLabel", "user")
|
|
|
|
return "user" # Default
|
|
except Exception as e:
|
|
logger.debug(f"Error getting user role: {e}")
|
|
return "user"
|
|
|
|
|
|
def _getInstancePermissions(rootInterface, userId: str, instanceId: str) -> Dict[str, Any]:
|
|
"""Get summarized permissions for a user in an instance."""
|
|
# Default permissions structure
|
|
permissions = {
|
|
"tables": {},
|
|
"views": {},
|
|
"fields": {}
|
|
}
|
|
|
|
try:
|
|
from modules.datamodels.datamodelRbac import UserRole, Role, RolePermission
|
|
|
|
# Get user's roles for this instance
|
|
userRoles = rootInterface.db.getRecordset(UserRole, recordFilter={"userId": userId})
|
|
roleIds = []
|
|
|
|
for ur in userRoles:
|
|
roleId = ur.get("roleId")
|
|
if roleId:
|
|
roles = rootInterface.db.getRecordset(Role, recordFilter={"id": roleId})
|
|
if roles and str(roles[0].get("featureInstanceId")) == instanceId:
|
|
roleIds.append(roleId)
|
|
|
|
if not roleIds:
|
|
return permissions
|
|
|
|
# Get permissions for all roles
|
|
for roleId in roleIds:
|
|
rolePerms = rootInterface.db.getRecordset(
|
|
RolePermission,
|
|
recordFilter={"roleId": roleId}
|
|
)
|
|
|
|
for perm in rolePerms:
|
|
tableName = perm.get("tableName", "")
|
|
if tableName:
|
|
if tableName not in permissions["tables"]:
|
|
permissions["tables"][tableName] = {
|
|
"view": False,
|
|
"read": "n",
|
|
"create": "n",
|
|
"update": "n",
|
|
"delete": "n"
|
|
}
|
|
|
|
# Merge permissions (highest wins)
|
|
current = permissions["tables"][tableName]
|
|
current["view"] = current["view"] or perm.get("canView", False)
|
|
current["read"] = _mergeAccessLevel(current["read"], perm.get("readLevel", "n"))
|
|
current["create"] = _mergeAccessLevel(current["create"], perm.get("createLevel", "n"))
|
|
current["update"] = _mergeAccessLevel(current["update"], perm.get("updateLevel", "n"))
|
|
current["delete"] = _mergeAccessLevel(current["delete"], perm.get("deleteLevel", "n"))
|
|
|
|
viewName = perm.get("viewName", "")
|
|
if viewName:
|
|
permissions["views"][viewName] = permissions["views"].get(viewName, False) or perm.get("canAccess", False)
|
|
|
|
return permissions
|
|
|
|
except Exception as e:
|
|
logger.debug(f"Error getting instance permissions: {e}")
|
|
return permissions
|
|
|
|
|
|
def _mergeAccessLevel(current: str, new: str) -> str:
|
|
"""Merge two access levels, returning the highest."""
|
|
levels = {"n": 0, "m": 1, "g": 2, "a": 3}
|
|
currentLevel = levels.get(current, 0)
|
|
newLevel = levels.get(new, 0)
|
|
|
|
if newLevel > currentLevel:
|
|
return new
|
|
return current
|
|
|
|
|
|
@router.post("/", response_model=Dict[str, Any])
|
|
@limiter.limit("10/minute")
|
|
async def createFeature(
|
|
request: Request,
|
|
code: str = Query(..., description="Unique feature code"),
|
|
label: Dict[str, str] = None,
|
|
icon: str = Query("mdi-puzzle", description="Icon identifier"),
|
|
sysAdmin: User = Depends(requireSysAdmin)
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Create a new feature definition.
|
|
|
|
SysAdmin only - creates a global feature that can be activated for mandates.
|
|
|
|
Args:
|
|
code: Unique feature code (e.g., 'trustee')
|
|
label: I18n labels (e.g., {"en": "Trustee", "de": "Treuhand"})
|
|
icon: Icon identifier
|
|
"""
|
|
try:
|
|
rootInterface = getRootInterface()
|
|
featureInterface = getFeatureInterface(rootInterface.db)
|
|
|
|
# Check if feature already exists
|
|
existing = featureInterface.getFeature(code)
|
|
if existing:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_409_CONFLICT,
|
|
detail=f"Feature '{code}' already exists"
|
|
)
|
|
|
|
feature = featureInterface.createFeature(
|
|
code=code,
|
|
label=label or {"en": code.title(), "de": code.title()},
|
|
icon=icon
|
|
)
|
|
|
|
logger.info(f"SysAdmin {sysAdmin.id} created feature '{code}'")
|
|
return feature.model_dump()
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error creating feature: {e}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Failed to create feature: {str(e)}"
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# Feature Instance Endpoints (Mandate-scoped)
|
|
# =============================================================================
|
|
|
|
@router.get("/instances", response_model=List[Dict[str, Any]])
|
|
@limiter.limit("60/minute")
|
|
async def listFeatureInstances(
|
|
request: Request,
|
|
featureCode: Optional[str] = Query(None, description="Filter by feature code"),
|
|
context: RequestContext = Depends(getRequestContext)
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
List feature instances for the current mandate.
|
|
|
|
Returns instances the user has access to within the selected mandate.
|
|
|
|
Args:
|
|
featureCode: Optional filter by feature code
|
|
"""
|
|
if not context.mandateId:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="X-Mandate-Id header is required"
|
|
)
|
|
|
|
try:
|
|
rootInterface = getRootInterface()
|
|
featureInterface = getFeatureInterface(rootInterface.db)
|
|
|
|
instances = featureInterface.getFeatureInstancesForMandate(
|
|
mandateId=str(context.mandateId),
|
|
featureCode=featureCode
|
|
)
|
|
|
|
return [inst.model_dump() for inst in instances]
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error listing feature instances: {e}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Failed to list feature instances: {str(e)}"
|
|
)
|
|
|
|
|
|
@router.get("/instances/{instanceId}", response_model=Dict[str, Any])
|
|
@limiter.limit("60/minute")
|
|
async def getFeatureInstance(
|
|
request: Request,
|
|
instanceId: str,
|
|
context: RequestContext = Depends(getRequestContext)
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Get a specific feature instance.
|
|
|
|
Args:
|
|
instanceId: FeatureInstance ID
|
|
"""
|
|
try:
|
|
rootInterface = getRootInterface()
|
|
featureInterface = getFeatureInterface(rootInterface.db)
|
|
|
|
instance = featureInterface.getFeatureInstance(instanceId)
|
|
if not instance:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Feature instance '{instanceId}' not found"
|
|
)
|
|
|
|
# Verify mandate access (unless SysAdmin)
|
|
if context.mandateId and str(instance.mandateId) != str(context.mandateId):
|
|
if not context.isSysAdmin:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Access denied to this feature instance"
|
|
)
|
|
|
|
return instance.model_dump()
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error getting feature instance {instanceId}: {e}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Failed to get feature instance: {str(e)}"
|
|
)
|
|
|
|
|
|
@router.post("/instances", response_model=Dict[str, Any])
|
|
@limiter.limit("10/minute")
|
|
async def createFeatureInstance(
|
|
request: Request,
|
|
data: FeatureInstanceCreate,
|
|
context: RequestContext = Depends(getRequestContext)
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Create a new feature instance for the current mandate.
|
|
|
|
Requires Mandate-Admin role. Template roles are optionally copied.
|
|
|
|
Args:
|
|
data: Feature instance creation data
|
|
"""
|
|
if not context.mandateId:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="X-Mandate-Id header is required"
|
|
)
|
|
|
|
# Check mandate admin permission
|
|
if not _hasMandateAdminRole(context):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Mandate-Admin role required to create feature instances"
|
|
)
|
|
|
|
try:
|
|
rootInterface = getRootInterface()
|
|
featureInterface = getFeatureInterface(rootInterface.db)
|
|
|
|
# Verify feature exists
|
|
feature = featureInterface.getFeature(data.featureCode)
|
|
if not feature:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Feature '{data.featureCode}' not found"
|
|
)
|
|
|
|
instance = featureInterface.createFeatureInstance(
|
|
featureCode=data.featureCode,
|
|
mandateId=str(context.mandateId),
|
|
label=data.label,
|
|
copyTemplateRoles=data.copyTemplateRoles
|
|
)
|
|
|
|
logger.info(
|
|
f"User {context.user.id} created feature instance '{data.label}' "
|
|
f"for feature '{data.featureCode}' in mandate {context.mandateId}"
|
|
)
|
|
|
|
return instance.model_dump()
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error creating feature instance: {e}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Failed to create feature instance: {str(e)}"
|
|
)
|
|
|
|
|
|
@router.delete("/instances/{instanceId}", response_model=Dict[str, str])
|
|
@limiter.limit("10/minute")
|
|
async def deleteFeatureInstance(
|
|
request: Request,
|
|
instanceId: str,
|
|
context: RequestContext = Depends(getRequestContext)
|
|
) -> Dict[str, str]:
|
|
"""
|
|
Delete a feature instance.
|
|
|
|
Requires Mandate-Admin role. CASCADE will delete associated roles and access records.
|
|
|
|
Args:
|
|
instanceId: FeatureInstance ID
|
|
"""
|
|
try:
|
|
rootInterface = getRootInterface()
|
|
featureInterface = getFeatureInterface(rootInterface.db)
|
|
|
|
# Get instance to verify access
|
|
instance = featureInterface.getFeatureInstance(instanceId)
|
|
if not instance:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Feature instance '{instanceId}' not found"
|
|
)
|
|
|
|
# Verify mandate access
|
|
if context.mandateId and str(instance.mandateId) != str(context.mandateId):
|
|
if not context.isSysAdmin:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Access denied to this feature instance"
|
|
)
|
|
|
|
# Check mandate admin permission
|
|
if not _hasMandateAdminRole(context) and not context.isSysAdmin:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Mandate-Admin role required to delete feature instances"
|
|
)
|
|
|
|
featureInterface.deleteFeatureInstance(instanceId)
|
|
|
|
logger.info(f"User {context.user.id} deleted feature instance {instanceId}")
|
|
|
|
return {"message": "Feature instance deleted", "instanceId": instanceId}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error deleting feature instance {instanceId}: {e}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Failed to delete feature instance: {str(e)}"
|
|
)
|
|
|
|
|
|
@router.post("/instances/{instanceId}/sync-roles", response_model=SyncRolesResult)
|
|
@limiter.limit("10/minute")
|
|
async def syncInstanceRoles(
|
|
request: Request,
|
|
instanceId: str,
|
|
addOnly: bool = Query(True, description="Only add missing roles, don't remove extras"),
|
|
context: RequestContext = Depends(getRequestContext)
|
|
) -> SyncRolesResult:
|
|
"""
|
|
Synchronize roles of a feature instance with current templates.
|
|
|
|
IMPORTANT: Templates are only copied when a FeatureInstance is created.
|
|
This sync function is for manual re-synchronization, not automatic propagation.
|
|
|
|
Args:
|
|
instanceId: FeatureInstance ID
|
|
addOnly: If True, only add missing roles. If False, also remove extras.
|
|
"""
|
|
try:
|
|
rootInterface = getRootInterface()
|
|
featureInterface = getFeatureInterface(rootInterface.db)
|
|
|
|
# Get instance to verify access
|
|
instance = featureInterface.getFeatureInstance(instanceId)
|
|
if not instance:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Feature instance '{instanceId}' not found"
|
|
)
|
|
|
|
# Verify mandate access
|
|
if context.mandateId and str(instance.mandateId) != str(context.mandateId):
|
|
if not context.isSysAdmin:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Access denied to this feature instance"
|
|
)
|
|
|
|
# Check admin permission (Mandate-Admin or Feature-Admin)
|
|
if not _hasMandateAdminRole(context) and not context.isSysAdmin:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Admin role required to sync roles"
|
|
)
|
|
|
|
result = featureInterface.syncRolesFromTemplate(instanceId, addOnly)
|
|
|
|
logger.info(
|
|
f"User {context.user.id} synced roles for instance {instanceId}: {result}"
|
|
)
|
|
|
|
return SyncRolesResult(**result)
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error syncing roles for instance {instanceId}: {e}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Failed to sync roles: {str(e)}"
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# Template Role Endpoints (SysAdmin only)
|
|
# =============================================================================
|
|
|
|
@router.get("/templates/roles", response_model=List[Dict[str, Any]])
|
|
@limiter.limit("60/minute")
|
|
async def listTemplateRoles(
|
|
request: Request,
|
|
featureCode: Optional[str] = Query(None, description="Filter by feature code"),
|
|
sysAdmin: User = Depends(requireSysAdmin)
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
List global template roles.
|
|
|
|
SysAdmin only - returns template roles that are copied to new feature instances.
|
|
|
|
Args:
|
|
featureCode: Optional filter by feature code
|
|
"""
|
|
try:
|
|
rootInterface = getRootInterface()
|
|
featureInterface = getFeatureInterface(rootInterface.db)
|
|
|
|
roles = featureInterface.getTemplateRoles(featureCode)
|
|
return [r.model_dump() for r in roles]
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error listing template roles: {e}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Failed to list template roles: {str(e)}"
|
|
)
|
|
|
|
|
|
@router.post("/templates/roles", response_model=Dict[str, Any])
|
|
@limiter.limit("10/minute")
|
|
async def createTemplateRole(
|
|
request: Request,
|
|
roleLabel: str = Query(..., description="Role label (e.g., 'admin', 'viewer')"),
|
|
featureCode: str = Query(..., description="Feature code this role belongs to"),
|
|
description: Dict[str, str] = None,
|
|
sysAdmin: User = Depends(requireSysAdmin)
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Create a global template role for a feature.
|
|
|
|
SysAdmin only - new template roles are NOT automatically propagated to existing instances.
|
|
Use the sync-roles endpoint to manually synchronize.
|
|
|
|
Args:
|
|
roleLabel: Role label
|
|
featureCode: Feature code
|
|
description: I18n descriptions
|
|
"""
|
|
try:
|
|
rootInterface = getRootInterface()
|
|
featureInterface = getFeatureInterface(rootInterface.db)
|
|
|
|
# Verify feature exists
|
|
feature = featureInterface.getFeature(featureCode)
|
|
if not feature:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Feature '{featureCode}' not found"
|
|
)
|
|
|
|
role = featureInterface.createTemplateRole(
|
|
roleLabel=roleLabel,
|
|
featureCode=featureCode,
|
|
description=description
|
|
)
|
|
|
|
logger.info(
|
|
f"SysAdmin {sysAdmin.id} created template role '{roleLabel}' "
|
|
f"for feature '{featureCode}'"
|
|
)
|
|
|
|
return role.model_dump()
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error creating template role: {e}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Failed to create template role: {str(e)}"
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# Dynamic Feature Route (MUST be last to avoid catching /instances, /my, etc.)
|
|
# =============================================================================
|
|
|
|
@router.get("/{featureCode}", response_model=Dict[str, Any])
|
|
@limiter.limit("60/minute")
|
|
async def getFeature(
|
|
request: Request,
|
|
featureCode: str,
|
|
context: RequestContext = Depends(getRequestContext)
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Get a specific feature by code.
|
|
|
|
IMPORTANT: This route must be defined LAST to avoid catching paths like
|
|
/instances, /my, /templates, etc.
|
|
|
|
Args:
|
|
featureCode: Feature code (e.g., 'trustee', 'chatbot')
|
|
"""
|
|
try:
|
|
rootInterface = getRootInterface()
|
|
featureInterface = getFeatureInterface(rootInterface.db)
|
|
|
|
feature = featureInterface.getFeature(featureCode)
|
|
if not feature:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Feature '{featureCode}' not found"
|
|
)
|
|
|
|
return feature.model_dump()
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error getting feature {featureCode}: {e}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Failed to get feature: {str(e)}"
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# Helper Functions
|
|
# =============================================================================
|
|
|
|
def _hasMandateAdminRole(context: RequestContext) -> bool:
|
|
"""
|
|
Check if the user has mandate admin role in the current context.
|
|
|
|
A user is mandate admin if they have the 'admin' role at mandate level.
|
|
"""
|
|
if context.isSysAdmin:
|
|
return True
|
|
|
|
if not context.roleIds:
|
|
return False
|
|
|
|
# Check if any of the user's roles is an admin role
|
|
try:
|
|
rootInterface = getRootInterface()
|
|
from modules.datamodels.datamodelRbac import Role
|
|
|
|
for roleId in context.roleIds:
|
|
roleRecords = rootInterface.db.getRecordset(Role, recordFilter={"id": roleId})
|
|
if roleRecords:
|
|
role = roleRecords[0]
|
|
roleLabel = role.get("roleLabel", "")
|
|
# Admin role at mandate level (not feature-instance level)
|
|
if roleLabel == "admin" and role.get("mandateId") and not role.get("featureInstanceId"):
|
|
return True
|
|
|
|
return False
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error checking mandate admin role: {e}")
|
|
return False
|