From 1b51ee3e1c33651e4c621a158ad5a4672fc68892 Mon Sep 17 00:00:00 2001
From: ValueOn AG
Date: Sun, 12 Apr 2026 10:43:30 +0200
Subject: [PATCH] fixed udb rbac
---
modules/interfaces/interfaceDbManagement.py | 54 +++++++++------------
modules/interfaces/interfaceRbac.py | 8 ++-
modules/routes/routeDataFiles.py | 7 ++-
3 files changed, 35 insertions(+), 34 deletions(-)
diff --git a/modules/interfaces/interfaceDbManagement.py b/modules/interfaces/interfaceDbManagement.py
index 085be891..9f54fc44 100644
--- a/modules/interfaces/interfaceDbManagement.py
+++ b/modules/interfaces/interfaceDbManagement.py
@@ -956,7 +956,7 @@ class ComponentObjects:
mid = file.get("mandateId")
return not mid or (isinstance(mid, str) and not mid.strip())
- def getAllFiles(self, pagination: Optional[PaginationParams] = None) -> Union[List[FileItem], PaginatedResult]:
+ def getAllFiles(self, pagination: Optional[PaginationParams] = None, recordFilter: Dict[str, Any] = None) -> Union[List[FileItem], PaginatedResult]:
"""
Returns files visible to the current user based on RBAC scope rules.
@@ -999,44 +999,31 @@ class ComponentObjects:
continue
return fileItems
- folderFilter = None
- hasFolderFilter = False
- if pagination and pagination.filters and "folderId" in pagination.filters:
- folderFilter = pagination.filters.pop("folderId")
- hasFolderFilter = True
-
- def _applyFolderFilter(files):
- if not hasFolderFilter:
- return files
- if folderFilter is None:
- return [f for f in files if not (f.get("folderId") if isinstance(f, dict) else getattr(f, "folderId", None))]
- return [f for f in files if (f.get("folderId") if isinstance(f, dict) else getattr(f, "folderId", None)) == folderFilter]
-
if pagination is None:
allFiles = getRecordsetWithRBAC(
self.db, FileItem, self.currentUser,
+ recordFilter=recordFilter,
mandateId=self.mandateId,
featureInstanceId=self.featureInstanceId,
)
- return _convertFileItems(_applyFolderFilter(allFiles))
+ return _convertFileItems(allFiles)
result = getRecordsetPaginatedWithRBAC(
self.db, FileItem, self.currentUser,
pagination=pagination,
+ recordFilter=recordFilter,
mandateId=self.mandateId,
featureInstanceId=self.featureInstanceId,
)
if isinstance(result, PaginatedResult):
- filtered = _applyFolderFilter(result.items)
return PaginatedResult(
- items=_convertFileItems(filtered),
- totalItems=len(filtered),
- totalPages=max(1, -(-len(filtered) // (pagination.pageSize or 20))),
+ items=_convertFileItems(result.items),
+ totalItems=result.totalItems,
+ totalPages=result.totalPages,
)
- raw = result if isinstance(result, list) else []
- return _convertFileItems(_applyFolderFilter(raw))
+ return _convertFileItems(result if isinstance(result, list) else [])
def getFile(self, fileId: str) -> Optional[FileItem]:
"""Returns a file by ID if the current user has RBAC access (scope-based)."""
@@ -1337,13 +1324,14 @@ class ComponentObjects:
return folders[0] if folders else None
def listFolders(self, parentId: Optional[str] = None) -> List[Dict[str, Any]]:
- """List folders for current user, optionally filtered by parentId.
- Each folder is enriched with ``fileCount`` (number of direct files
- visible to this user via RBAC scope rules)."""
- recordFilter = {"sysCreatedBy": self.userId or ""}
+ """List folders visible to the current user.
+ Own folders are always returned. Other users' folders are only
+ returned when they contain files visible to the current user.
+ Each folder is enriched with ``fileCount``."""
+ recordFilter = {}
if parentId is not None:
recordFilter["parentId"] = parentId
- folders = self.db.getRecordset(FileFolder, recordFilter=recordFilter)
+ folders = self.db.getRecordset(FileFolder, recordFilter=recordFilter if recordFilter else None)
if not folders:
return folders
@@ -1351,11 +1339,7 @@ class ComponentObjects:
folderIds = [f["id"] for f in folders if f.get("id")]
fileCounts: Dict[str, int] = {}
try:
- # Count files per folder that the user can see (RBAC scope-aware).
- # Own files are always counted; shared files (global/mandate/featureInstance)
- # that happen to be in one of the user's folders are also counted.
from modules.interfaces.interfaceRbac import _buildFilesScopeWhereClause
- from modules.datamodels.datamodelUam import User as UserModel
scopeClause = _buildFilesScopeWhereClause(
self.currentUser, "FileItem", self.db,
self.mandateId, self.featureInstanceId,
@@ -1382,10 +1366,16 @@ class ComponentObjects:
except Exception as e:
logger.warning(f"Could not count files per folder: {e}")
+ userId = self.userId or ""
+ result = []
for folder in folders:
- folder["fileCount"] = fileCounts.get(folder.get("id", ""), 0)
+ fc = fileCounts.get(folder.get("id", ""), 0)
+ folder["fileCount"] = fc
+ isOwn = folder.get("sysCreatedBy") == userId
+ if isOwn or fc > 0:
+ result.append(folder)
- return folders
+ return result
def createFolder(self, name: str, parentId: Optional[str] = None) -> Dict[str, Any]:
"""Create a new folder with unique name validation."""
diff --git a/modules/interfaces/interfaceRbac.py b/modules/interfaces/interfaceRbac.py
index 885db8f5..b8a87ba9 100644
--- a/modules/interfaces/interfaceRbac.py
+++ b/modules/interfaces/interfaceRbac.py
@@ -740,9 +740,15 @@ def buildRbacWhereClause(
# All records within the feature instance - only featureInstanceId filtering
if readLevel == AccessLevel.ALL:
+ namespaceAll = TABLE_NAMESPACE.get(table, "system")
+ # Files: scope-based context filtering applies even with ALL access
+ if namespaceAll == "files":
+ return _buildFilesScopeWhereClause(
+ currentUser, table, connector, mandateId, featureInstanceId,
+ baseConditions, baseValues,
+ )
# Chat / AI Workspace: even DATA read ALL must not list other users' rows in a
# shared featureInstance (stale RBAC rules or merged roles). Same as MY.
- namespaceAll = TABLE_NAMESPACE.get(table, "system")
if featureInstanceId and namespaceAll == "chat":
userIdFieldAll = "sysCreatedBy"
if table == "UserInDB":
diff --git a/modules/routes/routeDataFiles.py b/modules/routes/routeDataFiles.py
index fc9b347e..907f0a20 100644
--- a/modules/routes/routeDataFiles.py
+++ b/modules/routes/routeDataFiles.py
@@ -207,12 +207,17 @@ def get_files(
detail=f"Invalid pagination parameter: {str(e)}"
)
+ recordFilter = None
+ if paginationParams and paginationParams.filters and "folderId" in paginationParams.filters:
+ fVal = paginationParams.filters.pop("folderId")
+ recordFilter = {"folderId": fVal}
+
managementInterface = interfaceDbManagement.getInterface(
currentUser,
mandateId=str(context.mandateId) if context.mandateId else None,
featureInstanceId=str(context.featureInstanceId) if context.featureInstanceId else None
)
- result = managementInterface.getAllFiles(pagination=paginationParams)
+ result = managementInterface.getAllFiles(pagination=paginationParams, recordFilter=recordFilter)
# If pagination was requested, result is PaginatedResult
# If no pagination, result is List[FileItem]