From bc2877bcc16b47fd8ae8a12c14779f11b885823f Mon Sep 17 00:00:00 2001
From: ValueOn AG
Date: Sun, 25 Jan 2026 03:18:22 +0100
Subject: [PATCH] fixed rbac feature rules to override global rules, not to
combine
---
.../trustee/interfaceFeatureTrustee.py | 168 +-----------------
modules/security/rbac.py | 20 ++-
2 files changed, 23 insertions(+), 165 deletions(-)
diff --git a/modules/features/trustee/interfaceFeatureTrustee.py b/modules/features/trustee/interfaceFeatureTrustee.py
index 729793b4..bdea38c1 100644
--- a/modules/features/trustee/interfaceFeatureTrustee.py
+++ b/modules/features/trustee/interfaceFeatureTrustee.py
@@ -663,9 +663,6 @@ class TrusteeObjects:
featureInstanceId=self.featureInstanceId
)
- # Step 2: Feature-level filtering based on trustee.access
- records = self.filterRecordsByTrusteeAccess(records, TrusteeContract)
-
totalItems = len(records)
if params:
pageSize = params.pageSize or 20
@@ -698,10 +695,7 @@ class TrusteeObjects:
mandateId=self.mandateId,
featureInstanceId=self.featureInstanceId
)
-
- # Step 2: Feature-level filtering based on trustee.access
- filtered = self.filterRecordsByTrusteeAccess(records, TrusteeContract)
- return [TrusteeContract(**{k: v for k, v in r.items() if not k.startswith("_")}) for r in filtered]
+ return [TrusteeContract(**{k: v for k, v in r.items() if not k.startswith("_")}) for r in records]
def updateContract(self, contractId: str, data: Dict[str, Any]) -> Optional[TrusteeContract]:
"""Update a contract (organisationId is immutable)."""
@@ -808,10 +802,6 @@ class TrusteeObjects:
featureInstanceId=self.featureInstanceId
)
- # Step 2: Feature-level filtering based on trustee.access
- # This applies userreport filtering (only own records)
- records = self.filterRecordsByTrusteeAccess(records, TrusteeDocument)
-
# Convert dicts to Pydantic objects (remove binary data and internal fields)
pydanticItems = []
for record in records:
@@ -851,11 +841,8 @@ class TrusteeObjects:
featureInstanceId=self.featureInstanceId
)
- # Step 2: Feature-level filtering based on trustee.access
- filtered = self.filterRecordsByTrusteeAccess(records, TrusteeDocument)
-
result = []
- for record in filtered:
+ for record in records:
cleanedRecord = {k: v for k, v in record.items() if not k.startswith("_") and k != "documentData"}
result.append(TrusteeDocument(**cleanedRecord))
return result
@@ -961,10 +948,6 @@ class TrusteeObjects:
featureInstanceId=self.featureInstanceId
)
- # Step 2: Feature-level filtering based on trustee.access
- # This applies userreport filtering (only own records)
- records = self.filterRecordsByTrusteeAccess(records, TrusteePosition)
-
# Convert dicts to Pydantic objects (remove internal fields)
pydanticItems = []
for record in records:
@@ -1003,10 +986,7 @@ class TrusteeObjects:
mandateId=self.mandateId,
featureInstanceId=self.featureInstanceId
)
-
- # Step 2: Feature-level filtering based on trustee.access
- filtered = self.filterRecordsByTrusteeAccess(records, TrusteePosition)
- return [TrusteePosition(**{k: v for k, v in r.items() if not k.startswith("_")}) for r in filtered]
+ return [TrusteePosition(**{k: v for k, v in r.items() if not k.startswith("_")}) for r in records]
def getPositionsByOrganisation(self, organisationId: str) -> List[TrusteePosition]:
"""Get all positions for a specific organisation."""
@@ -1020,10 +1000,7 @@ class TrusteeObjects:
mandateId=self.mandateId,
featureInstanceId=self.featureInstanceId
)
-
- # Step 2: Feature-level filtering based on trustee.access
- filtered = self.filterRecordsByTrusteeAccess(records, TrusteePosition)
- return [TrusteePosition(**{k: v for k, v in r.items() if not k.startswith("_")}) for r in filtered]
+ return [TrusteePosition(**{k: v for k, v in r.items() if not k.startswith("_")}) for r in records]
def updatePosition(self, positionId: str, data: Dict[str, Any]) -> Optional[TrusteePosition]:
"""Update a position.
@@ -1148,10 +1125,6 @@ class TrusteeObjects:
enrichPermissions=True
)
- # Step 2: Feature-level filtering based on trustee.access
- # This applies userreport filtering (only own records)
- records = self.filterRecordsByTrusteeAccess(records, TrusteePositionDocument)
-
totalItems = len(records)
if params:
pageSize = params.pageSize or 20
@@ -1184,10 +1157,7 @@ class TrusteeObjects:
mandateId=self.mandateId,
featureInstanceId=self.featureInstanceId
)
-
- # Step 2: Feature-level filtering based on trustee.access
- filtered = self.filterRecordsByTrusteeAccess(links, TrusteePositionDocument)
- return [TrusteePositionDocument(**{k: v for k, v in r.items() if not k.startswith("_")}) for r in filtered]
+ return [TrusteePositionDocument(**{k: v for k, v in r.items() if not k.startswith("_")}) for r in links]
def getPositionsForDocument(self, documentId: str) -> List[TrusteePositionDocument]:
"""Get all positions linked to a document."""
@@ -1201,10 +1171,7 @@ class TrusteeObjects:
mandateId=self.mandateId,
featureInstanceId=self.featureInstanceId
)
-
- # Step 2: Feature-level filtering based on trustee.access
- filtered = self.filterRecordsByTrusteeAccess(links, TrusteePositionDocument)
- return [TrusteePositionDocument(**{k: v for k, v in r.items() if not k.startswith("_")}) for r in filtered]
+ return [TrusteePositionDocument(**{k: v for k, v in r.items() if not k.startswith("_")}) for r in links]
def deletePositionDocument(self, linkId: str) -> bool:
"""Delete a position-document link.
@@ -1368,126 +1335,3 @@ class TrusteeObjects:
return True
return False
-
- def filterRecordsByTrusteeAccess(
- self,
- records: List[Dict[str, Any]],
- modelClass: type
- ) -> List[Dict[str, Any]]:
- """
- Filter records based on user's trustee.access permissions.
-
- Args:
- records: List of records to filter
- modelClass: The model class for determining filter logic
-
- Returns:
- Filtered list of records
- """
- if not records:
- return records
-
- # Users with ALL access level bypass feature-level filtering
- accessLevel = self.getRbacAccessLevel(modelClass, "read")
- if accessLevel == AccessLevel.ALL:
- return records
-
- # NEW: Feature-instance based access (new system)
- # If featureInstanceId is set, user has access via FeatureAccess system.
- # Data is already filtered by featureInstanceId in getRecordsetWithRBAC.
- # The old TrusteeAccess system (organisation-based) is not used for
- # feature-instance scoped data.
- if self.featureInstanceId:
- return records # User already has access to this feature instance
-
- # LEGACY: TrusteeAccess based filtering (for backwards compatibility)
- # Get all user's access records
- userAccess = self.getAllUserAccess(self.userId)
-
- if not userAccess:
- # No trustee access at all - return empty for trustee tables
- return []
-
- # Build lookup for user's accessible organisations and contracts
- accessByOrg = {} # {orgId: {'roles': [...], 'contracts': [...]}}
- hasFullOrgAccess = {} # {orgId: True} if user has access without contractId restriction
-
- for access in userAccess:
- orgId = access.get("organisationId")
- roleId = access.get("roleId")
- contractIdAccess = access.get("contractId")
-
- if orgId not in accessByOrg:
- accessByOrg[orgId] = {"roles": [], "contracts": []}
-
- if roleId not in accessByOrg[orgId]["roles"]:
- accessByOrg[orgId]["roles"].append(roleId)
-
- if contractIdAccess is None:
- hasFullOrgAccess[orgId] = True
- elif contractIdAccess not in accessByOrg[orgId]["contracts"]:
- accessByOrg[orgId]["contracts"].append(contractIdAccess)
-
- filteredRecords = []
-
- for record in records:
- orgId = record.get("organisationId")
- contractId = record.get("contractId")
- createdBy = record.get("_createdBy")
-
- # For Organisation model, filter by accessible organisations
- if modelClass == TrusteeOrganisation:
- recordOrgId = record.get("id")
- if recordOrgId in accessByOrg:
- filteredRecords.append(record)
- continue
-
- # Check if user has access to this organisation
- if orgId not in accessByOrg:
- continue
-
- roles = accessByOrg[orgId]["roles"]
-
- # admin has full access to organisation
- if "admin" in roles:
- # Check contract filtering
- if contractId:
- if hasFullOrgAccess.get(orgId) or contractId in accessByOrg[orgId]["contracts"]:
- filteredRecords.append(record)
- else:
- filteredRecords.append(record)
- continue
-
- # operate has full access to organisation data
- if "operate" in roles:
- if contractId:
- if hasFullOrgAccess.get(orgId) or contractId in accessByOrg[orgId]["contracts"]:
- filteredRecords.append(record)
- else:
- filteredRecords.append(record)
- continue
-
- # userreport can only see own records for documents/positions
- if "userreport" in roles:
- if modelClass in (TrusteeDocument, TrusteePosition, TrusteePositionDocument):
- # Must be own record
- if createdBy == self.userId:
- # Also check contract access
- if contractId:
- if hasFullOrgAccess.get(orgId) or contractId in accessByOrg[orgId]["contracts"]:
- filteredRecords.append(record)
- else:
- filteredRecords.append(record)
- elif modelClass == TrusteeContract:
- # Can read contracts in their organisation
- if contractId:
- if hasFullOrgAccess.get(orgId) or contractId in accessByOrg[orgId]["contracts"]:
- filteredRecords.append(record)
- else:
- # For contracts table, check if record's id is in accessible contracts
- recordContractId = record.get("id")
- if hasFullOrgAccess.get(orgId) or recordContractId in accessByOrg[orgId]["contracts"]:
- filteredRecords.append(record)
- continue
-
- return filteredRecords
diff --git a/modules/security/rbac.py b/modules/security/rbac.py
index f236852a..d35120cb 100644
--- a/modules/security/rbac.py
+++ b/modules/security/rbac.py
@@ -111,14 +111,22 @@ class RbacClass:
if roleId not in rolePermissions or priority > rolePermissions[roleId][0]:
rolePermissions[roleId] = (priority, rule)
- # Combine permissions across roles using opening (union) logic
+ # Find highest priority among matching rules
+ highestPriority = max((p for p, _ in rolePermissions.values()), default=0)
+
+ # Combine permissions ONLY from rules with highest priority
+ # This ensures instance-specific rules (Priority 3) override global rules (Priority 1)
for roleId, (priority, rule) in rolePermissions.items():
+ # Only use rules with highest priority
+ if priority < highestPriority:
+ continue
+
# View: union logic - if ANY role has view=true, then view=true
if rule.view:
permissions.view = True
if context == AccessRuleContext.DATA:
- # For DATA context, use most permissive access level across roles
+ # For DATA context, use most permissive access level across roles at same priority
if rule.read and self._isMorePermissive(rule.read, permissions.read):
permissions.read = rule.read
if rule.create and self._isMorePermissive(rule.create, permissions.create):
@@ -375,7 +383,8 @@ class RbacClass:
Args:
rule: Access rule to check
- item: Item to match against
+ item: Item to match against (can be short name like "TrusteePosition" or
+ fully qualified like "data.feature.trustee.TrusteePosition")
Returns:
True if rule matches item
@@ -396,6 +405,11 @@ class RbacClass:
if item.startswith(rule.item + "."):
return True
+ # Suffix match: rule.item ends with ".{item}" (e.g., "data.feature.trustee.TrusteePosition" matches "TrusteePosition")
+ # This allows short table names to match fully qualified objectKeys
+ if rule.item.endswith("." + item):
+ return True
+
return False
def findMostSpecificRule(self, rules: List[AccessRule], item: str) -> Optional[AccessRule]: