rbac rules tested and fixed

This commit is contained in:
ValueOn AG 2026-01-25 03:01:01 +01:00
parent df4c60fc99
commit 2fc8034260
37 changed files with 2081 additions and 377 deletions

15
app.py
View file

@ -21,7 +21,7 @@ from modules.shared.configuration import APP_CONFIG
from modules.shared.eventManagement import eventManager
from modules.workflows.automation import subAutomationSchedule
from modules.interfaces.interfaceDbApp import getRootInterface
from modules.features.featureRegistry import loadFeatureMainModules
from modules.system.registry import loadFeatureMainModules
class DailyRotatingFileHandler(RotatingFileHandler):
"""
@ -346,8 +346,8 @@ def _generateOperationId(route) -> str:
# START APP
app = FastAPI(
title="PowerOn | Data Platform API",
description=f"Backend API for the Multi-Agent Platform by ValueOn AG ({instanceLabel})",
title="PowerOn AG | Workflow Engine",
description=f"API for dynamic SaaS platforms ({instanceLabel})",
lifespan=lifespan,
swagger_ui_init_oauth={
"usePkceWithAuthorizationCodeGrant": True,
@ -501,11 +501,18 @@ app.include_router(gdprRouter)
from modules.routes.routeChat import router as chatRouter
app.include_router(chatRouter)
# ============================================================================
# SYSTEM ROUTES (Navigation, etc.)
# ============================================================================
from modules.system.routeSystem import router as systemRouter, navigationRouter
app.include_router(systemRouter)
app.include_router(navigationRouter)
# ============================================================================
# PLUG&PLAY FEATURE ROUTERS
# Dynamically load routers from feature containers in modules/features/
# ============================================================================
from modules.features.featureRegistry import loadFeatureRouters
from modules.system.registry import loadFeatureRouters
featureLoadResults = loadFeatureRouters(app)
logger.info(f"Feature router load results: {featureLoadResults}")

View file

@ -276,8 +276,11 @@ def getRequestContext(
Determines request context from headers.
Checks authorization and loads role IDs.
IMPORTANT: Even SysAdmin needs explicit membership for mandate context!
SysAdmin flag does NOT give implicit access to mandate data.
Security Model:
- Regular users: Must be explicit members of mandates/feature instances
- SysAdmin users: Can access ANY mandate for administrative operations,
but don't get implicit roleIds (no automatic data access rights).
Routes can check ctx.isSysAdmin to allow admin operations.
Args:
request: FastAPI Request object
@ -289,57 +292,66 @@ def getRequestContext(
RequestContext with user, mandate, roles
Raises:
HTTPException 403: If user is not member of mandate or has no feature access
HTTPException 403: If non-SysAdmin user is not member of mandate or has no feature access
"""
ctx = RequestContext(user=currentUser)
isSysAdmin = getattr(currentUser, 'isSysAdmin', False)
# Get root interface for membership checks
rootInterface = getRootInterface()
if mandateId:
# Check mandate membership - ALSO for SysAdmin!
# SysAdmin must be explicitly added to the mandate
# Check mandate membership
membership = rootInterface.getUserMandate(currentUser.id, mandateId)
if not membership:
# No implicit access for SysAdmin - Fail-Fast!
if membership:
# User is a member - load their roles
if not membership.enabled:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Mandate membership is disabled"
)
ctx.mandateId = mandateId
ctx.roleIds = rootInterface.getRoleIdsForUserMandate(membership.id)
elif isSysAdmin:
# SysAdmin can access any mandate for admin operations
# But they don't get roleIds - no implicit data access
ctx.mandateId = mandateId
# roleIds stays empty - SysAdmin must rely on isSysAdmin flag for authorization
logger.debug(f"SysAdmin {currentUser.id} accessing mandate {mandateId} without membership")
else:
# Regular user without membership - denied
logger.warning(f"User {currentUser.id} is not member of mandate {mandateId}")
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not member of mandate"
)
if not membership.enabled:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Mandate membership is disabled"
)
ctx.mandateId = mandateId
# Load roles via Junction Table
ctx.roleIds = rootInterface.getRoleIdsForUserMandate(membership.id)
if featureInstanceId:
# Check feature access - ALSO for SysAdmin!
# Check feature access
access = rootInterface.getFeatureAccess(currentUser.id, featureInstanceId)
if not access:
logger.warning(f"User {currentUser.id} has no access to feature instance {featureInstanceId}")
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="No access to feature instance"
)
if access:
# User has access - load their instance roles
if not access.enabled:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Feature access is disabled"
)
ctx.featureInstanceId = featureInstanceId
# Add instance roles
instanceRoleIds = rootInterface.getRoleIdsForFeatureAccess(access.id)
ctx.roleIds.extend(instanceRoleIds)
elif isSysAdmin:
# SysAdmin can access any feature instance for admin operations
ctx.featureInstanceId = featureInstanceId
logger.debug(f"SysAdmin {currentUser.id} accessing feature instance {featureInstanceId} without explicit access")
else:
# Regular user without access - denied
logger.warning(f"User {currentUser.id} has no access to feature instance {featureInstanceId}")
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="No access to feature instance"
)
return ctx

View file

@ -1,166 +0,0 @@
# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
AIChat Feature Container - Main Module.
Handles feature initialization and RBAC catalog registration.
AIChat is the dynamic chat workflow feature that handles:
- AI-powered document processing
- Dynamic workflow execution
- Automation definitions
"""
import logging
from typing import Dict, List, Any
logger = logging.getLogger(__name__)
# Feature metadata
FEATURE_CODE = "chatworkflow"
FEATURE_LABEL = {"en": "Chat Workflow", "de": "Chat-Workflow", "fr": "Workflow de Chat"}
FEATURE_ICON = "mdi-message-cog"
# UI Objects for RBAC catalog
UI_OBJECTS = [
{
"objectKey": "ui.feature.aichat.workflows",
"label": {"en": "Workflows", "de": "Workflows", "fr": "Workflows"},
"meta": {"area": "workflows"}
},
{
"objectKey": "ui.feature.aichat.automations",
"label": {"en": "Automations", "de": "Automatisierungen", "fr": "Automatisations"},
"meta": {"area": "automations"}
},
{
"objectKey": "ui.feature.aichat.logs",
"label": {"en": "Logs", "de": "Logs", "fr": "Journaux"},
"meta": {"area": "logs"}
},
]
# Resource Objects for RBAC catalog
RESOURCE_OBJECTS = [
{
"objectKey": "resource.feature.aichat.workflow.start",
"label": {"en": "Start Workflow", "de": "Workflow starten", "fr": "Démarrer workflow"},
"meta": {"endpoint": "/api/chat/playground/start", "method": "POST"}
},
{
"objectKey": "resource.feature.aichat.workflow.stop",
"label": {"en": "Stop Workflow", "de": "Workflow stoppen", "fr": "Arrêter workflow"},
"meta": {"endpoint": "/api/chat/playground/stop/{workflowId}", "method": "POST"}
},
{
"objectKey": "resource.feature.aichat.workflow.delete",
"label": {"en": "Delete Workflow", "de": "Workflow löschen", "fr": "Supprimer workflow"},
"meta": {"endpoint": "/api/chat/playground/workflow/{workflowId}", "method": "DELETE"}
},
]
# Template roles for this feature
TEMPLATE_ROLES = [
{
"roleLabel": "workflow-admin",
"description": {
"en": "Workflow Administrator - Full access to workflow configuration and execution",
"de": "Workflow-Administrator - Vollzugriff auf Workflow-Konfiguration und Ausführung",
"fr": "Administrateur workflow - Accès complet à la configuration et exécution"
}
},
{
"roleLabel": "workflow-editor",
"description": {
"en": "Workflow Editor - Create and modify workflows",
"de": "Workflow-Editor - Workflows erstellen und bearbeiten",
"fr": "Éditeur workflow - Créer et modifier les workflows"
}
},
{
"roleLabel": "workflow-viewer",
"description": {
"en": "Workflow Viewer - View workflows and execution results",
"de": "Workflow-Betrachter - Workflows und Ausführungsergebnisse einsehen",
"fr": "Visualiseur workflow - Consulter les workflows et résultats"
}
},
]
def getFeatureDefinition() -> Dict[str, Any]:
"""Return the feature definition for registration."""
return {
"code": FEATURE_CODE,
"label": FEATURE_LABEL,
"icon": FEATURE_ICON
}
def getUiObjects() -> List[Dict[str, Any]]:
"""Return UI objects for RBAC catalog registration."""
return UI_OBJECTS
def getResourceObjects() -> List[Dict[str, Any]]:
"""Return resource objects for RBAC catalog registration."""
return RESOURCE_OBJECTS
def getTemplateRoles() -> List[Dict[str, Any]]:
"""Return template roles for this feature."""
return TEMPLATE_ROLES
def registerFeature(catalogService) -> bool:
"""
Register this feature's RBAC objects in the catalog.
Args:
catalogService: The RBAC catalog service instance
Returns:
True if registration was successful
"""
try:
# Register UI objects
for uiObj in UI_OBJECTS:
catalogService.registerUiObject(
featureCode=FEATURE_CODE,
objectKey=uiObj["objectKey"],
label=uiObj["label"],
meta=uiObj.get("meta")
)
# Register Resource objects
for resObj in RESOURCE_OBJECTS:
catalogService.registerResourceObject(
featureCode=FEATURE_CODE,
objectKey=resObj["objectKey"],
label=resObj["label"],
meta=resObj.get("meta")
)
logger.info(f"Feature '{FEATURE_CODE}' registered {len(UI_OBJECTS)} UI objects and {len(RESOURCE_OBJECTS)} resource objects")
return True
except Exception as e:
logger.error(f"Failed to register feature '{FEATURE_CODE}': {e}")
return False
async def onStart(eventUser) -> None:
"""
Called when the feature container starts.
Initializes AI connectors for model registry.
"""
try:
from modules.aicore.aicoreModelRegistry import modelRegistry
modelRegistry.ensureConnectorsRegistered()
logger.info(f"Feature '{FEATURE_CODE}' started - AI connectors initialized")
except Exception as e:
logger.error(f"Feature '{FEATURE_CODE}' failed to initialize AI connectors: {e}")
async def onStop(eventUser) -> None:
"""Called when the feature container stops."""
logger.info(f"Feature '{FEATURE_CODE}' stopped")

View file

@ -66,7 +66,13 @@ TEMPLATE_ROLES = [
"en": "Automation Administrator - Full access to automation configuration and execution",
"de": "Automatisierungs-Administrator - Vollzugriff auf Automatisierungs-Konfiguration und Ausführung",
"fr": "Administrateur automatisation - Accès complet à la configuration et exécution"
}
},
"accessRules": [
# Full UI access
{"context": "UI", "item": None, "view": True},
# Full DATA access
{"context": "DATA", "item": None, "view": True, "read": "a", "create": "a", "update": "a", "delete": "a"},
]
},
{
"roleLabel": "automation-editor",
@ -74,7 +80,15 @@ TEMPLATE_ROLES = [
"en": "Automation Editor - Create and modify automations",
"de": "Automatisierungs-Editor - Automatisierungen erstellen und bearbeiten",
"fr": "Éditeur automatisation - Créer et modifier les automatisations"
}
},
"accessRules": [
# UI access to definitions and templates - vollqualifizierte ObjectKeys
{"context": "UI", "item": "ui.feature.automation.definitions", "view": True},
{"context": "UI", "item": "ui.feature.automation.templates", "view": True},
{"context": "UI", "item": "ui.feature.automation.logs", "view": True},
# Group-level DATA access
{"context": "DATA", "item": None, "view": True, "read": "g", "create": "g", "update": "g", "delete": "n"},
]
},
{
"roleLabel": "automation-viewer",
@ -82,7 +96,14 @@ TEMPLATE_ROLES = [
"en": "Automation Viewer - View automations and execution results",
"de": "Automatisierungs-Betrachter - Automatisierungen und Ausführungsergebnisse einsehen",
"fr": "Visualiseur automatisation - Consulter les automatisations et résultats"
}
},
"accessRules": [
# UI access to view only - vollqualifizierte ObjectKeys
{"context": "UI", "item": "ui.feature.automation.definitions", "view": True},
{"context": "UI", "item": "ui.feature.automation.logs", "view": True},
# Read-only DATA access (my level)
{"context": "DATA", "item": None, "view": True, "read": "m", "create": "n", "update": "n", "delete": "n"},
]
},
]

View file

@ -367,7 +367,9 @@ class ChatObjects:
permissions = self.rbac.getUserPermissions(
self.currentUser,
AccessRuleContext.DATA,
tableName
tableName,
mandateId=self.mandateId,
featureInstanceId=self.featureInstanceId
)
if operation == "create":

View file

@ -51,7 +51,15 @@ TEMPLATE_ROLES = [
"en": "Chatbot Administrator - Full access to chatbot settings and all conversations",
"de": "Chatbot-Administrator - Vollzugriff auf Chatbot-Einstellungen und alle Konversationen",
"fr": "Administrateur chatbot - Accès complet aux paramètres et conversations"
}
},
"accessRules": [
# Full UI access
{"context": "UI", "item": None, "view": True},
# Full DATA access
{"context": "DATA", "item": None, "view": True, "read": "a", "create": "a", "update": "a", "delete": "a"},
# Resource access
{"context": "RESOURCE", "item": "resource.feature.chatbot.start", "view": True},
]
},
{
"roleLabel": "chatbot-user",
@ -59,7 +67,15 @@ TEMPLATE_ROLES = [
"en": "Chatbot User - Use chatbot and view own conversations",
"de": "Chatbot-Benutzer - Chatbot nutzen und eigene Konversationen einsehen",
"fr": "Utilisateur chatbot - Utiliser le chatbot et consulter ses conversations"
}
},
"accessRules": [
# UI access to conversations - vollqualifizierte ObjectKeys
{"context": "UI", "item": "ui.feature.chatbot.conversations", "view": True},
# Own DATA access (my level)
{"context": "DATA", "item": None, "view": True, "read": "m", "create": "m", "update": "m", "delete": "m"},
# Resource access
{"context": "RESOURCE", "item": "resource.feature.chatbot.start", "view": True},
]
},
]

View file

@ -8,7 +8,7 @@ Handles CRUD operations for neutralization configuration and attributes.
import logging
from typing import Dict, List, Any, Optional
from modules.features.neutralizer.datamodelFeatureNeutralizer import (
from modules.features.neutralization.datamodelFeatureNeutralizer import (
DataNeutraliserConfig,
DataNeutralizerAttributes,
)

View file

@ -18,17 +18,17 @@ FEATURE_ICON = "mdi-shield-check"
# UI Objects for RBAC catalog
UI_OBJECTS = [
{
"objectKey": "ui.feature.neutralizer.playground",
"objectKey": "ui.feature.neutralization.playground",
"label": {"en": "Playground", "de": "Spielwiese", "fr": "Bac à sable"},
"meta": {"area": "playground"}
},
{
"objectKey": "ui.feature.neutralizer.config",
"objectKey": "ui.feature.neutralization.config",
"label": {"en": "Configuration", "de": "Konfiguration", "fr": "Configuration"},
"meta": {"area": "config"}
},
{
"objectKey": "ui.feature.neutralizer.attributes",
"objectKey": "ui.feature.neutralization.attributes",
"label": {"en": "Attributes", "de": "Attribute", "fr": "Attributs"},
"meta": {"area": "attributes"}
},
@ -37,17 +37,17 @@ UI_OBJECTS = [
# Resource Objects for RBAC catalog
RESOURCE_OBJECTS = [
{
"objectKey": "resource.feature.neutralizer.process.text",
"objectKey": "resource.feature.neutralization.process.text",
"label": {"en": "Process Text", "de": "Text verarbeiten", "fr": "Traiter texte"},
"meta": {"endpoint": "/api/neutralization/process/text", "method": "POST"}
},
{
"objectKey": "resource.feature.neutralizer.process.files",
"objectKey": "resource.feature.neutralization.process.files",
"label": {"en": "Process Files", "de": "Dateien verarbeiten", "fr": "Traiter fichiers"},
"meta": {"endpoint": "/api/neutralization/process/files", "method": "POST"}
},
{
"objectKey": "resource.feature.neutralizer.config.update",
"objectKey": "resource.feature.neutralization.config.update",
"label": {"en": "Update Config", "de": "Konfiguration aktualisieren", "fr": "Mettre à jour config"},
"meta": {"endpoint": "/api/neutralization/config", "method": "PUT"}
},
@ -61,7 +61,13 @@ TEMPLATE_ROLES = [
"en": "Neutralization Administrator - Full access to neutralization settings and data",
"de": "Neutralisierungs-Administrator - Vollzugriff auf Neutralisierungs-Einstellungen und Daten",
"fr": "Administrateur neutralisation - Accès complet aux paramètres et données"
}
},
"accessRules": [
# Full UI access (all views including admin views)
{"context": "UI", "item": None, "view": True},
# Full DATA access
{"context": "DATA", "item": None, "view": True, "read": "a", "create": "a", "update": "a", "delete": "a"},
]
},
{
"roleLabel": "neutralization-analyst",
@ -69,7 +75,14 @@ TEMPLATE_ROLES = [
"en": "Neutralization Analyst - Analyze and process neutralization data",
"de": "Neutralisierungs-Analyst - Neutralisierungsdaten analysieren und verarbeiten",
"fr": "Analyste neutralisation - Analyser et traiter les données de neutralisation"
}
},
"accessRules": [
# UI access to specific views - vollqualifizierte ObjectKeys
{"context": "UI", "item": "ui.feature.neutralization.playground", "view": True},
{"context": "UI", "item": "ui.feature.neutralization.attributes", "view": True},
# Group-level DATA access (read-only for sensitive config)
{"context": "DATA", "item": None, "view": True, "read": "g", "create": "n", "update": "n", "delete": "n"},
]
},
]

View file

@ -13,8 +13,8 @@ import re
import json
from typing import Dict, List, Any, Optional
from modules.features.neutralizer.datamodelFeatureNeutralizer import DataNeutraliserConfig, DataNeutralizerAttributes
from modules.features.neutralizer.interfaceFeatureNeutralizer import InterfaceFeatureNeutralizer
from modules.features.neutralization.datamodelFeatureNeutralizer import DataNeutraliserConfig, DataNeutralizerAttributes
from modules.features.neutralization.interfaceFeatureNeutralizer import InterfaceFeatureNeutralizer
# Import all necessary classes and functions for neutralization
from .subProcessCommon import CommonUtils, NeutralizationResult, NeutralizationAttribute

View file

@ -742,7 +742,9 @@ class RealEstateObjects:
permissions = self.rbac.getUserPermissions(
self.currentUser,
AccessRuleContext.DATA,
tableName
tableName,
mandateId=self.mandateId,
featureInstanceId=self.featureInstanceId
)
if operation == "create":

View file

@ -41,7 +41,8 @@ RESOURCE_OBJECTS = [
},
]
# Template roles for this feature
# Template roles for this feature with AccessRules
# IMPORTANT: item uses vollqualifizierte ObjectKeys (gemäss Navigation-API-Konzept)
TEMPLATE_ROLES = [
{
"roleLabel": "realestate-admin",
@ -49,7 +50,16 @@ TEMPLATE_ROLES = [
"en": "Real Estate Administrator - Full access to all property data and settings",
"de": "Immobilien-Administrator - Vollzugriff auf alle Immobiliendaten und Einstellungen",
"fr": "Administrateur immobilier - Accès complet aux données et paramètres"
}
},
"accessRules": [
# Full UI access (all views including admin views)
{"context": "UI", "item": None, "view": True},
# Full DATA access
{"context": "DATA", "item": None, "view": True, "read": "a", "create": "a", "update": "a", "delete": "a"},
# Admin resources
{"context": "RESOURCE", "item": "resource.feature.realestate.project.create", "view": True},
{"context": "RESOURCE", "item": "resource.feature.realestate.project.delete", "view": True},
]
},
{
"roleLabel": "realestate-manager",
@ -57,7 +67,16 @@ TEMPLATE_ROLES = [
"en": "Real Estate Manager - Manage properties and tenants",
"de": "Immobilien-Verwalter - Immobilien und Mieter verwalten",
"fr": "Gestionnaire immobilier - Gérer les propriétés et locataires"
}
},
"accessRules": [
# UI access to main views - vollqualifizierte ObjectKeys
{"context": "UI", "item": "ui.feature.realestate.projects", "view": True},
{"context": "UI", "item": "ui.feature.realestate.parcels", "view": True},
# Group-level DATA access
{"context": "DATA", "item": None, "view": True, "read": "g", "create": "g", "update": "g", "delete": "g"},
# Resource: create projects
{"context": "RESOURCE", "item": "resource.feature.realestate.project.create", "view": True},
]
},
{
"roleLabel": "realestate-viewer",
@ -65,7 +84,14 @@ TEMPLATE_ROLES = [
"en": "Real Estate Viewer - View property information",
"de": "Immobilien-Betrachter - Immobilien-Informationen einsehen",
"fr": "Visualiseur immobilier - Consulter les informations immobilières"
}
},
"accessRules": [
# UI access to view-only views - vollqualifizierte ObjectKeys
{"context": "UI", "item": "ui.feature.realestate.projects", "view": True},
{"context": "UI", "item": "ui.feature.realestate.parcels", "view": True},
# Read-only DATA access (my records)
{"context": "DATA", "item": None, "view": True, "read": "m", "create": "n", "update": "n", "delete": "n"},
]
},
]

View file

@ -171,7 +171,9 @@ class TrusteeObjects:
permissions = self.rbac.getUserPermissions(
self.currentUser,
AccessRuleContext.DATA,
tableName
tableName,
mandateId=self.mandateId,
featureInstanceId=self.featureInstanceId
)
if not permissions.view:
@ -196,7 +198,9 @@ class TrusteeObjects:
permissions = self.rbac.getUserPermissions(
self.currentUser,
AccessRuleContext.DATA,
tableName
tableName,
mandateId=self.mandateId,
featureInstanceId=self.featureInstanceId
)
if not permissions.view:
@ -258,7 +262,9 @@ class TrusteeObjects:
modelClass=TrusteeOrganisation,
currentUser=self.currentUser,
recordFilter=None,
orderBy="id"
orderBy="id",
mandateId=self.mandateId,
featureInstanceId=self.featureInstanceId
)
logger.debug(f"getAllOrganisations: getRecordsetWithRBAC returned {len(records)} records")
@ -349,7 +355,9 @@ class TrusteeObjects:
modelClass=TrusteeRole,
currentUser=self.currentUser,
recordFilter=None,
orderBy="id"
orderBy="id",
mandateId=self.mandateId,
featureInstanceId=self.featureInstanceId
)
# Users with ALL access level (from system RBAC) see all roles
@ -457,7 +465,9 @@ class TrusteeObjects:
modelClass=TrusteeAccess,
currentUser=self.currentUser,
recordFilter=None,
orderBy="id"
orderBy="id",
mandateId=self.mandateId,
featureInstanceId=self.featureInstanceId
)
# Users with ALL access level (from system RBAC) see all records
@ -514,7 +524,9 @@ class TrusteeObjects:
modelClass=TrusteeAccess,
currentUser=self.currentUser,
recordFilter={"organisationId": organisationId},
orderBy="id"
orderBy="id",
mandateId=self.mandateId,
featureInstanceId=self.featureInstanceId
)
return [TrusteeAccess(**{k: v for k, v in r.items() if not k.startswith("_")}) for r in records]
@ -529,7 +541,9 @@ class TrusteeObjects:
modelClass=TrusteeAccess,
currentUser=self.currentUser,
recordFilter={"userId": userId},
orderBy="id"
orderBy="id",
mandateId=self.mandateId,
featureInstanceId=self.featureInstanceId
)
# Users with ALL access level (from system RBAC) see all records
@ -644,7 +658,9 @@ class TrusteeObjects:
modelClass=TrusteeContract,
currentUser=self.currentUser,
recordFilter=None,
orderBy="id"
orderBy="id",
mandateId=self.mandateId,
featureInstanceId=self.featureInstanceId
)
# Step 2: Feature-level filtering based on trustee.access
@ -678,7 +694,9 @@ class TrusteeObjects:
modelClass=TrusteeContract,
currentUser=self.currentUser,
recordFilter={"organisationId": organisationId},
orderBy="label"
orderBy="label",
mandateId=self.mandateId,
featureInstanceId=self.featureInstanceId
)
# Step 2: Feature-level filtering based on trustee.access
@ -785,7 +803,9 @@ class TrusteeObjects:
modelClass=TrusteeDocument,
currentUser=self.currentUser,
recordFilter=None,
orderBy="documentName"
orderBy="documentName",
mandateId=self.mandateId,
featureInstanceId=self.featureInstanceId
)
# Step 2: Feature-level filtering based on trustee.access
@ -826,7 +846,9 @@ class TrusteeObjects:
modelClass=TrusteeDocument,
currentUser=self.currentUser,
recordFilter={"contractId": contractId},
orderBy="documentName"
orderBy="documentName",
mandateId=self.mandateId,
featureInstanceId=self.featureInstanceId
)
# Step 2: Feature-level filtering based on trustee.access
@ -934,7 +956,9 @@ class TrusteeObjects:
modelClass=TrusteePosition,
currentUser=self.currentUser,
recordFilter=None,
orderBy="valuta"
orderBy="valuta",
mandateId=self.mandateId,
featureInstanceId=self.featureInstanceId
)
# Step 2: Feature-level filtering based on trustee.access
@ -975,7 +999,9 @@ class TrusteeObjects:
modelClass=TrusteePosition,
currentUser=self.currentUser,
recordFilter={"contractId": contractId},
orderBy="valuta"
orderBy="valuta",
mandateId=self.mandateId,
featureInstanceId=self.featureInstanceId
)
# Step 2: Feature-level filtering based on trustee.access
@ -990,7 +1016,9 @@ class TrusteeObjects:
modelClass=TrusteePosition,
currentUser=self.currentUser,
recordFilter={"organisationId": organisationId},
orderBy="valuta"
orderBy="valuta",
mandateId=self.mandateId,
featureInstanceId=self.featureInstanceId
)
# Step 2: Feature-level filtering based on trustee.access
@ -1078,15 +1106,46 @@ class TrusteeObjects:
return None
return TrusteePositionDocument(**{k: v for k, v in records[0].items() if not k.startswith("_")})
def updatePositionDocument(self, linkId: str, data: Dict[str, Any]) -> Optional[TrusteePositionDocument]:
"""Update a position-document link."""
# Check permission
if not self.checkCombinedPermission(TrusteePositionDocument, "update"):
logger.warning(f"User {self.userId} lacks permission to update position-document link")
return None
# Verify link exists and belongs to this instance
existing = self.db.getRecordset(TrusteePositionDocument, recordFilter={"id": linkId})
if not existing:
logger.warning(f"Position-document link {linkId} not found")
return None
existingRecord = existing[0]
if existingRecord.get("featureInstanceId") != self.featureInstanceId:
logger.warning(f"Link {linkId} belongs to different instance")
return None
# Prevent changing context fields
data.pop("id", None)
data.pop("mandateId", None)
data.pop("featureInstanceId", None)
updatedRecord = self.db.recordModify(TrusteePositionDocument, linkId, data)
if updatedRecord and updatedRecord.get("id"):
return TrusteePositionDocument(**{k: v for k, v in updatedRecord.items() if not k.startswith("_")})
return None
def getAllPositionDocuments(self, params: Optional[PaginationParams] = None) -> PaginatedResult:
"""Get all position-document links with RBAC filtering + feature-level access filtering."""
# Step 1: System RBAC filtering
# Step 1: System RBAC filtering with per-row permissions
records = getRecordsetWithRBAC(
connector=self.db,
modelClass=TrusteePositionDocument,
currentUser=self.currentUser,
recordFilter=None,
orderBy="id"
orderBy="id",
mandateId=self.mandateId,
featureInstanceId=self.featureInstanceId,
enrichPermissions=True
)
# Step 2: Feature-level filtering based on trustee.access
@ -1121,7 +1180,9 @@ class TrusteeObjects:
modelClass=TrusteePositionDocument,
currentUser=self.currentUser,
recordFilter={"positionId": positionId},
orderBy="id"
orderBy="id",
mandateId=self.mandateId,
featureInstanceId=self.featureInstanceId
)
# Step 2: Feature-level filtering based on trustee.access
@ -1136,7 +1197,9 @@ class TrusteeObjects:
modelClass=TrusteePositionDocument,
currentUser=self.currentUser,
recordFilter={"documentId": documentId},
orderBy="id"
orderBy="id",
mandateId=self.mandateId,
featureInstanceId=self.featureInstanceId
)
# Step 2: Feature-level filtering based on trustee.access

View file

@ -45,6 +45,31 @@ UI_OBJECTS = [
},
]
# DATA Objects for RBAC catalog (tables/entities)
# Used for AccessRules on data-level permissions
DATA_OBJECTS = [
{
"objectKey": "data.feature.trustee.TrusteePosition",
"label": {"en": "Position", "de": "Position", "fr": "Position"},
"meta": {"table": "TrusteePosition", "fields": ["id", "label", "description", "organisationId"]}
},
{
"objectKey": "data.feature.trustee.TrusteeDocument",
"label": {"en": "Document", "de": "Dokument", "fr": "Document"},
"meta": {"table": "TrusteeDocument", "fields": ["id", "filename", "mimeType", "fileSize", "uploadDate"]}
},
{
"objectKey": "data.feature.trustee.TrusteePositionDocument",
"label": {"en": "Position-Document Assignment", "de": "Position-Dokument Zuordnung", "fr": "Assignation Position-Document"},
"meta": {"table": "TrusteePositionDocument", "fields": ["id", "positionId", "documentId"]}
},
{
"objectKey": "data.feature.trustee.*",
"label": {"en": "All Trustee Data", "de": "Alle Treuhand-Daten", "fr": "Toutes les données fiduciaires"},
"meta": {"wildcard": True, "description": "Wildcard for all trustee data tables"}
},
]
# Resource Objects for RBAC catalog
# Note: organisations and contracts removed - feature instance = organisation
RESOURCE_OBJECTS = [
@ -88,6 +113,7 @@ RESOURCE_OBJECTS = [
# Template roles for this feature with AccessRules
# Each role defines default UI and DATA permissions
# Note: UI item=None means ALL views, specific items restrict to named views
# IMPORTANT: item uses vollqualifizierte ObjectKeys (gemäss Navigation-API-Konzept)
TEMPLATE_ROLES = [
{
"roleLabel": "trustee-admin",
@ -102,7 +128,7 @@ TEMPLATE_ROLES = [
# Full DATA access
{"context": "DATA", "item": None, "view": True, "read": "a", "create": "a", "update": "a", "delete": "a"},
# Admin resource: manage instance roles
{"context": "RESOURCE", "item": "instance-roles.manage", "view": True},
{"context": "RESOURCE", "item": "resource.feature.trustee.instance-roles.manage", "view": True},
]
},
{
@ -113,11 +139,11 @@ TEMPLATE_ROLES = [
"fr": "Comptable fiduciaire - Gérer les données comptables et financières"
},
"accessRules": [
# UI access to main views (not admin views)
{"context": "UI", "item": "dashboard", "view": True},
{"context": "UI", "item": "positions", "view": True},
{"context": "UI", "item": "documents", "view": True},
{"context": "UI", "item": "position-documents", "view": True},
# UI access to main views (not admin views) - vollqualifizierte ObjectKeys
{"context": "UI", "item": "ui.feature.trustee.dashboard", "view": True},
{"context": "UI", "item": "ui.feature.trustee.positions", "view": True},
{"context": "UI", "item": "ui.feature.trustee.documents", "view": True},
{"context": "UI", "item": "ui.feature.trustee.position-documents", "view": True},
# Group-level DATA access
{"context": "DATA", "item": None, "view": True, "read": "g", "create": "g", "update": "g", "delete": "g"},
]
@ -130,12 +156,15 @@ TEMPLATE_ROLES = [
"fr": "Client fiduciaire - Consulter ses propres données comptables et documents"
},
"accessRules": [
# UI access to main views only (read-only focus)
{"context": "UI", "item": "dashboard", "view": True},
{"context": "UI", "item": "positions", "view": True},
{"context": "UI", "item": "documents", "view": True},
# Own records only (MY level)
{"context": "DATA", "item": None, "view": True, "read": "m", "create": "m", "update": "m", "delete": "n"},
# UI access to main views only (read-only focus) - vollqualifizierte ObjectKeys
{"context": "UI", "item": "ui.feature.trustee.dashboard", "view": True},
{"context": "UI", "item": "ui.feature.trustee.positions", "view": True},
{"context": "UI", "item": "ui.feature.trustee.documents", "view": True},
{"context": "UI", "item": "ui.feature.trustee.position-documents", "view": True},
# Own records only (MY level) - explizite Regeln pro Tabelle
{"context": "DATA", "item": "data.feature.trustee.TrusteePosition", "view": True, "read": "m", "create": "m", "update": "m", "delete": "n"},
{"context": "DATA", "item": "data.feature.trustee.TrusteeDocument", "view": True, "read": "m", "create": "m", "update": "m", "delete": "n"},
{"context": "DATA", "item": "data.feature.trustee.TrusteePositionDocument", "view": True, "read": "m", "create": "m", "update": "m", "delete": "n"},
]
},
]
@ -165,6 +194,11 @@ def getTemplateRoles() -> List[Dict[str, Any]]:
return TEMPLATE_ROLES
def getDataObjects() -> List[Dict[str, Any]]:
"""Return DATA objects for RBAC catalog registration."""
return DATA_OBJECTS
def registerFeature(catalogService) -> bool:
"""
Register this feature's RBAC objects in the catalog.
@ -194,10 +228,19 @@ def registerFeature(catalogService) -> bool:
meta=resObj.get("meta")
)
# Register DATA objects (tables/entities)
for dataObj in DATA_OBJECTS:
catalogService.registerDataObject(
featureCode=FEATURE_CODE,
objectKey=dataObj["objectKey"],
label=dataObj["label"],
meta=dataObj.get("meta")
)
# Sync template roles to database (with AccessRules)
_syncTemplateRolesToDb()
logger.info(f"Feature '{FEATURE_CODE}' registered {len(UI_OBJECTS)} UI objects and {len(RESOURCE_OBJECTS)} resource objects")
logger.info(f"Feature '{FEATURE_CODE}' registered {len(UI_OBJECTS)} UI, {len(RESOURCE_OBJECTS)} resource, {len(DATA_OBJECTS)} data objects")
return True
except Exception as e:

View file

@ -282,12 +282,13 @@ async def get_document_options(
instanceId: str = Path(..., description="Feature Instance ID"),
context: RequestContext = Depends(getRequestContext)
) -> List[Dict[str, Any]]:
"""Get document options for select dropdowns. Returns: [{ value, label }]"""
"""Get document options for select dropdowns. Returns: [{ id, value, label }]"""
mandateId = await _validateInstanceAccess(instanceId, context)
interface = getInterface(context.user, mandateId=mandateId, featureInstanceId=instanceId)
result = interface.getAllDocuments(None)
items = result.items if hasattr(result, 'items') else result
return [{"value": d.id, "label": d.documentName or d.id} for d in items]
# Include 'id' for FK resolution in tables
return [{"id": d.id, "value": d.id, "label": d.documentName or d.id} for d in items]
@router.get("/{instanceId}/positions/options", response_model=List[Dict[str, Any]])
@ -297,7 +298,7 @@ async def get_position_options(
instanceId: str = Path(..., description="Feature Instance ID"),
context: RequestContext = Depends(getRequestContext)
) -> List[Dict[str, Any]]:
"""Get position options for select dropdowns. Returns: [{ value, label }]"""
"""Get position options for select dropdowns. Returns: [{ id, value, label }]"""
mandateId = await _validateInstanceAccess(instanceId, context)
interface = getInterface(context.user, mandateId=mandateId, featureInstanceId=instanceId)
result = interface.getAllPositions(None)
@ -313,7 +314,8 @@ async def get_position_options(
parts.append(p.desc[:30])
return " - ".join(parts) if parts else p.id
return [{"value": p.id, "label": _makePositionLabel(p)} for p in items]
# Include 'id' for FK resolution in tables
return [{"id": p.id, "value": p.id, "label": _makePositionLabel(p)} for p in items]
# ============================================================================
@ -1166,15 +1168,18 @@ async def delete_position(
# ===== Position-Document Link Routes =====
@router.get("/{instanceId}/position-documents", response_model=PaginatedResponse[TrusteePositionDocument])
@router.get("/{instanceId}/position-documents")
@limiter.limit("30/minute")
async def get_position_documents(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
pagination: Optional[str] = Query(None),
context: RequestContext = Depends(getRequestContext)
) -> PaginatedResponse[TrusteePositionDocument]:
"""Get all position-document links with optional pagination."""
) -> Dict[str, Any]:
"""Get all position-document links with optional pagination.
Each item includes _permissions: { canUpdate, canDelete } for row-level permission UI.
"""
mandateId = await _validateInstanceAccess(instanceId, context)
paginationParams = _parsePagination(pagination)
@ -1182,18 +1187,18 @@ async def get_position_documents(
result = interface.getAllPositionDocuments(paginationParams)
if paginationParams:
return PaginatedResponse(
items=result.items,
pagination=PaginationMetadata(
currentPage=paginationParams.page or 1,
pageSize=paginationParams.pageSize or 20,
totalItems=result.totalItems,
totalPages=result.totalPages,
sort=paginationParams.sort if paginationParams else [],
filters=paginationParams.filters if paginationParams else None
)
)
return PaginatedResponse(items=result.items, pagination=None)
return {
"items": result.items,
"pagination": {
"currentPage": paginationParams.page or 1,
"pageSize": paginationParams.pageSize or 20,
"totalItems": result.totalItems,
"totalPages": result.totalPages,
"sort": paginationParams.sort if paginationParams else [],
"filters": paginationParams.filters if paginationParams else None
}
}
return {"items": result.items, "pagination": None}
@router.get("/{instanceId}/position-documents/{linkId}", response_model=TrusteePositionDocument)
@ -1262,6 +1267,25 @@ async def create_position_document(
return result
@router.put("/{instanceId}/position-documents/{linkId}", response_model=TrusteePositionDocument)
@limiter.limit("10/minute")
async def update_position_document(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
linkId: str = Path(...),
data: TrusteePositionDocument = Body(...),
context: RequestContext = Depends(getRequestContext)
) -> TrusteePositionDocument:
"""Update a position-document link."""
mandateId = await _validateInstanceAccess(instanceId, context)
interface = getInterface(context.user, mandateId=mandateId, featureInstanceId=instanceId)
result = interface.updatePositionDocument(linkId, data.model_dump(exclude_unset=True))
if not result:
raise HTTPException(status_code=400, detail="Failed to update link")
return result
@router.delete("/{instanceId}/position-documents/{linkId}")
@limiter.limit("10/minute")
async def delete_position_document(
@ -1505,10 +1529,10 @@ async def update_instance_role_rule(
updateData["delete"] = ruleData["delete"]
if not updateData:
return existingRule
return existingRules[0]
try:
updated = rootInterface.db.recordUpdate(AccessRule, ruleId, updateData)
updated = rootInterface.db.recordModify(AccessRule, ruleId, updateData)
return updated
except Exception as e:
logger.error(f"Error updating AccessRule: {e}")

View file

@ -447,29 +447,33 @@ def _createTableSpecificRules(db: DatabaseConnector) -> None:
delete=AccessLevel.NONE,
))
# Standard tables with typical access patterns
# System tables only - NOT feature-specific tables!
# Feature tables (TrusteeXXX, Projekt, etc.) are handled by FEATURE-TEMPLATE roles.
# NOTE: No sysadmin rules - SysAdmin users bypass RBAC via isSysAdmin flag
standardTables = [
"UserConnection", "DataNeutraliserConfig", "DataNeutralizerAttributes",
"ChatWorkflow", "Prompt", "Projekt", "Parzelle", "Dokument",
"Gemeinde", "Kanton", "Land",
"TrusteeOrganisation", "TrusteeRole", "TrusteeAccess", "TrusteeContract",
"TrusteeDocument", "TrusteePosition", "TrusteePositionDocument"
#
# Proper format: Just table names for DATA context (item="TableName")
# The full data.system.TableName format is for catalog registration only.
# FileItem and UserConnection: All users (user, admin, viewer) only MY-level CRUD
restrictedTables = [
"UserConnection", # User connections/sessions - only own records
"FileItem", # Uploaded files - only own files
]
for table in standardTables:
# Admin gets full group-level access (highest role-based permission)
for table in restrictedTables:
# Admin: Only MY-level access (not group-level!)
if adminId:
tableRules.append(AccessRule(
roleId=adminId,
context=AccessRuleContext.DATA,
item=table,
view=True,
read=AccessLevel.GROUP,
create=AccessLevel.GROUP,
update=AccessLevel.GROUP,
delete=AccessLevel.GROUP,
read=AccessLevel.MY,
create=AccessLevel.MY,
update=AccessLevel.MY,
delete=AccessLevel.MY,
))
# User: MY-level CRUD
if userId:
tableRules.append(AccessRule(
roleId=userId,
@ -481,6 +485,7 @@ def _createTableSpecificRules(db: DatabaseConnector) -> None:
update=AccessLevel.MY,
delete=AccessLevel.MY,
))
# Viewer: MY-level read-only
if viewerId:
tableRules.append(AccessRule(
roleId=viewerId,
@ -493,6 +498,80 @@ def _createTableSpecificRules(db: DatabaseConnector) -> None:
delete=AccessLevel.NONE,
))
# Prompt: Special rule - CRUD for MY + Read for GROUP
# Each user can manage own prompts (m) but can read group prompts (g)
if adminId:
# Admin: MY-level CRUD + GROUP-level read
tableRules.append(AccessRule(
roleId=adminId,
context=AccessRuleContext.DATA,
item="Prompt",
view=True,
read=AccessLevel.GROUP, # Can read group prompts
create=AccessLevel.MY, # Can create own prompts
update=AccessLevel.MY, # Can update own prompts
delete=AccessLevel.MY, # Can delete own prompts
))
if userId:
# User: MY-level CRUD + GROUP-level read
tableRules.append(AccessRule(
roleId=userId,
context=AccessRuleContext.DATA,
item="Prompt",
view=True,
read=AccessLevel.GROUP, # Can read group prompts
create=AccessLevel.MY, # Can create own prompts
update=AccessLevel.MY, # Can update own prompts
delete=AccessLevel.MY, # Can delete own prompts
))
if viewerId:
# Viewer: MY-level read + GROUP-level read
tableRules.append(AccessRule(
roleId=viewerId,
context=AccessRuleContext.DATA,
item="Prompt",
view=True,
read=AccessLevel.GROUP, # Can read group prompts
create=AccessLevel.NONE,
update=AccessLevel.NONE,
delete=AccessLevel.NONE,
))
# Invitation: Standard group-level access
if adminId:
tableRules.append(AccessRule(
roleId=adminId,
context=AccessRuleContext.DATA,
item="Invitation",
view=True,
read=AccessLevel.GROUP,
create=AccessLevel.GROUP,
update=AccessLevel.GROUP,
delete=AccessLevel.GROUP,
))
if userId:
tableRules.append(AccessRule(
roleId=userId,
context=AccessRuleContext.DATA,
item="Invitation",
view=True,
read=AccessLevel.MY,
create=AccessLevel.MY,
update=AccessLevel.MY,
delete=AccessLevel.MY,
))
if viewerId:
tableRules.append(AccessRule(
roleId=viewerId,
context=AccessRuleContext.DATA,
item="Invitation",
view=True,
read=AccessLevel.MY,
create=AccessLevel.NONE,
update=AccessLevel.NONE,
delete=AccessLevel.NONE,
))
# AuthEvent table - Audit logs (no delete allowed for audit integrity!)
# SysAdmin can delete via isSysAdmin bypass, but regular admins cannot
if adminId:
@ -541,26 +620,50 @@ def _createUiContextRules(db: DatabaseConnector) -> None:
Create UI context rules for controlling UI element visibility.
Uses roleId instead of roleLabel.
Creates rules for system pages based on NAVIGATION_SECTIONS.
Admin pages require admin role, public pages are available to all.
NOTE: No sysadmin rules - SysAdmin users bypass RBAC via isSysAdmin flag.
Args:
db: Database connector instance
"""
uiRules = []
from modules.system.mainSystem import NAVIGATION_SECTIONS
# All roles get full UI access by default (no sysadmin - that's a flag)
for roleLabel in ["admin", "user", "viewer"]:
roleId = _getRoleId(db, roleLabel)
uiRules = []
adminId = _getRoleId(db, "admin")
userId = _getRoleId(db, "user")
viewerId = _getRoleId(db, "viewer")
# Create rules based on navigation sections
for section in NAVIGATION_SECTIONS:
isAdminSection = section.get("adminOnly", False)
for item in section.get("items", []):
objectKey = item.get("objectKey")
isPublic = item.get("public", False)
isAdminOnly = item.get("adminOnly", False) or isAdminSection
if isAdminOnly:
# Admin-only pages: only admin role
if adminId:
uiRules.append(AccessRule(
roleId=adminId,
context=AccessRuleContext.UI,
item=objectKey,
view=True,
read=None, create=None, update=None, delete=None,
))
else:
# Public/normal pages: all roles
for roleId in [adminId, userId, viewerId]:
if roleId:
uiRules.append(AccessRule(
roleId=roleId,
context=AccessRuleContext.UI,
item=None,
item=objectKey,
view=True,
read=None,
create=None,
update=None,
delete=None,
read=None, create=None, update=None, delete=None,
))
for rule in uiRules:

View file

@ -227,7 +227,8 @@ class AppObjects:
permissions = self.rbac.getUserPermissions(
self.currentUser,
AccessRuleContext.DATA,
tableName
tableName,
mandateId=self.mandateId
)
if operation == "create":

View file

@ -367,7 +367,9 @@ class ChatObjects:
permissions = self.rbac.getUserPermissions(
self.currentUser,
AccessRuleContext.DATA,
tableName
tableName,
mandateId=self.mandateId,
featureInstanceId=self.featureInstanceId
)
if operation == "create":

View file

@ -319,7 +319,9 @@ class ComponentObjects:
permissions = self.rbac.getUserPermissions(
self.currentUser,
AccessRuleContext.DATA,
tableName
tableName,
mandateId=self.mandateId,
featureInstanceId=self.featureInstanceId
)
if operation == "create":

View file

@ -30,6 +30,7 @@ def getRecordsetWithRBAC(
limit: int = None,
mandateId: Optional[str] = None,
featureInstanceId: Optional[str] = None,
enrichPermissions: bool = False,
) -> List[Dict[str, Any]]:
"""
Get records with RBAC filtering applied at database level.
@ -47,9 +48,11 @@ def getRecordsetWithRBAC(
limit: Maximum number of records to return
mandateId: Explicit mandate context (from request header). Required for GROUP access.
featureInstanceId: Explicit feature instance context
enrichPermissions: If True, adds _permissions field to each record with row-level
permissions { canUpdate, canDelete } based on RBAC rules and _createdBy
Returns:
List of filtered records
List of filtered records (with _permissions if enrichPermissions=True)
"""
table = modelClass.__name__
@ -64,7 +67,12 @@ def getRecordsetWithRBAC(
if isSysAdmin:
# Direct access without RBAC filtering
# Note: getRecordset doesn't support orderBy/limit - these are only used in RBAC path
return connector.getRecordset(modelClass, recordFilter=recordFilter)
records = connector.getRecordset(modelClass, recordFilter=recordFilter)
if enrichPermissions:
# SysAdmin has full permissions on all records
for record in records:
record["_permissions"] = {"canUpdate": True, "canDelete": True}
return records
# Get RBAC permissions for this table
# AccessRule table is always in DbApp database
@ -173,6 +181,12 @@ def getRecordsetWithRBAC(
f"Could not parse JSONB field {fieldName}, keeping as string: {record[fieldName]}"
)
# Enrich records with row-level permissions if requested
if enrichPermissions:
records = _enrichRecordsWithPermissions(
records, permissions, currentUser
)
return records
except Exception as e:
logger.error(f"Error loading records with RBAC from table {table}: {e}")
@ -292,3 +306,87 @@ def buildRbacWhereClause(
}
return None
def _enrichRecordsWithPermissions(
records: List[Dict[str, Any]],
permissions: UserPermissions,
currentUser: User
) -> List[Dict[str, Any]]:
"""
Enrich records with per-row permissions (_permissions field).
The _permissions field contains:
- canUpdate: bool - whether current user can update this record
- canDelete: bool - whether current user can delete this record
Logic:
- AccessLevel.ALL ('a'): User can update/delete all records
- AccessLevel.MY ('m'): User can only update/delete records where _createdBy == userId
- AccessLevel.GROUP ('g'): Same as MY for now (group-level ownership)
- AccessLevel.NONE ('n'): User cannot update/delete any records
Args:
records: List of record dicts
permissions: UserPermissions with update/delete levels
currentUser: Current user object
Returns:
Records with _permissions field added
"""
enriched = []
userId = currentUser.id if currentUser else None
for record in records:
recordCopy = dict(record)
createdBy = record.get("_createdBy")
# Determine canUpdate
canUpdate = _checkRowPermission(permissions.update, userId, createdBy)
# Determine canDelete
canDelete = _checkRowPermission(permissions.delete, userId, createdBy)
recordCopy["_permissions"] = {
"canUpdate": canUpdate,
"canDelete": canDelete
}
enriched.append(recordCopy)
return enriched
def _checkRowPermission(
accessLevel: Optional[AccessLevel],
userId: Optional[str],
recordCreatedBy: Optional[str]
) -> bool:
"""
Check if user has permission for a specific row based on access level.
Args:
accessLevel: The permission level (ALL, MY, GROUP, NONE)
userId: Current user's ID
recordCreatedBy: The _createdBy value of the record
Returns:
True if user has permission, False otherwise
"""
if not accessLevel or accessLevel == AccessLevel.NONE:
return False
if accessLevel == AccessLevel.ALL:
return True
# MY and GROUP: Check ownership via _createdBy
if accessLevel in (AccessLevel.MY, AccessLevel.GROUP):
# If record has no _createdBy, allow access (can't verify ownership)
if not recordCreatedBy:
return True
# If no userId, can't verify - deny
if not userId:
return False
# Check ownership
return recordCreatedBy == userId
# Unknown level - deny by default
return False

View file

@ -325,19 +325,16 @@ def _getInstancePermissions(rootInterface, userId: str, instanceId: str) -> Dict
current["delete"] = _mergeAccessLevel(current["delete"], rule.get("delete") or "n")
# Handle UI context (views)
# Views are stored with full objectKey (e.g., ui.feature.trustee.dashboard)
elif context == "UI" or context == AccessRuleContext.UI:
ruleView = rule.get("view", False)
if item:
# Specific view rule
# Store with full objectKey as per Navigation-API-Konzept
permissions["views"][item] = permissions["views"].get(item, False) or ruleView
elif ruleView:
# item=None means all views - set a wildcard flag
permissions["views"]["_all"] = True
# Derive view permissions from table permissions
# This allows UI navigation to be controlled by data access rights
_deriveViewPermissions(permissions)
return permissions
except Exception as e:
@ -345,51 +342,6 @@ def _getInstancePermissions(rootInterface, userId: str, instanceId: str) -> Dict
return permissions
def _deriveViewPermissions(permissions: Dict[str, Any]) -> None:
"""
Derive UI view permissions from table/data permissions.
Mapping:
- trustee-dashboard: always visible (basic access)
- trustee-positions: visible if READ on TrusteePosition
- trustee-documents: visible if READ on TrusteeDocument
- trustee-position-documents: visible if READ on TrusteePositionDocument
- trustee-instance-roles: visible only for admin roles
This function modifies permissions["views"] in place.
"""
tables = permissions.get("tables", {})
views = permissions.get("views", {})
isAdmin = permissions.get("isAdmin", False)
# If user has _all views permission, skip derivation
if views.get("_all"):
return
# Dashboard is always visible for users with any access
if "trustee-dashboard" not in views:
views["trustee-dashboard"] = True
# Positions view: requires READ on TrusteePosition
if "trustee-positions" not in views:
positionPerms = tables.get("TrusteePosition", {})
views["trustee-positions"] = positionPerms.get("read", "n") != "n"
# Documents view: requires READ on TrusteeDocument
if "trustee-documents" not in views:
documentPerms = tables.get("TrusteeDocument", {})
views["trustee-documents"] = documentPerms.get("read", "n") != "n"
# Position-Documents view: requires READ on TrusteePositionDocument
if "trustee-position-documents" not in views:
linkPerms = tables.get("TrusteePositionDocument", {})
views["trustee-position-documents"] = linkPerms.get("read", "n") != "n"
# Instance-roles (admin) view: requires admin role
if "trustee-instance-roles" not in views:
views["trustee-instance-roles"] = isAdmin
def _mergeAccessLevel(current: str, new: str) -> str:
"""Merge two access levels, returning the highest."""
levels = {"n": 0, "m": 1, "g": 2, "a": 3}

View file

@ -19,6 +19,7 @@ import math
from modules.auth import limiter, getRequestContext, requireSysAdmin, RequestContext
from modules.datamodels.datamodelUam import User, UserPermissions, AccessLevel
from modules.datamodels.datamodelRbac import AccessRuleContext, AccessRule, Role
from modules.datamodels.datamodelMembership import UserMandate
from modules.datamodels.datamodelPagination import PaginationParams, PaginatedResponse, PaginationMetadata, normalize_pagination_dict
from modules.interfaces.interfaceDbApp import getInterface, getRootInterface
@ -77,11 +78,13 @@ async def get_permissions(
)
# MULTI-TENANT: Get permissions using context (mandateId/featureInstanceId)
# For now, pass user - RBAC will be extended to use context in later phases
# Pass mandateId and featureInstanceId to load Feature-Instance roles
permissions = interface.rbac.getUserPermissions(
reqContext.user,
accessContext,
item or ""
item or "",
mandateId=reqContext.mandateId,
featureInstanceId=reqContext.featureInstanceId
)
return permissions
@ -166,32 +169,92 @@ async def get_all_permissions(
result: Dict[str, Any] = {}
# MULTI-TENANT: Get role IDs from context (computed from mandateId/featureInstanceId)
roleIds = reqContext.roleIds or []
# For UI/RESOURCE permissions: These are GLOBAL (not mandate-specific)
# System roles (admin, user, viewer) have global UI rules that apply without mandate context
rootInterface = getRootInterface()
# Start with roleIds from current mandate context (if any)
roleIds = list(reqContext.roleIds or [])
# For UI/RESOURCE: Load system roles the user has across ALL their mandates
# This allows users to access system UI elements without needing a specific mandate header
userMandates = rootInterface.db.getRecordset(
UserMandate,
recordFilter={"userId": str(reqContext.user.id), "enabled": True}
)
# Collect all role IDs the user has across all mandates
for userMandate in userMandates:
mandateRoleIds = rootInterface.getRoleIdsForUserMandate(userMandate.get("id"))
for rid in mandateRoleIds:
if rid not in roleIds:
roleIds.append(rid)
logger.debug(f"UI/RESOURCE permissions: User has {len(roleIds)} roles across all mandates")
if not roleIds and not reqContext.isSysAdmin:
# User has no roles, return empty permissions
# No roles at all, return empty permissions
for ctx in contextsToFetch:
result[ctx.value.lower()] = {}
return result
# Get all access rules for user's roles and requested contexts
# IMPORTANT: Use direct DB access without RBAC filtering!
# Otherwise we have a chicken-and-egg problem: need AccessRule read permission to calculate permissions
allRules: Dict[AccessRuleContext, List[AccessRule]] = {}
for ctx in contextsToFetch:
allRules[ctx] = []
# Get all rules for user's roles in this context
# Get all rules for user's roles - bypass RBAC filtering
for roleId in roleIds:
rules = interface.getAccessRules(
roleId=str(roleId),
context=ctx,
pagination=None
ruleRecords = rootInterface.db.getRecordset(
AccessRule,
recordFilter={"roleId": str(roleId), "context": ctx.value}
)
allRules[ctx].extend(rules)
for ruleRecord in ruleRecords:
# Convert dict to AccessRule object
cleanedRule = {k: v for k, v in ruleRecord.items() if not k.startswith("_")}
allRules[ctx].append(AccessRule(**cleanedRule))
# Build result: for each context, collect all unique items and calculate permissions
for ctx in contextsToFetch:
result[ctx.value.lower()] = {}
# Collect all unique items from rules
# Check for global rule (item=None) first - this grants access to ALL UI/RESOURCE items
# Calculate permissions directly from loaded rules (don't call getUserPermissions - it requires mandateId)
hasGlobalRule = False
globalView = False
globalRead = None
globalCreate = None
globalUpdate = None
globalDelete = None
for rule in allRules[ctx]:
if rule.item is None:
hasGlobalRule = True
if rule.view:
globalView = True
if rule.read:
globalRead = rule.read.value if hasattr(rule.read, 'value') else rule.read
if rule.create:
globalCreate = rule.create.value if hasattr(rule.create, 'value') else rule.create
if rule.update:
globalUpdate = rule.update.value if hasattr(rule.update, 'value') else rule.update
if rule.delete:
globalDelete = rule.delete.value if hasattr(rule.delete, 'value') else rule.delete
# If there's a global rule with view permission, add "_global" key
if hasGlobalRule and globalView:
logger.debug(f"Adding _global key for context {ctx.value} with view={globalView}")
result[ctx.value.lower()]["_global"] = {
"view": globalView,
"read": globalRead,
"create": globalCreate,
"update": globalUpdate,
"delete": globalDelete
}
# Collect all unique items from rules (specific rules)
items = set()
for rule in allRules[ctx]:
if rule.item:
@ -199,7 +262,11 @@ async def get_all_permissions(
# For each item, calculate user permissions
for item in sorted(items):
permissions = interface.rbac.getUserPermissions(reqContext.user, ctx, item)
permissions = interface.rbac.getUserPermissions(
reqContext.user, ctx, item,
mandateId=reqContext.mandateId,
featureInstanceId=reqContext.featureInstanceId
)
# Only include if user has view permission
if permissions.view:
result[ctx.value.lower()][item] = {
@ -1007,3 +1074,129 @@ async def delete_role(
status_code=500,
detail=f"Failed to delete role: {str(e)}"
)
# ============================================================================
# RBAC Catalog Endpoints
# ============================================================================
@router.get("/catalog/objects", response_model=Dict[str, Any])
@limiter.limit("60/minute")
async def getCatalogObjects(
request: Request,
context: Optional[str] = Query(None, description="Filter by context (DATA, UI, RESOURCE)"),
featureCode: Optional[str] = Query(None, description="Filter by feature code"),
mandateId: Optional[str] = Query(None, description="Filter by mandate's active features"),
reqContext: RequestContext = Depends(getRequestContext) # Available to all authenticated users
) -> Dict[str, Any]:
"""
Get available RBAC catalog objects.
Returns all registered DATA, UI and RESOURCE objects that can be used in AccessRules.
Query Parameters:
- context: Optional filter by context type (DATA, UI, RESOURCE)
- featureCode: Optional filter by feature (e.g., "trustee")
- mandateId: Optional filter to only include objects from features active in this mandate
Returns:
- Dictionary with objects grouped by context type, each with:
- objectKey: Dot-notation identifier (e.g., "data.feature.trustee.TrusteeContract")
- label: Multilingual label
- featureCode: Owning feature
- meta: Additional metadata (table name, fields, etc.)
Examples:
- GET /api/rbac/catalog/objects all objects
- GET /api/rbac/catalog/objects?context=DATA only DATA objects
- GET /api/rbac/catalog/objects?featureCode=trustee only trustee objects
- GET /api/rbac/catalog/objects?mandateId=xxx objects from mandate's active features
"""
try:
from modules.security.rbacCatalog import getCatalogService
catalog = getCatalogService()
# If mandateId is provided, get active features for that mandate
activeFeatures = None
if mandateId:
try:
interface = getRootInterface()
# Get all feature instances for this mandate
from modules.datamodels.datamodelFeatures import FeatureInstance
instances = interface.db.getRecordset(
FeatureInstance,
recordFilter={"mandateId": mandateId, "enabled": True}
)
activeFeatures = set(inst.get("featureCode") for inst in instances)
# Always include "system" feature
activeFeatures.add("system")
except Exception as e:
logger.warning(f"Could not get active features for mandate {mandateId}: {e}")
if context:
# Single context filter
try:
accessContext = AccessRuleContext(context.upper())
except ValueError:
raise HTTPException(
status_code=400,
detail=f"Invalid context '{context}'. Must be one of: DATA, UI, RESOURCE"
)
if accessContext == AccessRuleContext.DATA:
objects = catalog.getDataObjects(featureCode)
elif accessContext == AccessRuleContext.UI:
objects = catalog.getUiObjects(featureCode)
else:
objects = catalog.getResourceObjects(featureCode)
# Filter by active features if mandateId was provided
if activeFeatures:
objects = [obj for obj in objects if obj.get("featureCode") in activeFeatures]
return {context.upper(): objects}
else:
# All contexts
result = catalog.getAllCatalogObjects(featureCode)
# Filter by active features if mandateId was provided
if activeFeatures:
for ctxKey in result:
result[ctxKey] = [obj for obj in result[ctxKey] if obj.get("featureCode") in activeFeatures]
return result
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting catalog objects: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Failed to get catalog objects: {str(e)}"
)
@router.get("/catalog/stats", response_model=Dict[str, Any])
@limiter.limit("60/minute")
async def getCatalogStats(
request: Request,
currentUser: User = Depends(requireSysAdmin)
) -> Dict[str, Any]:
"""
Get statistics about the RBAC catalog.
Returns:
- Statistics about registered features, objects, and roles
"""
try:
from modules.security.rbacCatalog import getCatalogService
catalog = getCatalogService()
return catalog.getCatalogStats()
except Exception as e:
logger.error(f"Error getting catalog stats: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Failed to get catalog stats: {str(e)}"
)

View file

@ -37,6 +37,7 @@ class RbacCatalogService:
self._uiObjects: Dict[str, Dict[str, Any]] = {}
self._resourceObjects: Dict[str, Dict[str, Any]] = {}
self._dataObjects: Dict[str, Dict[str, Any]] = {} # DATA objects (tables/entities)
self._featureDefinitions: Dict[str, Dict[str, Any]] = {}
self._templateRoles: Dict[str, List[Dict[str, Any]]] = {}
self._initialized = True
@ -60,6 +61,29 @@ class RbacCatalogService:
logger.error(f"Failed to register RESOURCE object {objectKey}: {e}")
return False
def registerDataObject(self, featureCode: str, objectKey: str, label: Dict[str, str], meta: Optional[Dict[str, Any]] = None) -> bool:
"""
Register a DATA object (table/entity) for a feature.
Args:
featureCode: Feature code (e.g., "trustee", "system")
objectKey: Dot-notation key (e.g., "data.feature.trustee.TrusteeContract")
label: Multilingual label dict
meta: Optional metadata (e.g., table name, fields list)
"""
try:
self._dataObjects[objectKey] = {
"objectKey": objectKey,
"featureCode": featureCode,
"label": label,
"meta": meta or {},
"type": "DATA"
}
return True
except Exception as e:
logger.error(f"Failed to register DATA object {objectKey}: {e}")
return False
def registerFeatureDefinition(self, featureCode: str, label: Dict[str, str], icon: str) -> bool:
"""Register a feature definition."""
try:
@ -90,9 +114,23 @@ class RbacCatalogService:
return [obj for obj in self._resourceObjects.values() if obj["featureCode"] == featureCode]
return list(self._resourceObjects.values())
def getDataObjects(self, featureCode: Optional[str] = None) -> List[Dict[str, Any]]:
"""Get all DATA objects (tables/entities), optionally filtered by feature."""
if featureCode:
return [obj for obj in self._dataObjects.values() if obj["featureCode"] == featureCode]
return list(self._dataObjects.values())
def getAllObjects(self, featureCode: Optional[str] = None) -> List[Dict[str, Any]]:
"""Get all RBAC objects (UI + RESOURCE), optionally filtered by feature."""
return self.getUiObjects(featureCode) + self.getResourceObjects(featureCode)
"""Get all RBAC objects (UI + RESOURCE + DATA), optionally filtered by feature."""
return self.getUiObjects(featureCode) + self.getResourceObjects(featureCode) + self.getDataObjects(featureCode)
def getAllCatalogObjects(self, featureCode: Optional[str] = None) -> Dict[str, List[Dict[str, Any]]]:
"""Get all catalog objects grouped by type (DATA, UI, RESOURCE)."""
return {
"DATA": self.getDataObjects(featureCode),
"UI": self.getUiObjects(featureCode),
"RESOURCE": self.getResourceObjects(featureCode)
}
def getFeatureDefinitions(self) -> List[Dict[str, Any]]:
"""Get all registered feature definitions."""
@ -121,6 +159,8 @@ class RbacCatalogService:
del self._uiObjects[key]
for key in [k for k, v in self._resourceObjects.items() if v["featureCode"] == featureCode]:
del self._resourceObjects[key]
for key in [k for k, v in self._dataObjects.items() if v["featureCode"] == featureCode]:
del self._dataObjects[key]
self._featureDefinitions.pop(featureCode, None)
self._templateRoles.pop(featureCode, None)
logger.info(f"Unregistered feature: {featureCode}")
@ -135,6 +175,7 @@ class RbacCatalogService:
"features": len(self._featureDefinitions),
"uiObjects": len(self._uiObjects),
"resourceObjects": len(self._resourceObjects),
"dataObjects": len(self._dataObjects),
"templateRoles": sum(len(roles) for roles in self._templateRoles.values())
}

View file

@ -0,0 +1,14 @@
# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
System Module - Contains system-level infrastructure:
- registry.py: Feature container discovery and loading
- mainSystem.py: System-level RBAC objects (UI, DATA, RESOURCE)
"""
from modules.system.registry import (
loadFeatureRouters,
loadFeatureMainModules,
discoverFeatureContainers,
registerAllFeaturesInCatalog,
)

View file

@ -0,0 +1,423 @@
# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
System Module - Main Module.
Registers system-level RBAC objects (UI, DATA, RESOURCE) that are not part of any feature.
These are global system pages and resources available to all users with appropriate roles.
Also defines the navigation structure for the frontend.
"""
import logging
from typing import Dict, List, Any, Optional
logger = logging.getLogger(__name__)
# System metadata
FEATURE_CODE = "system"
FEATURE_LABEL = {"en": "System", "de": "System", "fr": "Système"}
FEATURE_ICON = "mdi-cog"
# =============================================================================
# Navigation Structure (Single Source of Truth)
# =============================================================================
#
# Block Order (gemäss Navigation-API-Konzept):
# - System: 10
# - <dynamic/features>: 15 (wird in routeSystem.py eingefügt)
# - Workflows: 20
# - Basisdaten: 30
# - Migrate: 40
# - Administration: 200
#
# Item Order: Default-Abstand 10 pro Item
# uiComponent: Abgeleitet von objectKey (ui.system.home -> page.system.home)
# icon: Wird intern gehalten aber NICHT in der API Response zurückgegeben
NAVIGATION_SECTIONS = [
{
"id": "system",
"title": {"en": "SYSTEM", "de": "SYSTEM", "fr": "SYSTÈME"},
"order": 10,
"items": [
{
"id": "home",
"objectKey": "ui.system.home",
"label": {"en": "Home", "de": "Übersicht", "fr": "Accueil"},
"icon": "FaHome",
"path": "/",
"order": 10,
"public": True,
},
{
"id": "settings",
"objectKey": "ui.system.settings",
"label": {"en": "Settings", "de": "Einstellungen", "fr": "Paramètres"},
"icon": "FaCog",
"path": "/settings",
"order": 20,
"public": True,
},
],
},
{
"id": "workflows",
"title": {"en": "WORKFLOWS", "de": "WORKFLOWS", "fr": "WORKFLOWS"},
"order": 20,
"items": [
{
"id": "playground",
"objectKey": "ui.system.playground",
"label": {"en": "Chat Playground", "de": "Chat Playground", "fr": "Chat Playground"},
"icon": "FaPlay",
"path": "/workflows/playground",
"order": 10,
},
{
"id": "chats",
"objectKey": "ui.system.chats",
"label": {"en": "Chats", "de": "Chats", "fr": "Chats"},
"icon": "FaListAlt",
"path": "/workflows/list",
"order": 20,
},
{
"id": "automations",
"objectKey": "ui.system.automations",
"label": {"en": "Automations", "de": "Automatisierungen", "fr": "Automatisations"},
"icon": "FaCogs",
"path": "/workflows/automations",
"order": 30,
},
],
},
{
"id": "basedata",
"title": {"en": "BASE DATA", "de": "BASISDATEN", "fr": "DONNÉES DE BASE"},
"order": 30,
"items": [
{
"id": "prompts",
"objectKey": "ui.system.prompts",
"label": {"en": "Prompts", "de": "Prompts", "fr": "Prompts"},
"icon": "FaLightbulb",
"path": "/basedata/prompts",
"order": 10,
},
{
"id": "files",
"objectKey": "ui.system.files",
"label": {"en": "Files", "de": "Dateien", "fr": "Fichiers"},
"icon": "FaRegFileAlt",
"path": "/basedata/files",
"order": 20,
},
{
"id": "connections",
"objectKey": "ui.system.connections",
"label": {"en": "Connections", "de": "Verbindungen", "fr": "Connexions"},
"icon": "FaLink",
"path": "/basedata/connections",
"order": 30,
},
],
},
{
"id": "migrate",
"title": {"en": "MIGRATE TO FEATURES", "de": "MIGRATE TO FEATURES", "fr": "MIGRER VERS FEATURES"},
"order": 40,
"deprecated": True,
"items": [
{
"id": "chatbot",
"objectKey": "ui.system.chatbot",
"label": {"en": "Chatbot", "de": "Chatbot", "fr": "Chatbot"},
"icon": "FaComments",
"path": "/chatbot",
"order": 10,
"deprecated": True,
},
{
"id": "pek",
"objectKey": "ui.system.pek",
"label": {"en": "PEK", "de": "PEK", "fr": "PEK"},
"icon": "FaChartBar",
"path": "/pek",
"order": 20,
"deprecated": True,
},
{
"id": "speech",
"objectKey": "ui.system.speech",
"label": {"en": "Speech", "de": "Sprache", "fr": "Parole"},
"icon": "FaMicrophone",
"path": "/speech",
"order": 30,
"deprecated": True,
},
],
},
{
"id": "admin",
"title": {"en": "ADMINISTRATION", "de": "ADMINISTRATION", "fr": "ADMINISTRATION"},
"order": 200,
"adminOnly": True,
"items": [
{
"id": "admin-users",
"objectKey": "ui.admin.users",
"label": {"en": "Users", "de": "Benutzer", "fr": "Utilisateurs"},
"icon": "FaUsers",
"path": "/admin/users",
"order": 10,
"adminOnly": True,
},
{
"id": "admin-invitations",
"objectKey": "ui.admin.invitations",
"label": {"en": "Invitations", "de": "Einladungen", "fr": "Invitations"},
"icon": "FaEnvelopeOpenText",
"path": "/admin/invitations",
"order": 20,
"adminOnly": True,
},
{
"id": "admin-mandates",
"objectKey": "ui.admin.mandates",
"label": {"en": "Mandates", "de": "Mandanten", "fr": "Mandats"},
"icon": "FaBuilding",
"path": "/admin/mandates",
"order": 30,
"adminOnly": True,
},
{
"id": "admin-roles",
"objectKey": "ui.admin.roles",
"label": {"en": "Roles", "de": "Rollen", "fr": "Rôles"},
"icon": "FaKey",
"path": "/admin/mandate-roles",
"order": 40,
"adminOnly": True,
},
{
"id": "admin-role-permissions",
"objectKey": "ui.admin.role-permissions",
"label": {"en": "Role Permissions", "de": "Rollen-Berechtigungen", "fr": "Permissions des rôles"},
"icon": "FaShieldAlt",
"path": "/admin/mandate-role-permissions",
"order": 50,
"adminOnly": True,
},
{
"id": "admin-user-mandates",
"objectKey": "ui.admin.user-mandates",
"label": {"en": "Mandate Members", "de": "Mandanten-Mitglieder", "fr": "Membres du mandat"},
"icon": "FaUserTag",
"path": "/admin/user-mandates",
"order": 60,
"adminOnly": True,
},
{
"id": "admin-feature-roles",
"objectKey": "ui.admin.feature-roles",
"label": {"en": "Feature Roles & Permissions", "de": "Feature Rollen & Rechte", "fr": "Rôles et permissions des features"},
"icon": "FaCube",
"path": "/admin/feature-roles",
"order": 70,
"adminOnly": True,
},
{
"id": "admin-feature-instances",
"objectKey": "ui.admin.feature-instances",
"label": {"en": "Feature Instances", "de": "Feature-Instanzen", "fr": "Instances de feature"},
"icon": "FaCubes",
"path": "/admin/feature-instances",
"order": 80,
"adminOnly": True,
},
{
"id": "admin-feature-users",
"objectKey": "ui.admin.feature-users",
"label": {"en": "Feature Instance Users", "de": "Feature Instanz Benutzer", "fr": "Utilisateurs d'instance de feature"},
"icon": "FaUsersCog",
"path": "/admin/feature-users",
"order": 90,
"adminOnly": True,
},
],
},
]
def _objectKeyToUiComponent(objectKey: str) -> str:
"""
Convert objectKey to uiComponent.
Example: ui.system.home -> page.system.home
ui.admin.users -> page.admin.users
ui.feature.trustee.dashboard -> page.feature.trustee.dashboard
"""
if objectKey.startswith("ui."):
return "page." + objectKey[3:]
return objectKey
def _buildUiObjectsFromNavigation() -> List[Dict[str, Any]]:
"""Build UI_OBJECTS list from NAVIGATION_SECTIONS for RBAC registration."""
uiObjects = []
for section in NAVIGATION_SECTIONS:
for item in section.get("items", []):
uiObjects.append({
"objectKey": item["objectKey"],
"label": item["label"],
"meta": {
"area": section["id"],
"public": item.get("public", False),
"adminOnly": item.get("adminOnly", False),
"deprecated": item.get("deprecated", False),
"path": item["path"],
"icon": item["icon"],
}
})
return uiObjects
# Generate UI_OBJECTS from navigation structure
UI_OBJECTS = _buildUiObjectsFromNavigation()
# =============================================================================
# System DATA Objects
# =============================================================================
DATA_OBJECTS = [
{
"objectKey": "data.system.User",
"label": {"en": "User", "de": "Benutzer", "fr": "Utilisateur"},
"meta": {"table": "UserInDB"}
},
{
"objectKey": "data.system.Mandate",
"label": {"en": "Mandate", "de": "Mandant", "fr": "Mandat"},
"meta": {"table": "Mandate"}
},
{
"objectKey": "data.system.Role",
"label": {"en": "Role", "de": "Rolle", "fr": "Rôle"},
"meta": {"table": "Role"}
},
{
"objectKey": "data.system.AccessRule",
"label": {"en": "Access Rule", "de": "Zugriffsregel", "fr": "Règle d'accès"},
"meta": {"table": "AccessRule"}
},
{
"objectKey": "data.system.UserMandate",
"label": {"en": "User Mandate", "de": "Benutzer-Mandant", "fr": "Mandat utilisateur"},
"meta": {"table": "UserMandate"}
},
{
"objectKey": "data.system.Prompt",
"label": {"en": "Prompt", "de": "Prompt", "fr": "Prompt"},
"meta": {"table": "Prompt"}
},
{
"objectKey": "data.system.ChatWorkflow",
"label": {"en": "Chat Workflow", "de": "Chat-Workflow", "fr": "Workflow de chat"},
"meta": {"table": "ChatWorkflow"}
},
{
"objectKey": "data.system.FileItem",
"label": {"en": "File", "de": "Datei", "fr": "Fichier"},
"meta": {"table": "FileItem"}
},
{
"objectKey": "data.system.UserConnection",
"label": {"en": "Connection", "de": "Verbindung", "fr": "Connexion"},
"meta": {"table": "UserConnection"}
},
{
"objectKey": "data.system.FeatureInstance",
"label": {"en": "Feature Instance", "de": "Feature-Instanz", "fr": "Instance de feature"},
"meta": {"table": "FeatureInstance"}
},
]
# =============================================================================
# System RESOURCE Objects
# =============================================================================
RESOURCE_OBJECTS = [
{
"objectKey": "resource.system.api.auth",
"label": {"en": "Authentication API", "de": "Authentifizierungs-API", "fr": "API d'authentification"},
"meta": {"endpoint": "/api/auth/*"}
},
{
"objectKey": "resource.system.api.users",
"label": {"en": "Users API", "de": "Benutzer-API", "fr": "API des utilisateurs"},
"meta": {"endpoint": "/api/users/*"}
},
{
"objectKey": "resource.system.api.mandates",
"label": {"en": "Mandates API", "de": "Mandanten-API", "fr": "API des mandats"},
"meta": {"endpoint": "/api/mandates/*"}
},
{
"objectKey": "resource.system.api.rbac",
"label": {"en": "RBAC API", "de": "RBAC-API", "fr": "API RBAC"},
"meta": {"endpoint": "/api/rbac/*"}
},
]
def registerFeature(catalogService) -> bool:
"""
Register system RBAC objects in the catalog.
Args:
catalogService: The RBAC catalog service instance
Returns:
True if registration was successful
"""
try:
# Register UI objects
for uiObj in UI_OBJECTS:
catalogService.registerUiObject(
featureCode=FEATURE_CODE,
objectKey=uiObj["objectKey"],
label=uiObj["label"],
meta=uiObj.get("meta")
)
# Register DATA objects
for dataObj in DATA_OBJECTS:
catalogService.registerDataObject(
featureCode=FEATURE_CODE,
objectKey=dataObj["objectKey"],
label=dataObj["label"],
meta=dataObj.get("meta")
)
# Register RESOURCE objects
for resObj in RESOURCE_OBJECTS:
catalogService.registerResourceObject(
featureCode=FEATURE_CODE,
objectKey=resObj["objectKey"],
label=resObj["label"],
meta=resObj.get("meta")
)
# Register feature definition
catalogService.registerFeatureDefinition(
featureCode=FEATURE_CODE,
label=FEATURE_LABEL,
icon=FEATURE_ICON
)
logger.info(f"Registered system RBAC objects: {len(UI_OBJECTS)} UI, {len(DATA_OBJECTS)} DATA, {len(RESOURCE_OBJECTS)} RESOURCE")
return True
except Exception as e:
logger.error(f"Failed to register system RBAC objects: {e}")
return False

View file

@ -3,6 +3,8 @@
"""
Feature Registry for Plug&Play Feature Container Loading.
Dynamically discovers and loads feature containers from the features directory.
Note: This module is in modules/system/ but manages modules/features/.
"""
import os
@ -14,8 +16,8 @@ from fastapi import FastAPI
logger = logging.getLogger(__name__)
# Path to the features directory
FEATURES_DIR = os.path.dirname(os.path.abspath(__file__))
# Path to the features directory (relative to this file's location)
FEATURES_DIR = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "features")
def discoverFeatureContainers() -> List[str]:
@ -109,10 +111,26 @@ def loadFeatureMainModules() -> Dict[str, Any]:
def registerAllFeaturesInCatalog(catalogService) -> Dict[str, bool]:
"""
Register all features' RBAC objects in the catalog.
Also registers system-level RBAC objects.
"""
mainModules = loadFeatureMainModules()
results = {}
# Register system-level RBAC objects first
try:
from modules.system.mainSystem import registerFeature as registerSystemFeature
success = registerSystemFeature(catalogService)
results["system"] = success
if success:
logger.info("Registered RBAC objects: system")
except ImportError as e:
logger.warning(f"System module not found, skipping system RBAC registration: {e}")
except Exception as e:
logger.error(f"Error registering system RBAC objects: {e}")
results["system"] = False
# Register feature modules
mainModules = loadFeatureMainModules()
for featureName, module in mainModules.items():
if hasattr(module, "registerFeature"):
try:

View file

@ -0,0 +1,515 @@
# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
System Routes - Navigation and system-level API endpoints.
Navigation API Konzept:
- Single Source of Truth für Navigation im Gateway
- UI rendert nur was es erhält (keine Permission-Logik im UI)
- Keine Icons in API Response - UI mappt selbst via uiComponent
- Blocks statt Sections mit order auf allen Ebenen
"""
import logging
from typing import Dict, List, Any, Optional
from fastapi import APIRouter, Depends, Request, Query
from slowapi import Limiter
from slowapi.util import get_remote_address
from modules.auth.authentication import getRequestContext, RequestContext
from modules.system.mainSystem import NAVIGATION_SECTIONS, _objectKeyToUiComponent
from modules.interfaces.interfaceDbApp import getRootInterface
from modules.interfaces.interfaceFeatures import getFeatureInterface
from modules.datamodels.datamodelRbac import AccessRule, AccessRuleContext
from modules.datamodels.datamodelMembership import UserMandate, UserMandateRole, FeatureAccess, FeatureAccessRole
logger = logging.getLogger(__name__)
limiter = Limiter(key_func=get_remote_address)
# Main system router (for other system endpoints if needed)
router = APIRouter(prefix="/api/system", tags=["System"])
# Navigation router at /api/navigation (gemäss Navigation-API-Konzept)
navigationRouter = APIRouter(prefix="/api", tags=["Navigation"])
def _getUserRoleIds(userId: str) -> List[str]:
"""Get all role IDs for a user across all their mandates."""
rootInterface = getRootInterface()
roleIds = []
userMandates = rootInterface.db.getRecordset(
UserMandate,
recordFilter={"userId": userId, "enabled": True}
)
for um in userMandates:
mandateRoleIds = rootInterface.getRoleIdsForUserMandate(um.get("id"))
for rid in mandateRoleIds:
if rid not in roleIds:
roleIds.append(rid)
return roleIds
def _checkUiPermission(roleIds: List[str], objectKey: str) -> bool:
"""Check if any of the given roles has view permission for the UI object."""
if not roleIds:
return False
rootInterface = getRootInterface()
for roleId in roleIds:
# Get UI rules for this role
rules = rootInterface.db.getRecordset(
AccessRule,
recordFilter={"roleId": roleId, "context": "UI"}
)
for rule in rules:
ruleItem = rule.get("item")
ruleView = rule.get("view", False)
if not ruleView:
continue
# Global rule (item=None) grants access to all UI
if ruleItem is None:
return True
# Exact match
if ruleItem == objectKey:
return True
# Wildcard match (e.g., ui.system.* matches ui.system.playground)
if ruleItem.endswith(".*"):
prefix = ruleItem[:-2]
if objectKey.startswith(prefix):
return True
return False
# =============================================================================
# Navigation API (gemäss Navigation-API-Konzept)
# Endpoint: GET /api/navigation
# =============================================================================
def _getFeatureUiObjects(featureCode: str) -> List[Dict[str, Any]]:
"""
Get UI objects for a feature from its main module.
Returns list of UI objects with objectKey, label, meta (including path).
"""
try:
# Dynamic import based on feature code
if featureCode == "trustee":
from modules.features.trustee.mainTrustee import UI_OBJECTS
return UI_OBJECTS
elif featureCode == "realestate":
from modules.features.realestate.mainRealEstate import UI_OBJECTS
return UI_OBJECTS
else:
logger.warning(f"Unknown feature code: {featureCode}")
return []
except ImportError as e:
logger.error(f"Failed to import UI_OBJECTS for feature {featureCode}: {e}")
return []
def _buildDynamicBlock(
userId: str,
language: str,
isSysAdmin: bool
) -> Optional[Dict[str, Any]]:
"""
Build the dynamic features block with mandates, features, and instances.
Returns None if user has no feature instances.
"""
try:
rootInterface = getRootInterface()
featureInterface = getFeatureInterface(rootInterface.db)
# Get all feature accesses for this user
featureAccesses = rootInterface.getFeatureAccessesForUser(userId)
if not featureAccesses:
return None
# Build hierarchical structure: mandate -> feature -> instances
mandatesMap: Dict[str, Dict[str, Any]] = {}
featuresMap: Dict[str, Dict[str, Any]] = {} # key: mandateId_featureCode
mandateOrder = 10
for access in featureAccesses:
if not access.enabled:
continue
instance = featureInterface.getFeatureInstance(str(access.featureInstanceId))
if not instance or not instance.enabled:
continue
# Get mandate info
mandateId = str(instance.mandateId)
if mandateId not in mandatesMap:
mandate = rootInterface.getMandate(mandateId)
mandateName = mandate.name if mandate and hasattr(mandate, 'name') else mandateId
mandatesMap[mandateId] = {
"id": mandateId,
"uiLabel": mandateName,
"order": mandateOrder,
"features": []
}
mandateOrder += 10
# Get feature info
featureKey = f"{mandateId}_{instance.featureCode}"
if featureKey not in featuresMap:
feature = featureInterface.getFeature(instance.featureCode)
# Handle featureLabel - could be a dict or a Pydantic model (TextMultilingual)
if feature and hasattr(feature, 'label'):
featureLabel = feature.label
# Convert Pydantic model to dict if needed
if hasattr(featureLabel, 'model_dump'):
featureLabel = featureLabel.model_dump()
elif hasattr(featureLabel, 'dict'):
featureLabel = featureLabel.dict()
elif not isinstance(featureLabel, dict):
# Fallback: try to access as attributes
featureLabel = {"de": getattr(featureLabel, 'de', instance.featureCode), "en": getattr(featureLabel, 'en', instance.featureCode)}
else:
featureLabel = {"de": instance.featureCode, "en": instance.featureCode}
featuresMap[featureKey] = {
"uiComponent": f"feature.{instance.featureCode}",
"uiLabel": featureLabel.get(language, featureLabel.get("en", instance.featureCode)),
"order": 10,
"instances": [],
"_mandateId": mandateId,
"_featureCode": instance.featureCode
}
# Get user's permissions for this instance to filter views
permissions = _getInstanceViewPermissions(rootInterface, userId, str(instance.id), isSysAdmin)
# Get feature UI objects to build views
featureUiObjects = _getFeatureUiObjects(instance.featureCode)
# Build views for this instance
views = []
viewOrder = 10
for uiObj in featureUiObjects:
objectKey = uiObj.get("objectKey", "")
# Extract view name from objectKey for path building
viewName = objectKey.split(".")[-1] if objectKey else ""
# Check permission using full objectKey (as per Navigation-API-Konzept)
if not isSysAdmin and not permissions.get("_all") and not permissions.get(objectKey, False):
continue
# Skip admin-only views for non-admins
meta = uiObj.get("meta", {})
if meta.get("admin_only") and not isSysAdmin and not permissions.get("isAdmin", False):
continue
# Build path for this view
viewPath = f"/mandates/{mandateId}/{instance.featureCode}/{instance.id}/{viewName}"
# Get label in requested language
label = uiObj.get("label", {})
uiLabel = label.get(language, label.get("en", viewName))
views.append({
"uiComponent": f"page.feature.{instance.featureCode}.{viewName}",
"uiLabel": uiLabel,
"uiPath": viewPath,
"order": viewOrder,
"objectKey": objectKey
})
viewOrder += 10
# Sort views by order
views.sort(key=lambda v: v["order"])
# Add instance to feature
featuresMap[featureKey]["instances"].append({
"id": str(instance.id),
"uiLabel": instance.label,
"order": 10,
"views": views
})
# Build final structure
for featureKey, featureData in featuresMap.items():
mandateId = featureData.pop("_mandateId")
featureData.pop("_featureCode")
mandatesMap[mandateId]["features"].append(featureData)
# Sort features within each mandate
for mandate in mandatesMap.values():
mandate["features"].sort(key=lambda f: f["order"])
# Convert to list and sort by order
mandatesList = list(mandatesMap.values())
mandatesList.sort(key=lambda m: m["order"])
if not mandatesList:
return None
return {
"type": "dynamic",
"id": "features",
"title": "MEINE FEATURES",
"order": 15, # Between system (10) and workflows (20)
"mandates": mandatesList
}
except Exception as e:
logger.error(f"Error building dynamic block: {e}")
return None
def _getInstanceViewPermissions(
rootInterface,
userId: str,
instanceId: str,
isSysAdmin: bool
) -> Dict[str, Any]:
"""
Get view permissions for a user in a feature instance.
Returns dict with view names as keys and True/False as values.
Also includes "_all" if user has global view access.
"""
if isSysAdmin:
return {"_all": True, "isAdmin": True}
permissions = {"_all": False, "isAdmin": False}
try:
from modules.datamodels.datamodelRbac import AccessRule, AccessRuleContext, Role
# Get FeatureAccess for this user and instance
featureAccesses = rootInterface.db.getRecordset(
FeatureAccess,
recordFilter={"userId": userId, "featureInstanceId": instanceId}
)
if not featureAccesses:
return permissions
# Get role IDs via FeatureAccessRole junction table
featureAccessId = featureAccesses[0].get("id")
featureAccessRoles = rootInterface.db.getRecordset(
FeatureAccessRole,
recordFilter={"featureAccessId": featureAccessId}
)
roleIds = [far.get("roleId") for far in featureAccessRoles]
if not roleIds:
return permissions
# Check if user has admin role
for roleId in roleIds:
roles = rootInterface.db.getRecordset(Role, recordFilter={"id": roleId})
if roles:
roleLabel = roles[0].get("roleLabel", "").lower()
if "admin" in roleLabel:
permissions["isAdmin"] = True
break
# Get UI permissions from AccessRules
# Permissions are stored with full objectKey (e.g., ui.feature.trustee.dashboard)
for roleId in roleIds:
accessRules = rootInterface.db.getRecordset(
AccessRule,
recordFilter={"roleId": roleId, "context": "UI"}
)
logger.debug(f"_getInstanceViewPermissions: roleId={roleId}, UI rules count={len(accessRules)}")
for rule in accessRules:
if not rule.get("view", False):
continue
item = rule.get("item")
logger.debug(f"_getInstanceViewPermissions: rule item={item}, view={rule.get('view')}")
if item is None:
# item=None means all views
permissions["_all"] = True
else:
# Store full objectKey as per Navigation-API-Konzept
permissions[item] = True
logger.debug(f"_getInstanceViewPermissions: final permissions={permissions}")
return permissions
except Exception as e:
logger.debug(f"Error getting instance view permissions: {e}")
return permissions
def _buildStaticBlocks(
language: str,
isSysAdmin: bool,
roleIds: List[str],
hasGlobalPermission: bool
) -> List[Dict[str, Any]]:
"""
Build static navigation blocks from NAVIGATION_SECTIONS.
Returns list of blocks with items filtered by permissions.
"""
blocks = []
for section in NAVIGATION_SECTIONS:
# Skip admin-only sections for non-admins
if section.get("adminOnly") and not isSysAdmin:
continue
# Filter items based on permissions
filteredItems = []
for item in section.get("items", []):
# Skip admin-only items for non-admins
if item.get("adminOnly") and not isSysAdmin:
continue
# Public items are always visible
if item.get("public"):
filteredItems.append(_formatBlockItem(item, language))
continue
# SysAdmin sees everything
if isSysAdmin:
filteredItems.append(_formatBlockItem(item, language))
continue
# Check permission for this item
if hasGlobalPermission or _checkUiPermission(roleIds, item["objectKey"]):
filteredItems.append(_formatBlockItem(item, language))
# Only include section if it has visible items
if filteredItems:
# Sort items by order
filteredItems.sort(key=lambda i: i["order"])
blocks.append({
"type": "static",
"id": section["id"],
"title": section["title"].get(language, section["title"].get("en", section["id"])),
"order": section.get("order", 50),
"items": filteredItems,
})
return blocks
def _formatBlockItem(item: Dict[str, Any], language: str) -> Dict[str, Any]:
"""
Format a navigation item for the new API response.
Uses new field names: uiComponent, uiLabel, uiPath
Does NOT include icon (UI maps via uiComponent)
"""
objectKey = item["objectKey"]
uiComponent = _objectKeyToUiComponent(objectKey)
return {
"uiComponent": uiComponent,
"uiLabel": item["label"].get(language, item["label"].get("en", item["id"])),
"uiPath": item["path"],
"order": item.get("order", 50),
"objectKey": objectKey,
}
@navigationRouter.get("/navigation")
@limiter.limit("60/minute")
async def get_navigation(
request: Request,
language: str = Query("de", description="Language for labels (en, de, fr)"),
reqContext: RequestContext = Depends(getRequestContext)
) -> Dict[str, Any]:
"""
Get unified navigation structure with blocks.
Single Source of Truth für Navigation - UI rendert nur was es erhält.
Endpoint: GET /api/navigation
Block order:
- System (10)
- Dynamic/Features (15) - only if user has feature instances
- Workflows (20)
- Basisdaten (30)
- Migrate (40)
- Administration (200)
Response format:
{
"language": "de",
"blocks": [
{
"type": "static",
"id": "system",
"title": "SYSTEM",
"order": 10,
"items": [
{
"uiComponent": "page.system.home",
"uiLabel": "Übersicht",
"uiPath": "/",
"order": 10,
"objectKey": "ui.system.home"
}
]
},
{
"type": "dynamic",
"id": "features",
"title": "MEINE FEATURES",
"order": 15,
"mandates": [...]
}
]
}
"""
try:
isSysAdmin = reqContext.isSysAdmin
userId = str(reqContext.user.id) if reqContext.user else None
# Get user's role IDs for permission checking
roleIds = []
if userId and not isSysAdmin:
roleIds = _getUserRoleIds(userId)
# Check if user has global UI permission
hasGlobalPermission = isSysAdmin
if not hasGlobalPermission and roleIds:
hasGlobalPermission = _checkUiPermission(roleIds, "_global_check")
# Build static blocks from NAVIGATION_SECTIONS
blocks = _buildStaticBlocks(language, isSysAdmin, roleIds, hasGlobalPermission)
# Build dynamic block (features) if user has feature instances
if userId:
dynamicBlock = _buildDynamicBlock(userId, language, isSysAdmin)
if dynamicBlock:
blocks.append(dynamicBlock)
# Sort all blocks by order
blocks.sort(key=lambda b: b["order"])
return {
"language": language,
"blocks": blocks,
}
except Exception as e:
logger.error(f"Error getting navigation: {e}")
return {
"language": language,
"blocks": [],
"error": str(e),
}

View file

@ -0,0 +1,183 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Migration Script: Migrate AccessRules to Vollqualifizierte ObjectKeys
This script migrates existing AccessRules in the database from short item names
(e.g., "dashboard", "positions") to fully qualified ObjectKeys
(e.g., "ui.feature.trustee.dashboard", "ui.feature.trustee.positions").
This is required for the Navigation-API-Konzept implementation.
Usage:
python script_db_migrate_accessrules_objectkeys.py [--dry-run]
Options:
--dry-run Show what would be changed without making actual changes
"""
import sys
import os
import logging
from typing import Dict, List, Any, Optional
# Add parent directory to path for imports
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Mapping of short item names to fully qualified ObjectKeys per feature
MIGRATION_MAP: Dict[str, Dict[str, str]] = {
"trustee": {
# UI items
"dashboard": "ui.feature.trustee.dashboard",
"positions": "ui.feature.trustee.positions",
"documents": "ui.feature.trustee.documents",
"position-documents": "ui.feature.trustee.position-documents",
"instance-roles": "ui.feature.trustee.instance-roles",
# RESOURCE items
"instance-roles.manage": "resource.feature.trustee.instance-roles.manage",
},
"realestate": {
# UI items
"projects": "ui.feature.realestate.projects",
"parcels": "ui.feature.realestate.parcels",
# RESOURCE items
"project.create": "resource.feature.realestate.project.create",
"project.delete": "resource.feature.realestate.project.delete",
},
}
def migrateAccessRules(dryRun: bool = False) -> Dict[str, int]:
"""
Migrate AccessRules from short item names to fully qualified ObjectKeys.
Args:
dryRun: If True, don't make actual changes, just show what would be done
Returns:
Dictionary with migration statistics
"""
from modules.interfaces.interfaceDbApp import getRootInterface
from modules.datamodels.datamodelRbac import Role, AccessRule, AccessRuleContext
stats = {
"total_rules": 0,
"migrated": 0,
"already_qualified": 0,
"skipped_null": 0,
"errors": 0,
}
try:
rootInterface = getRootInterface()
db = rootInterface.db
# Get all AccessRules
allRules = db.getRecordset(AccessRule, recordFilter=None)
stats["total_rules"] = len(allRules)
logger.info(f"Found {len(allRules)} AccessRules to check")
for rule in allRules:
ruleId = rule.get("id")
context = rule.get("context")
item = rule.get("item")
roleId = rule.get("roleId")
# Skip rules without item (wildcard rules)
if item is None:
stats["skipped_null"] += 1
continue
# Skip if already fully qualified
if item.startswith("ui.") or item.startswith("resource.") or item.startswith("data."):
stats["already_qualified"] += 1
continue
# Get the role to determine feature code
roles = db.getRecordset(Role, recordFilter={"id": roleId})
if not roles or len(roles) == 0:
logger.warning(f"Rule {ruleId}: Role {roleId} not found, skipping")
stats["errors"] += 1
continue
role = roles[0]
featureCode = role.get("featureCode")
if not featureCode or featureCode not in MIGRATION_MAP:
logger.debug(f"Rule {ruleId}: Feature '{featureCode}' not in migration map, skipping")
continue
# Lookup the new ObjectKey
featureMap = MIGRATION_MAP[featureCode]
if item not in featureMap:
logger.warning(f"Rule {ruleId}: Item '{item}' not in migration map for feature '{featureCode}'")
stats["errors"] += 1
continue
newItem = featureMap[item]
if dryRun:
logger.info(f"[DRY-RUN] Would migrate rule {ruleId}: '{item}' -> '{newItem}' (feature: {featureCode})")
else:
# Update the rule using recordModify
try:
db.recordModify(AccessRule, ruleId, {"item": newItem})
logger.info(f"Migrated rule {ruleId}: '{item}' -> '{newItem}' (feature: {featureCode})")
except Exception as e:
logger.error(f"Failed to migrate rule {ruleId}: {e}")
stats["errors"] += 1
continue
stats["migrated"] += 1
return stats
except Exception as e:
logger.error(f"Migration failed: {e}", exc_info=True)
raise
def main():
"""Main entry point."""
dryRun = "--dry-run" in sys.argv
forceRun = "--force" in sys.argv
if dryRun:
logger.info("=" * 60)
logger.info("DRY RUN MODE - No changes will be made")
logger.info("=" * 60)
else:
logger.info("=" * 60)
logger.info("LIVE MODE - Changes will be applied to the database")
logger.info("=" * 60)
if not forceRun:
# Confirm before proceeding
confirm = input("Are you sure you want to proceed? (yes/no): ")
if confirm.lower() != "yes":
logger.info("Migration cancelled")
return
try:
stats = migrateAccessRules(dryRun=dryRun)
logger.info("=" * 60)
logger.info("Migration completed!")
logger.info(f" Total rules checked: {stats['total_rules']}")
logger.info(f" Rules migrated: {stats['migrated']}")
logger.info(f" Already qualified: {stats['already_qualified']}")
logger.info(f" Skipped (null item): {stats['skipped_null']}")
logger.info(f" Errors: {stats['errors']}")
logger.info("=" * 60)
except Exception as e:
logger.error(f"Migration failed: {e}")
sys.exit(1)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,96 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Export Script: Generate Access Rules per Role Report
Usage:
python script_export_accessrules.py > output.md
python script_export_accessrules.py --file ../docs/reports/access-rules.md
"""
import sys
import os
from datetime import datetime
# Add parent directory to path for imports
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from modules.interfaces.interfaceDbApp import getRootInterface
from modules.datamodels.datamodelRbac import Role, AccessRule
def main():
rootInterface = getRootInterface()
db = rootInterface.db
# Get all roles and rules
roles = db.getRecordset(Role, recordFilter=None)
rules = db.getRecordset(AccessRule, recordFilter=None)
# Group rules by role
rulesByRole = {}
for rule in rules:
roleId = rule.get('roleId')
if roleId not in rulesByRole:
rulesByRole[roleId] = []
rulesByRole[roleId].append(rule)
# Build markdown
lines = []
lines.append('# Access Rules per Role')
lines.append('')
lines.append(f'Generated: {datetime.now().isoformat()}')
lines.append('')
lines.append(f'Total Roles: {len(roles)}')
lines.append(f'Total Rules: {len(rules)}')
lines.append('')
lines.append('---')
lines.append('')
for role in sorted(roles, key=lambda r: (r.get('featureCode') or '', r.get('code') or '')):
roleId = role.get('id')
roleName = role.get('name') or role.get('code')
featureCode = role.get('featureCode') or 'system'
roleRules = rulesByRole.get(roleId, [])
lines.append(f'## {featureCode} / {roleName}')
lines.append('')
lines.append(f'- **Role ID:** `{roleId}`')
lines.append(f'- **Code:** `{role.get("code")}`')
lines.append(f'- **Feature:** `{featureCode}`')
lines.append(f'- **Rules Count:** {len(roleRules)}')
lines.append('')
if roleRules:
lines.append('| Context | Item | Access |')
lines.append('|---------|------|--------|')
for rule in sorted(roleRules, key=lambda r: (r.get('context') or '', r.get('item') or '')):
ctx = rule.get('context') or '*'
item = rule.get('item') or '*'
access = rule.get('access') or 'allow'
lines.append(f'| {ctx} | `{item}` | {access} |')
lines.append('')
else:
lines.append('*No rules defined*')
lines.append('')
lines.append('---')
lines.append('')
md = '\n'.join(lines)
# Check for --file argument
if '--file' in sys.argv:
idx = sys.argv.index('--file')
if idx + 1 < len(sys.argv):
filePath = sys.argv[idx + 1]
with open(filePath, 'w', encoding='utf-8') as f:
f.write(md)
print(f'Written to {filePath}')
return
# Output to stdout
sys.stdout.reconfigure(encoding='utf-8')
print(md)
if __name__ == "__main__":
main()