From 013de9e2202a3224e4d5e4d6f81ef60caf9b064e Mon Sep 17 00:00:00 2001
From: ValueOn AG
Date: Tue, 13 Jan 2026 23:16:49 +0100
Subject: [PATCH] fixed trustee access
---
.../dynamicOptions/mainDynamicOptions.py | 99 +++
.../interfaces/interfaceDbTrusteeObjects.py | 755 +++++++++++++++---
modules/routes/routeDataTrustee.py | 3 +
modules/routes/routeOptions.py | 3 +
modules/services/__init__.py | 3 +
5 files changed, 734 insertions(+), 129 deletions(-)
diff --git a/modules/features/dynamicOptions/mainDynamicOptions.py b/modules/features/dynamicOptions/mainDynamicOptions.py
index 364fa9da..b75e906b 100644
--- a/modules/features/dynamicOptions/mainDynamicOptions.py
+++ b/modules/features/dynamicOptions/mainDynamicOptions.py
@@ -53,7 +53,9 @@ def getOptions(optionsName: str, services, currentUser: Optional[User] = None) -
Raises:
ValueError: If optionsName is not recognized
"""
+ logger.debug(f"getOptions called with optionsName='{optionsName}' (repr: {repr(optionsName)})")
optionsNameLower = optionsName.lower()
+ logger.debug(f"optionsNameLower='{optionsNameLower}'")
if optionsNameLower == "user.role":
# Fetch roles from database
@@ -118,7 +120,100 @@ def getOptions(optionsName: str, services, currentUser: Optional[User] = None) -
logger.error(f"Error fetching user connections for options: {e}")
return []
+ elif optionsNameLower in ("user", "users"):
+ # Dynamic options: Get all users for the current mandate
+ if not currentUser:
+ return []
+
+ try:
+ users = services.interfaceDbApp.getUsersByMandate(currentUser.mandateId)
+
+ # Handle both list and PaginatedResult
+ if hasattr(users, 'items'):
+ userList = users.items
+ else:
+ userList = users
+
+ return [
+ {
+ "value": user.id,
+ "label": user.fullName or user.username or user.email or user.id
+ }
+ for user in userList
+ ]
+ except Exception as e:
+ logger.error(f"Error fetching users for options: {e}")
+ return []
+
+ elif optionsNameLower in ("trusteeorganisation", "trustee.organisation"):
+ # Dynamic options: Get all trustee organisations
+ if not currentUser:
+ return []
+
+ try:
+ result = services.interfaceDbTrustee.getAllOrganisations()
+
+ # Handle PaginatedResult
+ items = result.items if hasattr(result, 'items') else result
+
+ return [
+ {
+ "value": org.get("id") if isinstance(org, dict) else org.id,
+ "label": org.get("label") if isinstance(org, dict) else org.label
+ }
+ for org in items
+ ]
+ except Exception as e:
+ logger.error(f"Error fetching trustee organisations for options: {e}")
+ return []
+
+ elif optionsNameLower in ("trusteerole", "trustee.role"):
+ # Dynamic options: Get all trustee roles
+ if not currentUser:
+ return []
+
+ try:
+ result = services.interfaceDbTrustee.getAllRoles()
+
+ # Handle PaginatedResult
+ items = result.items if hasattr(result, 'items') else result
+
+ return [
+ {
+ "value": role.get("id") if isinstance(role, dict) else role.id,
+ # TrusteeRole uses 'desc' field, not 'label'
+ "label": role.get("desc", role.get("id")) if isinstance(role, dict) else getattr(role, "desc", role.id)
+ }
+ for role in items
+ ]
+ except Exception as e:
+ logger.error(f"Error fetching trustee roles for options: {e}")
+ return []
+
+ elif optionsNameLower in ("trusteecontract", "trustee.contract"):
+ # Dynamic options: Get all trustee contracts
+ if not currentUser:
+ return []
+
+ try:
+ result = services.interfaceDbTrustee.getAllContracts()
+
+ # Handle PaginatedResult
+ items = result.items if hasattr(result, 'items') else result
+
+ return [
+ {
+ "value": contract.get("id") if isinstance(contract, dict) else contract.id,
+ "label": contract.get("label") if isinstance(contract, dict) else (contract.get("name") if isinstance(contract, dict) else getattr(contract, "label", getattr(contract, "name", contract.id)))
+ }
+ for contract in items
+ ]
+ except Exception as e:
+ logger.error(f"Error fetching trustee contracts for options: {e}")
+ return []
+
else:
+ logger.error(f"Unknown options name: '{optionsName}' (lower: '{optionsNameLower}')")
raise ValueError(f"Unknown options name: {optionsName}")
@@ -134,5 +229,9 @@ def getAvailableOptionsNames() -> List[str]:
"auth.authority",
"connection.status",
"user.connection",
+ "User",
+ "TrusteeOrganisation",
+ "TrusteeRole",
+ "TrusteeContract",
]
diff --git a/modules/interfaces/interfaceDbTrusteeObjects.py b/modules/interfaces/interfaceDbTrusteeObjects.py
index 2cfa517e..54fc25c7 100644
--- a/modules/interfaces/interfaceDbTrusteeObjects.py
+++ b/modules/interfaces/interfaceDbTrusteeObjects.py
@@ -151,6 +151,27 @@ class TrusteeObjects:
return True
+ def getRbacAccessLevel(self, modelClass: type, operation: str) -> AccessLevel:
+ """Get the RBAC access level for a specific operation on a table.
+
+ Returns:
+ AccessLevel (ALL, GROUP, MY, NONE) - determines what filtering is needed
+ """
+ if not self.rbac or not self.currentUser:
+ return AccessLevel.NONE
+
+ tableName = modelClass.__name__
+ permissions = self.rbac.getUserPermissions(
+ self.currentUser,
+ AccessRuleContext.DATA,
+ tableName
+ )
+
+ if not permissions.view:
+ return AccessLevel.NONE
+
+ return getattr(permissions, operation, AccessLevel.NONE)
+
# ===== Organisation CRUD =====
def createOrganisation(self, data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
@@ -173,17 +194,29 @@ class TrusteeObjects:
logger.error(f"Invalid organisation ID format: {orgId}")
return None
- success = self.db.saveRecord(TrusteeOrganisation, orgId, data)
- if success:
- return self.db.getRecord(TrusteeOrganisation, orgId)
+ createdRecord = self.db.recordCreate(TrusteeOrganisation, data)
+ if createdRecord and createdRecord.get("id"):
+ return createdRecord
return None
def getOrganisation(self, orgId: str) -> Optional[Dict[str, Any]]:
"""Get a single organisation by ID."""
- return self.db.getRecord(TrusteeOrganisation, orgId)
+ records = self.db.getRecordset(TrusteeOrganisation, recordFilter={"id": orgId})
+ return records[0] if records else None
def getAllOrganisations(self, params: Optional[PaginationParams] = None) -> PaginatedResult:
- """Get all organisations with RBAC filtering."""
+ """Get all organisations with RBAC filtering.
+
+ Note: Organisations are managed at system level (by mandate).
+ Feature-level filtering (trustee.access) is NOT applied here because:
+ - Admin users with system RBAC can manage all orgs in their mandate
+ - trustee.access grants access to specific orgs for other users
+ - New organisations wouldn't be visible without an access record
+ """
+ # Debug: Log user info and permissions
+ logger.debug(f"getAllOrganisations called for user {self.userId}, roles: {self.currentUser.roleLabels if self.currentUser else 'None'}, mandateId: {self.mandateId}")
+
+ # System RBAC filtering (filters by mandate for GROUP access level)
records = getRecordsetWithRBAC(
connector=self.db,
modelClass=TrusteeOrganisation,
@@ -191,6 +224,7 @@ class TrusteeObjects:
recordFilter=None,
orderBy="id"
)
+ logger.debug(f"getAllOrganisations: getRecordsetWithRBAC returned {len(records)} records")
# Apply pagination
totalItems = len(records)
@@ -225,10 +259,8 @@ class TrusteeObjects:
return None
data["id"] = orgId
- success = self.db.saveRecord(TrusteeOrganisation, orgId, data)
- if success:
- return self.db.getRecord(TrusteeOrganisation, orgId)
- return None
+ updatedRecord = self.db.recordModify(TrusteeOrganisation, orgId, data)
+ return updatedRecord
def deleteOrganisation(self, orgId: str) -> bool:
"""Delete an organisation."""
@@ -236,7 +268,7 @@ class TrusteeObjects:
logger.warning(f"User {self.userId} lacks permission to delete organisation")
return False
- return self.db.deleteRecord(TrusteeOrganisation, orgId)
+ return self.db.recordDelete(TrusteeOrganisation, orgId)
# ===== Role CRUD =====
@@ -253,17 +285,23 @@ class TrusteeObjects:
logger.error("Role ID is required")
return None
- success = self.db.saveRecord(TrusteeRole, roleId, data)
- if success:
- return self.db.getRecord(TrusteeRole, roleId)
+ createdRecord = self.db.recordCreate(TrusteeRole, data)
+ if createdRecord and createdRecord.get("id"):
+ return createdRecord
return None
def getRole(self, roleId: str) -> Optional[Dict[str, Any]]:
"""Get a single role by ID."""
- return self.db.getRecord(TrusteeRole, roleId)
+ records = self.db.getRecordset(TrusteeRole, recordFilter={"id": roleId})
+ return records[0] if records else None
def getAllRoles(self, params: Optional[PaginationParams] = None) -> PaginatedResult:
- """Get all roles with RBAC filtering."""
+ """Get all roles with RBAC filtering.
+
+ Note: Roles are available to all users with trustee access.
+ They are not filtered by organisation since they define the role types.
+ Users with ALL access level see all roles; others need trustee.access records.
+ """
records = getRecordsetWithRBAC(
connector=self.db,
modelClass=TrusteeRole,
@@ -272,6 +310,14 @@ class TrusteeObjects:
orderBy="id"
)
+ # Users with ALL access level (from system RBAC) see all roles
+ # Others need at least one trustee.access record
+ accessLevel = self.getRbacAccessLevel(TrusteeRole, "read")
+ if accessLevel != AccessLevel.ALL:
+ userAccess = self.getAllUserAccess(self.userId)
+ if not userAccess:
+ records = [] # No trustee access at all
+
totalItems = len(records)
if params:
pageSize = params.pageSize or 20
@@ -299,10 +345,8 @@ class TrusteeObjects:
return None
data["id"] = roleId
- success = self.db.saveRecord(TrusteeRole, roleId, data)
- if success:
- return self.db.getRecord(TrusteeRole, roleId)
- return None
+ updatedRecord = self.db.recordModify(TrusteeRole, roleId, data)
+ return updatedRecord
def deleteRole(self, roleId: str) -> bool:
"""Delete a role (sysadmin only, not if in use)."""
@@ -311,19 +355,30 @@ class TrusteeObjects:
return False
# Check if role is in use
- accessRecords = self.db.getRecordset(TrusteeAccess, {"roleId": roleId})
+ accessRecords = self.db.getRecordset(TrusteeAccess, recordFilter={"roleId": roleId})
if accessRecords:
logger.error(f"Cannot delete role {roleId}: still in use")
return False
- return self.db.deleteRecord(TrusteeRole, roleId)
+ return self.db.recordDelete(TrusteeRole, roleId)
# ===== Access CRUD =====
def createAccess(self, data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
- """Create a new access record."""
+ """Create a new access record. Requires admin role for the organisation or ALL access level."""
+ # Check system RBAC first
if not self.checkRbacPermission(TrusteeAccess, "create"):
- logger.warning(f"User {self.userId} lacks permission to create access")
+ logger.warning(f"User {self.userId} lacks system permission to create access")
+ return None
+
+ organisationId = data.get("organisationId")
+
+ # Users with ALL access level bypass feature-level permission check
+ accessLevel = self.getRbacAccessLevel(TrusteeAccess, "create")
+
+ # Check feature-level permission - must have admin role for this organisation (unless ALL access)
+ if accessLevel != AccessLevel.ALL and not self.checkUserTrusteePermission(self.userId, organisationId, "admin"):
+ logger.warning(f"User {self.userId} lacks admin role for organisation {organisationId}")
return None
data["mandateId"] = self.mandateId
@@ -332,17 +387,23 @@ class TrusteeObjects:
accessId = data.get("id") or str(uuid.uuid4())
data["id"] = accessId
- success = self.db.saveRecord(TrusteeAccess, accessId, data)
- if success:
- return self.db.getRecord(TrusteeAccess, accessId)
+ createdRecord = self.db.recordCreate(TrusteeAccess, data)
+ if createdRecord and createdRecord.get("id"):
+ return createdRecord
return None
def getAccess(self, accessId: str) -> Optional[Dict[str, Any]]:
"""Get a single access record by ID."""
- return self.db.getRecord(TrusteeAccess, accessId)
+ records = self.db.getRecordset(TrusteeAccess, recordFilter={"id": accessId})
+ return records[0] if records else None
def getAllAccess(self, params: Optional[PaginationParams] = None) -> PaginatedResult:
- """Get all access records with RBAC filtering."""
+ """Get all access records with RBAC filtering + feature-level filtering.
+
+ Users with ALL access level see all access records.
+ Others can only see access records for organisations they have admin access to.
+ """
+ # Step 1: System RBAC filtering
records = getRecordsetWithRBAC(
connector=self.db,
modelClass=TrusteeAccess,
@@ -351,6 +412,25 @@ class TrusteeObjects:
orderBy="id"
)
+ # Users with ALL access level (from system RBAC) see all records
+ accessLevel = self.getRbacAccessLevel(TrusteeAccess, "read")
+
+ if accessLevel != AccessLevel.ALL:
+ # Step 2: Feature-level filtering - only see access for organisations where user is admin
+ userAccess = self.getAllUserAccess(self.userId)
+
+ # Get organisations where user has admin role
+ adminOrgs = set()
+ for access in userAccess:
+ if access.get("roleId") == "admin":
+ adminOrgs.add(access.get("organisationId"))
+
+ # Filter records to only show those in admin organisations
+ if adminOrgs:
+ records = [r for r in records if r.get("organisationId") in adminOrgs]
+ else:
+ records = []
+
totalItems = len(records)
if params:
pageSize = params.pageSize or 20
@@ -372,7 +452,15 @@ class TrusteeObjects:
)
def getAccessByOrganisation(self, organisationId: str) -> List[Dict[str, Any]]:
- """Get all access records for a specific organisation."""
+ """Get all access records for a specific organisation.
+
+ Requires admin role for the organisation.
+ """
+ # Check if user has admin access for this organisation
+ if not self.checkUserTrusteePermission(self.userId, organisationId, "admin"):
+ logger.warning(f"User {self.userId} lacks admin role for organisation {organisationId}")
+ return []
+
return getRecordsetWithRBAC(
connector=self.db,
modelClass=TrusteeAccess,
@@ -382,41 +470,98 @@ class TrusteeObjects:
)
def getAccessByUser(self, userId: str) -> List[Dict[str, Any]]:
- """Get all access records for a specific user."""
- return getRecordsetWithRBAC(
+ """Get all access records for a specific user.
+
+ Users with ALL access level see all access records.
+ Others can only see access records for organisations where they have admin role.
+ """
+ records = getRecordsetWithRBAC(
connector=self.db,
modelClass=TrusteeAccess,
currentUser=self.currentUser,
recordFilter={"userId": userId},
orderBy="id"
)
+
+ # Users with ALL access level (from system RBAC) see all records
+ accessLevel = self.getRbacAccessLevel(TrusteeAccess, "read")
+ if accessLevel == AccessLevel.ALL:
+ return records
+
+ # Filter to only organisations where current user has admin role
+ userAccess = self.getAllUserAccess(self.userId)
+ adminOrgs = set()
+ for access in userAccess:
+ if access.get("roleId") == "admin":
+ adminOrgs.add(access.get("organisationId"))
+
+ return [r for r in records if r.get("organisationId") in adminOrgs]
def updateAccess(self, accessId: str, data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
- """Update an access record."""
+ """Update an access record. Requires admin role for the organisation or ALL access level."""
+ # Check system RBAC first
if not self.checkRbacPermission(TrusteeAccess, "update"):
- logger.warning(f"User {self.userId} lacks permission to update access")
+ logger.warning(f"User {self.userId} lacks system permission to update access")
+ return None
+
+ # Get existing access to check organisation
+ existingRecords = self.db.getRecordset(TrusteeAccess, recordFilter={"id": accessId})
+ existing = existingRecords[0] if existingRecords else None
+
+ if not existing:
+ logger.warning(f"Access record {accessId} not found")
+ return None
+
+ organisationId = existing.get("organisationId")
+
+ # Users with ALL access level bypass feature-level permission check
+ accessLevel = self.getRbacAccessLevel(TrusteeAccess, "update")
+
+ # Check feature-level permission - must have admin role for this organisation (unless ALL access)
+ if accessLevel != AccessLevel.ALL and not self.checkUserTrusteePermission(self.userId, organisationId, "admin"):
+ logger.warning(f"User {self.userId} lacks admin role for organisation {organisationId}")
return None
data["id"] = accessId
- success = self.db.saveRecord(TrusteeAccess, accessId, data)
- if success:
- return self.db.getRecord(TrusteeAccess, accessId)
- return None
+ updatedRecord = self.db.recordModify(TrusteeAccess, accessId, data)
+ return updatedRecord
def deleteAccess(self, accessId: str) -> bool:
- """Delete an access record."""
+ """Delete an access record. Requires admin role for the organisation or ALL access level."""
+ # Check system RBAC first
if not self.checkRbacPermission(TrusteeAccess, "delete"):
- logger.warning(f"User {self.userId} lacks permission to delete access")
+ logger.warning(f"User {self.userId} lacks system permission to delete access")
+ return False
+
+ # Get existing access to check organisation
+ existingRecords = self.db.getRecordset(TrusteeAccess, recordFilter={"id": accessId})
+ existing = existingRecords[0] if existingRecords else None
+
+ if not existing:
+ logger.warning(f"Access record {accessId} not found")
+ return False
+
+ organisationId = existing.get("organisationId")
+
+ # Users with ALL access level bypass feature-level permission check
+ accessLevel = self.getRbacAccessLevel(TrusteeAccess, "delete")
+
+ # Check feature-level permission - must have admin role for this organisation (unless ALL access)
+ if accessLevel != AccessLevel.ALL and not self.checkUserTrusteePermission(self.userId, organisationId, "admin"):
+ logger.warning(f"User {self.userId} lacks admin role for organisation {organisationId}")
return False
- return self.db.deleteRecord(TrusteeAccess, accessId)
+ return self.db.recordDelete(TrusteeAccess, accessId)
# ===== Contract CRUD =====
def createContract(self, data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""Create a new contract."""
- if not self.checkRbacPermission(TrusteeContract, "create"):
- logger.warning(f"User {self.userId} lacks permission to create contract")
+ organisationId = data.get("organisationId")
+
+ # Check combined permission (system RBAC + feature-level)
+ if not self.checkCombinedPermission(TrusteeContract, "create", organisationId):
+ logger.warning(f"User {self.userId} lacks permission to create contract for org {organisationId}")
return None
data["mandateId"] = self.mandateId
@@ -425,17 +570,19 @@ class TrusteeObjects:
contractId = data.get("id") or str(uuid.uuid4())
data["id"] = contractId
- success = self.db.saveRecord(TrusteeContract, contractId, data)
- if success:
- return self.db.getRecord(TrusteeContract, contractId)
+ createdRecord = self.db.recordCreate(TrusteeContract, data)
+ if createdRecord and createdRecord.get("id"):
+ return createdRecord
return None
def getContract(self, contractId: str) -> Optional[Dict[str, Any]]:
"""Get a single contract by ID."""
- return self.db.getRecord(TrusteeContract, contractId)
+ records = self.db.getRecordset(TrusteeContract, recordFilter={"id": contractId})
+ return records[0] if records else None
def getAllContracts(self, params: Optional[PaginationParams] = None) -> PaginatedResult:
- """Get all contracts with RBAC filtering."""
+ """Get all contracts with RBAC filtering + feature-level access filtering."""
+ # Step 1: System RBAC filtering
records = getRecordsetWithRBAC(
connector=self.db,
modelClass=TrusteeContract,
@@ -444,6 +591,9 @@ class TrusteeObjects:
orderBy="id"
)
+ # Step 2: Feature-level filtering based on trustee.access
+ records = self.filterRecordsByTrusteeAccess(records, TrusteeContract)
+
totalItems = len(records)
if params:
pageSize = params.pageSize or 20
@@ -466,47 +616,73 @@ class TrusteeObjects:
def getContractsByOrganisation(self, organisationId: str) -> List[Dict[str, Any]]:
"""Get all contracts for a specific organisation."""
- return getRecordsetWithRBAC(
+ # Step 1: System RBAC filtering
+ records = getRecordsetWithRBAC(
connector=self.db,
modelClass=TrusteeContract,
currentUser=self.currentUser,
recordFilter={"organisationId": organisationId},
orderBy="label"
)
+
+ # Step 2: Feature-level filtering based on trustee.access
+ return self.filterRecordsByTrusteeAccess(records, TrusteeContract)
def updateContract(self, contractId: str, data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""Update a contract (organisationId is immutable)."""
- if not self.checkRbacPermission(TrusteeContract, "update"):
- logger.warning(f"User {self.userId} lacks permission to update contract")
+ # Get existing contract to check organisation
+ existingRecords = self.db.getRecordset(TrusteeContract, recordFilter={"id": contractId})
+ existing = existingRecords[0] if existingRecords else None
+
+ if not existing:
+ logger.warning(f"Contract {contractId} not found")
+ return None
+
+ organisationId = existing.get("organisationId")
+
+ # Check combined permission (system RBAC + feature-level)
+ if not self.checkCombinedPermission(TrusteeContract, "update", organisationId):
+ logger.warning(f"User {self.userId} lacks permission to update contract in org {organisationId}")
return None
- # Check if organisationId is being changed
- existing = self.db.getRecord(TrusteeContract, contractId)
- if existing and "organisationId" in data:
- if data["organisationId"] != existing.get("organisationId"):
- logger.error("Contract organisationId cannot be changed after creation")
- return None
+ # Check if organisationId is being changed (not allowed)
+ if "organisationId" in data and data["organisationId"] != organisationId:
+ logger.error("Contract organisationId cannot be changed after creation")
+ return None
data["id"] = contractId
- success = self.db.saveRecord(TrusteeContract, contractId, data)
- if success:
- return self.db.getRecord(TrusteeContract, contractId)
- return None
+ updatedRecord = self.db.recordModify(TrusteeContract, contractId, data)
+ return updatedRecord
def deleteContract(self, contractId: str) -> bool:
"""Delete a contract."""
- if not self.checkRbacPermission(TrusteeContract, "delete"):
- logger.warning(f"User {self.userId} lacks permission to delete contract")
+ # Get existing contract to check organisation
+ existingRecords = self.db.getRecordset(TrusteeContract, recordFilter={"id": contractId})
+ existing = existingRecords[0] if existingRecords else None
+
+ if not existing:
+ logger.warning(f"Contract {contractId} not found")
+ return False
+
+ organisationId = existing.get("organisationId")
+
+ # Check combined permission (system RBAC + feature-level)
+ if not self.checkCombinedPermission(TrusteeContract, "delete", organisationId):
+ logger.warning(f"User {self.userId} lacks permission to delete contract in org {organisationId}")
return False
- return self.db.deleteRecord(TrusteeContract, contractId)
+ return self.db.recordDelete(TrusteeContract, contractId)
# ===== Document CRUD =====
def createDocument(self, data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""Create a new document."""
- if not self.checkRbacPermission(TrusteeDocument, "create"):
- logger.warning(f"User {self.userId} lacks permission to create document")
+ organisationId = data.get("organisationId")
+ contractId = data.get("contractId")
+
+ # Check combined permission (system RBAC + feature-level)
+ if not self.checkCombinedPermission(TrusteeDocument, "create", organisationId, contractId):
+ logger.warning(f"User {self.userId} lacks permission to create document in org {organisationId}")
return None
data["mandateId"] = self.mandateId
@@ -515,28 +691,34 @@ class TrusteeObjects:
documentId = data.get("id") or str(uuid.uuid4())
data["id"] = documentId
- success = self.db.saveRecord(TrusteeDocument, documentId, data)
- if success:
- return self.db.getRecord(TrusteeDocument, documentId)
+ createdRecord = self.db.recordCreate(TrusteeDocument, data)
+ if createdRecord and createdRecord.get("id"):
+ # Remove binary data from response
+ createdRecord.pop("documentData", None)
+ return createdRecord
return None
def getDocument(self, documentId: str) -> Optional[Dict[str, Any]]:
"""Get a single document by ID (metadata only)."""
- record = self.db.getRecord(TrusteeDocument, documentId)
- if record:
+ records = self.db.getRecordset(TrusteeDocument, recordFilter={"id": documentId})
+ if records:
+ record = records[0]
# Remove binary data from response
record.pop("documentData", None)
- return record
+ return record
+ return None
def getDocumentData(self, documentId: str) -> Optional[bytes]:
"""Get document binary data."""
- record = self.db.getRecord(TrusteeDocument, documentId)
+ records = self.db.getRecordset(TrusteeDocument, recordFilter={"id": documentId})
+ record = records[0] if records else None
if record:
return record.get("documentData")
return None
def getAllDocuments(self, params: Optional[PaginationParams] = None) -> PaginatedResult:
- """Get all documents with RBAC filtering (metadata only)."""
+ """Get all documents with RBAC filtering + feature-level access filtering (metadata only)."""
+ # Step 1: System RBAC filtering
records = getRecordsetWithRBAC(
connector=self.db,
modelClass=TrusteeDocument,
@@ -545,6 +727,10 @@ class TrusteeObjects:
orderBy="documentName"
)
+ # Step 2: Feature-level filtering based on trustee.access
+ # This applies userreport filtering (only own records)
+ records = self.filterRecordsByTrusteeAccess(records, TrusteeDocument)
+
# Remove binary data from responses
for record in records:
record.pop("documentData", None)
@@ -571,6 +757,7 @@ class TrusteeObjects:
def getDocumentsByContract(self, contractId: str) -> List[Dict[str, Any]]:
"""Get all documents for a specific contract."""
+ # Step 1: System RBAC filtering
records = getRecordsetWithRBAC(
connector=self.db,
modelClass=TrusteeDocument,
@@ -578,39 +765,72 @@ class TrusteeObjects:
recordFilter={"contractId": contractId},
orderBy="documentName"
)
+
+ # Step 2: Feature-level filtering based on trustee.access
+ records = self.filterRecordsByTrusteeAccess(records, TrusteeDocument)
+
for record in records:
record.pop("documentData", None)
return records
def updateDocument(self, documentId: str, data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""Update a document."""
- if not self.checkRbacPermission(TrusteeDocument, "update"):
- logger.warning(f"User {self.userId} lacks permission to update document")
+ # Get existing document to check organisation and creator
+ existingRecords = self.db.getRecordset(TrusteeDocument, recordFilter={"id": documentId})
+ existing = existingRecords[0] if existingRecords else None
+
+ if not existing:
+ logger.warning(f"Document {documentId} not found")
+ return None
+
+ organisationId = existing.get("organisationId")
+ contractId = existing.get("contractId")
+ createdBy = existing.get("_createdBy")
+
+ # Check combined permission (system RBAC + feature-level)
+ # For userreport, this checks if they created the record
+ if not self.checkCombinedPermission(TrusteeDocument, "update", organisationId, contractId, createdBy):
+ logger.warning(f"User {self.userId} lacks permission to update document in org {organisationId}")
return None
data["id"] = documentId
- success = self.db.saveRecord(TrusteeDocument, documentId, data)
- if success:
- result = self.db.getRecord(TrusteeDocument, documentId)
- if result:
- result.pop("documentData", None)
- return result
- return None
+ updatedRecord = self.db.recordModify(TrusteeDocument, documentId, data)
+ if updatedRecord:
+ updatedRecord.pop("documentData", None)
+ return updatedRecord
def deleteDocument(self, documentId: str) -> bool:
"""Delete a document."""
- if not self.checkRbacPermission(TrusteeDocument, "delete"):
- logger.warning(f"User {self.userId} lacks permission to delete document")
+ # Get existing document to check organisation and creator
+ existingRecords = self.db.getRecordset(TrusteeDocument, recordFilter={"id": documentId})
+ existing = existingRecords[0] if existingRecords else None
+
+ if not existing:
+ logger.warning(f"Document {documentId} not found")
+ return False
+
+ organisationId = existing.get("organisationId")
+ contractId = existing.get("contractId")
+ createdBy = existing.get("_createdBy")
+
+ # Check combined permission (system RBAC + feature-level)
+ # For userreport, this checks if they created the record
+ if not self.checkCombinedPermission(TrusteeDocument, "delete", organisationId, contractId, createdBy):
+ logger.warning(f"User {self.userId} lacks permission to delete document in org {organisationId}")
return False
- return self.db.deleteRecord(TrusteeDocument, documentId)
+ return self.db.recordDelete(TrusteeDocument, documentId)
# ===== Position CRUD =====
def createPosition(self, data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""Create a new position."""
- if not self.checkRbacPermission(TrusteePosition, "create"):
- logger.warning(f"User {self.userId} lacks permission to create position")
+ organisationId = data.get("organisationId")
+ contractId = data.get("contractId")
+
+ # Check combined permission (system RBAC + feature-level)
+ if not self.checkCombinedPermission(TrusteePosition, "create", organisationId, contractId):
+ logger.warning(f"User {self.userId} lacks permission to create position in org {organisationId}")
return None
data["mandateId"] = self.mandateId
@@ -625,17 +845,19 @@ class TrusteeObjects:
positionId = data.get("id") or str(uuid.uuid4())
data["id"] = positionId
- success = self.db.saveRecord(TrusteePosition, positionId, data)
- if success:
- return self.db.getRecord(TrusteePosition, positionId)
+ createdRecord = self.db.recordCreate(TrusteePosition, data)
+ if createdRecord and createdRecord.get("id"):
+ return createdRecord
return None
def getPosition(self, positionId: str) -> Optional[Dict[str, Any]]:
"""Get a single position by ID."""
- return self.db.getRecord(TrusteePosition, positionId)
+ records = self.db.getRecordset(TrusteePosition, recordFilter={"id": positionId})
+ return records[0] if records else None
def getAllPositions(self, params: Optional[PaginationParams] = None) -> PaginatedResult:
- """Get all positions with RBAC filtering."""
+ """Get all positions with RBAC filtering + feature-level access filtering."""
+ # Step 1: System RBAC filtering
records = getRecordsetWithRBAC(
connector=self.db,
modelClass=TrusteePosition,
@@ -644,6 +866,10 @@ class TrusteeObjects:
orderBy="valuta"
)
+ # Step 2: Feature-level filtering based on trustee.access
+ # This applies userreport filtering (only own records)
+ records = self.filterRecordsByTrusteeAccess(records, TrusteePosition)
+
totalItems = len(records)
if params:
pageSize = params.pageSize or 20
@@ -666,50 +892,88 @@ class TrusteeObjects:
def getPositionsByContract(self, contractId: str) -> List[Dict[str, Any]]:
"""Get all positions for a specific contract."""
- return getRecordsetWithRBAC(
+ # Step 1: System RBAC filtering
+ records = getRecordsetWithRBAC(
connector=self.db,
modelClass=TrusteePosition,
currentUser=self.currentUser,
recordFilter={"contractId": contractId},
orderBy="valuta"
)
+
+ # Step 2: Feature-level filtering based on trustee.access
+ return self.filterRecordsByTrusteeAccess(records, TrusteePosition)
def getPositionsByOrganisation(self, organisationId: str) -> List[Dict[str, Any]]:
"""Get all positions for a specific organisation."""
- return getRecordsetWithRBAC(
+ # Step 1: System RBAC filtering
+ records = getRecordsetWithRBAC(
connector=self.db,
modelClass=TrusteePosition,
currentUser=self.currentUser,
recordFilter={"organisationId": organisationId},
orderBy="valuta"
)
+
+ # Step 2: Feature-level filtering based on trustee.access
+ return self.filterRecordsByTrusteeAccess(records, TrusteePosition)
def updatePosition(self, positionId: str, data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""Update a position."""
- if not self.checkRbacPermission(TrusteePosition, "update"):
- logger.warning(f"User {self.userId} lacks permission to update position")
+ # Get existing position to check organisation and creator
+ existingRecords = self.db.getRecordset(TrusteePosition, recordFilter={"id": positionId})
+ existing = existingRecords[0] if existingRecords else None
+
+ if not existing:
+ logger.warning(f"Position {positionId} not found")
+ return None
+
+ organisationId = existing.get("organisationId")
+ contractId = existing.get("contractId")
+ createdBy = existing.get("_createdBy")
+
+ # Check combined permission (system RBAC + feature-level)
+ # For userreport, this checks if they created the record
+ if not self.checkCombinedPermission(TrusteePosition, "update", organisationId, contractId, createdBy):
+ logger.warning(f"User {self.userId} lacks permission to update position in org {organisationId}")
return None
data["id"] = positionId
- success = self.db.saveRecord(TrusteePosition, positionId, data)
- if success:
- return self.db.getRecord(TrusteePosition, positionId)
- return None
+ updatedRecord = self.db.recordModify(TrusteePosition, positionId, data)
+ return updatedRecord
def deletePosition(self, positionId: str) -> bool:
"""Delete a position."""
- if not self.checkRbacPermission(TrusteePosition, "delete"):
- logger.warning(f"User {self.userId} lacks permission to delete position")
+ # Get existing position to check organisation and creator
+ existingRecords = self.db.getRecordset(TrusteePosition, recordFilter={"id": positionId})
+ existing = existingRecords[0] if existingRecords else None
+
+ if not existing:
+ logger.warning(f"Position {positionId} not found")
+ return False
+
+ organisationId = existing.get("organisationId")
+ contractId = existing.get("contractId")
+ createdBy = existing.get("_createdBy")
+
+ # Check combined permission (system RBAC + feature-level)
+ # For userreport, this checks if they created the record
+ if not self.checkCombinedPermission(TrusteePosition, "delete", organisationId, contractId, createdBy):
+ logger.warning(f"User {self.userId} lacks permission to delete position in org {organisationId}")
return False
- return self.db.deleteRecord(TrusteePosition, positionId)
+ return self.db.recordDelete(TrusteePosition, positionId)
# ===== Position-Document Link CRUD =====
def createPositionDocument(self, data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""Create a new position-document link."""
- if not self.checkRbacPermission(TrusteePositionDocument, "create"):
- logger.warning(f"User {self.userId} lacks permission to create position-document link")
+ organisationId = data.get("organisationId")
+ contractId = data.get("contractId")
+
+ # Check combined permission (system RBAC + feature-level)
+ if not self.checkCombinedPermission(TrusteePositionDocument, "create", organisationId, contractId):
+ logger.warning(f"User {self.userId} lacks permission to create position-document link in org {organisationId}")
return None
data["mandateId"] = self.mandateId
@@ -718,17 +982,19 @@ class TrusteeObjects:
linkId = data.get("id") or str(uuid.uuid4())
data["id"] = linkId
- success = self.db.saveRecord(TrusteePositionDocument, linkId, data)
- if success:
- return self.db.getRecord(TrusteePositionDocument, linkId)
+ createdRecord = self.db.recordCreate(TrusteePositionDocument, data)
+ if createdRecord and createdRecord.get("id"):
+ return createdRecord
return None
def getPositionDocument(self, linkId: str) -> Optional[Dict[str, Any]]:
"""Get a single position-document link by ID."""
- return self.db.getRecord(TrusteePositionDocument, linkId)
+ records = self.db.getRecordset(TrusteePositionDocument, recordFilter={"id": linkId})
+ return records[0] if records else None
def getAllPositionDocuments(self, params: Optional[PaginationParams] = None) -> PaginatedResult:
- """Get all position-document links with RBAC filtering."""
+ """Get all position-document links with RBAC filtering + feature-level access filtering."""
+ # Step 1: System RBAC filtering
records = getRecordsetWithRBAC(
connector=self.db,
modelClass=TrusteePositionDocument,
@@ -737,6 +1003,10 @@ class TrusteeObjects:
orderBy="id"
)
+ # Step 2: Feature-level filtering based on trustee.access
+ # This applies userreport filtering (only own records)
+ records = self.filterRecordsByTrusteeAccess(records, TrusteePositionDocument)
+
totalItems = len(records)
if params:
pageSize = params.pageSize or 20
@@ -759,6 +1029,7 @@ class TrusteeObjects:
def getDocumentsForPosition(self, positionId: str) -> List[Dict[str, Any]]:
"""Get all documents linked to a position."""
+ # Step 1: System RBAC filtering
links = getRecordsetWithRBAC(
connector=self.db,
modelClass=TrusteePositionDocument,
@@ -766,10 +1037,13 @@ class TrusteeObjects:
recordFilter={"positionId": positionId},
orderBy="id"
)
- return links
+
+ # Step 2: Feature-level filtering based on trustee.access
+ return self.filterRecordsByTrusteeAccess(links, TrusteePositionDocument)
def getPositionsForDocument(self, documentId: str) -> List[Dict[str, Any]]:
"""Get all positions linked to a document."""
+ # Step 1: System RBAC filtering
links = getRecordsetWithRBAC(
connector=self.db,
modelClass=TrusteePositionDocument,
@@ -777,15 +1051,31 @@ class TrusteeObjects:
recordFilter={"documentId": documentId},
orderBy="id"
)
- return links
+
+ # Step 2: Feature-level filtering based on trustee.access
+ return self.filterRecordsByTrusteeAccess(links, TrusteePositionDocument)
def deletePositionDocument(self, linkId: str) -> bool:
"""Delete a position-document link."""
- if not self.checkRbacPermission(TrusteePositionDocument, "delete"):
- logger.warning(f"User {self.userId} lacks permission to delete position-document link")
+ # Get existing link to check organisation and creator
+ existingRecords = self.db.getRecordset(TrusteePositionDocument, recordFilter={"id": linkId})
+ existing = existingRecords[0] if existingRecords else None
+
+ if not existing:
+ logger.warning(f"Position-document link {linkId} not found")
+ return False
+
+ organisationId = existing.get("organisationId")
+ contractId = existing.get("contractId")
+ createdBy = existing.get("_createdBy")
+
+ # Check combined permission (system RBAC + feature-level)
+ # For userreport, this checks if they created the record
+ if not self.checkCombinedPermission(TrusteePositionDocument, "delete", organisationId, contractId, createdBy):
+ logger.warning(f"User {self.userId} lacks permission to delete position-document link in org {organisationId}")
return False
- return self.db.deleteRecord(TrusteePositionDocument, linkId)
+ return self.db.recordDelete(TrusteePositionDocument, linkId)
# ===== Trustee-specific Access Check =====
@@ -796,6 +1086,47 @@ class TrusteeObjects:
{"userId": userId, "organisationId": organisationId}
)
+ def getAllUserAccess(self, userId: str) -> List[Dict[str, Any]]:
+ """Get all access records for a user across all organisations."""
+ return self.db.getRecordset(TrusteeAccess, {"userId": userId})
+
+ def getUserTrusteeRoles(self, userId: str, organisationId: str, contractId: Optional[str] = None) -> List[str]:
+ """
+ Get all trustee roles a user has for an organisation (and optionally contract).
+
+ Args:
+ userId: User ID to check
+ organisationId: Organisation ID
+ contractId: Optional contract ID for contract-specific access
+
+ Returns:
+ List of role IDs the user has
+ """
+ accessRecords = self.getUserAccessForOrganisation(userId, organisationId)
+ roles = []
+
+ for access in accessRecords:
+ accessContractId = access.get("contractId")
+ roleId = access.get("roleId")
+
+ # If access has no contractId, it grants access to all contracts
+ if accessContractId is None:
+ if roleId not in roles:
+ roles.append(roleId)
+ # If checking for specific contract, match it
+ elif contractId and accessContractId == contractId:
+ if roleId not in roles:
+ roles.append(roleId)
+ # If no specific contract requested but access is contract-specific,
+ # only include if we're not filtering by contract
+ elif contractId is None:
+ # User has contract-specific access, but we're not filtering
+ # They can see the record but only for their specific contracts
+ if roleId not in roles:
+ roles.append(roleId)
+
+ return roles
+
def checkUserTrusteePermission(
self,
userId: str,
@@ -815,22 +1146,188 @@ class TrusteeObjects:
Returns:
True if user has the required role
"""
- accessRecords = self.getUserAccessForOrganisation(userId, organisationId)
+ roles = self.getUserTrusteeRoles(userId, organisationId, contractId)
+ return requiredRole in roles
+
+ def checkCombinedPermission(
+ self,
+ modelClass: type,
+ operation: str,
+ organisationId: Optional[str] = None,
+ contractId: Optional[str] = None,
+ recordCreatedBy: Optional[str] = None
+ ) -> bool:
+ """
+ Check combined system RBAC + feature-level RBAC permissions.
- for access in accessRecords:
- if access.get("roleId") == requiredRole:
- accessContractId = access.get("contractId")
-
- # If access has no contractId, it grants access to all contracts
- if accessContractId is None:
+ Args:
+ modelClass: The model class (e.g., TrusteeContract)
+ operation: Operation type (read, create, update, delete)
+ organisationId: Optional organisation ID for feature-level check
+ contractId: Optional contract ID for contract-level filtering
+ recordCreatedBy: Optional creator ID for userreport role checks
+
+ Returns:
+ True if user has permission
+ """
+ # Step 1: Check system RBAC and get access level
+ accessLevel = self.getRbacAccessLevel(modelClass, operation)
+ if accessLevel == AccessLevel.NONE:
+ return False
+
+ # Users with ALL access level bypass feature-level checks
+ if accessLevel == AccessLevel.ALL:
+ return True
+
+ # Step 2: If no organisationId, system RBAC is sufficient (for listing all)
+ if organisationId is None:
+ return True
+
+ # Step 3: Check feature-level RBAC via trustee.access
+ roles = self.getUserTrusteeRoles(self.userId, organisationId, contractId)
+
+ if not roles:
+ # No trustee access for this organisation
+ return False
+
+ # Check role-based permissions
+ # admin role: full CRUD for organisation
+ if "admin" in roles:
+ return True
+
+ # operate role: CRUD for contracts, documents, positions
+ if "operate" in roles:
+ if modelClass in (TrusteeContract, TrusteeDocument, TrusteePosition, TrusteePositionDocument):
+ return True
+ # operate can read organisations
+ if modelClass == TrusteeOrganisation and operation == "read":
+ return True
+
+ # userreport role: CRUD own records for documents/positions
+ if "userreport" in roles:
+ if modelClass in (TrusteeDocument, TrusteePosition, TrusteePositionDocument):
+ # For create, always allowed
+ if operation == "create":
return True
-
- # If checking for specific contract, match it
- if contractId and accessContractId == contractId:
+ # For read/update/delete, must be own record
+ if recordCreatedBy == self.userId:
return True
-
- # If no specific contract requested and access is contract-specific, deny
- if contractId is None:
- continue
+ # userreport can read organisations and contracts
+ if modelClass in (TrusteeOrganisation, TrusteeContract) and operation == "read":
+ return True
return False
+
+ def filterRecordsByTrusteeAccess(
+ self,
+ records: List[Dict[str, Any]],
+ modelClass: type
+ ) -> List[Dict[str, Any]]:
+ """
+ Filter records based on user's trustee.access permissions.
+
+ Args:
+ records: List of records to filter
+ modelClass: The model class for determining filter logic
+
+ Returns:
+ Filtered list of records
+ """
+ if not records:
+ return records
+
+ # Users with ALL access level bypass feature-level filtering
+ accessLevel = self.getRbacAccessLevel(modelClass, "read")
+ if accessLevel == AccessLevel.ALL:
+ return records
+
+ # Get all user's access records
+ userAccess = self.getAllUserAccess(self.userId)
+
+ if not userAccess:
+ # No trustee access at all - return empty for trustee tables
+ return []
+
+ # Build lookup for user's accessible organisations and contracts
+ accessByOrg = {} # {orgId: {'roles': [...], 'contracts': [...]}}
+ hasFullOrgAccess = {} # {orgId: True} if user has access without contractId restriction
+
+ for access in userAccess:
+ orgId = access.get("organisationId")
+ roleId = access.get("roleId")
+ contractIdAccess = access.get("contractId")
+
+ if orgId not in accessByOrg:
+ accessByOrg[orgId] = {"roles": [], "contracts": []}
+
+ if roleId not in accessByOrg[orgId]["roles"]:
+ accessByOrg[orgId]["roles"].append(roleId)
+
+ if contractIdAccess is None:
+ hasFullOrgAccess[orgId] = True
+ elif contractIdAccess not in accessByOrg[orgId]["contracts"]:
+ accessByOrg[orgId]["contracts"].append(contractIdAccess)
+
+ filteredRecords = []
+
+ for record in records:
+ orgId = record.get("organisationId")
+ contractId = record.get("contractId")
+ createdBy = record.get("_createdBy")
+
+ # For Organisation model, filter by accessible organisations
+ if modelClass == TrusteeOrganisation:
+ recordOrgId = record.get("id")
+ if recordOrgId in accessByOrg:
+ filteredRecords.append(record)
+ continue
+
+ # Check if user has access to this organisation
+ if orgId not in accessByOrg:
+ continue
+
+ roles = accessByOrg[orgId]["roles"]
+
+ # admin has full access to organisation
+ if "admin" in roles:
+ # Check contract filtering
+ if contractId:
+ if hasFullOrgAccess.get(orgId) or contractId in accessByOrg[orgId]["contracts"]:
+ filteredRecords.append(record)
+ else:
+ filteredRecords.append(record)
+ continue
+
+ # operate has full access to organisation data
+ if "operate" in roles:
+ if contractId:
+ if hasFullOrgAccess.get(orgId) or contractId in accessByOrg[orgId]["contracts"]:
+ filteredRecords.append(record)
+ else:
+ filteredRecords.append(record)
+ continue
+
+ # userreport can only see own records for documents/positions
+ if "userreport" in roles:
+ if modelClass in (TrusteeDocument, TrusteePosition, TrusteePositionDocument):
+ # Must be own record
+ if createdBy == self.userId:
+ # Also check contract access
+ if contractId:
+ if hasFullOrgAccess.get(orgId) or contractId in accessByOrg[orgId]["contracts"]:
+ filteredRecords.append(record)
+ else:
+ filteredRecords.append(record)
+ elif modelClass == TrusteeContract:
+ # Can read contracts in their organisation
+ if contractId:
+ if hasFullOrgAccess.get(orgId) or contractId in accessByOrg[orgId]["contracts"]:
+ filteredRecords.append(record)
+ else:
+ # For contracts table, check if record's id is in accessible contracts
+ recordContractId = record.get("id")
+ if hasFullOrgAccess.get(orgId) or recordContractId in accessByOrg[orgId]["contracts"]:
+ filteredRecords.append(record)
+ continue
+
+ return filteredRecords
diff --git a/modules/routes/routeDataTrustee.py b/modules/routes/routeDataTrustee.py
index f00c3128..bca55df4 100644
--- a/modules/routes/routeDataTrustee.py
+++ b/modules/routes/routeDataTrustee.py
@@ -70,9 +70,12 @@ async def getOrganisations(
currentUser: User = Depends(getCurrentUser)
) -> PaginatedResponse[TrusteeOrganisation]:
"""Get all organisations with optional pagination."""
+ logger = logging.getLogger(__name__)
+ logger.debug(f"getOrganisations called for user {currentUser.id}, roles: {currentUser.roleLabels}")
paginationParams = _parsePagination(pagination)
interface = getInterface(currentUser)
result = interface.getAllOrganisations(paginationParams)
+ logger.debug(f"getOrganisations returned {len(result.items)} items")
if paginationParams:
return PaginatedResponse(
diff --git a/modules/routes/routeOptions.py b/modules/routes/routeOptions.py
index b4086fd2..a2acad76 100644
--- a/modules/routes/routeOptions.py
+++ b/modules/routes/routeOptions.py
@@ -47,10 +47,13 @@ async def getOptionsEndpoint(
- GET /api/options/connection.status
"""
try:
+ logger.debug(f"Options request: {optionsName} for user {currentUser.id}")
services = getServices(currentUser, None)
options = getOptions(optionsName, services, currentUser)
+ logger.debug(f"Options response: {optionsName} returned {len(options)} items")
return options
except ValueError as e:
+ logger.error(f"ValueError for options {optionsName}: {str(e)}")
raise HTTPException(
status_code=400,
detail=str(e)
diff --git a/modules/services/__init__.py b/modules/services/__init__.py
index 32e0cb3f..16d2ed6d 100644
--- a/modules/services/__init__.py
+++ b/modules/services/__init__.py
@@ -57,6 +57,9 @@ class Services:
from modules.interfaces.interfaceDbComponentObjects import getInterface as getComponentInterface
self.interfaceDbComponent = getComponentInterface(user)
+ from modules.interfaces.interfaceDbTrusteeObjects import getInterface as getTrusteeInterface
+ self.interfaceDbTrustee = getTrusteeInterface(user)
+
# Expose RBAC directly on services for convenience
self.rbac = self.interfaceDbApp.rbac if self.interfaceDbApp else None