saas multi mandate tested

This commit is contained in:
ValueOn AG 2026-01-21 21:19:52 +01:00
parent ac88f25526
commit 5fcbc6acd3

View file

@ -27,6 +27,118 @@ from modules.datamodels.datamodelPagination import PaginationParams, PaginatedRe
# Configure logger
logger = logging.getLogger(__name__)
def _applyFiltersAndSort(items: List[Dict[str, Any]], paginationParams: Optional[PaginationParams]) -> List[Dict[str, Any]]:
"""
Apply filters and sorting to a list of items.
This is used when we can't do server-side filtering in the database (e.g., SysAdmin view).
Args:
items: List of dictionaries to filter/sort
paginationParams: Pagination parameters with filters and sort
Returns:
Filtered and sorted list
"""
if not paginationParams:
return items
result = items.copy()
# Apply filters
if paginationParams.filters:
filters = paginationParams.filters
# Handle general search
searchTerm = filters.get('search', '').lower() if filters.get('search') else None
if searchTerm:
def matchesSearch(item: Dict[str, Any]) -> bool:
for value in item.values():
if value is not None and searchTerm in str(value).lower():
return True
return False
result = [item for item in result if matchesSearch(item)]
# Handle field-specific filters
for field, filterValue in filters.items():
if field == 'search':
continue # Already handled
if isinstance(filterValue, dict) and 'operator' in filterValue:
operator = filterValue.get('operator', 'equals')
value = filterValue.get('value')
else:
operator = 'equals'
value = filterValue
if value is None or value == '':
continue
def matchesFilter(item: Dict[str, Any], f: str, op: str, v: Any) -> bool:
itemValue = item.get(f)
if itemValue is None:
return False
# Convert to string for comparison if needed
itemStr = str(itemValue).lower()
valueStr = str(v).lower()
if op in ('equals', 'eq'):
return itemStr == valueStr
elif op == 'contains':
return valueStr in itemStr
elif op == 'startsWith':
return itemStr.startswith(valueStr)
elif op == 'endsWith':
return itemStr.endswith(valueStr)
elif op in ('gt', 'gte', 'lt', 'lte'):
try:
itemNum = float(itemValue)
valueNum = float(v)
if op == 'gt':
return itemNum > valueNum
elif op == 'gte':
return itemNum >= valueNum
elif op == 'lt':
return itemNum < valueNum
elif op == 'lte':
return itemNum <= valueNum
except (ValueError, TypeError):
return False
elif op == 'in':
if isinstance(v, list):
return itemStr in [str(x).lower() for x in v]
return False
elif op == 'notIn':
if isinstance(v, list):
return itemStr not in [str(x).lower() for x in v]
return True
return True
result = [item for item in result if matchesFilter(item, field, operator, value)]
# Apply sorting
if paginationParams.sort:
for sortField in reversed(paginationParams.sort):
fieldName = sortField.field
ascending = sortField.direction == 'asc'
def getSortKey(item: Dict[str, Any]):
value = item.get(fieldName)
if value is None:
return (1, '') # Nulls last
if isinstance(value, bool):
return (0, not value if ascending else value)
if isinstance(value, (int, float)):
return (0, value)
return (0, str(value).lower())
result = sorted(result, key=getSortKey, reverse=not ascending)
return result
router = APIRouter(
prefix="/api/users",
tags=["Manage Users"],
@ -100,18 +212,24 @@ async def get_users(
# Get all users directly from database using UserInDB (the actual database model)
from modules.datamodels.datamodelUam import UserInDB
allUsers = appInterface.db.getRecordset(UserInDB)
# Convert to User objects, filtering out password hash and database-specific fields
users = []
# Convert to cleaned dictionaries first for filtering
cleanedUsers = []
for u in allUsers:
cleanedUser = {k: v for k, v in u.items() if not k.startswith("_") and k != "hashedPassword" and k != "resetToken" and k != "resetTokenExpires"}
# Ensure roleLabels is always a list
if cleanedUser.get("roleLabels") is None:
cleanedUser["roleLabels"] = []
users.append(User(**cleanedUser))
cleanedUsers.append(cleanedUser)
# Apply server-side filtering and sorting
filteredUsers = _applyFiltersAndSort(cleanedUsers, paginationParams)
# Convert to User objects
users = [User(**u) for u in filteredUsers]
if paginationParams:
totalItems = len(users)
import math
totalItems = len(users)
totalPages = math.ceil(totalItems / paginationParams.pageSize) if totalItems > 0 else 0
startIdx = (paginationParams.page - 1) * paginationParams.pageSize
endIdx = startIdx + paginationParams.pageSize