gateway/modules/routes/routeDataMandates.py
2026-04-10 22:44:08 +02:00

1129 lines
44 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Mandate routes for the backend API.
Implements the endpoints for mandate management.
MULTI-TENANT:
- Mandate CRUD is SysAdmin-only (mandates are system resources)
- User management within mandates is Mandate-Admin (add/remove users)
"""
from fastapi import APIRouter, HTTPException, Depends, Body, Path, Request, Response, Query
from typing import List, Dict, Any, Optional
from fastapi import status
import logging
import json
from pydantic import BaseModel, Field
# Import auth module
from modules.auth import limiter, requireSysAdminRole, getRequestContext, RequestContext
# Import interfaces
import modules.interfaces.interfaceDbApp as interfaceDbApp
from modules.interfaces.interfaceDbBilling import _getRootInterface as _getBillingRootInterface
from modules.shared.attributeUtils import getModelAttributeDefinitions
from modules.shared.auditLogger import audit_logger
# Import the model classes
from modules.datamodels.datamodelUam import Mandate, User
from modules.datamodels.datamodelMembership import UserMandate, UserMandateRole
from modules.datamodels.datamodelRbac import Role
from modules.datamodels.datamodelPagination import PaginationParams, PaginatedResponse, PaginationMetadata, normalize_pagination_dict
from modules.routes.routeNotifications import create_access_change_notification
from modules.serviceCenter.services.serviceSubscription.mainServiceSubscription import SubscriptionCapacityException
from modules.shared.i18nRegistry import apiRouteContext
routeApiMsg = apiRouteContext("routeDataMandates")
# =============================================================================
# Request/Response Models for User Management
# =============================================================================
class UserMandateCreate(BaseModel):
"""Request model for adding a user to a mandate"""
targetUserId: str = Field(..., description="User ID to add to the mandate")
roleIds: List[str] = Field(..., description="Role IDs to assign to the user")
class UserMandateResponse(BaseModel):
"""Response model for user mandate membership"""
id: str # UserMandate ID as primary key
userId: str
mandateId: str
roleIds: List[str]
enabled: bool
class MandateUserInfo(BaseModel):
"""User info within a mandate context"""
id: str # UserMandate ID as primary key
userId: str
username: str
email: Optional[str]
fullName: Optional[str]
roleIds: List[str]
roleLabels: List[str] # Resolved role labels for display
enabled: bool
# Configure logger
logger = logging.getLogger(__name__)
# Model attributes for Mandate
mandateAttributes = getModelAttributeDefinitions(Mandate)
# Create a router for the mandate endpoints
router = APIRouter(
prefix="/api/mandates",
tags=["Manage Mandates"],
responses={404: {"description": "Not found"}}
)
@router.get("/", response_model=PaginatedResponse[Mandate])
@limiter.limit("30/minute")
def get_mandates(
request: Request,
pagination: Optional[str] = Query(None, description="JSON-encoded PaginationParams object"),
context: RequestContext = Depends(getRequestContext)
) -> PaginatedResponse[Mandate]:
"""
Get mandates with optional pagination, sorting, and filtering.
Access:
- SysAdmin: all mandates
- MandateAdmin: only mandates where user has admin role
- Other: 403
Query Parameters:
- pagination: JSON-encoded PaginationParams object, or None for no pagination
"""
try:
# Check admin access
isSysAdmin = context.hasSysAdminRole
if not isSysAdmin:
adminMandateIds = _getAdminMandateIds(context)
if not adminMandateIds:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=routeApiMsg("Admin role required")
)
# Parse pagination parameter
paginationParams = None
if pagination:
try:
paginationDict = json.loads(pagination)
if paginationDict:
paginationDict = normalize_pagination_dict(paginationDict)
paginationParams = PaginationParams(**paginationDict)
except (json.JSONDecodeError, ValueError) as e:
raise HTTPException(
status_code=400,
detail=f"Invalid pagination parameter: {str(e)}"
)
appInterface = interfaceDbApp.getRootInterface()
if isSysAdmin:
# SysAdmin: all mandates
result = appInterface.getAllMandates(pagination=paginationParams)
else:
# MandateAdmin: only their enabled mandates
allMandates = []
for mandateId in adminMandateIds:
mandate = appInterface.getMandate(mandateId)
if mandate and getattr(mandate, "enabled", True):
mandateDict = mandate if isinstance(mandate, dict) else mandate.model_dump() if hasattr(mandate, 'model_dump') else vars(mandate)
allMandates.append(mandateDict)
result = allMandates
paginationParams = None # Client-side pagination for filtered results
# If pagination was requested, result is PaginatedResult
# If no pagination, result is List[Mandate]
if paginationParams and hasattr(result, 'items'):
return PaginatedResponse(
items=result.items,
pagination=PaginationMetadata(
currentPage=paginationParams.page,
pageSize=paginationParams.pageSize,
totalItems=result.totalItems,
totalPages=result.totalPages,
sort=paginationParams.sort,
filters=paginationParams.filters
)
)
else:
items = result if isinstance(result, list) else (result.items if hasattr(result, 'items') else result)
return PaginatedResponse(
items=items,
pagination=None
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting mandates: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to get mandates: {str(e)}"
)
@router.get("/filter-values")
@limiter.limit("60/minute")
def get_mandate_filter_values(
request: Request,
column: str = Query(..., description="Column key"),
pagination: Optional[str] = Query(None, description="JSON-encoded current filters"),
context: RequestContext = Depends(getRequestContext)
) -> list:
"""Return distinct filter values for a column in mandates."""
try:
from modules.routes.routeDataUsers import _handleFilterValuesRequest
isSysAdmin = context.hasSysAdminRole
if not isSysAdmin:
adminMandateIds = _getAdminMandateIds(context)
if not adminMandateIds:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=routeApiMsg("Admin role required"))
appInterface = interfaceDbApp.getRootInterface()
if isSysAdmin:
# SysAdmin: try SQL DISTINCT for DB columns
crossFilterPagination = None
if pagination:
try:
paginationDict = json.loads(pagination)
if paginationDict:
paginationDict = normalize_pagination_dict(paginationDict)
filters = paginationDict.get("filters", {})
filters.pop(column, None)
paginationDict["filters"] = filters
paginationDict.pop("sort", None)
crossFilterPagination = PaginationParams(**paginationDict)
except (json.JSONDecodeError, ValueError):
pass
try:
values = appInterface.db.getDistinctColumnValues(
Mandate, column, crossFilterPagination
)
return sorted(values, key=lambda v: str(v).lower())
except Exception:
result = appInterface.getAllMandates(pagination=None)
items = result if isinstance(result, list) else (result.items if hasattr(result, 'items') else result)
items = [i.model_dump() if hasattr(i, 'model_dump') else i for i in items]
return _handleFilterValuesRequest(items, column, pagination)
else:
# MandateAdmin: in-memory (small set of individual mandate lookups)
result = []
for mid in adminMandateIds:
mandate = appInterface.getMandate(mid)
if mandate:
result.append(mandate if isinstance(mandate, dict) else mandate.model_dump() if hasattr(mandate, 'model_dump') else vars(mandate))
items = [i.model_dump() if hasattr(i, 'model_dump') else i for i in result]
return _handleFilterValuesRequest(items, column, pagination)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting filter values for mandates: {str(e)}")
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
@router.get("/{targetMandateId}", response_model=Mandate)
@limiter.limit("30/minute")
def get_mandate(
request: Request,
targetMandateId: str = Path(..., description="ID of the mandate"),
context: RequestContext = Depends(getRequestContext)
) -> Mandate:
"""
Get a specific mandate by ID.
Access:
- SysAdmin: any mandate
- MandateAdmin: only mandates where user has admin role
- Other: 403
"""
try:
mandateId = targetMandateId
# Check access
if not context.hasSysAdminRole:
adminMandateIds = _getAdminMandateIds(context)
if mandateId not in adminMandateIds:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=routeApiMsg("Admin role required for this mandate")
)
appInterface = interfaceDbApp.getRootInterface()
mandate = appInterface.getMandate(mandateId)
if not mandate:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Mandate with ID {mandateId} not found"
)
return mandate
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting mandate {mandateId}: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to get mandate: {str(e)}"
)
@router.post("/", response_model=Mandate)
@limiter.limit("10/minute")
def create_mandate(
request: Request,
mandateData: dict = Body(..., description="Mandate data with at least 'name' field"),
currentUser: User = Depends(requireSysAdminRole)
) -> Mandate:
"""
Create a new mandate.
MULTI-TENANT: SysAdmin-only.
"""
try:
logger.debug(f"Creating mandate with data: {mandateData}")
# Validate required fields
name = mandateData.get('name')
if not name or (isinstance(name, str) and name.strip() == ''):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=routeApiMsg("Mandate name is required")
)
# Get optional fields with defaults
label = mandateData.get('label')
enabled = mandateData.get('enabled', True)
appInterface = interfaceDbApp.getRootInterface()
# Create mandate
newMandate = appInterface.createMandate(
name=name,
label=label,
enabled=enabled
)
if not newMandate:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=routeApiMsg("Failed to create mandate")
)
try:
billingInterface = _getBillingRootInterface()
billingInterface.getOrCreateSettings(str(newMandate.id))
logger.debug(f"Ensured billing settings for new mandate {newMandate.id}")
except Exception as billingErr:
logger.warning(
f"Could not create default billing settings for mandate {newMandate.id}: {billingErr}"
)
try:
from modules.datamodels.datamodelSubscription import (
MandateSubscription, SubscriptionStatusEnum, BUILTIN_PLANS,
)
from modules.interfaces.interfaceDbSubscription import _getRootInterface as _getSubRoot
from datetime import datetime, timezone, timedelta
planKey = mandateData.get("planKey", "TRIAL_14D")
plan = BUILTIN_PLANS.get(planKey)
if plan:
now = datetime.now(timezone.utc)
targetStatus = SubscriptionStatusEnum.TRIALING if plan.trialDays else SubscriptionStatusEnum.ACTIVE
sub = MandateSubscription(
mandateId=str(newMandate.id),
planKey=planKey,
status=targetStatus,
recurring=plan.autoRenew and not plan.trialDays,
startedAt=now,
currentPeriodStart=now,
)
if plan.trialDays:
sub.trialEndsAt = now + timedelta(days=plan.trialDays)
sub.currentPeriodEnd = now + timedelta(days=plan.trialDays)
subInterface = _getSubRoot()
subInterface.createSubscription(sub)
logger.info(f"Created {targetStatus.value} subscription ({planKey}) for mandate {newMandate.id}")
except Exception as subErr:
logger.error(f"Failed to create subscription for mandate {newMandate.id}: {subErr}")
logger.info(f"Mandate {newMandate.id} created by SysAdmin {currentUser.id}")
return newMandate
except HTTPException:
raise
except Exception as e:
logger.error(f"Error creating mandate: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to create mandate: {str(e)}"
)
@router.put("/{mandateId}", response_model=Mandate)
@limiter.limit("10/minute")
def update_mandate(
request: Request,
mandateId: str = Path(..., description="ID of the mandate to update"),
mandateData: dict = Body(..., description="Mandate update data"),
currentUser: User = Depends(requireSysAdminRole)
) -> Mandate:
"""
Update an existing mandate.
MULTI-TENANT: SysAdmin-only.
"""
try:
logger.debug(f"Updating mandate {mandateId} with data: {mandateData}")
appInterface = interfaceDbApp.getRootInterface()
# Check if mandate exists
existingMandate = appInterface.getMandate(mandateId)
if not existingMandate:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Mandate with ID {mandateId} not found"
)
# Update mandate - mandateData is already a dict
updatedMandate = appInterface.updateMandate(mandateId, mandateData)
if not updatedMandate:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=routeApiMsg("Failed to update mandate")
)
logger.info(f"Mandate {mandateId} updated by SysAdmin {currentUser.id}")
return updatedMandate
except HTTPException:
raise
except Exception as e:
logger.error(f"Error updating mandate {mandateId}: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to update mandate: {str(e)}"
)
@router.delete("/{mandateId}", response_model=Dict[str, Any])
@limiter.limit("10/minute")
def delete_mandate(
request: Request,
mandateId: str = Path(..., description="ID of the mandate to delete"),
force: bool = Query(False, description="Hard-delete with full cascade (irreversible)"),
currentUser: User = Depends(requireSysAdminRole)
) -> Dict[str, Any]:
"""
Delete a mandate.
Default: soft-delete (sets enabled=False, 30-day retention).
With ?force=true: hard-delete with full cascade (irreversible).
Requires X-Confirm-Name header matching the mandate name for hard-delete.
MULTI-TENANT: SysAdmin-only.
"""
try:
appInterface = interfaceDbApp.getRootInterface()
existingMandate = appInterface.getMandate(mandateId)
if not existingMandate:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Mandate {mandateId} not found"
)
if force:
confirmName = request.headers.get("X-Confirm-Name", "")
mandateName = getattr(existingMandate, "name", "") or ""
if confirmName != mandateName:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=routeApiMsg("Hard-delete requires X-Confirm-Name header matching the mandate name")
)
try:
appInterface.deleteMandate(mandateId, force=force)
except ValueError as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e)
)
mode = "hard-deleted" if force else "soft-deleted"
logger.info(f"Mandate {mandateId} {mode} by SysAdmin {currentUser.id}")
return {"message": f"Mandate {mandateId} {mode} successfully"}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error deleting mandate {mandateId}: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to delete mandate: {str(e)}"
)
# =============================================================================
# User Management within Mandates (Mandate-Admin)
# =============================================================================
@router.get("/{targetMandateId}/users")
@limiter.limit("60/minute")
def list_mandate_users(
request: Request,
targetMandateId: str = Path(..., description="ID of the mandate"),
pagination: Optional[str] = Query(None, description="JSON-encoded PaginationParams object"),
context: RequestContext = Depends(getRequestContext)
):
"""
List all users in a mandate with pagination, search, and sorting support.
Requires Mandate-Admin role or SysAdmin.
Args:
pagination: Optional pagination parameters (page, pageSize, search, filters, sort)
"""
# Check permission
if not _hasMandateAdminRole(context, targetMandateId) and not context.hasSysAdminRole:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=routeApiMsg("Mandate-Admin role required")
)
try:
rootInterface = interfaceDbApp.getRootInterface()
# Verify mandate exists
mandate = rootInterface.getMandate(targetMandateId)
if not mandate:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Mandate {targetMandateId} not found"
)
# Parse pagination parameter
paginationParams = None
if pagination:
try:
paginationDict = json.loads(pagination)
if paginationDict:
# Normalize pagination dict
if 'sort' in paginationDict and paginationDict['sort']:
normalizedSort = []
for item in paginationDict['sort']:
if isinstance(item, dict):
normalizedSort.append(item)
paginationDict['sort'] = normalizedSort if normalizedSort else None
paginationParams = paginationDict
except (json.JSONDecodeError, ValueError) as e:
raise HTTPException(
status_code=400,
detail=f"Invalid pagination parameter: {str(e)}"
)
# Get all UserMandate entries for this mandate
userMandates = rootInterface.getUserMandatesByMandate(targetMandateId)
result = []
for um in userMandates:
# Get user info
user = rootInterface.getUser(str(um.userId))
if not user:
continue
# Get roles for this membership
roleIds = rootInterface.getRoleIdsForUserMandate(str(um.id))
# Resolve role labels for display (only mandate-level roles, deduplicated)
roleLabels = []
filteredRoleIds = []
seenLabels = set()
for roleId in roleIds:
role = rootInterface.getRole(roleId)
if role:
# Skip feature-instance roles - they don't belong in mandate membership
if role.featureInstanceId:
continue
filteredRoleIds.append(roleId)
if role.roleLabel not in seenLabels:
roleLabels.append(role.roleLabel)
seenLabels.add(role.roleLabel)
else:
# Role not found - fail-safe: skip (no access)
logger.warning(f"Role {roleId} not found, skipping")
continue
result.append({
"id": str(um.id), # UserMandate ID as primary key
"userId": str(user.id),
"username": user.username,
"email": user.email,
"fullName": user.fullName,
"roleIds": filteredRoleIds,
"roleLabels": roleLabels,
"enabled": um.enabled
})
# Apply search, filtering, and sorting if pagination requested
if paginationParams:
# Apply search (if search term provided)
searchTerm = paginationParams.get('search', '').lower() if paginationParams.get('search') else ''
if searchTerm:
searchedResult = []
for item in result:
username = (item.get("username") or "").lower()
email = (item.get("email") or "").lower()
fullName = (item.get("fullName") or "").lower()
roleLabelsStr = " ".join(item.get("roleLabels") or []).lower()
if searchTerm in username or searchTerm in email or searchTerm in fullName or searchTerm in roleLabelsStr:
searchedResult.append(item)
result = searchedResult
# Apply filters (if filters provided)
filters = paginationParams.get('filters')
if filters:
for fieldName, filterValue in filters.items():
if filterValue is not None and filterValue != '':
filterValueLower = str(filterValue).lower()
result = [
item for item in result
if str(item.get(fieldName, '')).lower() == filterValueLower
]
# Apply sorting
sortFields = paginationParams.get('sort')
if sortFields:
for sortItem in reversed(sortFields):
field = sortItem.get('field')
direction = sortItem.get('direction', 'asc')
if field:
result = sorted(
result,
key=lambda x: str(x.get(field, '') or '').lower(),
reverse=(direction == 'desc')
)
# Apply pagination
page = paginationParams.get('page', 1)
pageSize = paginationParams.get('pageSize', 25)
totalItems = len(result)
totalPages = (totalItems + pageSize - 1) // pageSize if totalItems > 0 else 0
startIdx = (page - 1) * pageSize
endIdx = startIdx + pageSize
paginatedResult = result[startIdx:endIdx]
return {
"items": paginatedResult,
"pagination": {
"currentPage": page,
"pageSize": pageSize,
"totalItems": totalItems,
"totalPages": totalPages
}
}
# No pagination - return all users as list
return result
except HTTPException:
raise
except Exception as e:
logger.error(f"Error listing users for mandate {targetMandateId}: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to list users: {str(e)}"
)
@router.get("/{targetMandateId}/users/filter-values")
@limiter.limit("60/minute")
def get_mandate_users_filter_values(
request: Request,
targetMandateId: str = Path(..., description="ID of the mandate"),
column: str = Query(..., description="Column key"),
pagination: Optional[str] = Query(None, description="JSON-encoded current filters"),
context: RequestContext = Depends(getRequestContext)
) -> list:
"""Return distinct filter values for a column in mandate users."""
if not _hasMandateAdminRole(context, targetMandateId) and not context.hasSysAdminRole:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=routeApiMsg("Mandate-Admin role required"))
try:
from modules.routes.routeDataUsers import _handleFilterValuesRequest
rootInterface = interfaceDbApp.getRootInterface()
mandate = rootInterface.getMandate(targetMandateId)
if not mandate:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Mandate {targetMandateId} not found")
userMandates = rootInterface.getUserMandatesByMandate(targetMandateId)
result = []
for um in userMandates:
user = rootInterface.getUser(str(um.userId))
if not user:
continue
roleIds = rootInterface.getRoleIdsForUserMandate(str(um.id))
roleLabels = []
filteredRoleIds = []
seenLabels = set()
for roleId in roleIds:
role = rootInterface.getRole(roleId)
if role:
if role.featureInstanceId:
continue
filteredRoleIds.append(roleId)
if role.roleLabel not in seenLabels:
roleLabels.append(role.roleLabel)
seenLabels.add(role.roleLabel)
result.append({
"id": str(um.id),
"userId": str(user.id),
"username": user.username,
"email": user.email,
"fullName": user.fullName,
"roleIds": filteredRoleIds,
"roleLabels": roleLabels,
"enabled": um.enabled
})
return _handleFilterValuesRequest(result, column, pagination)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting filter values for mandate users: {str(e)}")
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
@router.post("/{targetMandateId}/users", response_model=UserMandateResponse)
@limiter.limit("30/minute")
def add_user_to_mandate(
request: Request,
targetMandateId: str = Path(..., description="ID of the mandate"),
data: UserMandateCreate = Body(...),
context: RequestContext = Depends(getRequestContext)
) -> UserMandateResponse:
"""
Add a user to a mandate with specified roles.
Requires Mandate-Admin role (SysAdmin passes automatically).
Args:
targetMandateId: Target mandate ID
data: User ID and role IDs to assign
"""
# Check Mandate-Admin permission
if not _hasMandateAdminRole(context, targetMandateId):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=routeApiMsg("Mandate-Admin role required to add users")
)
try:
rootInterface = interfaceDbApp.getRootInterface()
# 3. Verify mandate exists
mandate = rootInterface.getMandate(targetMandateId)
if not mandate:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Mandate {targetMandateId} not found"
)
# 4. Verify target user exists
targetUser = rootInterface.getUser(data.targetUserId)
if not targetUser:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"User {data.targetUserId} not found"
)
# 5. Check if user is already a member
existingMembership = rootInterface.getUserMandate(data.targetUserId, targetMandateId)
if existingMembership:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"User {data.targetUserId} is already a member of this mandate"
)
# 6. Validate roles (must exist and belong to this mandate or be global)
for roleId in data.roleIds:
try:
rootInterface.validateRoleForMandate(roleId, targetMandateId)
except ValueError as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e)
)
# 7. Create UserMandate
userMandate = rootInterface.createUserMandate(
userId=data.targetUserId,
mandateId=targetMandateId,
roleIds=data.roleIds
)
# 8. Audit - Log permission change with IP address
audit_logger.logPermissionChange(
userId=str(context.user.id),
mandateId=targetMandateId,
action="user_added_to_mandate",
targetUserId=data.targetUserId,
details=f"Roles assigned: {data.roleIds}",
resourceType="UserMandate",
resourceId=str(userMandate.id)
)
logger.info(
f"User {context.user.id} added user {data.targetUserId} to mandate {targetMandateId} "
f"with roles {data.roleIds}"
)
mname = _mandate_display_name(mandate)
create_access_change_notification(
data.targetUserId,
"Mandantenzugriff",
f"Sie wurden dem Mandanten «{mname}» hinzugefügt.",
"mandate_access",
targetMandateId,
)
return UserMandateResponse(
id=str(userMandate.id), # UserMandate ID as primary key
userId=data.targetUserId,
mandateId=targetMandateId,
roleIds=data.roleIds,
enabled=True
)
except HTTPException:
raise
except SubscriptionCapacityException as cap:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=cap.message,
)
except Exception as e:
logger.error(f"Error adding user to mandate: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to add user to mandate: {str(e)}"
)
@router.delete("/{targetMandateId}/users/{targetUserId}", response_model=Dict[str, str])
@limiter.limit("30/minute")
def remove_user_from_mandate(
request: Request,
targetMandateId: str = Path(..., description="ID of the mandate"),
targetUserId: str = Path(..., description="ID of the user to remove"),
context: RequestContext = Depends(getRequestContext)
) -> Dict[str, str]:
"""
Remove a user from a mandate.
Requires Mandate-Admin role.
Cannot remove the last admin from a mandate (orphan prevention).
Args:
targetMandateId: Target mandate ID
targetUserId: User ID to remove
"""
# Check Mandate-Admin permission
if not _hasMandateAdminRole(context, targetMandateId):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=routeApiMsg("Mandate-Admin role required")
)
try:
rootInterface = interfaceDbApp.getRootInterface()
# Verify mandate exists
mandate = rootInterface.getMandate(targetMandateId)
if not mandate:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Mandate {targetMandateId} not found"
)
# Get user's membership
membership = rootInterface.getUserMandate(targetUserId, targetMandateId)
if not membership:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"User {targetUserId} is not a member of this mandate"
)
# Check if this is the last admin (orphan prevention)
if _isLastMandateAdmin(rootInterface, targetMandateId, targetUserId):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=routeApiMsg("Cannot remove the last admin from a mandate. Assign another admin first.")
)
# Delete UserMandate (CASCADE will delete UserMandateRole entries)
rootInterface.deleteUserMandate(targetUserId, targetMandateId)
# Audit - Log permission change
audit_logger.logPermissionChange(
userId=str(context.user.id),
mandateId=targetMandateId,
action="user_removed_from_mandate",
targetUserId=targetUserId,
details="User removed from mandate",
resourceType="UserMandate"
)
logger.info(f"User {context.user.id} removed user {targetUserId} from mandate {targetMandateId}")
mname = _mandate_display_name(mandate)
create_access_change_notification(
targetUserId,
"Mandantenzugriff",
f"Sie wurden aus dem Mandanten «{mname}» entfernt.",
"mandate_access",
targetMandateId,
)
return {"message": "User removed from mandate", "userId": targetUserId}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error removing user from mandate: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to remove user from mandate: {str(e)}"
)
@router.put("/{targetMandateId}/users/{targetUserId}/roles", response_model=UserMandateResponse)
@limiter.limit("30/minute")
def update_user_roles_in_mandate(
request: Request,
targetMandateId: str = Path(..., description="ID of the mandate"),
targetUserId: str = Path(..., description="ID of the user"),
roleIds: List[str] = Body(..., description="New role IDs to assign"),
context: RequestContext = Depends(getRequestContext)
) -> UserMandateResponse:
"""
Update a user's roles within a mandate.
Replaces all existing roles with the new set.
Requires Mandate-Admin role.
Args:
targetMandateId: Target mandate ID
targetUserId: User ID to update
roleIds: New set of role IDs
"""
# Check Mandate-Admin permission
if not _hasMandateAdminRole(context, targetMandateId):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=routeApiMsg("Mandate-Admin role required")
)
try:
rootInterface = interfaceDbApp.getRootInterface()
# Get user's membership
membership = rootInterface.getUserMandate(targetUserId, targetMandateId)
if not membership:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"User {targetUserId} is not a member of this mandate"
)
# Validate new roles
for roleId in roleIds:
try:
rootInterface.validateRoleForMandate(roleId, targetMandateId)
except ValueError as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e)
)
# Check if removing admin role would leave mandate without admins
currentRoleIds = rootInterface.getRoleIdsForUserMandate(str(membership.id))
isCurrentlyAdmin = _hasAdminRoleInList(rootInterface, currentRoleIds, targetMandateId)
willBeAdmin = _hasAdminRoleInList(rootInterface, roleIds, targetMandateId)
if isCurrentlyAdmin and not willBeAdmin:
if _isLastMandateAdmin(rootInterface, targetMandateId, targetUserId):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=routeApiMsg("Cannot remove admin role from the last admin. Assign another admin first.")
)
# Remove existing role assignments
rootInterface.deleteUserMandateRoles(str(membership.id))
# Add new role assignments
for roleId in roleIds:
rootInterface.addRoleToUserMandate(str(membership.id), roleId)
# Audit - Log role assignment change
audit_logger.logPermissionChange(
userId=str(context.user.id),
mandateId=targetMandateId,
action="role_assigned",
targetUserId=targetUserId,
details=f"New roles: {roleIds}",
resourceType="UserMandateRole",
resourceId=str(membership.id)
)
logger.info(
f"User {context.user.id} updated roles for user {targetUserId} "
f"in mandate {targetMandateId} to {roleIds}"
)
mandate_meta = rootInterface.getMandate(targetMandateId)
mname = _mandate_display_name(mandate_meta)
create_access_change_notification(
targetUserId,
"Mandantenrollen geändert",
f"Ihre Rollen im Mandanten «{mname}» wurden angepasst.",
"mandate_access",
targetMandateId,
)
return UserMandateResponse(
id=str(membership.id), # UserMandate ID as primary key
userId=targetUserId,
mandateId=targetMandateId,
roleIds=roleIds,
enabled=membership.enabled
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error updating user roles in mandate: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to update user roles: {str(e)}"
)
# =============================================================================
# Helper Functions
# =============================================================================
def _mandate_display_name(mandate: Any) -> str:
"""Human-readable mandate label for notifications."""
if mandate is None:
return ""
if isinstance(mandate, dict):
if mandate.get("label"):
return str(mandate["label"])
name = mandate.get("name")
if isinstance(name, dict):
return str(name.get("de") or name.get("en") or (next(iter(name.values()), "") if name else ""))
return str(name or mandate.get("id", ""))
label = getattr(mandate, "label", None)
if label:
return str(label)
name = getattr(mandate, "name", None)
if isinstance(name, dict):
return str(name.get("de") or name.get("en") or (next(iter(name.values()), "") if name else ""))
if name is not None:
return str(name)
return str(getattr(mandate, "id", ""))
def _getAdminMandateIds(context: RequestContext) -> List[str]:
"""
Get list of mandate IDs where the user has the admin role.
Returns empty list if user has no admin roles.
"""
mandateIds = []
try:
rootInterface = interfaceDbApp.getRootInterface()
userId = str(context.user.id)
userMandates = rootInterface.getUserMandates(userId)
for um in userMandates:
if not getattr(um, 'enabled', True):
continue
umId = getattr(um, 'id', None)
mandateId = getattr(um, 'mandateId', None)
if not umId or not mandateId:
continue
roleIds = rootInterface.getRoleIdsForUserMandate(str(umId))
for roleId in roleIds:
role = rootInterface.getRole(roleId)
if role and role.roleLabel == "admin" and not role.featureInstanceId:
mandateIds.append(str(mandateId))
break
except Exception as e:
logger.error(f"Error getting admin mandate IDs: {e}")
return mandateIds
def _hasMandateAdminRole(context: RequestContext, mandateId: str) -> bool:
"""
Check if the user has mandate admin role for the specified mandate.
Works with or without X-Mandate-Id header (admin pages don't send it).
"""
if context.hasSysAdminRole:
return True
# If mandate context matches, check roles from context directly
if context.mandateId and str(context.mandateId) == str(mandateId):
if context.roleIds:
try:
rootInterface = interfaceDbApp.getRootInterface()
for roleId in context.roleIds:
role = rootInterface.getRole(roleId)
if role and role.roleLabel == "admin" and not role.featureInstanceId:
return True
except Exception as e:
logger.error(f"Error checking mandate admin role: {e}")
return False
# No mandate context (admin pages) — check via user's mandate memberships
adminMandateIds = _getAdminMandateIds(context)
return str(mandateId) in adminMandateIds
def _isLastMandateAdmin(interface, mandateId: str, excludeUserId: str) -> bool:
"""
Check if excluding this user would leave the mandate without any admins.
"""
try:
# Get all UserMandates for this mandate (Pydantic models)
allMandates = interface.getUserMandatesByMandate(mandateId)
userMandates = [um for um in allMandates if um.enabled]
adminCount = 0
for um in userMandates:
if str(um.userId) == str(excludeUserId):
continue
# Check if this user has admin role
roleIds = interface.getRoleIdsForUserMandate(str(um.id))
if _hasAdminRoleInList(interface, roleIds, mandateId):
adminCount += 1
return adminCount == 0
except Exception as e:
logger.error(f"Error checking last admin: {e}")
return True # Fail-safe: assume they're the last admin (prevents deletion)
def _hasAdminRoleInList(interface, roleIds: List[str], mandateId: str) -> bool:
"""
Check if any of the role IDs is an admin role for the mandate.
"""
for roleId in roleIds:
role = interface.getRole(roleId)
if role:
# Admin role at mandate level (global or mandate-specific, not feature-instance)
if role.roleLabel == "admin" and not role.featureInstanceId:
if not role.mandateId or str(role.mandateId) == str(mandateId):
return True
return False