gateway/modules/routes/routeAdminUserAccessOverview.py
2026-04-10 12:33:27 +02:00

617 lines
25 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Admin User Access Overview routes.
Provides endpoints for viewing complete user access permissions.
MULTI-TENANT: These are SYSTEM-LEVEL operations requiring isSysAdmin=true.
Shows comprehensive view of what a user can see and access.
"""
from fastapi import APIRouter, HTTPException, Depends, Query, Path, Request, status
from typing import List, Dict, Any, Optional, Set
import logging
from modules.auth import limiter
from modules.auth.authentication import getRequestContext, RequestContext
from modules.datamodels.datamodelUam import User, UserInDB
from modules.datamodels.datamodelRbac import Role, AccessRule, AccessRuleContext
from modules.datamodels.datamodelMembership import (
UserMandate,
UserMandateRole,
FeatureAccess,
FeatureAccessRole,
)
from modules.datamodels.datamodelFeatures import FeatureInstance, Feature
from modules.interfaces.interfaceDbApp import getRootInterface
from modules.shared.i18nRegistry import apiRouteContext
routeApiMsg = apiRouteContext("routeAdminUserAccessOverview")
# Configure logger
logger = logging.getLogger(__name__)
router = APIRouter(
prefix="/api/admin/user-access-overview",
tags=["Admin User Access Overview"],
responses={404: {"description": "Not found"}}
)
def _getAccessLevelLabel(level: Optional[str]) -> str:
"""Convert access level code to human-readable label."""
labels = {
"a": "ALL",
"m": "MY",
"g": "GROUP",
"n": "NONE",
None: "-"
}
return labels.get(level, "-")
def _getRoleScope(role) -> str:
"""Determine the scope of a role. Accepts Role object or dict."""
# Support both Pydantic models and dicts
featureInstanceId = getattr(role, 'featureInstanceId', None) or (role.get("featureInstanceId") if isinstance(role, dict) else None)
mandateId = getattr(role, 'mandateId', None) or (role.get("mandateId") if isinstance(role, dict) else None)
if featureInstanceId:
return "instance"
elif mandateId:
return "mandate"
else:
return "global"
def _getRoleScopePriority(scope: str) -> int:
"""Get priority for role scope (higher = more specific)."""
priorities = {"global": 1, "mandate": 2, "instance": 3}
return priorities.get(scope, 0)
def _hasMandateAdminRole(context: RequestContext) -> bool:
"""Check if the user has mandate admin role in ANY mandate.
Loads roles independently from request context (context.roleIds may be empty
when no X-Mandate-Id header is sent, e.g., on admin pages).
"""
if context.hasSysAdminRole:
return True
try:
rootInterface = getRootInterface()
userMandates = rootInterface.getUserMandates(str(context.user.id))
for um in userMandates:
umId = getattr(um, 'id', None)
if not umId:
continue
roleIds = rootInterface.getRoleIdsForUserMandate(str(umId))
for roleId in roleIds:
role = rootInterface.getRole(roleId)
if role and role.roleLabel == "admin" and not role.featureInstanceId:
return True
return False
except Exception as e:
logger.error(f"Error checking mandate admin role: {e}")
return False
def _isUserInMandate(rootInterface, userId: str, mandateId: str) -> bool:
"""Check if a user belongs to a specific mandate."""
try:
userMandates = rootInterface.db.getRecordset(UserMandate, {"userId": userId, "mandateId": mandateId})
return len(userMandates) > 0
except Exception:
return False
@router.get("/users", response_model=List[Dict[str, Any]])
@limiter.limit("60/minute")
def listUsersForOverview(
request: Request,
context: RequestContext = Depends(getRequestContext)
) -> List[Dict[str, Any]]:
"""
Get list of users for selection in the overview.
SysAdmin sees all users. MandateAdmin sees users in their mandate.
Returns:
- List of user dictionaries with basic info
"""
if not _hasMandateAdminRole(context):
raise HTTPException(status_code=403, detail=routeApiMsg("Keine Berechtigung für die Benutzerzugriffsübersicht"))
try:
interface = getRootInterface()
if context.hasSysAdminRole and not context.mandateId:
# SysAdmin without mandate context: all users
allUsers = interface.getAllUsers()
elif context.mandateId:
# With explicit mandate context: users in that mandate
allUsers = interface.getUsersByMandate(str(context.mandateId))
else:
# MandateAdmin without mandate context: aggregate across all admin mandates
userMandates = interface.getUserMandates(str(context.user.id))
adminMandateIds = []
for um in userMandates:
umId = getattr(um, 'id', None)
mid = getattr(um, 'mandateId', None)
if not umId or not mid:
continue
roleIds = interface.getRoleIdsForUserMandate(str(umId))
for roleId in roleIds:
role = interface.getRole(roleId)
if role and role.roleLabel == "admin" and not role.featureInstanceId:
adminMandateIds.append(str(mid))
break
seenUserIds = set()
allUsers = []
for mid in adminMandateIds:
mandateUsers = interface.getUsersByMandate(mid)
for u in (mandateUsers if isinstance(mandateUsers, list) else mandateUsers.items if hasattr(mandateUsers, 'items') else []):
uid = u.get("id") if isinstance(u, dict) else getattr(u, "id", None)
if uid and uid not in seenUserIds:
seenUserIds.add(uid)
allUsers.append(u)
result = []
for u in allUsers:
userData = u if isinstance(u, dict) else u.model_dump() if hasattr(u, 'model_dump') else vars(u)
result.append({
"id": userData.get("id"),
"username": userData.get("username"),
"email": userData.get("email"),
"fullName": userData.get("fullName"),
"isSysAdmin": userData.get("isSysAdmin", False),
"enabled": userData.get("enabled", True),
})
# Sort by username
result.sort(key=lambda x: (x.get("username") or "").lower())
return result
except HTTPException:
raise
except Exception as e:
logger.error(f"Error listing users for overview: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Failed to list users: {str(e)}"
)
@router.get("/{userId}", response_model=Dict[str, Any])
@limiter.limit("60/minute")
def getUserAccessOverview(
request: Request,
userId: str = Path(..., description="User ID to get access overview for"),
mandateId: Optional[str] = Query(None, description="Filter by mandate ID"),
featureInstanceId: Optional[str] = Query(None, description="Filter by feature instance ID"),
context: RequestContext = Depends(getRequestContext)
) -> Dict[str, Any]:
"""
Get comprehensive access overview for a specific user.
SysAdmin sees all users. MandateAdmin sees users in their mandate.
Path Parameters:
- userId: User ID
Query Parameters:
- mandateId: Optional filter by mandate ID
- featureInstanceId: Optional filter by feature instance ID
Returns:
- Comprehensive access overview including:
- User info
- All assigned roles with scope
- UI access (what pages/views the user can see)
- Data access (what tables/fields the user can access)
- Resource access (what resources the user can use)
"""
if not _hasMandateAdminRole(context):
raise HTTPException(status_code=403, detail=routeApiMsg("Keine Berechtigung für die Benutzerzugriffsübersicht"))
try:
interface = getRootInterface()
# MandateAdmin: verify the requested user shares at least one admin mandate
if not context.hasSysAdminRole:
# Get admin's mandate IDs
adminMandateIds = []
userMandates = interface.getUserMandates(str(context.user.id))
for um in userMandates:
umId = getattr(um, 'id', None)
mid = getattr(um, 'mandateId', None)
if not umId or not mid:
continue
roleIds = interface.getRoleIdsForUserMandate(str(umId))
for roleId in roleIds:
role = interface.getRole(roleId)
if role and role.roleLabel == "admin" and not role.featureInstanceId:
adminMandateIds.append(str(mid))
break
# Check that requested user belongs to at least one of the admin's mandates
userInAdminMandate = False
for mid in adminMandateIds:
if _isUserInMandate(interface, userId, mid):
userInAdminMandate = True
break
if not userInAdminMandate:
raise HTTPException(status_code=403, detail=routeApiMsg("Benutzer gehört nicht zu Ihrem Mandate"))
# Get user
user = interface.getUser(userId)
if not user:
raise HTTPException(
status_code=404,
detail=f"User {userId} not found"
)
# Build user info
userInfo = {
"id": user.id,
"username": user.username,
"email": user.email,
"fullName": user.fullName,
"isSysAdmin": user.isSysAdmin,
"enabled": user.enabled,
}
# Collect all roles for the user
allRoles = []
roleIdToInfo = {} # Map roleId to role info for later reference
# Get mandates for this user using interface method
allUserMandates = interface.getUserMandates(userId)
# Filter by enabled and optionally mandateId
userMandates = [um for um in allUserMandates if um.enabled]
if mandateId:
userMandates = [um for um in userMandates if um.mandateId == mandateId]
mandatesInfo = []
for um in userMandates:
umId = um.id
umMandateId = um.mandateId
# Get mandate name
mandate = interface.getMandate(umMandateId)
mandateName = mandate.name if mandate else umMandateId
mandateLabel = (mandate.label or None) if mandate else None
# Get roles for this UserMandate using interface method
umRoles = interface.getUserMandateRoles(umId)
mandateRoleIds = []
for umr in umRoles:
roleId = umr.roleId
if roleId:
mandateRoleIds.append(roleId)
# Get role details using interface method
role = interface.getRole(roleId)
if role:
scope = _getRoleScope(role)
roleInfo = {
"id": roleId,
"roleLabel": role.roleLabel,
"description": role.description or {},
"scope": scope,
"scopePriority": _getRoleScopePriority(scope),
"mandateId": role.mandateId,
"featureInstanceId": role.featureInstanceId,
"source": "mandate",
"sourceMandateId": umMandateId,
"sourceMandateName": mandateName,
}
allRoles.append(roleInfo)
roleIdToInfo[roleId] = roleInfo
# Get feature instances for this mandate using interface method
allFeatureAccesses = interface.getFeatureAccessesForUser(userId)
featureAccesses = [fa for fa in allFeatureAccesses if fa.enabled]
featureInstancesInfo = []
for fa in featureAccesses:
faId = fa.id
faInstanceId = fa.featureInstanceId
# Check if instance belongs to this mandate using interface method
instance = interface.getFeatureInstance(faInstanceId)
if not instance:
continue
if instance.mandateId != umMandateId:
continue
# Filter by featureInstanceId if specified
if featureInstanceId and faInstanceId != featureInstanceId:
continue
# Get feature info using interface method
featureCode = instance.featureCode
feature = interface.getFeatureByCode(featureCode)
featureLabel = feature.label if feature else {}
# Get roles for this FeatureAccess using interface method
instanceRoleIds = interface.getRoleIdsForFeatureAccess(faId)
for roleId in instanceRoleIds:
# Get role details (if not already added)
if roleId not in roleIdToInfo:
role = interface.getRole(roleId)
if role:
scope = _getRoleScope(role)
roleInfo = {
"id": roleId,
"roleLabel": role.roleLabel,
"description": role.description or {},
"scope": scope,
"scopePriority": _getRoleScopePriority(scope),
"mandateId": role.mandateId,
"featureInstanceId": role.featureInstanceId,
"source": "featureInstance",
"sourceInstanceId": faInstanceId,
"sourceInstanceLabel": instance.label,
}
allRoles.append(roleInfo)
roleIdToInfo[roleId] = roleInfo
featureInstancesInfo.append({
"id": faInstanceId,
"label": instance.label,
"featureCode": featureCode,
"featureLabel": featureLabel,
"roleIds": instanceRoleIds,
})
mandatesInfo.append({
"id": umMandateId,
"name": mandateName,
"label": mandateLabel,
"roleIds": mandateRoleIds,
"featureInstances": featureInstancesInfo,
})
# Remove duplicate roles (keep most specific)
uniqueRoles = {}
for role in allRoles:
roleId = role["id"]
if roleId not in uniqueRoles or role["scopePriority"] > uniqueRoles[roleId]["scopePriority"]:
uniqueRoles[roleId] = role
allRoles = list(uniqueRoles.values())
# Get all AccessRules for all role IDs
allRoleIds = list(roleIdToInfo.keys())
# Collect access by context
uiAccess = []
dataAccess = []
resourceAccess = []
for roleId in allRoleIds:
roleInfo = roleIdToInfo.get(roleId, {})
roleLabel = roleInfo.get("roleLabel", "unknown")
roleScope = roleInfo.get("scope", "unknown")
# Get all rules for this role using interface method
rules = interface.getAccessRulesByRole(roleId)
for rule in rules:
context = rule.context
item = rule.item
accessEntry = {
"item": item or "(all)",
"grantedByRoleId": roleId,
"grantedByRoleLabel": roleLabel,
"roleScope": roleScope,
"scopePriority": roleInfo.get("scopePriority", 0),
}
if context == "UI":
accessEntry["view"] = rule.view if rule.view is not None else False
if accessEntry["view"]:
uiAccess.append(accessEntry)
elif context == "DATA":
accessEntry["view"] = rule.view if rule.view is not None else False
accessEntry["read"] = _getAccessLevelLabel(rule.read)
accessEntry["create"] = _getAccessLevelLabel(rule.create)
accessEntry["update"] = _getAccessLevelLabel(rule.update)
accessEntry["delete"] = _getAccessLevelLabel(rule.delete)
dataAccess.append(accessEntry)
elif context == "RESOURCE":
accessEntry["view"] = rule.view if rule.view is not None else False
if accessEntry["view"]:
resourceAccess.append(accessEntry)
# Merge and deduplicate access entries (keep highest priority)
def _mergeAccessEntries(entries: List[Dict], isDataContext: bool = False) -> List[Dict]:
"""Merge entries for same item, keeping highest priority."""
merged = {}
for entry in entries:
item = entry["item"]
priority = entry.get("scopePriority", 0)
if item not in merged or priority > merged[item].get("scopePriority", 0):
merged[item] = entry
elif item in merged and priority == merged[item].get("scopePriority", 0):
# Same priority - merge grantedBy info
existingRoles = merged[item].get("grantedByRoleLabels", [merged[item].get("grantedByRoleLabel")])
if entry["grantedByRoleLabel"] not in existingRoles:
existingRoles.append(entry["grantedByRoleLabel"])
merged[item]["grantedByRoleLabels"] = existingRoles
# For DATA context, merge to most permissive
if isDataContext:
levelOrder = {"NONE": 0, "-": 0, "MY": 1, "GROUP": 2, "ALL": 3}
for field in ["read", "create", "update", "delete"]:
existingLevel = merged[item].get(field, "-")
newLevel = entry.get(field, "-")
if levelOrder.get(newLevel, 0) > levelOrder.get(existingLevel, 0):
merged[item][field] = newLevel
# Clean up and sort
result = list(merged.values())
for entry in result:
if "grantedByRoleLabels" not in entry:
entry["grantedByRoleLabels"] = [entry.get("grantedByRoleLabel")]
# Remove internal priority field from response
entry.pop("scopePriority", None)
result.sort(key=lambda x: x.get("item", ""))
return result
uiAccess = _mergeAccessEntries(uiAccess)
dataAccess = _mergeAccessEntries(dataAccess, isDataContext=True)
resourceAccess = _mergeAccessEntries(resourceAccess)
# Clean up roles for response
for role in allRoles:
role.pop("scopePriority", None)
# Sort roles by scope (instance > mandate > global) then by label
allRoles.sort(key=lambda r: (-_getRoleScopePriority(r.get("scope", "")), r.get("roleLabel", "").lower()))
return {
"user": userInfo,
"isSysAdmin": False,
"roles": allRoles,
"mandates": mandatesInfo,
"uiAccess": uiAccess,
"dataAccess": dataAccess,
"resourceAccess": resourceAccess,
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting user access overview: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Failed to get user access overview: {str(e)}"
)
@router.get("/{userId}/effective-permissions", response_model=Dict[str, Any])
@limiter.limit("60/minute")
def getEffectivePermissions(
request: Request,
userId: str = Path(..., description="User ID"),
mandateId: str = Query(..., description="Mandate ID context"),
featureInstanceId: Optional[str] = Query(None, description="Feature instance ID context"),
accessContext: str = Query("DATA", alias="context", description="Context type: DATA, UI, or RESOURCE"),
item: Optional[str] = Query(None, description="Specific item to check permissions for"),
context: RequestContext = Depends(getRequestContext)
) -> Dict[str, Any]:
"""
Get effective (resolved) permissions for a user in a specific context.
This uses the RBAC resolution logic to show what permissions actually apply.
MULTI-TENANT: SysAdmin sees all. MandateAdmin can check users in their own mandates.
Path Parameters:
- userId: User ID
Query Parameters:
- mandateId: Required mandate context
- featureInstanceId: Optional feature instance context
- context: Permission context (DATA, UI, RESOURCE)
- item: Optional specific item to check
Returns:
- Effective permissions after RBAC resolution
"""
if not context.hasSysAdminRole:
# Check if user has admin role in any mandate
if not _hasMandateAdminRole(context):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=routeApiMsg("Admin role required"))
try:
interface = getRootInterface()
# MandateAdmin: verify the requested user shares at least one admin mandate
if not context.hasSysAdminRole:
adminMandateIds = []
adminUserMandates = interface.getUserMandates(str(context.user.id))
for um in adminUserMandates:
umId = getattr(um, 'id', None)
mid = getattr(um, 'mandateId', None)
if not umId or not mid:
continue
roleIds = interface.getRoleIdsForUserMandate(str(umId))
for roleId in roleIds:
role = interface.getRole(roleId)
if role and role.roleLabel == "admin" and not role.featureInstanceId:
adminMandateIds.append(str(mid))
break
if not adminMandateIds:
raise HTTPException(status_code=403, detail=routeApiMsg("Insufficient permissions"))
userInAdminMandate = False
for mid in adminMandateIds:
if _isUserInMandate(interface, userId, mid):
userInAdminMandate = True
break
if not userInAdminMandate:
raise HTTPException(status_code=403, detail=routeApiMsg("Benutzer gehört nicht zu Ihrem Mandate"))
# Get user
user = interface.getUser(userId)
if not user:
raise HTTPException(
status_code=404,
detail=f"User {userId} not found"
)
# Convert context string to enum
try:
contextEnum = AccessRuleContext(accessContext)
except ValueError:
raise HTTPException(
status_code=400,
detail=f"Invalid context: {accessContext}. Must be DATA, UI, or RESOURCE."
)
# Use RBAC interface to get actual permissions
from modules.security.rbac import RbacClass
rbac = RbacClass(interface.db, dbApp=interface.db)
permissions = rbac.getUserPermissions(
user=user,
context=contextEnum,
item=item or "",
mandateId=mandateId,
featureInstanceId=featureInstanceId
)
return {
"userId": userId,
"mandateId": mandateId,
"featureInstanceId": featureInstanceId,
"context": accessContext,
"item": item,
"effectivePermissions": {
"view": permissions.view,
"read": _getAccessLevelLabel(permissions.read.value if permissions.read else None),
"create": _getAccessLevelLabel(permissions.create.value if permissions.create else None),
"update": _getAccessLevelLabel(permissions.update.value if permissions.update else None),
"delete": _getAccessLevelLabel(permissions.delete.value if permissions.delete else None),
}
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting effective permissions: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Failed to get effective permissions: {str(e)}"
)