From 7ca957f664d7c0d4574a9c014b2e881b5b71cc1b Mon Sep 17 00:00:00 2001
From: ValueOn AG
Date: Mon, 26 Jan 2026 23:40:12 +0100
Subject: [PATCH] fixed filter/sort for rtustee
---
.../trustee/interfaceFeatureTrustee.py | 223 +++++++++++++++++-
1 file changed, 217 insertions(+), 6 deletions(-)
diff --git a/modules/features/trustee/interfaceFeatureTrustee.py b/modules/features/trustee/interfaceFeatureTrustee.py
index c3400752..99553108 100644
--- a/modules/features/trustee/interfaceFeatureTrustee.py
+++ b/modules/features/trustee/interfaceFeatureTrustee.py
@@ -212,6 +212,197 @@ class TrusteeObjects:
return getattr(permissions, operation, AccessLevel.NONE)
+ # ===== Pagination Helper Functions =====
+
+ def _applyFilters(self, records: List[Dict[str, Any]], params: Optional[PaginationParams]) -> List[Dict[str, Any]]:
+ """
+ Apply filter criteria to records.
+
+ Supports:
+ - General search: params.filters["search"] - searches across all text fields
+ - Field-specific filters: params.filters
+ - Simple: {"status": "running"} - equals match
+ - With operator: {"status": {"operator": "equals", "value": "running"}}
+ - Operators: equals, contains, gt, gte, lt, lte, in, notIn, startsWith, endsWith
+
+ Args:
+ records: List of record dictionaries to filter
+ params: PaginationParams with filters (search is inside filters)
+
+ Returns:
+ Filtered list of records
+ """
+ if not params or not records:
+ return records
+
+ # Get filters safely (may be None)
+ filters = getattr(params, 'filters', None)
+ if not filters:
+ return records
+
+ filtered = records
+
+ # Handle general search across text fields (search is inside filters)
+ searchTerm = filters.get("search") if isinstance(filters, dict) else None
+ if searchTerm:
+ searchTerm = str(searchTerm).lower()
+ if searchTerm:
+ searchFiltered = []
+ for record in filtered:
+ found = False
+ for key, value in record.items():
+ if isinstance(value, str) and searchTerm in value.lower():
+ found = True
+ break
+ elif isinstance(value, (int, float)) and searchTerm in str(value):
+ found = True
+ break
+ if found:
+ searchFiltered.append(record)
+ filtered = searchFiltered
+
+ # Handle field-specific filters
+ if filters:
+ for fieldName, filterValue in filters.items():
+ if fieldName == "search":
+ continue # Already handled above
+
+ fieldFiltered = []
+ for record in filtered:
+ if fieldName not in record:
+ continue
+
+ recordValue = record.get(fieldName)
+
+ # Handle simple value (equals operator)
+ if not isinstance(filterValue, dict):
+ if recordValue == filterValue:
+ fieldFiltered.append(record)
+ continue
+
+ # Handle filter with operator
+ operator = filterValue.get("operator", "equals")
+ filterVal = filterValue.get("value")
+
+ matches = False
+ if operator in ["equals", "eq"]:
+ matches = recordValue == filterVal
+
+ elif operator == "contains":
+ recordStr = str(recordValue).lower() if recordValue is not None else ""
+ filterStr = str(filterVal).lower() if filterVal is not None else ""
+ matches = filterStr in recordStr
+
+ elif operator == "startsWith":
+ recordStr = str(recordValue).lower() if recordValue is not None else ""
+ filterStr = str(filterVal).lower() if filterVal is not None else ""
+ matches = recordStr.startswith(filterStr)
+
+ elif operator == "endsWith":
+ recordStr = str(recordValue).lower() if recordValue is not None else ""
+ filterStr = str(filterVal).lower() if filterVal is not None else ""
+ matches = recordStr.endswith(filterStr)
+
+ elif operator == "gt":
+ try:
+ recordNum = float(recordValue) if recordValue is not None else float('-inf')
+ filterNum = float(filterVal) if filterVal is not None else float('-inf')
+ matches = recordNum > filterNum
+ except (ValueError, TypeError):
+ matches = False
+
+ elif operator == "gte":
+ try:
+ recordNum = float(recordValue) if recordValue is not None else float('-inf')
+ filterNum = float(filterVal) if filterVal is not None else float('-inf')
+ matches = recordNum >= filterNum
+ except (ValueError, TypeError):
+ matches = False
+
+ elif operator == "lt":
+ try:
+ recordNum = float(recordValue) if recordValue is not None else float('inf')
+ filterNum = float(filterVal) if filterVal is not None else float('inf')
+ matches = recordNum < filterNum
+ except (ValueError, TypeError):
+ matches = False
+
+ elif operator == "lte":
+ try:
+ recordNum = float(recordValue) if recordValue is not None else float('inf')
+ filterNum = float(filterVal) if filterVal is not None else float('inf')
+ matches = recordNum <= filterNum
+ except (ValueError, TypeError):
+ matches = False
+
+ elif operator == "in":
+ if isinstance(filterVal, list):
+ matches = recordValue in filterVal
+ else:
+ matches = False
+
+ elif operator == "notIn":
+ if isinstance(filterVal, list):
+ matches = recordValue not in filterVal
+ else:
+ matches = False
+
+ if matches:
+ fieldFiltered.append(record)
+
+ filtered = fieldFiltered
+
+ return filtered
+
+ def _applySorting(self, records: List[Dict[str, Any]], params: Optional[PaginationParams]) -> List[Dict[str, Any]]:
+ """Apply multi-level sorting to records using stable sort."""
+ if not params:
+ return records
+
+ # Get sort safely (may be None or empty list)
+ sortFields = getattr(params, 'sort', None)
+ if not sortFields:
+ return records
+
+ sortedRecords = list(records)
+
+ # Sort from least significant to most significant field (reverse order)
+ # Python's sort is stable, so this creates proper multi-level sorting
+ for sortField in reversed(sortFields):
+ # Handle both dict and object formats
+ if isinstance(sortField, dict):
+ fieldName = sortField.get("field")
+ direction = sortField.get("direction", "asc")
+ else:
+ fieldName = getattr(sortField, "field", None)
+ direction = getattr(sortField, "direction", "asc")
+
+ if not fieldName:
+ continue
+
+ isDesc = (direction == "desc")
+
+ def makeSortKey(fName):
+ def sortKey(record):
+ value = record.get(fName)
+ # Handle None values - place them at the end for both directions
+ if value is None:
+ return (1, "") # sorts after (0, ...)
+ else:
+ if isinstance(value, (int, float)):
+ return (0, value)
+ elif isinstance(value, str):
+ return (0, value.lower())
+ elif isinstance(value, bool):
+ return (0, value)
+ else:
+ return (0, str(value))
+ return sortKey
+
+ sortedRecords.sort(key=makeSortKey(fieldName), reverse=isDesc)
+
+ return sortedRecords
+
# ===== Organisation CRUD =====
def createOrganisation(self, data: Dict[str, Any]) -> Optional[TrusteeOrganisation]:
@@ -819,12 +1010,22 @@ class TrusteeObjects:
featureInstanceId=self.featureInstanceId
)
- # Convert dicts to Pydantic objects (remove binary data and internal fields)
- pydanticItems = []
+ # Clean records (remove binary data and internal fields) - keep as dicts for filtering/sorting
+ cleanedRecords = []
for record in records:
cleanedRecord = {k: v for k, v in record.items() if not k.startswith("_") and k != "documentData"}
- pydanticItems.append(TrusteeDocument(**cleanedRecord))
+ cleanedRecords.append(cleanedRecord)
+ # Step 2: Apply filters (search and field filters)
+ filteredRecords = self._applyFilters(cleanedRecords, params)
+
+ # Step 3: Apply sorting
+ sortedRecords = self._applySorting(filteredRecords, params)
+
+ # Step 4: Convert to Pydantic objects
+ pydanticItems = [TrusteeDocument(**r) for r in sortedRecords]
+
+ # Step 5: Apply pagination
totalItems = len(pydanticItems)
if params:
pageSize = params.pageSize or 20
@@ -965,12 +1166,22 @@ class TrusteeObjects:
featureInstanceId=self.featureInstanceId
)
- # Convert dicts to Pydantic objects (remove internal fields)
- pydanticItems = []
+ # Clean records (remove internal fields) - keep as dicts for filtering/sorting
+ cleanedRecords = []
for record in records:
cleanedRecord = {k: v for k, v in record.items() if not k.startswith("_")}
- pydanticItems.append(TrusteePosition(**cleanedRecord))
+ cleanedRecords.append(cleanedRecord)
+ # Step 2: Apply filters (search and field filters)
+ filteredRecords = self._applyFilters(cleanedRecords, params)
+
+ # Step 3: Apply sorting
+ sortedRecords = self._applySorting(filteredRecords, params)
+
+ # Step 4: Convert to Pydantic objects
+ pydanticItems = [TrusteePosition(**r) for r in sortedRecords]
+
+ # Step 5: Apply pagination
totalItems = len(pydanticItems)
if params:
pageSize = params.pageSize or 20