serverside filter and sort for form generic

This commit is contained in:
ValueOn AG 2026-01-21 15:57:16 +01:00
parent 6c8c703115
commit ac88f25526
3 changed files with 343 additions and 22 deletions

View file

@ -62,6 +62,9 @@ def initBootstrap(db: DatabaseConnector) -> None:
# Initialize RBAC rules (uses roleIds from roles)
initRbacRules(db)
# Initialize AccessRules for feature-template roles (idempotent - adds missing rules)
_initFeatureTemplateRoleAccessRules(db)
# Initialize admin user
adminUserId = initAdminUser(db, mandateId)
@ -498,6 +501,190 @@ def _getRoleId(db: DatabaseConnector, roleLabel: str) -> Optional[str]:
return None
def _initFeatureTemplateRoleAccessRules(db: DatabaseConnector) -> None:
"""
Initialize AccessRules for feature-template roles.
This is idempotent - only adds rules that don't exist yet.
Feature-template roles need explicit AccessRules for their respective tables:
- trustee-admin/accountant/client -> TrusteeOrganisation, TrusteeContract, etc.
- chatbot-admin/user -> ChatSession, etc.
- workflow-admin/editor/viewer -> ChatWorkflow, etc.
Args:
db: Database connector instance
"""
logger.info("Checking feature-template role AccessRules")
# Define feature-specific table access
featureTableAccess = {
"trustee": {
"tables": [
"TrusteeOrganisation", "TrusteeRole", "TrusteeAccess",
"TrusteeContract", "TrusteeDocument", "TrusteePosition",
"TrusteePositionDocument"
],
"roles": {
"trustee-admin": {
"view": True,
"read": AccessLevel.ALL,
"create": AccessLevel.ALL,
"update": AccessLevel.ALL,
"delete": AccessLevel.ALL
},
"trustee-accountant": {
"view": True,
"read": AccessLevel.GROUP,
"create": AccessLevel.GROUP,
"update": AccessLevel.GROUP,
"delete": AccessLevel.NONE
},
"trustee-client": {
"view": True,
"read": AccessLevel.MY,
"create": AccessLevel.NONE,
"update": AccessLevel.NONE,
"delete": AccessLevel.NONE
}
}
},
"chatbot": {
"tables": ["ChatSession", "ChatMessage"],
"roles": {
"chatbot-admin": {
"view": True,
"read": AccessLevel.ALL,
"create": AccessLevel.ALL,
"update": AccessLevel.ALL,
"delete": AccessLevel.ALL
},
"chatbot-user": {
"view": True,
"read": AccessLevel.MY,
"create": AccessLevel.MY,
"update": AccessLevel.MY,
"delete": AccessLevel.MY
}
}
},
"chatworkflow": {
"tables": ["ChatWorkflow", "AutomationDefinition"],
"roles": {
"workflow-admin": {
"view": True,
"read": AccessLevel.ALL,
"create": AccessLevel.ALL,
"update": AccessLevel.ALL,
"delete": AccessLevel.ALL
},
"workflow-editor": {
"view": True,
"read": AccessLevel.GROUP,
"create": AccessLevel.GROUP,
"update": AccessLevel.GROUP,
"delete": AccessLevel.NONE
},
"workflow-viewer": {
"view": True,
"read": AccessLevel.GROUP,
"create": AccessLevel.NONE,
"update": AccessLevel.NONE,
"delete": AccessLevel.NONE
}
}
},
"neutralization": {
"tables": ["DataNeutraliserConfig", "DataNeutralizerAttributes"],
"roles": {
"neutralization-admin": {
"view": True,
"read": AccessLevel.ALL,
"create": AccessLevel.ALL,
"update": AccessLevel.ALL,
"delete": AccessLevel.ALL
},
"neutralization-analyst": {
"view": True,
"read": AccessLevel.GROUP,
"create": AccessLevel.NONE,
"update": AccessLevel.NONE,
"delete": AccessLevel.NONE
}
}
},
"realestate": {
"tables": ["Projekt", "Parzelle", "Dokument", "Gemeinde", "Kanton", "Land"],
"roles": {
"realestate-admin": {
"view": True,
"read": AccessLevel.ALL,
"create": AccessLevel.ALL,
"update": AccessLevel.ALL,
"delete": AccessLevel.ALL
},
"realestate-manager": {
"view": True,
"read": AccessLevel.GROUP,
"create": AccessLevel.GROUP,
"update": AccessLevel.GROUP,
"delete": AccessLevel.NONE
},
"realestate-viewer": {
"view": True,
"read": AccessLevel.GROUP,
"create": AccessLevel.NONE,
"update": AccessLevel.NONE,
"delete": AccessLevel.NONE
}
}
}
}
createdCount = 0
for featureCode, featureConfig in featureTableAccess.items():
tables = featureConfig["tables"]
roles = featureConfig["roles"]
for roleLabel, permissions in roles.items():
roleId = _getRoleId(db, roleLabel)
if not roleId:
continue
for tableName in tables:
# Check if rule already exists
existingRules = db.getRecordset(
AccessRule,
recordFilter={
"roleId": roleId,
"context": AccessRuleContext.DATA,
"item": tableName
}
)
if existingRules:
continue # Rule already exists
# Create new rule
rule = AccessRule(
roleId=roleId,
context=AccessRuleContext.DATA,
item=tableName,
view=permissions["view"],
read=permissions["read"],
create=permissions["create"],
update=permissions["update"],
delete=permissions["delete"]
)
db.recordCreate(AccessRule, rule)
createdCount += 1
if createdCount > 0:
logger.info(f"Created {createdCount} feature-template role AccessRules")
else:
logger.debug("All feature-template role AccessRules already exist")
def initRbacRules(db: DatabaseConnector) -> None:
"""
Initialize RBAC rules if they don't exist.

View file

@ -323,17 +323,21 @@ async def delete_mandate(
# User Management within Mandates (Mandate-Admin)
# =============================================================================
@router.get("/{targetMandateId}/users", response_model=List[MandateUserInfo])
@router.get("/{targetMandateId}/users")
@limiter.limit("60/minute")
async def listMandateUsers(
request: Request,
targetMandateId: str = Path(..., description="ID of the mandate"),
pagination: Optional[str] = Query(None, description="JSON-encoded PaginationParams object"),
context: RequestContext = Depends(getRequestContext)
) -> List[MandateUserInfo]:
):
"""
List all users in a mandate.
List all users in a mandate with pagination, search, and sorting support.
Requires Mandate-Admin role or SysAdmin.
Args:
pagination: Optional pagination parameters (page, pageSize, search, filters, sort)
"""
# Check permission
if not _hasMandateAdminRole(context, targetMandateId) and not context.isSysAdmin:
@ -353,6 +357,26 @@ async def listMandateUsers(
detail=f"Mandate {targetMandateId} not found"
)
# Parse pagination parameter
paginationParams = None
if pagination:
try:
paginationDict = json.loads(pagination)
if paginationDict:
# Normalize pagination dict
if 'sort' in paginationDict and paginationDict['sort']:
normalizedSort = []
for item in paginationDict['sort']:
if isinstance(item, dict):
normalizedSort.append(item)
paginationDict['sort'] = normalizedSort if normalizedSort else None
paginationParams = paginationDict
except (json.JSONDecodeError, ValueError) as e:
raise HTTPException(
status_code=400,
detail=f"Invalid pagination parameter: {str(e)}"
)
# Get all UserMandate entries for this mandate
userMandates = rootInterface.db.getRecordset(
UserMandate,
@ -378,17 +402,77 @@ async def listMandateUsers(
else:
roleLabels.append(roleId) # Fallback to ID if not found
result.append(MandateUserInfo(
id=um.get("id"), # UserMandate ID as primary key
userId=str(user.id),
username=user.username,
email=user.email,
fullName=user.fullName,
roleIds=roleIds,
roleLabels=roleLabels,
enabled=um.get("enabled", True)
))
result.append({
"id": um.get("id"), # UserMandate ID as primary key
"userId": str(user.id),
"username": user.username,
"email": user.email,
"fullName": user.fullName,
"roleIds": roleIds,
"roleLabels": roleLabels,
"enabled": um.get("enabled", True)
})
# Apply search, filtering, and sorting if pagination requested
if paginationParams:
# Apply search (if search term provided)
searchTerm = paginationParams.get('search', '').lower() if paginationParams.get('search') else ''
if searchTerm:
searchedResult = []
for item in result:
username = (item.get("username") or "").lower()
email = (item.get("email") or "").lower()
fullName = (item.get("fullName") or "").lower()
roleLabelsStr = " ".join(item.get("roleLabels") or []).lower()
if searchTerm in username or searchTerm in email or searchTerm in fullName or searchTerm in roleLabelsStr:
searchedResult.append(item)
result = searchedResult
# Apply filters (if filters provided)
filters = paginationParams.get('filters')
if filters:
for fieldName, filterValue in filters.items():
if filterValue is not None and filterValue != '':
filterValueLower = str(filterValue).lower()
result = [
item for item in result
if str(item.get(fieldName, '')).lower() == filterValueLower
]
# Apply sorting
sortFields = paginationParams.get('sort')
if sortFields:
for sortItem in reversed(sortFields):
field = sortItem.get('field')
direction = sortItem.get('direction', 'asc')
if field:
result = sorted(
result,
key=lambda x: str(x.get(field, '') or '').lower(),
reverse=(direction == 'desc')
)
# Apply pagination
page = paginationParams.get('page', 1)
pageSize = paginationParams.get('pageSize', 25)
totalItems = len(result)
totalPages = (totalItems + pageSize - 1) // pageSize if totalItems > 0 else 0
startIdx = (page - 1) * pageSize
endIdx = startIdx + pageSize
paginatedResult = result[startIdx:endIdx]
return {
"items": paginatedResult,
"pagination": {
"currentPage": page,
"pageSize": pageSize,
"totalItems": totalItems,
"totalPages": totalPages
}
}
# No pagination - return all users as list
return result
except HTTPException:

View file

@ -566,21 +566,25 @@ async def listRoles(
request: Request,
pagination: Optional[str] = Query(None, description="JSON-encoded PaginationParams object"),
includeTemplates: bool = Query(False, description="Include feature template roles"),
mandateId: Optional[str] = Query(None, description="Include mandate-specific roles for this mandate"),
scopeFilter: Optional[str] = Query(None, description="Filter by scope: 'all', 'mandate', 'global', 'system'"),
currentUser: User = Depends(requireSysAdmin)
) -> PaginatedResponse:
"""
Get list of global/system roles with metadata.
Get list of roles with metadata.
MULTI-TENANT: SysAdmin-only (roles are system resources).
By default, only returns true global roles (mandateId=None, featureInstanceId=None, featureCode=None).
Feature template roles are managed via /api/features/templates/roles.
Args:
pagination: Optional pagination parameters
pagination: Optional pagination parameters (includes search, filters, sort)
includeTemplates: If True, also include feature template roles (featureCode != None)
mandateId: If provided, also include mandate-specific roles for this mandate
scopeFilter: Filter by scope type: 'all', 'mandate', 'global', 'system'
Returns:
- List of role dictionaries with role label, description, and user count
- List of role dictionaries with role label, description, user count, and computed scopeType
"""
try:
interface = getRootInterface()
@ -605,19 +609,45 @@ async def listRoles(
# Count role assignments from UserMandateRole table
roleCounts = interface.countRoleAssignments()
# Helper function to compute scopeType
def _computeScopeType(role) -> str:
if role.isSystemRole:
return "system"
if role.mandateId:
return "mandate"
return "global"
# Convert Role objects to dictionaries and add user counts
# Filter to only global roles (mandateId=None, featureInstanceId=None)
# Unless includeTemplates=True, also exclude feature template roles (featureCode != None)
# Filter logic:
# - Always include global roles (mandateId=None, featureInstanceId=None)
# - If mandateId provided, also include roles for that specific mandate
# - Unless includeTemplates=True, exclude feature template roles (featureCode != None)
result = []
for role in dbRoles:
# Filter: Only global roles (no mandate, no instance)
if role.mandateId is not None or role.featureInstanceId is not None:
# Always exclude feature-instance level roles
if role.featureInstanceId is not None:
continue
# Include global roles (mandateId=None) OR mandate-specific roles if mandateId matches
if role.mandateId is not None and (mandateId is None or role.mandateId != mandateId):
continue
# Filter: Exclude feature template roles unless includeTemplates=True
if not includeTemplates and role.featureCode is not None:
continue
# Compute scopeType (system, global, mandate)
scopeType = _computeScopeType(role)
# Apply scopeFilter if provided
if scopeFilter and scopeFilter != 'all':
if scopeFilter == 'mandate' and scopeType != 'mandate':
continue
if scopeFilter == 'global' and scopeType not in ('global', 'system'):
continue
if scopeFilter == 'system' and scopeType != 'system':
continue
result.append({
"id": role.id,
"roleLabel": role.roleLabel,
@ -626,11 +656,31 @@ async def listRoles(
"featureInstanceId": role.featureInstanceId,
"featureCode": role.featureCode,
"userCount": roleCounts.get(str(role.id), 0),
"isSystemRole": role.isSystemRole
"isSystemRole": role.isSystemRole,
"scopeType": scopeType # Computed field for frontend display
})
# Apply filtering and sorting if pagination requested
# Apply search, filtering and sorting if pagination requested
if paginationParams:
# Apply search (if search term provided in filters)
searchTerm = paginationParams.filters.get("search", "").lower() if paginationParams.filters else ""
if searchTerm:
searchedResult = []
for item in result:
# Search in roleLabel and description
roleLabel = (item.get("roleLabel") or "").lower()
description = item.get("description")
descText = ""
if isinstance(description, dict):
descText = " ".join(str(v) for v in description.values()).lower()
elif description:
descText = str(description).lower()
scopeType = (item.get("scopeType") or "").lower()
if searchTerm in roleLabel or searchTerm in descText or searchTerm in scopeType:
searchedResult.append(item)
result = searchedResult
# Apply filtering (if filters provided)
if paginationParams.filters:
# Use the interface's filter method