access rules editor enhanced

This commit is contained in:
ValueOn AG 2026-01-24 09:43:46 +01:00
parent 50e3fce12b
commit efc28879c3
3 changed files with 415 additions and 57 deletions

View file

@ -16,76 +16,48 @@ FEATURE_LABEL = {"en": "Trustee", "de": "Treuhand", "fr": "Fiduciaire"}
FEATURE_ICON = "mdi-briefcase"
# UI Objects for RBAC catalog
# Note: organisations and contracts removed - feature instance = organisation
UI_OBJECTS = [
{
"objectKey": "ui.feature.trustee.organisations",
"label": {"en": "Organisations", "de": "Organisationen", "fr": "Organisations"},
"meta": {"area": "organisations"}
"objectKey": "ui.feature.trustee.dashboard",
"label": {"en": "Dashboard", "de": "Dashboard", "fr": "Tableau de bord"},
"meta": {"area": "dashboard"}
},
{
"objectKey": "ui.feature.trustee.contracts",
"label": {"en": "Contracts", "de": "Verträge", "fr": "Contrats"},
"meta": {"area": "contracts"}
"objectKey": "ui.feature.trustee.positions",
"label": {"en": "Positions", "de": "Positionen", "fr": "Positions"},
"meta": {"area": "positions"}
},
{
"objectKey": "ui.feature.trustee.contracts.tab.documents",
"label": {"en": "Contract Documents", "de": "Vertragsdokumente", "fr": "Documents contractuels"},
"meta": {"area": "contracts", "element": "tab.documents"}
"objectKey": "ui.feature.trustee.documents",
"label": {"en": "Documents", "de": "Dokumente", "fr": "Documents"},
"meta": {"area": "documents"}
},
{
"objectKey": "ui.feature.trustee.contracts.tab.positions",
"label": {"en": "Contract Positions", "de": "Vertragspositionen", "fr": "Positions contractuelles"},
"meta": {"area": "contracts", "element": "tab.positions"}
"objectKey": "ui.feature.trustee.position-documents",
"label": {"en": "Position Documents", "de": "Positions-Dokumente", "fr": "Documents de position"},
"meta": {"area": "position-documents"}
},
{
"objectKey": "ui.feature.trustee.access",
"label": {"en": "Access Management", "de": "Zugriffsverwaltung", "fr": "Gestion des accès"},
"meta": {"area": "access"}
},
{
"objectKey": "ui.feature.trustee.roles",
"label": {"en": "Roles", "de": "Rollen", "fr": "Rôles"},
"meta": {"area": "roles"}
"objectKey": "ui.feature.trustee.instance-roles",
"label": {"en": "Instance Roles & Permissions", "de": "Instanz-Rollen & Berechtigungen", "fr": "Rôles et permissions d'instance"},
"meta": {"area": "admin", "admin_only": True}
},
]
# Resource Objects for RBAC catalog
# Note: organisations and contracts removed - feature instance = organisation
RESOURCE_OBJECTS = [
{
"objectKey": "resource.feature.trustee.organisations.create",
"label": {"en": "Create Organisation", "de": "Organisation erstellen", "fr": "Créer organisation"},
"meta": {"endpoint": "/api/trustee/{instanceId}/organisations", "method": "POST"}
},
{
"objectKey": "resource.feature.trustee.organisations.update",
"label": {"en": "Update Organisation", "de": "Organisation aktualisieren", "fr": "Modifier organisation"},
"meta": {"endpoint": "/api/trustee/{instanceId}/organisations/{orgId}", "method": "PUT"}
},
{
"objectKey": "resource.feature.trustee.organisations.delete",
"label": {"en": "Delete Organisation", "de": "Organisation löschen", "fr": "Supprimer organisation"},
"meta": {"endpoint": "/api/trustee/{instanceId}/organisations/{orgId}", "method": "DELETE"}
},
{
"objectKey": "resource.feature.trustee.contracts.create",
"label": {"en": "Create Contract", "de": "Vertrag erstellen", "fr": "Créer contrat"},
"meta": {"endpoint": "/api/trustee/{instanceId}/contracts", "method": "POST"}
},
{
"objectKey": "resource.feature.trustee.contracts.update",
"label": {"en": "Update Contract", "de": "Vertrag aktualisieren", "fr": "Modifier contrat"},
"meta": {"endpoint": "/api/trustee/{instanceId}/contracts/{contractId}", "method": "PUT"}
},
{
"objectKey": "resource.feature.trustee.contracts.delete",
"label": {"en": "Delete Contract", "de": "Vertrag löschen", "fr": "Supprimer contrat"},
"meta": {"endpoint": "/api/trustee/{instanceId}/contracts/{contractId}", "method": "DELETE"}
},
{
"objectKey": "resource.feature.trustee.documents.create",
"label": {"en": "Upload Document", "de": "Dokument hochladen", "fr": "Télécharger document"},
"meta": {"endpoint": "/api/trustee/{instanceId}/documents", "method": "POST"}
},
{
"objectKey": "resource.feature.trustee.documents.update",
"label": {"en": "Update Document", "de": "Dokument aktualisieren", "fr": "Modifier document"},
"meta": {"endpoint": "/api/trustee/{instanceId}/documents/{documentId}", "method": "PUT"}
},
{
"objectKey": "resource.feature.trustee.documents.delete",
"label": {"en": "Delete Document", "de": "Dokument löschen", "fr": "Supprimer document"},
@ -96,15 +68,26 @@ RESOURCE_OBJECTS = [
"label": {"en": "Create Position", "de": "Position erstellen", "fr": "Créer position"},
"meta": {"endpoint": "/api/trustee/{instanceId}/positions", "method": "POST"}
},
{
"objectKey": "resource.feature.trustee.positions.update",
"label": {"en": "Update Position", "de": "Position aktualisieren", "fr": "Modifier position"},
"meta": {"endpoint": "/api/trustee/{instanceId}/positions/{positionId}", "method": "PUT"}
},
{
"objectKey": "resource.feature.trustee.positions.delete",
"label": {"en": "Delete Position", "de": "Position löschen", "fr": "Supprimer position"},
"meta": {"endpoint": "/api/trustee/{instanceId}/positions/{positionId}", "method": "DELETE"}
},
{
"objectKey": "resource.feature.trustee.instance-roles.manage",
"label": {"en": "Manage Instance Roles", "de": "Instanz-Rollen verwalten", "fr": "Gérer les rôles d'instance"},
"meta": {"endpoint": "/api/trustee/{instanceId}/instance-roles", "method": "ALL", "admin_only": True}
},
]
# Template roles for this feature with AccessRules
# Each role defines default UI and DATA permissions
# Note: UI item=None means ALL views, specific items restrict to named views
TEMPLATE_ROLES = [
{
"roleLabel": "trustee-admin",
@ -114,10 +97,12 @@ TEMPLATE_ROLES = [
"fr": "Administrateur fiduciaire - Accès complet aux données et paramètres fiduciaires"
},
"accessRules": [
# Full UI access
# Full UI access (all views including admin views)
{"context": "UI", "item": None, "view": True},
# Full DATA access
{"context": "DATA", "item": None, "view": True, "read": "a", "create": "a", "update": "a", "delete": "a"},
# Admin resource: manage instance roles
{"context": "RESOURCE", "item": "instance-roles.manage", "view": True},
]
},
{
@ -128,8 +113,11 @@ TEMPLATE_ROLES = [
"fr": "Comptable fiduciaire - Gérer les données comptables et financières"
},
"accessRules": [
# Full UI access
{"context": "UI", "item": None, "view": True},
# UI access to main views (not admin views)
{"context": "UI", "item": "dashboard", "view": True},
{"context": "UI", "item": "positions", "view": True},
{"context": "UI", "item": "documents", "view": True},
{"context": "UI", "item": "position-documents", "view": True},
# Group-level DATA access
{"context": "DATA", "item": None, "view": True, "read": "g", "create": "g", "update": "g", "delete": "g"},
]
@ -142,8 +130,10 @@ TEMPLATE_ROLES = [
"fr": "Client fiduciaire - Consulter ses propres données comptables et documents"
},
"accessRules": [
# Full UI access
{"context": "UI", "item": None, "view": True},
# UI access to main views only (read-only focus)
{"context": "UI", "item": "dashboard", "view": True},
{"context": "UI", "item": "positions", "view": True},
{"context": "UI", "item": "documents", "view": True},
# Own records only (MY level)
{"context": "DATA", "item": None, "view": True, "read": "m", "create": "m", "update": "m", "delete": "n"},
]

View file

@ -115,6 +115,66 @@ async def _validateInstanceAccess(instanceId: str, context: RequestContext) -> s
return str(instance.mandateId)
# ============================================================================
# ATTRIBUTES ENDPOINT (for FormGeneratorTable)
# ============================================================================
# Mapping of entity names to Pydantic model classes
_TRUSTEE_ENTITY_MODELS = {
"TrusteeOrganisation": TrusteeOrganisation,
"TrusteeRole": TrusteeRole,
"TrusteeAccess": TrusteeAccess,
"TrusteeContract": TrusteeContract,
"TrusteeDocument": TrusteeDocument,
"TrusteePosition": TrusteePosition,
"TrusteePositionDocument": TrusteePositionDocument,
}
@router.get("/{instanceId}/attributes/{entityType}")
@limiter.limit("30/minute")
async def getEntityAttributes(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
entityType: str = Path(..., description="Entity type (e.g., TrusteeDocument)"),
context: RequestContext = Depends(getRequestContext)
) -> Dict[str, Any]:
"""
Get attribute definitions for a Trustee entity.
Used by FormGeneratorTable for dynamic column generation.
"""
# Validate instance access
await _validateInstanceAccess(instanceId, context)
# Check if entity type is valid
if entityType not in _TRUSTEE_ENTITY_MODELS:
raise HTTPException(
status_code=404,
detail=f"Unknown entity type: {entityType}. Valid types: {list(_TRUSTEE_ENTITY_MODELS.keys())}"
)
# Get the model class
modelClass = _TRUSTEE_ENTITY_MODELS[entityType]
# Import the attribute utils
from modules.shared.attributeUtils import getModelAttributeDefinitions
try:
attrDefs = getModelAttributeDefinitions(modelClass)
# Filter to only visible attributes
visibleAttrs = [
attr for attr in attrDefs.get("attributes", [])
if isinstance(attr, dict) and attr.get("visible", True)
]
return {"attributes": visibleAttrs}
except Exception as e:
logger.error(f"Error getting attributes for {entityType}: {e}")
raise HTTPException(
status_code=500,
detail=f"Error getting attributes for {entityType}: {str(e)}"
)
# ============================================================================
# OPTIONS ENDPOINTS (for dropdowns)
# ============================================================================
@ -131,7 +191,7 @@ async def getOrganisationOptions(
interface = getInterface(context.user, mandateId=mandateId, featureInstanceId=instanceId)
result = interface.getAllOrganisations(None)
items = result.items if hasattr(result, 'items') else result
return [{"value": org.id, "label": org.label or org.id} for org in items]
return [{"value": org["id"], "label": org.get("label") or org["id"]} for org in items]
@router.get("/{instanceId}/roles/options", response_model=List[Dict[str, Any]])
@ -146,7 +206,7 @@ async def getRoleOptions(
interface = getInterface(context.user, mandateId=mandateId, featureInstanceId=instanceId)
result = interface.getAllRoles(None)
items = result.items if hasattr(result, 'items') else result
return [{"value": role.id, "label": role.desc or role.id} for role in items]
return [{"value": role["id"], "label": role.get("desc") or role["id"]} for role in items]
@router.get("/{instanceId}/contracts/options", response_model=List[Dict[str, Any]])
@ -1136,3 +1196,267 @@ async def deletePositionDocument(
if not success:
raise HTTPException(status_code=400, detail="Failed to delete link")
return {"message": f"Link {linkId} deleted"}
# ===== Instance Roles Management =====
# These endpoints allow feature admins to manage instance-specific roles and their AccessRules
from modules.datamodels.datamodelRbac import Role, AccessRule, AccessRuleContext
async def _validateInstanceAdmin(instanceId: str, context: RequestContext) -> str:
"""
Validate that the user has admin access to the feature instance.
Returns the mandateId if authorized.
This checks for the RESOURCE permission 'instance-roles.manage'.
"""
mandateId = await _validateInstanceAccess(instanceId, context)
# SysAdmin always has access
if context.user.isSysAdmin:
return mandateId
# Check for instance-roles.manage resource permission
featureInterface = getFeatureInterface()
permissions = featureInterface.getUserPermissionsForInstance(context.user.id, instanceId)
if not permissions:
raise HTTPException(
status_code=403,
detail="Keine Berechtigung zur Rollenverwaltung"
)
# Check for resource permission
resourcePermissions = permissions.get("resources", {})
if not resourcePermissions.get("instance-roles.manage"):
raise HTTPException(
status_code=403,
detail="Keine Berechtigung zur Rollenverwaltung"
)
return mandateId
@router.get("/{instanceId}/instance-roles", response_model=PaginatedResponse)
@limiter.limit("30/minute")
async def getInstanceRoles(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
context: RequestContext = Depends(getRequestContext)
) -> PaginatedResponse:
"""
Get all roles for this feature instance.
Requires feature admin permission.
"""
mandateId = await _validateInstanceAdmin(instanceId, context)
rootInterface = getRootInterface()
# Get instance-specific roles (mandateId set, featureInstanceId matches)
roles = rootInterface.db.getRecordset(
Role,
recordFilter={
"featureCode": "trustee",
"featureInstanceId": instanceId
}
)
return PaginatedResponse(
items=roles,
pagination=None
)
@router.get("/{instanceId}/instance-roles/{roleId}", response_model=Dict[str, Any])
@limiter.limit("30/minute")
async def getInstanceRole(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
roleId: str = Path(..., description="Role ID"),
context: RequestContext = Depends(getRequestContext)
) -> Dict[str, Any]:
"""Get a specific instance role."""
mandateId = await _validateInstanceAdmin(instanceId, context)
rootInterface = getRootInterface()
role = rootInterface.db.getRecord(Role, roleId)
if not role:
raise HTTPException(status_code=404, detail=f"Role {roleId} not found")
# Verify role belongs to this instance
if role.get("featureInstanceId") != instanceId:
raise HTTPException(status_code=404, detail=f"Role {roleId} not found in this instance")
return role
@router.get("/{instanceId}/instance-roles/{roleId}/rules", response_model=PaginatedResponse)
@limiter.limit("30/minute")
async def getInstanceRoleRules(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
roleId: str = Path(..., description="Role ID"),
context: RequestContext = Depends(getRequestContext)
) -> PaginatedResponse:
"""
Get all AccessRules for a specific instance role.
Requires feature admin permission.
"""
mandateId = await _validateInstanceAdmin(instanceId, context)
rootInterface = getRootInterface()
# Verify role belongs to this instance
role = rootInterface.db.getRecord(Role, roleId)
if not role or role.get("featureInstanceId") != instanceId:
raise HTTPException(status_code=404, detail=f"Role {roleId} not found in this instance")
# Get AccessRules for this role
rules = rootInterface.db.getRecordset(
AccessRule,
recordFilter={"roleId": roleId}
)
return PaginatedResponse(
items=rules,
pagination=None
)
@router.post("/{instanceId}/instance-roles/{roleId}/rules", response_model=Dict[str, Any], status_code=201)
@limiter.limit("10/minute")
async def createInstanceRoleRule(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
roleId: str = Path(..., description="Role ID"),
ruleData: Dict[str, Any] = Body(...),
context: RequestContext = Depends(getRequestContext)
) -> Dict[str, Any]:
"""
Create a new AccessRule for an instance role.
Requires feature admin permission.
"""
mandateId = await _validateInstanceAdmin(instanceId, context)
rootInterface = getRootInterface()
# Verify role belongs to this instance
role = rootInterface.db.getRecord(Role, roleId)
if not role or role.get("featureInstanceId") != instanceId:
raise HTTPException(status_code=404, detail=f"Role {roleId} not found in this instance")
# Create the rule
try:
contextStr = ruleData.get("context", "UI")
if isinstance(contextStr, str):
contextEnum = AccessRuleContext(contextStr.upper())
else:
contextEnum = contextStr
newRule = AccessRule(
roleId=roleId,
context=contextEnum,
item=ruleData.get("item"),
view=ruleData.get("view", False),
read=ruleData.get("read"),
create=ruleData.get("create"),
update=ruleData.get("update"),
delete=ruleData.get("delete"),
)
created = rootInterface.db.recordCreate(AccessRule, newRule.model_dump())
return created
except Exception as e:
logger.error(f"Error creating AccessRule: {e}")
raise HTTPException(status_code=400, detail=f"Failed to create rule: {str(e)}")
@router.put("/{instanceId}/instance-roles/{roleId}/rules/{ruleId}", response_model=Dict[str, Any])
@limiter.limit("10/minute")
async def updateInstanceRoleRule(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
roleId: str = Path(..., description="Role ID"),
ruleId: str = Path(..., description="Rule ID"),
ruleData: Dict[str, Any] = Body(...),
context: RequestContext = Depends(getRequestContext)
) -> Dict[str, Any]:
"""
Update an AccessRule for an instance role.
Only view, read, create, update, delete can be changed.
Requires feature admin permission.
"""
mandateId = await _validateInstanceAdmin(instanceId, context)
rootInterface = getRootInterface()
# Verify role belongs to this instance
role = rootInterface.db.getRecord(Role, roleId)
if not role or role.get("featureInstanceId") != instanceId:
raise HTTPException(status_code=404, detail=f"Role {roleId} not found in this instance")
# Verify rule belongs to role
existingRule = rootInterface.db.getRecord(AccessRule, ruleId)
if not existingRule or existingRule.get("roleId") != roleId:
raise HTTPException(status_code=404, detail=f"Rule {ruleId} not found for this role")
# Update only allowed fields
updateData = {}
if "view" in ruleData:
updateData["view"] = ruleData["view"]
if "read" in ruleData:
updateData["read"] = ruleData["read"]
if "create" in ruleData:
updateData["create"] = ruleData["create"]
if "update" in ruleData:
updateData["update"] = ruleData["update"]
if "delete" in ruleData:
updateData["delete"] = ruleData["delete"]
if not updateData:
return existingRule
try:
updated = rootInterface.db.recordUpdate(AccessRule, ruleId, updateData)
return updated
except Exception as e:
logger.error(f"Error updating AccessRule: {e}")
raise HTTPException(status_code=400, detail=f"Failed to update rule: {str(e)}")
@router.delete("/{instanceId}/instance-roles/{roleId}/rules/{ruleId}")
@limiter.limit("10/minute")
async def deleteInstanceRoleRule(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
roleId: str = Path(..., description="Role ID"),
ruleId: str = Path(..., description="Rule ID"),
context: RequestContext = Depends(getRequestContext)
) -> Dict[str, Any]:
"""
Delete an AccessRule for an instance role.
Requires feature admin permission.
"""
mandateId = await _validateInstanceAdmin(instanceId, context)
rootInterface = getRootInterface()
# Verify role belongs to this instance
role = rootInterface.db.getRecord(Role, roleId)
if not role or role.get("featureInstanceId") != instanceId:
raise HTTPException(status_code=404, detail=f"Role {roleId} not found in this instance")
# Verify rule belongs to role
existingRule = rootInterface.db.getRecord(AccessRule, ruleId)
if not existingRule or existingRule.get("roleId") != roleId:
raise HTTPException(status_code=404, detail=f"Rule {ruleId} not found for this role")
try:
rootInterface.db.recordDelete(AccessRule, ruleId)
return {"message": f"Rule {ruleId} deleted"}
except Exception as e:
logger.error(f"Error deleting AccessRule: {e}")
raise HTTPException(status_code=400, detail=f"Failed to delete rule: {str(e)}")

View file

@ -311,6 +311,50 @@ async def getAccessRules(
)
@router.get("/rules/by-role/{roleId}", response_model=PaginatedResponse)
@limiter.limit("30/minute")
async def getAccessRulesByRole(
request: Request,
roleId: str = Path(..., description="Role ID to get rules for"),
currentUser: User = Depends(requireSysAdmin)
) -> PaginatedResponse:
"""
Get all access rules for a specific role.
MULTI-TENANT: SysAdmin-only.
Path Parameters:
- roleId: The role ID to get rules for
Returns:
- List of AccessRule objects for the specified role
"""
try:
interface = getRootInterface()
# Build filter for roleId
recordFilter = {"roleId": roleId}
# Get rules from database
rules = interface.db.getRecordset(AccessRule, recordFilter=recordFilter)
# Convert to AccessRule objects
ruleObjects = [AccessRule(**rule) for rule in rules]
return PaginatedResponse(
items=[rule.model_dump() for rule in ruleObjects],
pagination=None
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting access rules for role {roleId}: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Failed to get access rules for role: {str(e)}"
)
@router.get("/rules/{ruleId}", response_model=dict)
@limiter.limit("30/minute")
async def getAccessRule(