proper junction table handling

This commit is contained in:
ValueOn AG 2026-01-21 11:26:19 +01:00
parent b0d89b9080
commit 6c8c703115
2 changed files with 90 additions and 58 deletions

31
app.py
View file

@ -145,21 +145,24 @@ def initLogging():
def filter(self, record): def filter(self, record):
if isinstance(record.msg, str): if isinstance(record.msg, str):
# Remove only emojis, preserve other Unicode characters like quotes # Remove only emojis, preserve other Unicode characters like quotes
# Guard against None characters during shutdown
# Remove emoji characters specifically try:
record.msg = "".join( record.msg = "".join(
char char
for char in record.msg for char in record.msg
if unicodedata.category(char) != "So" if char is not None and unicodedata.category(char) != "So"
or not ( or (char is not None and not (
0x1F600 <= ord(char) <= 0x1F64F 0x1F600 <= ord(char) <= 0x1F64F
or 0x1F300 <= ord(char) <= 0x1F5FF or 0x1F300 <= ord(char) <= 0x1F5FF
or 0x1F680 <= ord(char) <= 0x1F6FF or 0x1F680 <= ord(char) <= 0x1F6FF
or 0x1F1E0 <= ord(char) <= 0x1F1FF or 0x1F1E0 <= ord(char) <= 0x1F1FF
or 0x2600 <= ord(char) <= 0x26FF or 0x2600 <= ord(char) <= 0x26FF
or 0x2700 <= ord(char) <= 0x27BF or 0x2700 <= ord(char) <= 0x27BF
))
) )
) except (TypeError, AttributeError):
# Handle edge cases during shutdown
pass
return True return True
# Add filter to normalize problematic unicode (e.g., arrows) to ASCII for terminals like cp1252 # Add filter to normalize problematic unicode (e.g., arrows) to ASCII for terminals like cp1252

View file

@ -199,19 +199,28 @@ async def getMyFeatureInstances(
def _getUserRoleInInstance(rootInterface, userId: str, instanceId: str) -> str: def _getUserRoleInInstance(rootInterface, userId: str, instanceId: str) -> str:
"""Get the user's primary role label in a feature instance.""" """Get the user's primary role label in a feature instance."""
try: try:
from modules.datamodels.datamodelRbac import UserRole, Role from modules.datamodels.datamodelRbac import Role
from modules.datamodels.datamodelMembership import FeatureAccess, FeatureAccessRole
# Get user-role assignments for this instance # Get FeatureAccess for this user and instance
userRoles = rootInterface.db.getRecordset( featureAccesses = rootInterface.db.getRecordset(
UserRole, FeatureAccess,
recordFilter={"userId": userId} recordFilter={"userId": userId, "featureInstanceId": instanceId}
) )
for ur in userRoles: if featureAccesses:
roleId = ur.get("roleId") featureAccessId = featureAccesses[0].get("id")
if roleId:
# Get role IDs via FeatureAccessRole junction table
featureAccessRoles = rootInterface.db.getRecordset(
FeatureAccessRole,
recordFilter={"featureAccessId": featureAccessId}
)
if featureAccessRoles:
roleId = featureAccessRoles[0].get("roleId")
roles = rootInterface.db.getRecordset(Role, recordFilter={"id": roleId}) roles = rootInterface.db.getRecordset(Role, recordFilter={"id": roleId})
if roles and str(roles[0].get("featureInstanceId")) == instanceId: if roles:
return roles[0].get("roleLabel", "user") return roles[0].get("roleLabel", "user")
return "user" # Default return "user" # Default
@ -230,52 +239,72 @@ def _getInstancePermissions(rootInterface, userId: str, instanceId: str) -> Dict
} }
try: try:
from modules.datamodels.datamodelRbac import UserRole, Role, RolePermission from modules.datamodels.datamodelRbac import AccessRule, AccessRuleContext
from modules.datamodels.datamodelMembership import FeatureAccess, FeatureAccessRole
# Get user's roles for this instance # Get FeatureAccess for this user and instance
userRoles = rootInterface.db.getRecordset(UserRole, recordFilter={"userId": userId}) featureAccesses = rootInterface.db.getRecordset(
roleIds = [] FeatureAccess,
recordFilter={"userId": userId, "featureInstanceId": instanceId}
)
for ur in userRoles: if not featureAccesses:
roleId = ur.get("roleId") return permissions
if roleId:
roles = rootInterface.db.getRecordset(Role, recordFilter={"id": roleId}) # Get role IDs via FeatureAccessRole junction table
if roles and str(roles[0].get("featureInstanceId")) == instanceId: featureAccessId = featureAccesses[0].get("id")
roleIds.append(roleId) featureAccessRoles = rootInterface.db.getRecordset(
FeatureAccessRole,
recordFilter={"featureAccessId": featureAccessId}
)
roleIds = [far.get("roleId") for far in featureAccessRoles]
if not roleIds: if not roleIds:
return permissions return permissions
# Get permissions for all roles # Get permissions (AccessRules) for all roles
for roleId in roleIds: for roleId in roleIds:
rolePerms = rootInterface.db.getRecordset( accessRules = rootInterface.db.getRecordset(
RolePermission, AccessRule,
recordFilter={"roleId": roleId} recordFilter={"roleId": roleId}
) )
for perm in rolePerms: for rule in accessRules:
tableName = perm.get("tableName", "") context = rule.get("context", "")
if tableName: item = rule.get("item", "")
if tableName not in permissions["tables"]:
permissions["tables"][tableName] = {
"view": False,
"read": "n",
"create": "n",
"update": "n",
"delete": "n"
}
# Merge permissions (highest wins) # Handle DATA context (tables/fields)
current = permissions["tables"][tableName] if context == "DATA" or context == AccessRuleContext.DATA:
current["view"] = current["view"] or perm.get("canView", False) if item:
current["read"] = _mergeAccessLevel(current["read"], perm.get("readLevel", "n")) # Check if it's a field (table.field) or table
current["create"] = _mergeAccessLevel(current["create"], perm.get("createLevel", "n")) if "." in item:
current["update"] = _mergeAccessLevel(current["update"], perm.get("updateLevel", "n")) tableName, fieldName = item.split(".", 1)
current["delete"] = _mergeAccessLevel(current["delete"], perm.get("deleteLevel", "n")) if fieldName not in permissions["fields"]:
permissions["fields"][fieldName] = {"view": False}
permissions["fields"][fieldName]["view"] = permissions["fields"][fieldName]["view"] or rule.get("view", False)
else:
tableName = item
if tableName not in permissions["tables"]:
permissions["tables"][tableName] = {
"view": False,
"read": "n",
"create": "n",
"update": "n",
"delete": "n"
}
viewName = perm.get("viewName", "") # Merge permissions (highest wins)
if viewName: current = permissions["tables"][tableName]
permissions["views"][viewName] = permissions["views"].get(viewName, False) or perm.get("canAccess", False) current["view"] = current["view"] or rule.get("view", False)
current["read"] = _mergeAccessLevel(current["read"], rule.get("read") or "n")
current["create"] = _mergeAccessLevel(current["create"], rule.get("create") or "n")
current["update"] = _mergeAccessLevel(current["update"], rule.get("update") or "n")
current["delete"] = _mergeAccessLevel(current["delete"], rule.get("delete") or "n")
# Handle UI context (views)
elif context == "UI" or context == AccessRuleContext.UI:
if item:
permissions["views"][item] = permissions["views"].get(item, False) or rule.get("view", False)
return permissions return permissions