# Copyright (c) 2025 Patrick Motsch # All rights reserved. """ Admin User Access Overview routes. Provides endpoints for viewing complete user access permissions. MULTI-TENANT: These are SYSTEM-LEVEL operations requiring isSysAdmin=true. Shows comprehensive view of what a user can see and access. """ from fastapi import APIRouter, HTTPException, Depends, Query, Path, Request, status from typing import List, Dict, Any, Optional, Set import logging from modules.auth import limiter from modules.auth.authentication import getRequestContext, RequestContext from modules.datamodels.datamodelUam import User, UserInDB from modules.datamodels.datamodelRbac import Role, AccessRule, AccessRuleContext from modules.datamodels.datamodelMembership import ( UserMandate, UserMandateRole, FeatureAccess, FeatureAccessRole, ) from modules.datamodels.datamodelFeatures import FeatureInstance, Feature from modules.interfaces.interfaceDbApp import getRootInterface from modules.shared.i18nRegistry import apiRouteContext, t, _getLanguage routeApiMsg = apiRouteContext("routeAdminUserAccessOverview") # Configure logger logger = logging.getLogger(__name__) def _resolveTextMultilingual(value) -> str: """Resolve a TextMultilingual dict to a single string for the current request language. Falls back to xx (source text), then any available value.""" if isinstance(value, str): return value if isinstance(value, dict): lang = _getLanguage() return value.get(lang) or value.get("xx") or next(iter(value.values()), "") return str(value) if value else "" router = APIRouter( prefix="/api/admin/user-access-overview", tags=["Admin User Access Overview"], responses={404: {"description": "Not found"}} ) def _getAccessLevelLabel(level: Optional[str]) -> str: """Convert access level code to human-readable label.""" labels = { "a": "ALL", "m": "MY", "g": "GROUP", "n": "NONE", None: "-" } return labels.get(level, "-") def _getRoleScope(role) -> str: """Determine the scope of a role. Accepts Role object or dict.""" # Support both Pydantic models and dicts featureInstanceId = getattr(role, 'featureInstanceId', None) or (role.get("featureInstanceId") if isinstance(role, dict) else None) mandateId = getattr(role, 'mandateId', None) or (role.get("mandateId") if isinstance(role, dict) else None) if featureInstanceId: return "instance" elif mandateId: return "mandate" else: return "global" def _getRoleScopePriority(scope: str) -> int: """Get priority for role scope (higher = more specific).""" priorities = {"global": 1, "mandate": 2, "instance": 3} return priorities.get(scope, 0) def _hasMandateAdminRole(context: RequestContext) -> bool: """Check if the user has mandate admin role in ANY mandate. Loads roles independently from request context (context.roleIds may be empty when no X-Mandate-Id header is sent, e.g., on admin pages). """ if context.hasSysAdminRole: return True try: rootInterface = getRootInterface() userMandates = rootInterface.getUserMandates(str(context.user.id)) for um in userMandates: umId = getattr(um, 'id', None) if not umId: continue roleIds = rootInterface.getRoleIdsForUserMandate(str(umId)) for roleId in roleIds: role = rootInterface.getRole(roleId) if role and role.roleLabel == "admin" and not role.featureInstanceId: return True return False except Exception as e: logger.error(f"Error checking mandate admin role: {e}") return False def _isUserInMandate(rootInterface, userId: str, mandateId: str) -> bool: """Check if a user belongs to a specific mandate.""" try: userMandates = rootInterface.db.getRecordset(UserMandate, {"userId": userId, "mandateId": mandateId}) return len(userMandates) > 0 except Exception: return False @router.get("/users", response_model=List[Dict[str, Any]]) @limiter.limit("60/minute") def listUsersForOverview( request: Request, context: RequestContext = Depends(getRequestContext) ) -> List[Dict[str, Any]]: """ Get list of users for selection in the overview. SysAdmin sees all users. MandateAdmin sees users in their mandate. Returns: - List of user dictionaries with basic info """ if not _hasMandateAdminRole(context): raise HTTPException(status_code=403, detail=routeApiMsg("Keine Berechtigung für die Benutzerzugriffsübersicht")) try: interface = getRootInterface() if context.hasSysAdminRole and not context.mandateId: # SysAdmin without mandate context: all users allUsers = interface.getAllUsers() elif context.mandateId: # With explicit mandate context: users in that mandate allUsers = interface.getUsersByMandate(str(context.mandateId)) else: # MandateAdmin without mandate context: aggregate across all admin mandates userMandates = interface.getUserMandates(str(context.user.id)) adminMandateIds = [] for um in userMandates: umId = getattr(um, 'id', None) mid = getattr(um, 'mandateId', None) if not umId or not mid: continue roleIds = interface.getRoleIdsForUserMandate(str(umId)) for roleId in roleIds: role = interface.getRole(roleId) if role and role.roleLabel == "admin" and not role.featureInstanceId: adminMandateIds.append(str(mid)) break seenUserIds = set() allUsers = [] for mid in adminMandateIds: mandateUsers = interface.getUsersByMandate(mid) for u in (mandateUsers if isinstance(mandateUsers, list) else mandateUsers.items if hasattr(mandateUsers, 'items') else []): uid = u.get("id") if isinstance(u, dict) else getattr(u, "id", None) if uid and uid not in seenUserIds: seenUserIds.add(uid) allUsers.append(u) result = [] for u in allUsers: userData = u if isinstance(u, dict) else u.model_dump() if hasattr(u, 'model_dump') else vars(u) result.append({ "id": userData.get("id"), "username": userData.get("username"), "email": userData.get("email"), "fullName": userData.get("fullName"), "isSysAdmin": userData.get("isSysAdmin", False), "enabled": userData.get("enabled", True), }) # Sort by username result.sort(key=lambda x: (x.get("username") or "").lower()) return result except HTTPException: raise except Exception as e: logger.error(f"Error listing users for overview: {str(e)}") raise HTTPException( status_code=500, detail=f"Failed to list users: {str(e)}" ) @router.get("/{userId}", response_model=Dict[str, Any]) @limiter.limit("60/minute") def getUserAccessOverview( request: Request, userId: str = Path(..., description="User ID to get access overview for"), mandateId: Optional[str] = Query(None, description="Filter by mandate ID"), featureInstanceId: Optional[str] = Query(None, description="Filter by feature instance ID"), context: RequestContext = Depends(getRequestContext) ) -> Dict[str, Any]: """ Get comprehensive access overview for a specific user. SysAdmin sees all users. MandateAdmin sees users in their mandate. Path Parameters: - userId: User ID Query Parameters: - mandateId: Optional filter by mandate ID - featureInstanceId: Optional filter by feature instance ID Returns: - Comprehensive access overview including: - User info - All assigned roles with scope - UI access (what pages/views the user can see) - Data access (what tables/fields the user can access) - Resource access (what resources the user can use) """ if not _hasMandateAdminRole(context): raise HTTPException(status_code=403, detail=routeApiMsg("Keine Berechtigung für die Benutzerzugriffsübersicht")) try: interface = getRootInterface() # MandateAdmin: verify the requested user shares at least one admin mandate if not context.hasSysAdminRole: # Get admin's mandate IDs adminMandateIds = [] userMandates = interface.getUserMandates(str(context.user.id)) for um in userMandates: umId = getattr(um, 'id', None) mid = getattr(um, 'mandateId', None) if not umId or not mid: continue roleIds = interface.getRoleIdsForUserMandate(str(umId)) for roleId in roleIds: role = interface.getRole(roleId) if role and role.roleLabel == "admin" and not role.featureInstanceId: adminMandateIds.append(str(mid)) break # Check that requested user belongs to at least one of the admin's mandates userInAdminMandate = False for mid in adminMandateIds: if _isUserInMandate(interface, userId, mid): userInAdminMandate = True break if not userInAdminMandate: raise HTTPException(status_code=403, detail=routeApiMsg("Benutzer gehört nicht zu Ihrem Mandate")) # Get user user = interface.getUser(userId) if not user: raise HTTPException( status_code=404, detail=f"User {userId} not found" ) # Build user info userInfo = { "id": user.id, "username": user.username, "email": user.email, "fullName": user.fullName, "isSysAdmin": user.isSysAdmin, "enabled": user.enabled, } # Collect all roles for the user allRoles = [] roleIdToInfo = {} # Map roleId to role info for later reference # Get mandates for this user using interface method allUserMandates = interface.getUserMandates(userId) # Filter by enabled and optionally mandateId userMandates = [um for um in allUserMandates if um.enabled] if mandateId: userMandates = [um for um in userMandates if um.mandateId == mandateId] mandatesInfo = [] for um in userMandates: umId = um.id umMandateId = um.mandateId # Get mandate name mandate = interface.getMandate(umMandateId) mandateName = mandate.name if mandate else umMandateId mandateLabel = (mandate.label or None) if mandate else None # Get roles for this UserMandate using interface method umRoles = interface.getUserMandateRoles(umId) mandateRoleIds = [] for umr in umRoles: roleId = umr.roleId if roleId: mandateRoleIds.append(roleId) # Get role details using interface method role = interface.getRole(roleId) if role: scope = _getRoleScope(role) roleInfo = { "id": roleId, "roleLabel": role.roleLabel, "description": _resolveTextMultilingual(role.description), "scope": scope, "scopePriority": _getRoleScopePriority(scope), "mandateId": role.mandateId, "featureInstanceId": role.featureInstanceId, "source": "mandate", "sourceMandateId": umMandateId, "sourceMandateName": mandateName, } allRoles.append(roleInfo) roleIdToInfo[roleId] = roleInfo # Get feature instances for this mandate using interface method allFeatureAccesses = interface.getFeatureAccessesForUser(userId) featureAccesses = [fa for fa in allFeatureAccesses if fa.enabled] featureInstancesInfo = [] for fa in featureAccesses: faId = fa.id faInstanceId = fa.featureInstanceId # Check if instance belongs to this mandate using interface method instance = interface.getFeatureInstance(faInstanceId) if not instance: continue if instance.mandateId != umMandateId: continue # Filter by featureInstanceId if specified if featureInstanceId and faInstanceId != featureInstanceId: continue # Get feature info using interface method featureCode = instance.featureCode feature = interface.getFeatureByCode(featureCode) featureLabel = t(feature.label) if feature and feature.label else "" # Get roles for this FeatureAccess using interface method instanceRoleIds = interface.getRoleIdsForFeatureAccess(faId) for roleId in instanceRoleIds: # Get role details (if not already added) if roleId not in roleIdToInfo: role = interface.getRole(roleId) if role: scope = _getRoleScope(role) roleInfo = { "id": roleId, "roleLabel": role.roleLabel, "description": _resolveTextMultilingual(role.description), "scope": scope, "scopePriority": _getRoleScopePriority(scope), "mandateId": role.mandateId, "featureInstanceId": role.featureInstanceId, "source": "featureInstance", "sourceInstanceId": faInstanceId, "sourceInstanceLabel": instance.label, } allRoles.append(roleInfo) roleIdToInfo[roleId] = roleInfo featureInstancesInfo.append({ "id": faInstanceId, "label": instance.label, "featureCode": featureCode, "featureLabel": featureLabel, "roleIds": instanceRoleIds, }) mandatesInfo.append({ "id": umMandateId, "name": mandateName, "label": mandateLabel, "roleIds": mandateRoleIds, "featureInstances": featureInstancesInfo, }) # Remove duplicate roles (keep most specific) uniqueRoles = {} for role in allRoles: roleId = role["id"] if roleId not in uniqueRoles or role["scopePriority"] > uniqueRoles[roleId]["scopePriority"]: uniqueRoles[roleId] = role allRoles = list(uniqueRoles.values()) # Get all AccessRules for all role IDs allRoleIds = list(roleIdToInfo.keys()) # Collect access by context uiAccess = [] dataAccess = [] resourceAccess = [] for roleId in allRoleIds: roleInfo = roleIdToInfo.get(roleId, {}) roleLabel = roleInfo.get("roleLabel", "unknown") roleScope = roleInfo.get("scope", "unknown") # Get all rules for this role using interface method rules = interface.getAccessRulesByRole(roleId) for rule in rules: context = rule.context item = rule.item accessEntry = { "item": item or "(all)", "grantedByRoleId": roleId, "grantedByRoleLabel": roleLabel, "roleScope": roleScope, "scopePriority": roleInfo.get("scopePriority", 0), } if context == "UI": accessEntry["view"] = rule.view if rule.view is not None else False if accessEntry["view"]: uiAccess.append(accessEntry) elif context == "DATA": accessEntry["view"] = rule.view if rule.view is not None else False accessEntry["read"] = _getAccessLevelLabel(rule.read) accessEntry["create"] = _getAccessLevelLabel(rule.create) accessEntry["update"] = _getAccessLevelLabel(rule.update) accessEntry["delete"] = _getAccessLevelLabel(rule.delete) dataAccess.append(accessEntry) elif context == "RESOURCE": accessEntry["view"] = rule.view if rule.view is not None else False if accessEntry["view"]: resourceAccess.append(accessEntry) # Merge and deduplicate access entries (keep highest priority) def _mergeAccessEntries(entries: List[Dict], isDataContext: bool = False) -> List[Dict]: """Merge entries for same item, keeping highest priority.""" merged = {} for entry in entries: item = entry["item"] priority = entry.get("scopePriority", 0) if item not in merged or priority > merged[item].get("scopePriority", 0): merged[item] = entry elif item in merged and priority == merged[item].get("scopePriority", 0): # Same priority - merge grantedBy info existingRoles = merged[item].get("grantedByRoleLabels", [merged[item].get("grantedByRoleLabel")]) if entry["grantedByRoleLabel"] not in existingRoles: existingRoles.append(entry["grantedByRoleLabel"]) merged[item]["grantedByRoleLabels"] = existingRoles # For DATA context, merge to most permissive if isDataContext: levelOrder = {"NONE": 0, "-": 0, "MY": 1, "GROUP": 2, "ALL": 3} for field in ["read", "create", "update", "delete"]: existingLevel = merged[item].get(field, "-") newLevel = entry.get(field, "-") if levelOrder.get(newLevel, 0) > levelOrder.get(existingLevel, 0): merged[item][field] = newLevel # Clean up and sort result = list(merged.values()) for entry in result: if "grantedByRoleLabels" not in entry: entry["grantedByRoleLabels"] = [entry.get("grantedByRoleLabel")] # Remove internal priority field from response entry.pop("scopePriority", None) result.sort(key=lambda x: x.get("item", "")) return result uiAccess = _mergeAccessEntries(uiAccess) dataAccess = _mergeAccessEntries(dataAccess, isDataContext=True) resourceAccess = _mergeAccessEntries(resourceAccess) # Clean up roles for response for role in allRoles: role.pop("scopePriority", None) # Sort roles by scope (instance > mandate > global) then by label allRoles.sort(key=lambda r: (-_getRoleScopePriority(r.get("scope", "")), r.get("roleLabel", "").lower())) return { "user": userInfo, "isSysAdmin": False, "roles": allRoles, "mandates": mandatesInfo, "uiAccess": uiAccess, "dataAccess": dataAccess, "resourceAccess": resourceAccess, } except HTTPException: raise except Exception as e: logger.error(f"Error getting user access overview: {str(e)}") raise HTTPException( status_code=500, detail=f"Failed to get user access overview: {str(e)}" ) @router.get("/{userId}/effective-permissions", response_model=Dict[str, Any]) @limiter.limit("60/minute") def getEffectivePermissions( request: Request, userId: str = Path(..., description="User ID"), mandateId: str = Query(..., description="Mandate ID context"), featureInstanceId: Optional[str] = Query(None, description="Feature instance ID context"), accessContext: str = Query("DATA", alias="context", description="Context type: DATA, UI, or RESOURCE"), item: Optional[str] = Query(None, description="Specific item to check permissions for"), context: RequestContext = Depends(getRequestContext) ) -> Dict[str, Any]: """ Get effective (resolved) permissions for a user in a specific context. This uses the RBAC resolution logic to show what permissions actually apply. MULTI-TENANT: SysAdmin sees all. MandateAdmin can check users in their own mandates. Path Parameters: - userId: User ID Query Parameters: - mandateId: Required mandate context - featureInstanceId: Optional feature instance context - context: Permission context (DATA, UI, RESOURCE) - item: Optional specific item to check Returns: - Effective permissions after RBAC resolution """ if not context.hasSysAdminRole: # Check if user has admin role in any mandate if not _hasMandateAdminRole(context): raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=routeApiMsg("Admin role required")) try: interface = getRootInterface() # MandateAdmin: verify the requested user shares at least one admin mandate if not context.hasSysAdminRole: adminMandateIds = [] adminUserMandates = interface.getUserMandates(str(context.user.id)) for um in adminUserMandates: umId = getattr(um, 'id', None) mid = getattr(um, 'mandateId', None) if not umId or not mid: continue roleIds = interface.getRoleIdsForUserMandate(str(umId)) for roleId in roleIds: role = interface.getRole(roleId) if role and role.roleLabel == "admin" and not role.featureInstanceId: adminMandateIds.append(str(mid)) break if not adminMandateIds: raise HTTPException(status_code=403, detail=routeApiMsg("Insufficient permissions")) userInAdminMandate = False for mid in adminMandateIds: if _isUserInMandate(interface, userId, mid): userInAdminMandate = True break if not userInAdminMandate: raise HTTPException(status_code=403, detail=routeApiMsg("Benutzer gehört nicht zu Ihrem Mandate")) # Get user user = interface.getUser(userId) if not user: raise HTTPException( status_code=404, detail=f"User {userId} not found" ) # Convert context string to enum try: contextEnum = AccessRuleContext(accessContext) except ValueError: raise HTTPException( status_code=400, detail=f"Invalid context: {accessContext}. Must be DATA, UI, or RESOURCE." ) # Use RBAC interface to get actual permissions from modules.security.rbac import RbacClass rbac = RbacClass(interface.db, dbApp=interface.db) permissions = rbac.getUserPermissions( user=user, context=contextEnum, item=item or "", mandateId=mandateId, featureInstanceId=featureInstanceId ) return { "userId": userId, "mandateId": mandateId, "featureInstanceId": featureInstanceId, "context": accessContext, "item": item, "effectivePermissions": { "view": permissions.view, "read": _getAccessLevelLabel(permissions.read.value if permissions.read else None), "create": _getAccessLevelLabel(permissions.create.value if permissions.create else None), "update": _getAccessLevelLabel(permissions.update.value if permissions.update else None), "delete": _getAccessLevelLabel(permissions.delete.value if permissions.delete else None), } } except HTTPException: raise except Exception as e: logger.error(f"Error getting effective permissions: {str(e)}") raise HTTPException( status_code=500, detail=f"Failed to get effective permissions: {str(e)}" )