503 lines
19 KiB
Python
503 lines
19 KiB
Python
# Copyright (c) 2025 Patrick Motsch
|
|
# All rights reserved.
|
|
"""
|
|
Admin User Access Overview routes.
|
|
Provides endpoints for viewing complete user access permissions.
|
|
|
|
MULTI-TENANT: These are SYSTEM-LEVEL operations requiring isSysAdmin=true.
|
|
Shows comprehensive view of what a user can see and access.
|
|
"""
|
|
|
|
from fastapi import APIRouter, HTTPException, Depends, Query, Path, Request
|
|
from typing import List, Dict, Any, Optional, Set
|
|
import logging
|
|
|
|
from modules.auth import limiter, requireSysAdmin
|
|
from modules.datamodels.datamodelUam import User, UserInDB
|
|
from modules.datamodels.datamodelRbac import Role, AccessRule, AccessRuleContext
|
|
from modules.datamodels.datamodelMembership import (
|
|
UserMandate,
|
|
UserMandateRole,
|
|
FeatureAccess,
|
|
FeatureAccessRole,
|
|
)
|
|
from modules.datamodels.datamodelFeatures import FeatureInstance, Feature
|
|
from modules.interfaces.interfaceDbApp import getRootInterface
|
|
|
|
# Configure logger
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
router = APIRouter(
|
|
prefix="/api/admin/user-access-overview",
|
|
tags=["Admin User Access Overview"],
|
|
responses={404: {"description": "Not found"}}
|
|
)
|
|
|
|
|
|
def _getAccessLevelLabel(level: Optional[str]) -> str:
|
|
"""Convert access level code to human-readable label."""
|
|
labels = {
|
|
"a": "ALL",
|
|
"m": "MY",
|
|
"g": "GROUP",
|
|
"n": "NONE",
|
|
None: "-"
|
|
}
|
|
return labels.get(level, "-")
|
|
|
|
|
|
def _getRoleScope(role: Dict[str, Any]) -> str:
|
|
"""Determine the scope of a role."""
|
|
if role.get("featureInstanceId"):
|
|
return "instance"
|
|
elif role.get("mandateId"):
|
|
return "mandate"
|
|
else:
|
|
return "global"
|
|
|
|
|
|
def _getRoleScopePriority(scope: str) -> int:
|
|
"""Get priority for role scope (higher = more specific)."""
|
|
priorities = {"global": 1, "mandate": 2, "instance": 3}
|
|
return priorities.get(scope, 0)
|
|
|
|
|
|
@router.get("/users", response_model=List[Dict[str, Any]])
|
|
@limiter.limit("60/minute")
|
|
async def listUsersForOverview(
|
|
request: Request,
|
|
currentUser: User = Depends(requireSysAdmin)
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
Get list of all users for selection in the overview.
|
|
MULTI-TENANT: SysAdmin-only.
|
|
|
|
Returns:
|
|
- List of user dictionaries with basic info
|
|
"""
|
|
try:
|
|
interface = getRootInterface()
|
|
|
|
# Get all users
|
|
allUsersData = interface.db.getRecordset(UserInDB)
|
|
|
|
result = []
|
|
for u in allUsersData:
|
|
result.append({
|
|
"id": u.get("id"),
|
|
"username": u.get("username"),
|
|
"email": u.get("email"),
|
|
"fullName": u.get("fullName"),
|
|
"isSysAdmin": u.get("isSysAdmin", False),
|
|
"enabled": u.get("enabled", True),
|
|
})
|
|
|
|
# Sort by username
|
|
result.sort(key=lambda x: (x.get("username") or "").lower())
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error listing users for overview: {str(e)}")
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Failed to list users: {str(e)}"
|
|
)
|
|
|
|
|
|
@router.get("/{userId}", response_model=Dict[str, Any])
|
|
@limiter.limit("60/minute")
|
|
async def getUserAccessOverview(
|
|
request: Request,
|
|
userId: str = Path(..., description="User ID to get access overview for"),
|
|
mandateId: Optional[str] = Query(None, description="Filter by mandate ID"),
|
|
featureInstanceId: Optional[str] = Query(None, description="Filter by feature instance ID"),
|
|
currentUser: User = Depends(requireSysAdmin)
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Get comprehensive access overview for a specific user.
|
|
MULTI-TENANT: SysAdmin-only.
|
|
|
|
Path Parameters:
|
|
- userId: User ID
|
|
|
|
Query Parameters:
|
|
- mandateId: Optional filter by mandate ID
|
|
- featureInstanceId: Optional filter by feature instance ID
|
|
|
|
Returns:
|
|
- Comprehensive access overview including:
|
|
- User info
|
|
- All assigned roles with scope
|
|
- UI access (what pages/views the user can see)
|
|
- Data access (what tables/fields the user can access)
|
|
- Resource access (what resources the user can use)
|
|
"""
|
|
try:
|
|
interface = getRootInterface()
|
|
|
|
# Get user
|
|
user = interface.getUser(userId)
|
|
if not user:
|
|
raise HTTPException(
|
|
status_code=404,
|
|
detail=f"User {userId} not found"
|
|
)
|
|
|
|
# Build user info
|
|
userInfo = {
|
|
"id": user.id,
|
|
"username": user.username,
|
|
"email": user.email,
|
|
"fullName": user.fullName,
|
|
"isSysAdmin": user.isSysAdmin,
|
|
"enabled": user.enabled,
|
|
}
|
|
|
|
# If user is SysAdmin, they have full access to everything
|
|
if user.isSysAdmin:
|
|
return {
|
|
"user": userInfo,
|
|
"isSysAdmin": True,
|
|
"sysAdminNote": "SysAdmin users have full access to all system-level resources without mandate context.",
|
|
"roles": [],
|
|
"mandates": [],
|
|
"uiAccess": [],
|
|
"dataAccess": [],
|
|
"resourceAccess": [],
|
|
}
|
|
|
|
# Collect all roles for the user
|
|
allRoles = []
|
|
roleIdToInfo = {} # Map roleId to role info for later reference
|
|
|
|
# Get mandates for this user
|
|
mandateFilter = {"userId": userId, "enabled": True}
|
|
if mandateId:
|
|
mandateFilter["mandateId"] = mandateId
|
|
|
|
userMandates = interface.db.getRecordset(UserMandate, recordFilter=mandateFilter)
|
|
|
|
mandatesInfo = []
|
|
for um in userMandates:
|
|
umId = um.get("id")
|
|
umMandateId = um.get("mandateId")
|
|
|
|
# Get mandate name
|
|
mandate = interface.getMandate(umMandateId)
|
|
mandateName = mandate.name if mandate else umMandateId
|
|
|
|
# Get roles for this UserMandate
|
|
umRoles = interface.db.getRecordset(
|
|
UserMandateRole,
|
|
recordFilter={"userMandateId": umId}
|
|
)
|
|
|
|
mandateRoleIds = []
|
|
for umr in umRoles:
|
|
roleId = umr.get("roleId")
|
|
if roleId:
|
|
mandateRoleIds.append(roleId)
|
|
|
|
# Get role details
|
|
roleRecords = interface.db.getRecordset(Role, recordFilter={"id": roleId})
|
|
if roleRecords:
|
|
role = roleRecords[0]
|
|
scope = _getRoleScope(role)
|
|
roleInfo = {
|
|
"id": roleId,
|
|
"roleLabel": role.get("roleLabel"),
|
|
"description": role.get("description", {}),
|
|
"scope": scope,
|
|
"scopePriority": _getRoleScopePriority(scope),
|
|
"mandateId": role.get("mandateId"),
|
|
"featureInstanceId": role.get("featureInstanceId"),
|
|
"source": "mandate",
|
|
"sourceMandateId": umMandateId,
|
|
"sourceMandateName": mandateName,
|
|
}
|
|
allRoles.append(roleInfo)
|
|
roleIdToInfo[roleId] = roleInfo
|
|
|
|
# Get feature instances for this mandate
|
|
featureInstanceFilter = {"userId": userId, "enabled": True}
|
|
featureAccesses = interface.db.getRecordset(FeatureAccess, recordFilter=featureInstanceFilter)
|
|
|
|
featureInstancesInfo = []
|
|
for fa in featureAccesses:
|
|
faId = fa.get("id")
|
|
faInstanceId = fa.get("featureInstanceId")
|
|
|
|
# Check if instance belongs to this mandate
|
|
instance = interface.db.getRecordset(FeatureInstance, recordFilter={"id": faInstanceId})
|
|
if not instance:
|
|
continue
|
|
instance = instance[0]
|
|
|
|
if instance.get("mandateId") != umMandateId:
|
|
continue
|
|
|
|
# Filter by featureInstanceId if specified
|
|
if featureInstanceId and faInstanceId != featureInstanceId:
|
|
continue
|
|
|
|
# Get feature info
|
|
featureCode = instance.get("featureCode")
|
|
featureRecords = interface.db.getRecordset(Feature, recordFilter={"code": featureCode})
|
|
featureLabel = featureRecords[0].get("label", {}) if featureRecords else {}
|
|
|
|
# Get roles for this FeatureAccess
|
|
faRoles = interface.db.getRecordset(
|
|
FeatureAccessRole,
|
|
recordFilter={"featureAccessId": faId}
|
|
)
|
|
|
|
instanceRoleIds = []
|
|
for far in faRoles:
|
|
roleId = far.get("roleId")
|
|
if roleId:
|
|
instanceRoleIds.append(roleId)
|
|
|
|
# Get role details (if not already added)
|
|
if roleId not in roleIdToInfo:
|
|
roleRecords = interface.db.getRecordset(Role, recordFilter={"id": roleId})
|
|
if roleRecords:
|
|
role = roleRecords[0]
|
|
scope = _getRoleScope(role)
|
|
roleInfo = {
|
|
"id": roleId,
|
|
"roleLabel": role.get("roleLabel"),
|
|
"description": role.get("description", {}),
|
|
"scope": scope,
|
|
"scopePriority": _getRoleScopePriority(scope),
|
|
"mandateId": role.get("mandateId"),
|
|
"featureInstanceId": role.get("featureInstanceId"),
|
|
"source": "featureInstance",
|
|
"sourceInstanceId": faInstanceId,
|
|
"sourceInstanceLabel": instance.get("label"),
|
|
}
|
|
allRoles.append(roleInfo)
|
|
roleIdToInfo[roleId] = roleInfo
|
|
|
|
featureInstancesInfo.append({
|
|
"id": faInstanceId,
|
|
"label": instance.get("label"),
|
|
"featureCode": featureCode,
|
|
"featureLabel": featureLabel,
|
|
"roleIds": instanceRoleIds,
|
|
})
|
|
|
|
mandatesInfo.append({
|
|
"id": umMandateId,
|
|
"name": mandateName,
|
|
"roleIds": mandateRoleIds,
|
|
"featureInstances": featureInstancesInfo,
|
|
})
|
|
|
|
# Remove duplicate roles (keep most specific)
|
|
uniqueRoles = {}
|
|
for role in allRoles:
|
|
roleId = role["id"]
|
|
if roleId not in uniqueRoles or role["scopePriority"] > uniqueRoles[roleId]["scopePriority"]:
|
|
uniqueRoles[roleId] = role
|
|
|
|
allRoles = list(uniqueRoles.values())
|
|
|
|
# Get all AccessRules for all role IDs
|
|
allRoleIds = list(roleIdToInfo.keys())
|
|
|
|
# Collect access by context
|
|
uiAccess = []
|
|
dataAccess = []
|
|
resourceAccess = []
|
|
|
|
for roleId in allRoleIds:
|
|
roleInfo = roleIdToInfo.get(roleId, {})
|
|
roleLabel = roleInfo.get("roleLabel", "unknown")
|
|
roleScope = roleInfo.get("scope", "unknown")
|
|
|
|
# Get all rules for this role
|
|
rules = interface.db.getRecordset(AccessRule, recordFilter={"roleId": roleId})
|
|
|
|
for rule in rules:
|
|
context = rule.get("context")
|
|
item = rule.get("item")
|
|
|
|
accessEntry = {
|
|
"item": item or "(all)",
|
|
"grantedByRoleId": roleId,
|
|
"grantedByRoleLabel": roleLabel,
|
|
"roleScope": roleScope,
|
|
"scopePriority": roleInfo.get("scopePriority", 0),
|
|
}
|
|
|
|
if context == "UI":
|
|
accessEntry["view"] = rule.get("view", False)
|
|
if accessEntry["view"]:
|
|
uiAccess.append(accessEntry)
|
|
|
|
elif context == "DATA":
|
|
accessEntry["view"] = rule.get("view", False)
|
|
accessEntry["read"] = _getAccessLevelLabel(rule.get("read"))
|
|
accessEntry["create"] = _getAccessLevelLabel(rule.get("create"))
|
|
accessEntry["update"] = _getAccessLevelLabel(rule.get("update"))
|
|
accessEntry["delete"] = _getAccessLevelLabel(rule.get("delete"))
|
|
dataAccess.append(accessEntry)
|
|
|
|
elif context == "RESOURCE":
|
|
accessEntry["view"] = rule.get("view", False)
|
|
if accessEntry["view"]:
|
|
resourceAccess.append(accessEntry)
|
|
|
|
# Merge and deduplicate access entries (keep highest priority)
|
|
def _mergeAccessEntries(entries: List[Dict], isDataContext: bool = False) -> List[Dict]:
|
|
"""Merge entries for same item, keeping highest priority."""
|
|
merged = {}
|
|
for entry in entries:
|
|
item = entry["item"]
|
|
priority = entry.get("scopePriority", 0)
|
|
|
|
if item not in merged or priority > merged[item].get("scopePriority", 0):
|
|
merged[item] = entry
|
|
elif item in merged and priority == merged[item].get("scopePriority", 0):
|
|
# Same priority - merge grantedBy info
|
|
existingRoles = merged[item].get("grantedByRoleLabels", [merged[item].get("grantedByRoleLabel")])
|
|
if entry["grantedByRoleLabel"] not in existingRoles:
|
|
existingRoles.append(entry["grantedByRoleLabel"])
|
|
merged[item]["grantedByRoleLabels"] = existingRoles
|
|
|
|
# For DATA context, merge to most permissive
|
|
if isDataContext:
|
|
levelOrder = {"NONE": 0, "-": 0, "MY": 1, "GROUP": 2, "ALL": 3}
|
|
for field in ["read", "create", "update", "delete"]:
|
|
existingLevel = merged[item].get(field, "-")
|
|
newLevel = entry.get(field, "-")
|
|
if levelOrder.get(newLevel, 0) > levelOrder.get(existingLevel, 0):
|
|
merged[item][field] = newLevel
|
|
|
|
# Clean up and sort
|
|
result = list(merged.values())
|
|
for entry in result:
|
|
if "grantedByRoleLabels" not in entry:
|
|
entry["grantedByRoleLabels"] = [entry.get("grantedByRoleLabel")]
|
|
# Remove internal priority field from response
|
|
entry.pop("scopePriority", None)
|
|
|
|
result.sort(key=lambda x: x.get("item", ""))
|
|
return result
|
|
|
|
uiAccess = _mergeAccessEntries(uiAccess)
|
|
dataAccess = _mergeAccessEntries(dataAccess, isDataContext=True)
|
|
resourceAccess = _mergeAccessEntries(resourceAccess)
|
|
|
|
# Clean up roles for response
|
|
for role in allRoles:
|
|
role.pop("scopePriority", None)
|
|
|
|
# Sort roles by scope (instance > mandate > global) then by label
|
|
allRoles.sort(key=lambda r: (-_getRoleScopePriority(r.get("scope", "")), r.get("roleLabel", "").lower()))
|
|
|
|
return {
|
|
"user": userInfo,
|
|
"isSysAdmin": False,
|
|
"roles": allRoles,
|
|
"mandates": mandatesInfo,
|
|
"uiAccess": uiAccess,
|
|
"dataAccess": dataAccess,
|
|
"resourceAccess": resourceAccess,
|
|
}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error getting user access overview: {str(e)}")
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Failed to get user access overview: {str(e)}"
|
|
)
|
|
|
|
|
|
@router.get("/{userId}/effective-permissions", response_model=Dict[str, Any])
|
|
@limiter.limit("60/minute")
|
|
async def getEffectivePermissions(
|
|
request: Request,
|
|
userId: str = Path(..., description="User ID"),
|
|
mandateId: str = Query(..., description="Mandate ID context"),
|
|
featureInstanceId: Optional[str] = Query(None, description="Feature instance ID context"),
|
|
context: str = Query("DATA", description="Context type: DATA, UI, or RESOURCE"),
|
|
item: Optional[str] = Query(None, description="Specific item to check permissions for"),
|
|
currentUser: User = Depends(requireSysAdmin)
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Get effective (resolved) permissions for a user in a specific context.
|
|
This uses the RBAC resolution logic to show what permissions actually apply.
|
|
MULTI-TENANT: SysAdmin-only.
|
|
|
|
Path Parameters:
|
|
- userId: User ID
|
|
|
|
Query Parameters:
|
|
- mandateId: Required mandate context
|
|
- featureInstanceId: Optional feature instance context
|
|
- context: Permission context (DATA, UI, RESOURCE)
|
|
- item: Optional specific item to check
|
|
|
|
Returns:
|
|
- Effective permissions after RBAC resolution
|
|
"""
|
|
try:
|
|
interface = getRootInterface()
|
|
|
|
# Get user
|
|
user = interface.getUser(userId)
|
|
if not user:
|
|
raise HTTPException(
|
|
status_code=404,
|
|
detail=f"User {userId} not found"
|
|
)
|
|
|
|
# Convert context string to enum
|
|
try:
|
|
contextEnum = AccessRuleContext(context)
|
|
except ValueError:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Invalid context: {context}. Must be DATA, UI, or RESOURCE."
|
|
)
|
|
|
|
# Use RBAC interface to get actual permissions
|
|
from modules.security.rbac import RbacClass
|
|
rbac = RbacClass(interface.db, dbApp=interface.db)
|
|
|
|
permissions = rbac.getUserPermissions(
|
|
user=user,
|
|
context=contextEnum,
|
|
item=item or "",
|
|
mandateId=mandateId,
|
|
featureInstanceId=featureInstanceId
|
|
)
|
|
|
|
return {
|
|
"userId": userId,
|
|
"mandateId": mandateId,
|
|
"featureInstanceId": featureInstanceId,
|
|
"context": context,
|
|
"item": item,
|
|
"effectivePermissions": {
|
|
"view": permissions.view,
|
|
"read": _getAccessLevelLabel(permissions.read.value if permissions.read else None),
|
|
"create": _getAccessLevelLabel(permissions.create.value if permissions.create else None),
|
|
"update": _getAccessLevelLabel(permissions.update.value if permissions.update else None),
|
|
"delete": _getAccessLevelLabel(permissions.delete.value if permissions.delete else None),
|
|
}
|
|
}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error getting effective permissions: {str(e)}")
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Failed to get effective permissions: {str(e)}"
|
|
)
|