From f15ed2e380071c2e060b10eb8d25fbc324755b2f Mon Sep 17 00:00:00 2001
From: patrick-motsch
Date: Sun, 8 Feb 2026 14:00:08 +0100
Subject: [PATCH] fixes
---
.../automation/interfaceFeatureAutomation.py | 48 +++++----
modules/features/automation/mainAutomation.py | 3 +-
.../chatplayground/mainChatplayground.py | 3 +-
modules/features/realEstate/mainRealEstate.py | 3 +-
modules/features/trustee/mainTrustee.py | 3 +-
modules/routes/routeAdminRbacRules.py | 98 +++++++++++++++++++
6 files changed, 135 insertions(+), 23 deletions(-)
diff --git a/modules/features/automation/interfaceFeatureAutomation.py b/modules/features/automation/interfaceFeatureAutomation.py
index 2bbf56e0..f88c3973 100644
--- a/modules/features/automation/interfaceFeatureAutomation.py
+++ b/modules/features/automation/interfaceFeatureAutomation.py
@@ -119,13 +119,12 @@ class AutomationObjects:
def _enrichAutomationsWithUserAndMandate(self, automations: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Batch enrich automations with user names and mandate names for display.
- Uses AppObjects interface to fetch users and mandates with proper access control.
+ Uses direct DB lookup (no RBAC) because this is purely cosmetic enrichment —
+ the user already has RBAC-verified access to the automations themselves.
"""
if not automations:
return automations
- from modules.interfaces.interfaceDbApp import getInterface as getAppInterface
-
# Collect all unique user IDs and mandate IDs
userIds = set()
mandateIds = set()
@@ -139,22 +138,33 @@ class AutomationObjects:
if mandateId:
mandateIds.add(mandateId)
- # Use AppObjects interface to fetch users (respects access control)
- appInterface = getAppInterface(self.currentUser)
- usersMap = {}
- if userIds:
- for userId in userIds:
- user = appInterface.getUser(userId)
- if user:
- usersMap[userId] = user.username or user.email or userId
-
- # Use AppObjects interface to fetch mandates (respects access control)
- mandatesMap = {}
- if mandateIds:
- for mandateId in mandateIds:
- mandate = appInterface.getMandate(mandateId)
- if mandate:
- mandatesMap[mandateId] = mandate.name or mandateId
+ # Use root DB connector for display-only lookups (no RBAC needed)
+ try:
+ from modules.datamodels.datamodelUam import UserInDB, Mandate
+ from modules.security.rootAccess import getRootDbAppConnector
+ dbAppConn = getRootDbAppConnector()
+
+ # Batch fetch user display names
+ usersMap = {}
+ if userIds:
+ for userId in userIds:
+ users = dbAppConn.getRecordset(UserInDB, {"id": userId})
+ if users:
+ user = users[0]
+ fullName = f"{user.get('firstName', '')} {user.get('lastName', '')}".strip()
+ usersMap[userId] = fullName or user.get("email") or user.get("username") or userId
+
+ # Batch fetch mandate display names
+ mandatesMap = {}
+ if mandateIds:
+ for mandateId in mandateIds:
+ mandates = dbAppConn.getRecordset(Mandate, {"id": mandateId})
+ if mandates:
+ mandatesMap[mandateId] = mandates[0].get("name") or mandateId
+ except Exception as e:
+ logger.warning(f"Could not enrich automations with user/mandate names: {e}")
+ usersMap = {}
+ mandatesMap = {}
# Enrich each automation with the fetched data
for automation in automations:
diff --git a/modules/features/automation/mainAutomation.py b/modules/features/automation/mainAutomation.py
index 2b8443a9..924f5bc9 100644
--- a/modules/features/automation/mainAutomation.py
+++ b/modules/features/automation/mainAutomation.py
@@ -266,9 +266,10 @@ def _ensureAccessRulesForRole(rootInterface, roleId: str, ruleTemplates: List[Di
existingRules = rootInterface.getAccessRulesByRole(roleId)
# Create a set of existing rule signatures to avoid duplicates
+ # IMPORTANT: Use .value for enum comparison, not str() which gives "AccessRuleContext.DATA" in Python 3.11+
existingSignatures = set()
for rule in existingRules:
- sig = (str(rule.context) if rule.context else None, rule.item)
+ sig = (rule.context.value if rule.context else None, rule.item)
existingSignatures.add(sig)
createdCount = 0
diff --git a/modules/features/chatplayground/mainChatplayground.py b/modules/features/chatplayground/mainChatplayground.py
index ed0e2868..268ee467 100644
--- a/modules/features/chatplayground/mainChatplayground.py
+++ b/modules/features/chatplayground/mainChatplayground.py
@@ -230,9 +230,10 @@ def _ensureAccessRulesForRole(rootInterface, roleId: str, ruleTemplates: List[Di
existingRules = rootInterface.getAccessRulesByRole(roleId)
# Create a set of existing rule signatures to avoid duplicates
+ # IMPORTANT: Use .value for enum comparison, not str() which gives "AccessRuleContext.DATA" in Python 3.11+
existingSignatures = set()
for rule in existingRules:
- sig = (str(rule.context) if rule.context else None, rule.item)
+ sig = (rule.context.value if rule.context else None, rule.item)
existingSignatures.add(sig)
createdCount = 0
diff --git a/modules/features/realEstate/mainRealEstate.py b/modules/features/realEstate/mainRealEstate.py
index 8562f5b8..0483218d 100644
--- a/modules/features/realEstate/mainRealEstate.py
+++ b/modules/features/realEstate/mainRealEstate.py
@@ -245,7 +245,8 @@ def _ensureAccessRulesForRole(rootInterface, roleId: str, ruleTemplates: list) -
from modules.datamodels.datamodelRbac import AccessRule, AccessRuleContext
existingRules = rootInterface.getAccessRulesByRole(roleId)
- existingSignatures = {(str(r.context) if r.context else None, r.item) for r in existingRules}
+ # IMPORTANT: Use .value for enum comparison, not str() which gives "AccessRuleContext.DATA" in Python 3.11+
+ existingSignatures = {(r.context.value if r.context else None, r.item) for r in existingRules}
createdCount = 0
for template in ruleTemplates or []:
diff --git a/modules/features/trustee/mainTrustee.py b/modules/features/trustee/mainTrustee.py
index ad449d8f..b917f9ad 100644
--- a/modules/features/trustee/mainTrustee.py
+++ b/modules/features/trustee/mainTrustee.py
@@ -394,9 +394,10 @@ def _ensureAccessRulesForRole(rootInterface, roleId: str, ruleTemplates: List[Di
existingRules = rootInterface.getAccessRulesByRole(roleId)
# Create a set of existing rule signatures to avoid duplicates
+ # IMPORTANT: Use .value for enum comparison, not str() which gives "AccessRuleContext.DATA" in Python 3.11+
existingSignatures = set()
for rule in existingRules:
- sig = (str(rule.context) if rule.context else None, rule.item)
+ sig = (rule.context.value if rule.context else None, rule.item)
existingSignatures.add(sig)
createdCount = 0
diff --git a/modules/routes/routeAdminRbacRules.py b/modules/routes/routeAdminRbacRules.py
index fc9b315e..82cc13d7 100644
--- a/modules/routes/routeAdminRbacRules.py
+++ b/modules/routes/routeAdminRbacRules.py
@@ -1192,3 +1192,101 @@ async def getCatalogStats(
status_code=500,
detail=f"Failed to get catalog stats: {str(e)}"
)
+
+
+# =============================================================================
+# CLEANUP: Remove duplicate AccessRules
+# =============================================================================
+
+@router.post("/cleanup/duplicate-rules", response_model=dict)
+@limiter.limit("5/minute")
+async def cleanup_duplicate_access_rules(
+ request: Request,
+ dryRun: bool = Query(True, description="If true, only report duplicates without deleting"),
+ currentUser: User = Depends(requireSysAdmin)
+) -> dict:
+ """
+ Find and remove duplicate AccessRules.
+
+ Duplicates are rules with the same (roleId, context, item) signature.
+ Only the first rule (oldest) is kept, all others are deleted.
+
+ Query Parameters:
+ - dryRun: If true (default), only report what would be deleted. Set to false to actually delete.
+
+ Returns:
+ - Summary with counts and details of duplicates found/removed
+ """
+ try:
+ rootInterface = getRootInterface()
+
+ # Get ALL AccessRules from DB
+ allRules = rootInterface.db.getRecordset(AccessRule)
+
+ # Group by signature (roleId, context, item)
+ rulesBySignature: Dict[tuple, list] = {}
+ for rule in allRules:
+ context = rule.get("context", "")
+ # Normalize context enum value
+ if hasattr(context, 'value'):
+ context = context.value
+ sig = (rule.get("roleId"), str(context), rule.get("item"))
+ if sig not in rulesBySignature:
+ rulesBySignature[sig] = []
+ rulesBySignature[sig].append(rule)
+
+ # Find duplicates and collect IDs to delete
+ duplicateGroups = []
+ idsToDelete = []
+
+ for sig, rules in rulesBySignature.items():
+ if len(rules) > 1:
+ # Sort by creation time (keep oldest)
+ rules.sort(key=lambda r: r.get("_createdAt", 0))
+ keepRule = rules[0]
+ deleteRules = rules[1:]
+
+ duplicateGroups.append({
+ "roleId": sig[0],
+ "context": sig[1],
+ "item": sig[2] or "(global)",
+ "totalCount": len(rules),
+ "keepId": keepRule.get("id"),
+ "deleteCount": len(deleteRules),
+ "deleteIds": [r.get("id") for r in deleteRules]
+ })
+
+ idsToDelete.extend([r.get("id") for r in deleteRules])
+
+ # Perform deletion if not dry run
+ deletedCount = 0
+ if not dryRun and idsToDelete:
+ for ruleId in idsToDelete:
+ try:
+ rootInterface.db.recordDelete(AccessRule, ruleId)
+ deletedCount += 1
+ except Exception as e:
+ logger.warning(f"Failed to delete rule {ruleId}: {e}")
+
+ result = {
+ "dryRun": dryRun,
+ "totalRules": len(allRules),
+ "uniqueSignatures": len(rulesBySignature),
+ "duplicateGroups": len(duplicateGroups),
+ "duplicateRulesToDelete": len(idsToDelete),
+ "deletedCount": deletedCount,
+ "details": duplicateGroups[:50] # Limit details to 50 groups
+ }
+
+ logger.info(f"AccessRule cleanup: dryRun={dryRun}, total={len(allRules)}, "
+ f"duplicateGroups={len(duplicateGroups)}, toDelete={len(idsToDelete)}, "
+ f"deleted={deletedCount}")
+
+ return result
+
+ except Exception as e:
+ logger.error(f"Error during AccessRule cleanup: {str(e)}")
+ raise HTTPException(
+ status_code=500,
+ detail=f"Failed to cleanup duplicate rules: {str(e)}"
+ )