From 6c8c703115ec4fdeabe969cbeacfb690e7c9bfb3 Mon Sep 17 00:00:00 2001
From: ValueOn AG
Date: Wed, 21 Jan 2026 11:26:19 +0100
Subject: [PATCH] proper junction table handling
---
app.py | 31 +++++----
modules/routes/routeFeatures.py | 117 ++++++++++++++++++++------------
2 files changed, 90 insertions(+), 58 deletions(-)
diff --git a/app.py b/app.py
index b489194d..472de2a1 100644
--- a/app.py
+++ b/app.py
@@ -145,21 +145,24 @@ def initLogging():
def filter(self, record):
if isinstance(record.msg, str):
# Remove only emojis, preserve other Unicode characters like quotes
-
- # Remove emoji characters specifically
- record.msg = "".join(
- char
- for char in record.msg
- if unicodedata.category(char) != "So"
- or not (
- 0x1F600 <= ord(char) <= 0x1F64F
- or 0x1F300 <= ord(char) <= 0x1F5FF
- or 0x1F680 <= ord(char) <= 0x1F6FF
- or 0x1F1E0 <= ord(char) <= 0x1F1FF
- or 0x2600 <= ord(char) <= 0x26FF
- or 0x2700 <= ord(char) <= 0x27BF
+ # Guard against None characters during shutdown
+ try:
+ record.msg = "".join(
+ char
+ for char in record.msg
+ if char is not None and unicodedata.category(char) != "So"
+ or (char is not None and not (
+ 0x1F600 <= ord(char) <= 0x1F64F
+ or 0x1F300 <= ord(char) <= 0x1F5FF
+ or 0x1F680 <= ord(char) <= 0x1F6FF
+ or 0x1F1E0 <= ord(char) <= 0x1F1FF
+ or 0x2600 <= ord(char) <= 0x26FF
+ or 0x2700 <= ord(char) <= 0x27BF
+ ))
)
- )
+ except (TypeError, AttributeError):
+ # Handle edge cases during shutdown
+ pass
return True
# Add filter to normalize problematic unicode (e.g., arrows) to ASCII for terminals like cp1252
diff --git a/modules/routes/routeFeatures.py b/modules/routes/routeFeatures.py
index 43d120f1..0659d919 100644
--- a/modules/routes/routeFeatures.py
+++ b/modules/routes/routeFeatures.py
@@ -199,19 +199,28 @@ async def getMyFeatureInstances(
def _getUserRoleInInstance(rootInterface, userId: str, instanceId: str) -> str:
"""Get the user's primary role label in a feature instance."""
try:
- from modules.datamodels.datamodelRbac import UserRole, Role
+ from modules.datamodels.datamodelRbac import Role
+ from modules.datamodels.datamodelMembership import FeatureAccess, FeatureAccessRole
- # Get user-role assignments for this instance
- userRoles = rootInterface.db.getRecordset(
- UserRole,
- recordFilter={"userId": userId}
+ # Get FeatureAccess for this user and instance
+ featureAccesses = rootInterface.db.getRecordset(
+ FeatureAccess,
+ recordFilter={"userId": userId, "featureInstanceId": instanceId}
)
- for ur in userRoles:
- roleId = ur.get("roleId")
- if roleId:
+ if featureAccesses:
+ featureAccessId = featureAccesses[0].get("id")
+
+ # Get role IDs via FeatureAccessRole junction table
+ featureAccessRoles = rootInterface.db.getRecordset(
+ FeatureAccessRole,
+ recordFilter={"featureAccessId": featureAccessId}
+ )
+
+ if featureAccessRoles:
+ roleId = featureAccessRoles[0].get("roleId")
roles = rootInterface.db.getRecordset(Role, recordFilter={"id": roleId})
- if roles and str(roles[0].get("featureInstanceId")) == instanceId:
+ if roles:
return roles[0].get("roleLabel", "user")
return "user" # Default
@@ -230,52 +239,72 @@ def _getInstancePermissions(rootInterface, userId: str, instanceId: str) -> Dict
}
try:
- from modules.datamodels.datamodelRbac import UserRole, Role, RolePermission
+ from modules.datamodels.datamodelRbac import AccessRule, AccessRuleContext
+ from modules.datamodels.datamodelMembership import FeatureAccess, FeatureAccessRole
- # Get user's roles for this instance
- userRoles = rootInterface.db.getRecordset(UserRole, recordFilter={"userId": userId})
- roleIds = []
+ # Get FeatureAccess for this user and instance
+ featureAccesses = rootInterface.db.getRecordset(
+ FeatureAccess,
+ recordFilter={"userId": userId, "featureInstanceId": instanceId}
+ )
- for ur in userRoles:
- roleId = ur.get("roleId")
- if roleId:
- roles = rootInterface.db.getRecordset(Role, recordFilter={"id": roleId})
- if roles and str(roles[0].get("featureInstanceId")) == instanceId:
- roleIds.append(roleId)
+ if not featureAccesses:
+ return permissions
+
+ # Get role IDs via FeatureAccessRole junction table
+ featureAccessId = featureAccesses[0].get("id")
+ featureAccessRoles = rootInterface.db.getRecordset(
+ FeatureAccessRole,
+ recordFilter={"featureAccessId": featureAccessId}
+ )
+ roleIds = [far.get("roleId") for far in featureAccessRoles]
if not roleIds:
return permissions
- # Get permissions for all roles
+ # Get permissions (AccessRules) for all roles
for roleId in roleIds:
- rolePerms = rootInterface.db.getRecordset(
- RolePermission,
+ accessRules = rootInterface.db.getRecordset(
+ AccessRule,
recordFilter={"roleId": roleId}
)
- for perm in rolePerms:
- tableName = perm.get("tableName", "")
- if tableName:
- if tableName not in permissions["tables"]:
- permissions["tables"][tableName] = {
- "view": False,
- "read": "n",
- "create": "n",
- "update": "n",
- "delete": "n"
- }
-
- # Merge permissions (highest wins)
- current = permissions["tables"][tableName]
- current["view"] = current["view"] or perm.get("canView", False)
- current["read"] = _mergeAccessLevel(current["read"], perm.get("readLevel", "n"))
- current["create"] = _mergeAccessLevel(current["create"], perm.get("createLevel", "n"))
- current["update"] = _mergeAccessLevel(current["update"], perm.get("updateLevel", "n"))
- current["delete"] = _mergeAccessLevel(current["delete"], perm.get("deleteLevel", "n"))
+ for rule in accessRules:
+ context = rule.get("context", "")
+ item = rule.get("item", "")
- viewName = perm.get("viewName", "")
- if viewName:
- permissions["views"][viewName] = permissions["views"].get(viewName, False) or perm.get("canAccess", False)
+ # Handle DATA context (tables/fields)
+ if context == "DATA" or context == AccessRuleContext.DATA:
+ if item:
+ # Check if it's a field (table.field) or table
+ if "." in item:
+ tableName, fieldName = item.split(".", 1)
+ if fieldName not in permissions["fields"]:
+ permissions["fields"][fieldName] = {"view": False}
+ permissions["fields"][fieldName]["view"] = permissions["fields"][fieldName]["view"] or rule.get("view", False)
+ else:
+ tableName = item
+ if tableName not in permissions["tables"]:
+ permissions["tables"][tableName] = {
+ "view": False,
+ "read": "n",
+ "create": "n",
+ "update": "n",
+ "delete": "n"
+ }
+
+ # Merge permissions (highest wins)
+ current = permissions["tables"][tableName]
+ current["view"] = current["view"] or rule.get("view", False)
+ current["read"] = _mergeAccessLevel(current["read"], rule.get("read") or "n")
+ current["create"] = _mergeAccessLevel(current["create"], rule.get("create") or "n")
+ current["update"] = _mergeAccessLevel(current["update"], rule.get("update") or "n")
+ current["delete"] = _mergeAccessLevel(current["delete"], rule.get("delete") or "n")
+
+ # Handle UI context (views)
+ elif context == "UI" or context == AccessRuleContext.UI:
+ if item:
+ permissions["views"][item] = permissions["views"].get(item, False) or rule.get("view", False)
return permissions