From ac88f25526338d88699a5fa3ad9758332bf216c6 Mon Sep 17 00:00:00 2001
From: ValueOn AG
Date: Wed, 21 Jan 2026 15:57:16 +0100
Subject: [PATCH] serverside filter and sort for form generic
---
modules/interfaces/interfaceBootstrap.py | 187 +++++++++++++++++++++++
modules/routes/routeDataMandates.py | 110 +++++++++++--
modules/routes/routeRbac.py | 68 +++++++--
3 files changed, 343 insertions(+), 22 deletions(-)
diff --git a/modules/interfaces/interfaceBootstrap.py b/modules/interfaces/interfaceBootstrap.py
index 545b0040..cbf8295a 100644
--- a/modules/interfaces/interfaceBootstrap.py
+++ b/modules/interfaces/interfaceBootstrap.py
@@ -62,6 +62,9 @@ def initBootstrap(db: DatabaseConnector) -> None:
# Initialize RBAC rules (uses roleIds from roles)
initRbacRules(db)
+ # Initialize AccessRules for feature-template roles (idempotent - adds missing rules)
+ _initFeatureTemplateRoleAccessRules(db)
+
# Initialize admin user
adminUserId = initAdminUser(db, mandateId)
@@ -498,6 +501,190 @@ def _getRoleId(db: DatabaseConnector, roleLabel: str) -> Optional[str]:
return None
+def _initFeatureTemplateRoleAccessRules(db: DatabaseConnector) -> None:
+ """
+ Initialize AccessRules for feature-template roles.
+ This is idempotent - only adds rules that don't exist yet.
+
+ Feature-template roles need explicit AccessRules for their respective tables:
+ - trustee-admin/accountant/client -> TrusteeOrganisation, TrusteeContract, etc.
+ - chatbot-admin/user -> ChatSession, etc.
+ - workflow-admin/editor/viewer -> ChatWorkflow, etc.
+
+ Args:
+ db: Database connector instance
+ """
+ logger.info("Checking feature-template role AccessRules")
+
+ # Define feature-specific table access
+ featureTableAccess = {
+ "trustee": {
+ "tables": [
+ "TrusteeOrganisation", "TrusteeRole", "TrusteeAccess",
+ "TrusteeContract", "TrusteeDocument", "TrusteePosition",
+ "TrusteePositionDocument"
+ ],
+ "roles": {
+ "trustee-admin": {
+ "view": True,
+ "read": AccessLevel.ALL,
+ "create": AccessLevel.ALL,
+ "update": AccessLevel.ALL,
+ "delete": AccessLevel.ALL
+ },
+ "trustee-accountant": {
+ "view": True,
+ "read": AccessLevel.GROUP,
+ "create": AccessLevel.GROUP,
+ "update": AccessLevel.GROUP,
+ "delete": AccessLevel.NONE
+ },
+ "trustee-client": {
+ "view": True,
+ "read": AccessLevel.MY,
+ "create": AccessLevel.NONE,
+ "update": AccessLevel.NONE,
+ "delete": AccessLevel.NONE
+ }
+ }
+ },
+ "chatbot": {
+ "tables": ["ChatSession", "ChatMessage"],
+ "roles": {
+ "chatbot-admin": {
+ "view": True,
+ "read": AccessLevel.ALL,
+ "create": AccessLevel.ALL,
+ "update": AccessLevel.ALL,
+ "delete": AccessLevel.ALL
+ },
+ "chatbot-user": {
+ "view": True,
+ "read": AccessLevel.MY,
+ "create": AccessLevel.MY,
+ "update": AccessLevel.MY,
+ "delete": AccessLevel.MY
+ }
+ }
+ },
+ "chatworkflow": {
+ "tables": ["ChatWorkflow", "AutomationDefinition"],
+ "roles": {
+ "workflow-admin": {
+ "view": True,
+ "read": AccessLevel.ALL,
+ "create": AccessLevel.ALL,
+ "update": AccessLevel.ALL,
+ "delete": AccessLevel.ALL
+ },
+ "workflow-editor": {
+ "view": True,
+ "read": AccessLevel.GROUP,
+ "create": AccessLevel.GROUP,
+ "update": AccessLevel.GROUP,
+ "delete": AccessLevel.NONE
+ },
+ "workflow-viewer": {
+ "view": True,
+ "read": AccessLevel.GROUP,
+ "create": AccessLevel.NONE,
+ "update": AccessLevel.NONE,
+ "delete": AccessLevel.NONE
+ }
+ }
+ },
+ "neutralization": {
+ "tables": ["DataNeutraliserConfig", "DataNeutralizerAttributes"],
+ "roles": {
+ "neutralization-admin": {
+ "view": True,
+ "read": AccessLevel.ALL,
+ "create": AccessLevel.ALL,
+ "update": AccessLevel.ALL,
+ "delete": AccessLevel.ALL
+ },
+ "neutralization-analyst": {
+ "view": True,
+ "read": AccessLevel.GROUP,
+ "create": AccessLevel.NONE,
+ "update": AccessLevel.NONE,
+ "delete": AccessLevel.NONE
+ }
+ }
+ },
+ "realestate": {
+ "tables": ["Projekt", "Parzelle", "Dokument", "Gemeinde", "Kanton", "Land"],
+ "roles": {
+ "realestate-admin": {
+ "view": True,
+ "read": AccessLevel.ALL,
+ "create": AccessLevel.ALL,
+ "update": AccessLevel.ALL,
+ "delete": AccessLevel.ALL
+ },
+ "realestate-manager": {
+ "view": True,
+ "read": AccessLevel.GROUP,
+ "create": AccessLevel.GROUP,
+ "update": AccessLevel.GROUP,
+ "delete": AccessLevel.NONE
+ },
+ "realestate-viewer": {
+ "view": True,
+ "read": AccessLevel.GROUP,
+ "create": AccessLevel.NONE,
+ "update": AccessLevel.NONE,
+ "delete": AccessLevel.NONE
+ }
+ }
+ }
+ }
+
+ createdCount = 0
+
+ for featureCode, featureConfig in featureTableAccess.items():
+ tables = featureConfig["tables"]
+ roles = featureConfig["roles"]
+
+ for roleLabel, permissions in roles.items():
+ roleId = _getRoleId(db, roleLabel)
+ if not roleId:
+ continue
+
+ for tableName in tables:
+ # Check if rule already exists
+ existingRules = db.getRecordset(
+ AccessRule,
+ recordFilter={
+ "roleId": roleId,
+ "context": AccessRuleContext.DATA,
+ "item": tableName
+ }
+ )
+
+ if existingRules:
+ continue # Rule already exists
+
+ # Create new rule
+ rule = AccessRule(
+ roleId=roleId,
+ context=AccessRuleContext.DATA,
+ item=tableName,
+ view=permissions["view"],
+ read=permissions["read"],
+ create=permissions["create"],
+ update=permissions["update"],
+ delete=permissions["delete"]
+ )
+ db.recordCreate(AccessRule, rule)
+ createdCount += 1
+
+ if createdCount > 0:
+ logger.info(f"Created {createdCount} feature-template role AccessRules")
+ else:
+ logger.debug("All feature-template role AccessRules already exist")
+
+
def initRbacRules(db: DatabaseConnector) -> None:
"""
Initialize RBAC rules if they don't exist.
diff --git a/modules/routes/routeDataMandates.py b/modules/routes/routeDataMandates.py
index 90948b8a..82ed3ad6 100644
--- a/modules/routes/routeDataMandates.py
+++ b/modules/routes/routeDataMandates.py
@@ -323,17 +323,21 @@ async def delete_mandate(
# User Management within Mandates (Mandate-Admin)
# =============================================================================
-@router.get("/{targetMandateId}/users", response_model=List[MandateUserInfo])
+@router.get("/{targetMandateId}/users")
@limiter.limit("60/minute")
async def listMandateUsers(
request: Request,
targetMandateId: str = Path(..., description="ID of the mandate"),
+ pagination: Optional[str] = Query(None, description="JSON-encoded PaginationParams object"),
context: RequestContext = Depends(getRequestContext)
-) -> List[MandateUserInfo]:
+):
"""
- List all users in a mandate.
+ List all users in a mandate with pagination, search, and sorting support.
Requires Mandate-Admin role or SysAdmin.
+
+ Args:
+ pagination: Optional pagination parameters (page, pageSize, search, filters, sort)
"""
# Check permission
if not _hasMandateAdminRole(context, targetMandateId) and not context.isSysAdmin:
@@ -353,6 +357,26 @@ async def listMandateUsers(
detail=f"Mandate {targetMandateId} not found"
)
+ # Parse pagination parameter
+ paginationParams = None
+ if pagination:
+ try:
+ paginationDict = json.loads(pagination)
+ if paginationDict:
+ # Normalize pagination dict
+ if 'sort' in paginationDict and paginationDict['sort']:
+ normalizedSort = []
+ for item in paginationDict['sort']:
+ if isinstance(item, dict):
+ normalizedSort.append(item)
+ paginationDict['sort'] = normalizedSort if normalizedSort else None
+ paginationParams = paginationDict
+ except (json.JSONDecodeError, ValueError) as e:
+ raise HTTPException(
+ status_code=400,
+ detail=f"Invalid pagination parameter: {str(e)}"
+ )
+
# Get all UserMandate entries for this mandate
userMandates = rootInterface.db.getRecordset(
UserMandate,
@@ -378,17 +402,77 @@ async def listMandateUsers(
else:
roleLabels.append(roleId) # Fallback to ID if not found
- result.append(MandateUserInfo(
- id=um.get("id"), # UserMandate ID as primary key
- userId=str(user.id),
- username=user.username,
- email=user.email,
- fullName=user.fullName,
- roleIds=roleIds,
- roleLabels=roleLabels,
- enabled=um.get("enabled", True)
- ))
+ result.append({
+ "id": um.get("id"), # UserMandate ID as primary key
+ "userId": str(user.id),
+ "username": user.username,
+ "email": user.email,
+ "fullName": user.fullName,
+ "roleIds": roleIds,
+ "roleLabels": roleLabels,
+ "enabled": um.get("enabled", True)
+ })
+ # Apply search, filtering, and sorting if pagination requested
+ if paginationParams:
+ # Apply search (if search term provided)
+ searchTerm = paginationParams.get('search', '').lower() if paginationParams.get('search') else ''
+ if searchTerm:
+ searchedResult = []
+ for item in result:
+ username = (item.get("username") or "").lower()
+ email = (item.get("email") or "").lower()
+ fullName = (item.get("fullName") or "").lower()
+ roleLabelsStr = " ".join(item.get("roleLabels") or []).lower()
+
+ if searchTerm in username or searchTerm in email or searchTerm in fullName or searchTerm in roleLabelsStr:
+ searchedResult.append(item)
+ result = searchedResult
+
+ # Apply filters (if filters provided)
+ filters = paginationParams.get('filters')
+ if filters:
+ for fieldName, filterValue in filters.items():
+ if filterValue is not None and filterValue != '':
+ filterValueLower = str(filterValue).lower()
+ result = [
+ item for item in result
+ if str(item.get(fieldName, '')).lower() == filterValueLower
+ ]
+
+ # Apply sorting
+ sortFields = paginationParams.get('sort')
+ if sortFields:
+ for sortItem in reversed(sortFields):
+ field = sortItem.get('field')
+ direction = sortItem.get('direction', 'asc')
+ if field:
+ result = sorted(
+ result,
+ key=lambda x: str(x.get(field, '') or '').lower(),
+ reverse=(direction == 'desc')
+ )
+
+ # Apply pagination
+ page = paginationParams.get('page', 1)
+ pageSize = paginationParams.get('pageSize', 25)
+ totalItems = len(result)
+ totalPages = (totalItems + pageSize - 1) // pageSize if totalItems > 0 else 0
+ startIdx = (page - 1) * pageSize
+ endIdx = startIdx + pageSize
+ paginatedResult = result[startIdx:endIdx]
+
+ return {
+ "items": paginatedResult,
+ "pagination": {
+ "currentPage": page,
+ "pageSize": pageSize,
+ "totalItems": totalItems,
+ "totalPages": totalPages
+ }
+ }
+
+ # No pagination - return all users as list
return result
except HTTPException:
diff --git a/modules/routes/routeRbac.py b/modules/routes/routeRbac.py
index 5592bfa1..7ab9b229 100644
--- a/modules/routes/routeRbac.py
+++ b/modules/routes/routeRbac.py
@@ -566,21 +566,25 @@ async def listRoles(
request: Request,
pagination: Optional[str] = Query(None, description="JSON-encoded PaginationParams object"),
includeTemplates: bool = Query(False, description="Include feature template roles"),
+ mandateId: Optional[str] = Query(None, description="Include mandate-specific roles for this mandate"),
+ scopeFilter: Optional[str] = Query(None, description="Filter by scope: 'all', 'mandate', 'global', 'system'"),
currentUser: User = Depends(requireSysAdmin)
) -> PaginatedResponse:
"""
- Get list of global/system roles with metadata.
+ Get list of roles with metadata.
MULTI-TENANT: SysAdmin-only (roles are system resources).
By default, only returns true global roles (mandateId=None, featureInstanceId=None, featureCode=None).
Feature template roles are managed via /api/features/templates/roles.
Args:
- pagination: Optional pagination parameters
+ pagination: Optional pagination parameters (includes search, filters, sort)
includeTemplates: If True, also include feature template roles (featureCode != None)
+ mandateId: If provided, also include mandate-specific roles for this mandate
+ scopeFilter: Filter by scope type: 'all', 'mandate', 'global', 'system'
Returns:
- - List of role dictionaries with role label, description, and user count
+ - List of role dictionaries with role label, description, user count, and computed scopeType
"""
try:
interface = getRootInterface()
@@ -605,19 +609,45 @@ async def listRoles(
# Count role assignments from UserMandateRole table
roleCounts = interface.countRoleAssignments()
+ # Helper function to compute scopeType
+ def _computeScopeType(role) -> str:
+ if role.isSystemRole:
+ return "system"
+ if role.mandateId:
+ return "mandate"
+ return "global"
+
# Convert Role objects to dictionaries and add user counts
- # Filter to only global roles (mandateId=None, featureInstanceId=None)
- # Unless includeTemplates=True, also exclude feature template roles (featureCode != None)
+ # Filter logic:
+ # - Always include global roles (mandateId=None, featureInstanceId=None)
+ # - If mandateId provided, also include roles for that specific mandate
+ # - Unless includeTemplates=True, exclude feature template roles (featureCode != None)
result = []
for role in dbRoles:
- # Filter: Only global roles (no mandate, no instance)
- if role.mandateId is not None or role.featureInstanceId is not None:
+ # Always exclude feature-instance level roles
+ if role.featureInstanceId is not None:
+ continue
+
+ # Include global roles (mandateId=None) OR mandate-specific roles if mandateId matches
+ if role.mandateId is not None and (mandateId is None or role.mandateId != mandateId):
continue
# Filter: Exclude feature template roles unless includeTemplates=True
if not includeTemplates and role.featureCode is not None:
continue
+ # Compute scopeType (system, global, mandate)
+ scopeType = _computeScopeType(role)
+
+ # Apply scopeFilter if provided
+ if scopeFilter and scopeFilter != 'all':
+ if scopeFilter == 'mandate' and scopeType != 'mandate':
+ continue
+ if scopeFilter == 'global' and scopeType not in ('global', 'system'):
+ continue
+ if scopeFilter == 'system' and scopeType != 'system':
+ continue
+
result.append({
"id": role.id,
"roleLabel": role.roleLabel,
@@ -626,11 +656,31 @@ async def listRoles(
"featureInstanceId": role.featureInstanceId,
"featureCode": role.featureCode,
"userCount": roleCounts.get(str(role.id), 0),
- "isSystemRole": role.isSystemRole
+ "isSystemRole": role.isSystemRole,
+ "scopeType": scopeType # Computed field for frontend display
})
- # Apply filtering and sorting if pagination requested
+ # Apply search, filtering and sorting if pagination requested
if paginationParams:
+ # Apply search (if search term provided in filters)
+ searchTerm = paginationParams.filters.get("search", "").lower() if paginationParams.filters else ""
+ if searchTerm:
+ searchedResult = []
+ for item in result:
+ # Search in roleLabel and description
+ roleLabel = (item.get("roleLabel") or "").lower()
+ description = item.get("description")
+ descText = ""
+ if isinstance(description, dict):
+ descText = " ".join(str(v) for v in description.values()).lower()
+ elif description:
+ descText = str(description).lower()
+ scopeType = (item.get("scopeType") or "").lower()
+
+ if searchTerm in roleLabel or searchTerm in descText or searchTerm in scopeType:
+ searchedResult.append(item)
+ result = searchedResult
+
# Apply filtering (if filters provided)
if paginationParams.filters:
# Use the interface's filter method