diff --git a/modules/routes/routeDataUsers.py b/modules/routes/routeDataUsers.py index 81115e51..62a9b344 100644 --- a/modules/routes/routeDataUsers.py +++ b/modules/routes/routeDataUsers.py @@ -27,6 +27,118 @@ from modules.datamodels.datamodelPagination import PaginationParams, PaginatedRe # Configure logger logger = logging.getLogger(__name__) + +def _applyFiltersAndSort(items: List[Dict[str, Any]], paginationParams: Optional[PaginationParams]) -> List[Dict[str, Any]]: + """ + Apply filters and sorting to a list of items. + This is used when we can't do server-side filtering in the database (e.g., SysAdmin view). + + Args: + items: List of dictionaries to filter/sort + paginationParams: Pagination parameters with filters and sort + + Returns: + Filtered and sorted list + """ + if not paginationParams: + return items + + result = items.copy() + + # Apply filters + if paginationParams.filters: + filters = paginationParams.filters + + # Handle general search + searchTerm = filters.get('search', '').lower() if filters.get('search') else None + + if searchTerm: + def matchesSearch(item: Dict[str, Any]) -> bool: + for value in item.values(): + if value is not None and searchTerm in str(value).lower(): + return True + return False + result = [item for item in result if matchesSearch(item)] + + # Handle field-specific filters + for field, filterValue in filters.items(): + if field == 'search': + continue # Already handled + + if isinstance(filterValue, dict) and 'operator' in filterValue: + operator = filterValue.get('operator', 'equals') + value = filterValue.get('value') + else: + operator = 'equals' + value = filterValue + + if value is None or value == '': + continue + + def matchesFilter(item: Dict[str, Any], f: str, op: str, v: Any) -> bool: + itemValue = item.get(f) + if itemValue is None: + return False + + # Convert to string for comparison if needed + itemStr = str(itemValue).lower() + valueStr = str(v).lower() + + if op in ('equals', 'eq'): + return itemStr == valueStr + elif op == 'contains': + return valueStr in itemStr + elif op == 'startsWith': + return itemStr.startswith(valueStr) + elif op == 'endsWith': + return itemStr.endswith(valueStr) + elif op in ('gt', 'gte', 'lt', 'lte'): + try: + itemNum = float(itemValue) + valueNum = float(v) + if op == 'gt': + return itemNum > valueNum + elif op == 'gte': + return itemNum >= valueNum + elif op == 'lt': + return itemNum < valueNum + elif op == 'lte': + return itemNum <= valueNum + except (ValueError, TypeError): + return False + elif op == 'in': + if isinstance(v, list): + return itemStr in [str(x).lower() for x in v] + return False + elif op == 'notIn': + if isinstance(v, list): + return itemStr not in [str(x).lower() for x in v] + return True + return True + + result = [item for item in result if matchesFilter(item, field, operator, value)] + + # Apply sorting + if paginationParams.sort: + for sortField in reversed(paginationParams.sort): + fieldName = sortField.field + ascending = sortField.direction == 'asc' + + def getSortKey(item: Dict[str, Any]): + value = item.get(fieldName) + if value is None: + return (1, '') # Nulls last + if isinstance(value, bool): + return (0, not value if ascending else value) + if isinstance(value, (int, float)): + return (0, value) + return (0, str(value).lower()) + + result = sorted(result, key=getSortKey, reverse=not ascending) + + return result + + router = APIRouter( prefix="/api/users", tags=["Manage Users"], @@ -100,18 +212,24 @@ async def get_users( # Get all users directly from database using UserInDB (the actual database model) from modules.datamodels.datamodelUam import UserInDB allUsers = appInterface.db.getRecordset(UserInDB) - # Convert to User objects, filtering out password hash and database-specific fields - users = [] + # Convert to cleaned dictionaries first for filtering + cleanedUsers = [] for u in allUsers: cleanedUser = {k: v for k, v in u.items() if not k.startswith("_") and k != "hashedPassword" and k != "resetToken" and k != "resetTokenExpires"} # Ensure roleLabels is always a list if cleanedUser.get("roleLabels") is None: cleanedUser["roleLabels"] = [] - users.append(User(**cleanedUser)) + cleanedUsers.append(cleanedUser) + + # Apply server-side filtering and sorting + filteredUsers = _applyFiltersAndSort(cleanedUsers, paginationParams) + + # Convert to User objects + users = [User(**u) for u in filteredUsers] if paginationParams: - totalItems = len(users) import math + totalItems = len(users) totalPages = math.ceil(totalItems / paginationParams.pageSize) if totalItems > 0 else 0 startIdx = (paginationParams.page - 1) * paginationParams.pageSize endIdx = startIdx + paginationParams.pageSize