gateway/modules/routes/routeAdminUserAccessOverview.py
2026-02-08 14:26:01 +01:00

493 lines
19 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Admin User Access Overview routes.
Provides endpoints for viewing complete user access permissions.
MULTI-TENANT: These are SYSTEM-LEVEL operations requiring isSysAdmin=true.
Shows comprehensive view of what a user can see and access.
"""
from fastapi import APIRouter, HTTPException, Depends, Query, Path, Request
from typing import List, Dict, Any, Optional, Set
import logging
from modules.auth import limiter, requireSysAdmin
from modules.datamodels.datamodelUam import User, UserInDB
from modules.datamodels.datamodelRbac import Role, AccessRule, AccessRuleContext
from modules.datamodels.datamodelMembership import (
UserMandate,
UserMandateRole,
FeatureAccess,
FeatureAccessRole,
)
from modules.datamodels.datamodelFeatures import FeatureInstance, Feature
from modules.interfaces.interfaceDbApp import getRootInterface
# Configure logger
logger = logging.getLogger(__name__)
router = APIRouter(
prefix="/api/admin/user-access-overview",
tags=["Admin User Access Overview"],
responses={404: {"description": "Not found"}}
)
def _getAccessLevelLabel(level: Optional[str]) -> str:
"""Convert access level code to human-readable label."""
labels = {
"a": "ALL",
"m": "MY",
"g": "GROUP",
"n": "NONE",
None: "-"
}
return labels.get(level, "-")
def _getRoleScope(role) -> str:
"""Determine the scope of a role. Accepts Role object or dict."""
# Support both Pydantic models and dicts
featureInstanceId = getattr(role, 'featureInstanceId', None) or (role.get("featureInstanceId") if isinstance(role, dict) else None)
mandateId = getattr(role, 'mandateId', None) or (role.get("mandateId") if isinstance(role, dict) else None)
if featureInstanceId:
return "instance"
elif mandateId:
return "mandate"
else:
return "global"
def _getRoleScopePriority(scope: str) -> int:
"""Get priority for role scope (higher = more specific)."""
priorities = {"global": 1, "mandate": 2, "instance": 3}
return priorities.get(scope, 0)
@router.get("/users", response_model=List[Dict[str, Any]])
@limiter.limit("60/minute")
def listUsersForOverview(
request: Request,
currentUser: User = Depends(requireSysAdmin)
) -> List[Dict[str, Any]]:
"""
Get list of all users for selection in the overview.
MULTI-TENANT: SysAdmin-only.
Returns:
- List of user dictionaries with basic info
"""
try:
interface = getRootInterface()
# Get all users using interface method
allUsers = interface.getAllUsers()
result = []
for u in allUsers:
result.append({
"id": u.id,
"username": u.username,
"email": u.email,
"fullName": u.fullName,
"isSysAdmin": u.isSysAdmin,
"enabled": u.enabled,
})
# Sort by username
result.sort(key=lambda x: (x.get("username") or "").lower())
return result
except Exception as e:
logger.error(f"Error listing users for overview: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Failed to list users: {str(e)}"
)
@router.get("/{userId}", response_model=Dict[str, Any])
@limiter.limit("60/minute")
def getUserAccessOverview(
request: Request,
userId: str = Path(..., description="User ID to get access overview for"),
mandateId: Optional[str] = Query(None, description="Filter by mandate ID"),
featureInstanceId: Optional[str] = Query(None, description="Filter by feature instance ID"),
currentUser: User = Depends(requireSysAdmin)
) -> Dict[str, Any]:
"""
Get comprehensive access overview for a specific user.
MULTI-TENANT: SysAdmin-only.
Path Parameters:
- userId: User ID
Query Parameters:
- mandateId: Optional filter by mandate ID
- featureInstanceId: Optional filter by feature instance ID
Returns:
- Comprehensive access overview including:
- User info
- All assigned roles with scope
- UI access (what pages/views the user can see)
- Data access (what tables/fields the user can access)
- Resource access (what resources the user can use)
"""
try:
interface = getRootInterface()
# Get user
user = interface.getUser(userId)
if not user:
raise HTTPException(
status_code=404,
detail=f"User {userId} not found"
)
# Build user info
userInfo = {
"id": user.id,
"username": user.username,
"email": user.email,
"fullName": user.fullName,
"isSysAdmin": user.isSysAdmin,
"enabled": user.enabled,
}
# If user is SysAdmin, they have full access to everything
if user.isSysAdmin:
return {
"user": userInfo,
"isSysAdmin": True,
"sysAdminNote": "SysAdmin users have full access to all system-level resources without mandate context.",
"roles": [],
"mandates": [],
"uiAccess": [],
"dataAccess": [],
"resourceAccess": [],
}
# Collect all roles for the user
allRoles = []
roleIdToInfo = {} # Map roleId to role info for later reference
# Get mandates for this user using interface method
allUserMandates = interface.getUserMandates(userId)
# Filter by enabled and optionally mandateId
userMandates = [um for um in allUserMandates if um.enabled]
if mandateId:
userMandates = [um for um in userMandates if um.mandateId == mandateId]
mandatesInfo = []
for um in userMandates:
umId = um.id
umMandateId = um.mandateId
# Get mandate name
mandate = interface.getMandate(umMandateId)
mandateName = mandate.name if mandate else umMandateId
# Get roles for this UserMandate using interface method
umRoles = interface.getUserMandateRoles(umId)
mandateRoleIds = []
for umr in umRoles:
roleId = umr.roleId
if roleId:
mandateRoleIds.append(roleId)
# Get role details using interface method
role = interface.getRole(roleId)
if role:
scope = _getRoleScope(role)
roleInfo = {
"id": roleId,
"roleLabel": role.roleLabel,
"description": role.description or {},
"scope": scope,
"scopePriority": _getRoleScopePriority(scope),
"mandateId": role.mandateId,
"featureInstanceId": role.featureInstanceId,
"source": "mandate",
"sourceMandateId": umMandateId,
"sourceMandateName": mandateName,
}
allRoles.append(roleInfo)
roleIdToInfo[roleId] = roleInfo
# Get feature instances for this mandate using interface method
allFeatureAccesses = interface.getFeatureAccessesForUser(userId)
featureAccesses = [fa for fa in allFeatureAccesses if fa.enabled]
featureInstancesInfo = []
for fa in featureAccesses:
faId = fa.id
faInstanceId = fa.featureInstanceId
# Check if instance belongs to this mandate using interface method
instance = interface.getFeatureInstance(faInstanceId)
if not instance:
continue
if instance.mandateId != umMandateId:
continue
# Filter by featureInstanceId if specified
if featureInstanceId and faInstanceId != featureInstanceId:
continue
# Get feature info using interface method
featureCode = instance.featureCode
feature = interface.getFeatureByCode(featureCode)
featureLabel = feature.label if feature else {}
# Get roles for this FeatureAccess using interface method
instanceRoleIds = interface.getRoleIdsForFeatureAccess(faId)
for roleId in instanceRoleIds:
# Get role details (if not already added)
if roleId not in roleIdToInfo:
role = interface.getRole(roleId)
if role:
scope = _getRoleScope(role)
roleInfo = {
"id": roleId,
"roleLabel": role.roleLabel,
"description": role.description or {},
"scope": scope,
"scopePriority": _getRoleScopePriority(scope),
"mandateId": role.mandateId,
"featureInstanceId": role.featureInstanceId,
"source": "featureInstance",
"sourceInstanceId": faInstanceId,
"sourceInstanceLabel": instance.label,
}
allRoles.append(roleInfo)
roleIdToInfo[roleId] = roleInfo
featureInstancesInfo.append({
"id": faInstanceId,
"label": instance.label,
"featureCode": featureCode,
"featureLabel": featureLabel,
"roleIds": instanceRoleIds,
})
mandatesInfo.append({
"id": umMandateId,
"name": mandateName,
"roleIds": mandateRoleIds,
"featureInstances": featureInstancesInfo,
})
# Remove duplicate roles (keep most specific)
uniqueRoles = {}
for role in allRoles:
roleId = role["id"]
if roleId not in uniqueRoles or role["scopePriority"] > uniqueRoles[roleId]["scopePriority"]:
uniqueRoles[roleId] = role
allRoles = list(uniqueRoles.values())
# Get all AccessRules for all role IDs
allRoleIds = list(roleIdToInfo.keys())
# Collect access by context
uiAccess = []
dataAccess = []
resourceAccess = []
for roleId in allRoleIds:
roleInfo = roleIdToInfo.get(roleId, {})
roleLabel = roleInfo.get("roleLabel", "unknown")
roleScope = roleInfo.get("scope", "unknown")
# Get all rules for this role using interface method
rules = interface.getAccessRulesByRole(roleId)
for rule in rules:
context = rule.context
item = rule.item
accessEntry = {
"item": item or "(all)",
"grantedByRoleId": roleId,
"grantedByRoleLabel": roleLabel,
"roleScope": roleScope,
"scopePriority": roleInfo.get("scopePriority", 0),
}
if context == "UI":
accessEntry["view"] = rule.view if rule.view is not None else False
if accessEntry["view"]:
uiAccess.append(accessEntry)
elif context == "DATA":
accessEntry["view"] = rule.view if rule.view is not None else False
accessEntry["read"] = _getAccessLevelLabel(rule.read)
accessEntry["create"] = _getAccessLevelLabel(rule.create)
accessEntry["update"] = _getAccessLevelLabel(rule.update)
accessEntry["delete"] = _getAccessLevelLabel(rule.delete)
dataAccess.append(accessEntry)
elif context == "RESOURCE":
accessEntry["view"] = rule.view if rule.view is not None else False
if accessEntry["view"]:
resourceAccess.append(accessEntry)
# Merge and deduplicate access entries (keep highest priority)
def _mergeAccessEntries(entries: List[Dict], isDataContext: bool = False) -> List[Dict]:
"""Merge entries for same item, keeping highest priority."""
merged = {}
for entry in entries:
item = entry["item"]
priority = entry.get("scopePriority", 0)
if item not in merged or priority > merged[item].get("scopePriority", 0):
merged[item] = entry
elif item in merged and priority == merged[item].get("scopePriority", 0):
# Same priority - merge grantedBy info
existingRoles = merged[item].get("grantedByRoleLabels", [merged[item].get("grantedByRoleLabel")])
if entry["grantedByRoleLabel"] not in existingRoles:
existingRoles.append(entry["grantedByRoleLabel"])
merged[item]["grantedByRoleLabels"] = existingRoles
# For DATA context, merge to most permissive
if isDataContext:
levelOrder = {"NONE": 0, "-": 0, "MY": 1, "GROUP": 2, "ALL": 3}
for field in ["read", "create", "update", "delete"]:
existingLevel = merged[item].get(field, "-")
newLevel = entry.get(field, "-")
if levelOrder.get(newLevel, 0) > levelOrder.get(existingLevel, 0):
merged[item][field] = newLevel
# Clean up and sort
result = list(merged.values())
for entry in result:
if "grantedByRoleLabels" not in entry:
entry["grantedByRoleLabels"] = [entry.get("grantedByRoleLabel")]
# Remove internal priority field from response
entry.pop("scopePriority", None)
result.sort(key=lambda x: x.get("item", ""))
return result
uiAccess = _mergeAccessEntries(uiAccess)
dataAccess = _mergeAccessEntries(dataAccess, isDataContext=True)
resourceAccess = _mergeAccessEntries(resourceAccess)
# Clean up roles for response
for role in allRoles:
role.pop("scopePriority", None)
# Sort roles by scope (instance > mandate > global) then by label
allRoles.sort(key=lambda r: (-_getRoleScopePriority(r.get("scope", "")), r.get("roleLabel", "").lower()))
return {
"user": userInfo,
"isSysAdmin": False,
"roles": allRoles,
"mandates": mandatesInfo,
"uiAccess": uiAccess,
"dataAccess": dataAccess,
"resourceAccess": resourceAccess,
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting user access overview: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Failed to get user access overview: {str(e)}"
)
@router.get("/{userId}/effective-permissions", response_model=Dict[str, Any])
@limiter.limit("60/minute")
def getEffectivePermissions(
request: Request,
userId: str = Path(..., description="User ID"),
mandateId: str = Query(..., description="Mandate ID context"),
featureInstanceId: Optional[str] = Query(None, description="Feature instance ID context"),
context: str = Query("DATA", description="Context type: DATA, UI, or RESOURCE"),
item: Optional[str] = Query(None, description="Specific item to check permissions for"),
currentUser: User = Depends(requireSysAdmin)
) -> Dict[str, Any]:
"""
Get effective (resolved) permissions for a user in a specific context.
This uses the RBAC resolution logic to show what permissions actually apply.
MULTI-TENANT: SysAdmin-only.
Path Parameters:
- userId: User ID
Query Parameters:
- mandateId: Required mandate context
- featureInstanceId: Optional feature instance context
- context: Permission context (DATA, UI, RESOURCE)
- item: Optional specific item to check
Returns:
- Effective permissions after RBAC resolution
"""
try:
interface = getRootInterface()
# Get user
user = interface.getUser(userId)
if not user:
raise HTTPException(
status_code=404,
detail=f"User {userId} not found"
)
# Convert context string to enum
try:
contextEnum = AccessRuleContext(context)
except ValueError:
raise HTTPException(
status_code=400,
detail=f"Invalid context: {context}. Must be DATA, UI, or RESOURCE."
)
# Use RBAC interface to get actual permissions
from modules.security.rbac import RbacClass
rbac = RbacClass(interface.db, dbApp=interface.db)
permissions = rbac.getUserPermissions(
user=user,
context=contextEnum,
item=item or "",
mandateId=mandateId,
featureInstanceId=featureInstanceId
)
return {
"userId": userId,
"mandateId": mandateId,
"featureInstanceId": featureInstanceId,
"context": context,
"item": item,
"effectivePermissions": {
"view": permissions.view,
"read": _getAccessLevelLabel(permissions.read.value if permissions.read else None),
"create": _getAccessLevelLabel(permissions.create.value if permissions.create else None),
"update": _getAccessLevelLabel(permissions.update.value if permissions.update else None),
"delete": _getAccessLevelLabel(permissions.delete.value if permissions.delete else None),
}
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting effective permissions: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Failed to get effective permissions: {str(e)}"
)