778 lines
28 KiB
Python
778 lines
28 KiB
Python
# Copyright (c) 2025 Patrick Motsch
|
|
# All rights reserved.
|
|
"""
|
|
Mandate routes for the backend API.
|
|
Implements the endpoints for mandate management.
|
|
|
|
MULTI-TENANT:
|
|
- Mandate CRUD is SysAdmin-only (mandates are system resources)
|
|
- User management within mandates is Mandate-Admin (add/remove users)
|
|
"""
|
|
|
|
from fastapi import APIRouter, HTTPException, Depends, Body, Path, Request, Response, Query
|
|
from typing import List, Dict, Any, Optional
|
|
from fastapi import status
|
|
import logging
|
|
import json
|
|
from pydantic import BaseModel, Field
|
|
|
|
# Import auth module
|
|
from modules.auth import limiter, requireSysAdmin, getRequestContext, RequestContext
|
|
|
|
# Import interfaces
|
|
import modules.interfaces.interfaceDbAppObjects as interfaceDbAppObjects
|
|
from modules.shared.attributeUtils import getModelAttributeDefinitions
|
|
from modules.shared.auditLogger import audit_logger
|
|
|
|
# Import the model classes
|
|
from modules.datamodels.datamodelUam import Mandate, User
|
|
from modules.datamodels.datamodelMembership import UserMandate, UserMandateRole
|
|
from modules.datamodels.datamodelRbac import Role
|
|
from modules.datamodels.datamodelPagination import PaginationParams, PaginatedResponse, PaginationMetadata, normalize_pagination_dict
|
|
|
|
|
|
# =============================================================================
|
|
# Request/Response Models for User Management
|
|
# =============================================================================
|
|
|
|
class UserMandateCreate(BaseModel):
|
|
"""Request model for adding a user to a mandate"""
|
|
targetUserId: str = Field(..., description="User ID to add to the mandate")
|
|
roleIds: List[str] = Field(..., description="Role IDs to assign to the user")
|
|
|
|
|
|
class UserMandateResponse(BaseModel):
|
|
"""Response model for user mandate membership"""
|
|
userMandateId: str
|
|
userId: str
|
|
mandateId: str
|
|
roleIds: List[str]
|
|
enabled: bool
|
|
|
|
|
|
class MandateUserInfo(BaseModel):
|
|
"""User info within a mandate context"""
|
|
userId: str
|
|
username: str
|
|
email: Optional[str]
|
|
firstname: Optional[str]
|
|
lastname: Optional[str]
|
|
userMandateId: str
|
|
roleIds: List[str]
|
|
enabled: bool
|
|
|
|
# Configure logger
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Model attributes for Mandate
|
|
mandateAttributes = getModelAttributeDefinitions(Mandate)
|
|
|
|
# Create a router for the mandate endpoints
|
|
router = APIRouter(
|
|
prefix="/api/mandates",
|
|
tags=["Manage Mandates"],
|
|
responses={404: {"description": "Not found"}}
|
|
)
|
|
|
|
@router.get("/", response_model=PaginatedResponse[Mandate])
|
|
@limiter.limit("30/minute")
|
|
async def get_mandates(
|
|
request: Request,
|
|
pagination: Optional[str] = Query(None, description="JSON-encoded PaginationParams object"),
|
|
context: RequestContext = Depends(requireSysAdmin)
|
|
) -> PaginatedResponse[Mandate]:
|
|
"""
|
|
Get mandates with optional pagination, sorting, and filtering.
|
|
MULTI-TENANT: SysAdmin-only (mandates are system resources).
|
|
|
|
Query Parameters:
|
|
- pagination: JSON-encoded PaginationParams object, or None for no pagination
|
|
|
|
Examples:
|
|
- GET /api/mandates/ (no pagination - returns all items)
|
|
- GET /api/mandates/?pagination={"page":1,"pageSize":10,"sort":[]}
|
|
"""
|
|
try:
|
|
# Parse pagination parameter
|
|
paginationParams = None
|
|
if pagination:
|
|
try:
|
|
paginationDict = json.loads(pagination)
|
|
if paginationDict:
|
|
paginationDict = normalize_pagination_dict(paginationDict)
|
|
paginationParams = PaginationParams(**paginationDict)
|
|
except (json.JSONDecodeError, ValueError) as e:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Invalid pagination parameter: {str(e)}"
|
|
)
|
|
|
|
appInterface = interfaceDbAppObjects.getRootInterface()
|
|
result = appInterface.getAllMandates(pagination=paginationParams)
|
|
|
|
# If pagination was requested, result is PaginatedResult
|
|
# If no pagination, result is List[Mandate]
|
|
if paginationParams:
|
|
return PaginatedResponse(
|
|
items=result.items,
|
|
pagination=PaginationMetadata(
|
|
currentPage=paginationParams.page,
|
|
pageSize=paginationParams.pageSize,
|
|
totalItems=result.totalItems,
|
|
totalPages=result.totalPages,
|
|
sort=paginationParams.sort,
|
|
filters=paginationParams.filters
|
|
)
|
|
)
|
|
else:
|
|
return PaginatedResponse(
|
|
items=result,
|
|
pagination=None
|
|
)
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error getting mandates: {str(e)}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Failed to get mandates: {str(e)}"
|
|
)
|
|
|
|
@router.get("/{mandateId}", response_model=Mandate)
|
|
@limiter.limit("30/minute")
|
|
async def get_mandate(
|
|
request: Request,
|
|
mandateId: str = Path(..., description="ID of the mandate"),
|
|
context: RequestContext = Depends(requireSysAdmin)
|
|
) -> Mandate:
|
|
"""
|
|
Get a specific mandate by ID.
|
|
MULTI-TENANT: SysAdmin-only.
|
|
"""
|
|
try:
|
|
appInterface = interfaceDbAppObjects.getRootInterface()
|
|
mandate = appInterface.getMandate(mandateId)
|
|
|
|
if not mandate:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Mandate with ID {mandateId} not found"
|
|
)
|
|
|
|
return mandate
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error getting mandate {mandateId}: {str(e)}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Failed to get mandate: {str(e)}"
|
|
)
|
|
|
|
@router.post("/", response_model=Mandate)
|
|
@limiter.limit("10/minute")
|
|
async def create_mandate(
|
|
request: Request,
|
|
mandateData: dict = Body(..., description="Mandate data with at least 'name' field"),
|
|
context: RequestContext = Depends(requireSysAdmin)
|
|
) -> Mandate:
|
|
"""
|
|
Create a new mandate.
|
|
MULTI-TENANT: SysAdmin-only.
|
|
"""
|
|
try:
|
|
logger.debug(f"Creating mandate with data: {mandateData}")
|
|
|
|
# Validate required fields
|
|
name = mandateData.get('name')
|
|
if not name or (isinstance(name, str) and name.strip() == ''):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Mandate name is required"
|
|
)
|
|
|
|
# Get optional fields with defaults
|
|
language = mandateData.get('language', 'en')
|
|
|
|
appInterface = interfaceDbAppObjects.getRootInterface()
|
|
|
|
# Create mandate
|
|
newMandate = appInterface.createMandate(
|
|
name=name,
|
|
language=language
|
|
)
|
|
|
|
if not newMandate:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail="Failed to create mandate"
|
|
)
|
|
|
|
logger.info(f"Mandate {newMandate.id} created by SysAdmin {context.user.id}")
|
|
|
|
return newMandate
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error creating mandate: {str(e)}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Failed to create mandate: {str(e)}"
|
|
)
|
|
|
|
@router.put("/{mandateId}", response_model=Mandate)
|
|
@limiter.limit("10/minute")
|
|
async def update_mandate(
|
|
request: Request,
|
|
mandateId: str = Path(..., description="ID of the mandate to update"),
|
|
mandateData: dict = Body(..., description="Mandate update data"),
|
|
context: RequestContext = Depends(requireSysAdmin)
|
|
) -> Mandate:
|
|
"""
|
|
Update an existing mandate.
|
|
MULTI-TENANT: SysAdmin-only.
|
|
"""
|
|
try:
|
|
logger.debug(f"Updating mandate {mandateId} with data: {mandateData}")
|
|
|
|
appInterface = interfaceDbAppObjects.getRootInterface()
|
|
|
|
# Check if mandate exists
|
|
existingMandate = appInterface.getMandate(mandateId)
|
|
if not existingMandate:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Mandate with ID {mandateId} not found"
|
|
)
|
|
|
|
# Update mandate - mandateData is already a dict
|
|
updatedMandate = appInterface.updateMandate(mandateId, mandateData)
|
|
|
|
if not updatedMandate:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail="Failed to update mandate"
|
|
)
|
|
|
|
logger.info(f"Mandate {mandateId} updated by SysAdmin {context.user.id}")
|
|
|
|
return updatedMandate
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error updating mandate {mandateId}: {str(e)}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Failed to update mandate: {str(e)}"
|
|
)
|
|
|
|
@router.delete("/{mandateId}", response_model=Dict[str, Any])
|
|
@limiter.limit("10/minute")
|
|
async def delete_mandate(
|
|
request: Request,
|
|
mandateId: str = Path(..., description="ID of the mandate to delete"),
|
|
context: RequestContext = Depends(requireSysAdmin)
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Delete a mandate.
|
|
MULTI-TENANT: SysAdmin-only.
|
|
"""
|
|
try:
|
|
appInterface = interfaceDbAppObjects.getRootInterface()
|
|
|
|
# Check if mandate exists
|
|
existingMandate = appInterface.getMandate(mandateId)
|
|
if not existingMandate:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Mandate {mandateId} not found"
|
|
)
|
|
|
|
# MULTI-TENANT: Delete all UserMandate entries for this mandate first
|
|
from modules.datamodels.datamodelMembership import UserMandate
|
|
userMandates = appInterface.db.getRecordset(UserMandate, recordFilter={"mandateId": mandateId})
|
|
for um in userMandates:
|
|
appInterface.db.deleteRecord(UserMandate, um["id"])
|
|
logger.info(f"Deleted {len(userMandates)} UserMandate entries for mandate {mandateId}")
|
|
|
|
# Delete mandate
|
|
try:
|
|
appInterface.deleteMandate(mandateId)
|
|
except ValueError as e:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=str(e)
|
|
)
|
|
|
|
logger.info(f"Mandate {mandateId} deleted by SysAdmin {context.user.id}")
|
|
|
|
return {"message": f"Mandate {mandateId} deleted successfully"}
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error deleting mandate {mandateId}: {str(e)}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Failed to delete mandate: {str(e)}"
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# User Management within Mandates (Mandate-Admin)
|
|
# =============================================================================
|
|
|
|
@router.get("/{targetMandateId}/users", response_model=List[MandateUserInfo])
|
|
@limiter.limit("60/minute")
|
|
async def listMandateUsers(
|
|
request: Request,
|
|
targetMandateId: str = Path(..., description="ID of the mandate"),
|
|
context: RequestContext = Depends(getRequestContext)
|
|
) -> List[MandateUserInfo]:
|
|
"""
|
|
List all users in a mandate.
|
|
|
|
Requires Mandate-Admin role or SysAdmin.
|
|
"""
|
|
# Check permission
|
|
if not _hasMandateAdminRole(context, targetMandateId) and not context.isSysAdmin:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Mandate-Admin role required"
|
|
)
|
|
|
|
try:
|
|
rootInterface = interfaceDbAppObjects.getRootInterface()
|
|
|
|
# Verify mandate exists
|
|
mandate = rootInterface.getMandate(targetMandateId)
|
|
if not mandate:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Mandate {targetMandateId} not found"
|
|
)
|
|
|
|
# Get all UserMandate entries for this mandate
|
|
userMandates = rootInterface.db.getRecordset(
|
|
UserMandate,
|
|
recordFilter={"mandateId": targetMandateId}
|
|
)
|
|
|
|
result = []
|
|
for um in userMandates:
|
|
# Get user info
|
|
user = rootInterface.getUserById(um.get("userId"))
|
|
if not user:
|
|
continue
|
|
|
|
# Get roles for this membership
|
|
roleIds = rootInterface.getRoleIdsForUserMandate(um.get("id"))
|
|
|
|
result.append(MandateUserInfo(
|
|
userId=str(user.id),
|
|
username=user.username,
|
|
email=user.email,
|
|
firstname=user.firstname,
|
|
lastname=user.lastname,
|
|
userMandateId=um.get("id"),
|
|
roleIds=roleIds,
|
|
enabled=um.get("enabled", True)
|
|
))
|
|
|
|
return result
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error listing users for mandate {targetMandateId}: {e}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Failed to list users: {str(e)}"
|
|
)
|
|
|
|
|
|
@router.post("/{targetMandateId}/users", response_model=UserMandateResponse)
|
|
@limiter.limit("30/minute")
|
|
async def addUserToMandate(
|
|
request: Request,
|
|
targetMandateId: str = Path(..., description="ID of the mandate"),
|
|
data: UserMandateCreate = Body(...),
|
|
context: RequestContext = Depends(getRequestContext)
|
|
) -> UserMandateResponse:
|
|
"""
|
|
Add a user to a mandate with specified roles.
|
|
|
|
Requires Mandate-Admin role.
|
|
SysAdmin cannot add themselves (Self-Eskalation Prevention).
|
|
|
|
Args:
|
|
targetMandateId: Target mandate ID
|
|
data: User ID and role IDs to assign
|
|
"""
|
|
# 1. SysAdmin Self-Eskalation Prevention
|
|
if context.isSysAdmin and data.targetUserId == str(context.user.id):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="SysAdmin cannot add themselves to a mandate. A Mandate-Admin must grant access."
|
|
)
|
|
|
|
# 2. Check Mandate-Admin permission
|
|
if not _hasMandateAdminRole(context, targetMandateId):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Mandate-Admin role required to add users"
|
|
)
|
|
|
|
try:
|
|
rootInterface = interfaceDbAppObjects.getRootInterface()
|
|
|
|
# 3. Verify mandate exists
|
|
mandate = rootInterface.getMandate(targetMandateId)
|
|
if not mandate:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Mandate {targetMandateId} not found"
|
|
)
|
|
|
|
# 4. Verify target user exists
|
|
targetUser = rootInterface.getUserById(data.targetUserId)
|
|
if not targetUser:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"User {data.targetUserId} not found"
|
|
)
|
|
|
|
# 5. Check if user is already a member
|
|
existingMembership = rootInterface.getUserMandate(data.targetUserId, targetMandateId)
|
|
if existingMembership:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_409_CONFLICT,
|
|
detail=f"User {data.targetUserId} is already a member of this mandate"
|
|
)
|
|
|
|
# 6. Validate roles (must exist and belong to this mandate or be global)
|
|
for roleId in data.roleIds:
|
|
roleRecords = rootInterface.db.getRecordset(Role, recordFilter={"id": roleId})
|
|
if not roleRecords:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Role {roleId} not found"
|
|
)
|
|
role = roleRecords[0]
|
|
roleMandateId = role.get("mandateId")
|
|
if roleMandateId and str(roleMandateId) != str(targetMandateId):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=f"Role {roleId} belongs to a different mandate"
|
|
)
|
|
|
|
# 7. Create UserMandate
|
|
userMandate = rootInterface.createUserMandate(
|
|
userId=data.targetUserId,
|
|
mandateId=targetMandateId,
|
|
roleIds=data.roleIds
|
|
)
|
|
|
|
# 8. Audit - Log permission change with IP address
|
|
audit_logger.logPermissionChange(
|
|
userId=str(context.user.id),
|
|
mandateId=targetMandateId,
|
|
action="user_added_to_mandate",
|
|
targetUserId=data.targetUserId,
|
|
details=f"Roles assigned: {data.roleIds}",
|
|
resourceType="UserMandate",
|
|
resourceId=str(userMandate.id)
|
|
)
|
|
|
|
logger.info(
|
|
f"User {context.user.id} added user {data.targetUserId} to mandate {targetMandateId} "
|
|
f"with roles {data.roleIds}"
|
|
)
|
|
|
|
return UserMandateResponse(
|
|
userMandateId=str(userMandate.id),
|
|
userId=data.targetUserId,
|
|
mandateId=targetMandateId,
|
|
roleIds=data.roleIds,
|
|
enabled=True
|
|
)
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error adding user to mandate: {e}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Failed to add user to mandate: {str(e)}"
|
|
)
|
|
|
|
|
|
@router.delete("/{targetMandateId}/users/{targetUserId}", response_model=Dict[str, str])
|
|
@limiter.limit("30/minute")
|
|
async def removeUserFromMandate(
|
|
request: Request,
|
|
targetMandateId: str = Path(..., description="ID of the mandate"),
|
|
targetUserId: str = Path(..., description="ID of the user to remove"),
|
|
context: RequestContext = Depends(getRequestContext)
|
|
) -> Dict[str, str]:
|
|
"""
|
|
Remove a user from a mandate.
|
|
|
|
Requires Mandate-Admin role.
|
|
Cannot remove the last admin from a mandate (orphan prevention).
|
|
|
|
Args:
|
|
targetMandateId: Target mandate ID
|
|
targetUserId: User ID to remove
|
|
"""
|
|
# Check Mandate-Admin permission
|
|
if not _hasMandateAdminRole(context, targetMandateId):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Mandate-Admin role required"
|
|
)
|
|
|
|
try:
|
|
rootInterface = interfaceDbAppObjects.getRootInterface()
|
|
|
|
# Verify mandate exists
|
|
mandate = rootInterface.getMandate(targetMandateId)
|
|
if not mandate:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Mandate {targetMandateId} not found"
|
|
)
|
|
|
|
# Get user's membership
|
|
membership = rootInterface.getUserMandate(targetUserId, targetMandateId)
|
|
if not membership:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"User {targetUserId} is not a member of this mandate"
|
|
)
|
|
|
|
# Check if this is the last admin (orphan prevention)
|
|
if _isLastMandateAdmin(rootInterface, targetMandateId, targetUserId):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Cannot remove the last admin from a mandate. Assign another admin first."
|
|
)
|
|
|
|
# Delete UserMandate (CASCADE will delete UserMandateRole entries)
|
|
rootInterface.deleteUserMandate(targetUserId, targetMandateId)
|
|
|
|
# Audit - Log permission change
|
|
audit_logger.logPermissionChange(
|
|
userId=str(context.user.id),
|
|
mandateId=targetMandateId,
|
|
action="user_removed_from_mandate",
|
|
targetUserId=targetUserId,
|
|
details="User removed from mandate",
|
|
resourceType="UserMandate"
|
|
)
|
|
|
|
logger.info(f"User {context.user.id} removed user {targetUserId} from mandate {targetMandateId}")
|
|
|
|
return {"message": "User removed from mandate", "userId": targetUserId}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error removing user from mandate: {e}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Failed to remove user from mandate: {str(e)}"
|
|
)
|
|
|
|
|
|
@router.put("/{targetMandateId}/users/{targetUserId}/roles", response_model=UserMandateResponse)
|
|
@limiter.limit("30/minute")
|
|
async def updateUserRolesInMandate(
|
|
request: Request,
|
|
targetMandateId: str = Path(..., description="ID of the mandate"),
|
|
targetUserId: str = Path(..., description="ID of the user"),
|
|
roleIds: List[str] = Body(..., description="New role IDs to assign"),
|
|
context: RequestContext = Depends(getRequestContext)
|
|
) -> UserMandateResponse:
|
|
"""
|
|
Update a user's roles within a mandate.
|
|
|
|
Replaces all existing roles with the new set.
|
|
Requires Mandate-Admin role.
|
|
|
|
Args:
|
|
targetMandateId: Target mandate ID
|
|
targetUserId: User ID to update
|
|
roleIds: New set of role IDs
|
|
"""
|
|
# Check Mandate-Admin permission
|
|
if not _hasMandateAdminRole(context, targetMandateId):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Mandate-Admin role required"
|
|
)
|
|
|
|
try:
|
|
rootInterface = interfaceDbAppObjects.getRootInterface()
|
|
|
|
# Get user's membership
|
|
membership = rootInterface.getUserMandate(targetUserId, targetMandateId)
|
|
if not membership:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"User {targetUserId} is not a member of this mandate"
|
|
)
|
|
|
|
# Validate new roles
|
|
for roleId in roleIds:
|
|
roleRecords = rootInterface.db.getRecordset(Role, recordFilter={"id": roleId})
|
|
if not roleRecords:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Role {roleId} not found"
|
|
)
|
|
role = roleRecords[0]
|
|
roleMandateId = role.get("mandateId")
|
|
if roleMandateId and str(roleMandateId) != str(targetMandateId):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=f"Role {roleId} belongs to a different mandate"
|
|
)
|
|
|
|
# Check if removing admin role would leave mandate without admins
|
|
currentRoleIds = rootInterface.getRoleIdsForUserMandate(str(membership.id))
|
|
isCurrentlyAdmin = _hasAdminRoleInList(rootInterface, currentRoleIds, targetMandateId)
|
|
willBeAdmin = _hasAdminRoleInList(rootInterface, roleIds, targetMandateId)
|
|
|
|
if isCurrentlyAdmin and not willBeAdmin:
|
|
if _isLastMandateAdmin(rootInterface, targetMandateId, targetUserId):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Cannot remove admin role from the last admin. Assign another admin first."
|
|
)
|
|
|
|
# Remove existing role assignments
|
|
existingRoles = rootInterface.db.getRecordset(
|
|
UserMandateRole,
|
|
recordFilter={"userMandateId": str(membership.id)}
|
|
)
|
|
for er in existingRoles:
|
|
rootInterface.db.recordDelete(UserMandateRole, er.get("id"))
|
|
|
|
# Add new role assignments
|
|
for roleId in roleIds:
|
|
rootInterface.addRoleToUserMandate(str(membership.id), roleId)
|
|
|
|
# Audit - Log role assignment change
|
|
audit_logger.logPermissionChange(
|
|
userId=str(context.user.id),
|
|
mandateId=targetMandateId,
|
|
action="role_assigned",
|
|
targetUserId=targetUserId,
|
|
details=f"New roles: {roleIds}",
|
|
resourceType="UserMandateRole",
|
|
resourceId=str(membership.id)
|
|
)
|
|
|
|
logger.info(
|
|
f"User {context.user.id} updated roles for user {targetUserId} "
|
|
f"in mandate {targetMandateId} to {roleIds}"
|
|
)
|
|
|
|
return UserMandateResponse(
|
|
userMandateId=str(membership.id),
|
|
userId=targetUserId,
|
|
mandateId=targetMandateId,
|
|
roleIds=roleIds,
|
|
enabled=membership.enabled
|
|
)
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error updating user roles in mandate: {e}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Failed to update user roles: {str(e)}"
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# Helper Functions
|
|
# =============================================================================
|
|
|
|
def _hasMandateAdminRole(context: RequestContext, mandateId: str) -> bool:
|
|
"""
|
|
Check if the user has mandate admin role for the specified mandate.
|
|
"""
|
|
if context.isSysAdmin:
|
|
return True
|
|
|
|
# Must be in the same mandate context
|
|
if str(context.mandateId) != str(mandateId):
|
|
return False
|
|
|
|
if not context.roleIds:
|
|
return False
|
|
|
|
try:
|
|
rootInterface = interfaceDbAppObjects.getRootInterface()
|
|
|
|
for roleId in context.roleIds:
|
|
roleRecords = rootInterface.db.getRecordset(Role, recordFilter={"id": roleId})
|
|
if roleRecords:
|
|
role = roleRecords[0]
|
|
roleLabel = role.get("roleLabel", "")
|
|
# Admin role at mandate level (not feature-instance level)
|
|
if roleLabel == "admin" and role.get("mandateId") and not role.get("featureInstanceId"):
|
|
return True
|
|
|
|
return False
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error checking mandate admin role: {e}")
|
|
return False
|
|
|
|
|
|
def _isLastMandateAdmin(interface, mandateId: str, excludeUserId: str) -> bool:
|
|
"""
|
|
Check if excluding this user would leave the mandate without any admins.
|
|
"""
|
|
try:
|
|
# Get all UserMandates for this mandate
|
|
userMandates = interface.db.getRecordset(
|
|
UserMandate,
|
|
recordFilter={"mandateId": mandateId, "enabled": True}
|
|
)
|
|
|
|
adminCount = 0
|
|
for um in userMandates:
|
|
if str(um.get("userId")) == str(excludeUserId):
|
|
continue
|
|
|
|
# Check if this user has admin role
|
|
roleIds = interface.getRoleIdsForUserMandate(um.get("id"))
|
|
if _hasAdminRoleInList(interface, roleIds, mandateId):
|
|
adminCount += 1
|
|
|
|
return adminCount == 0
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error checking last admin: {e}")
|
|
return True # Fail-safe: assume they're the last admin
|
|
|
|
|
|
def _hasAdminRoleInList(interface, roleIds: List[str], mandateId: str) -> bool:
|
|
"""
|
|
Check if any of the role IDs is an admin role for the mandate.
|
|
"""
|
|
for roleId in roleIds:
|
|
roleRecords = interface.db.getRecordset(Role, recordFilter={"id": roleId})
|
|
if roleRecords:
|
|
role = roleRecords[0]
|
|
roleLabel = role.get("roleLabel", "")
|
|
roleMandateId = role.get("mandateId")
|
|
# Admin role at mandate level
|
|
if roleLabel == "admin" and (not roleMandateId or str(roleMandateId) == str(mandateId)):
|
|
if not role.get("featureInstanceId"):
|
|
return True
|
|
return False
|