1040 lines
40 KiB
Python
1040 lines
40 KiB
Python
# Copyright (c) 2025 Patrick Motsch
|
|
# All rights reserved.
|
|
"""
|
|
Mandate routes for the backend API.
|
|
Implements the endpoints for mandate management.
|
|
|
|
MULTI-TENANT:
|
|
- Mandate CRUD is SysAdmin-only (mandates are system resources)
|
|
- User management within mandates is Mandate-Admin (add/remove users)
|
|
"""
|
|
|
|
from fastapi import APIRouter, HTTPException, Depends, Body, Path, Request, Response, Query
|
|
from typing import List, Dict, Any, Optional
|
|
from fastapi import status
|
|
import logging
|
|
import json
|
|
from pydantic import BaseModel, Field
|
|
|
|
# Import auth module
|
|
from modules.auth import limiter, requireSysAdminRole, getRequestContext, RequestContext
|
|
|
|
# Import interfaces
|
|
import modules.interfaces.interfaceDbApp as interfaceDbApp
|
|
from modules.interfaces.interfaceDbBilling import _getRootInterface as _getBillingRootInterface
|
|
from modules.shared.attributeUtils import getModelAttributeDefinitions
|
|
from modules.shared.auditLogger import audit_logger
|
|
|
|
# Import the model classes
|
|
from modules.datamodels.datamodelUam import Mandate, User
|
|
from modules.datamodels.datamodelMembership import UserMandate, UserMandateRole
|
|
from modules.datamodels.datamodelRbac import Role
|
|
from modules.datamodels.datamodelPagination import PaginationParams, PaginatedResponse, PaginationMetadata, normalize_pagination_dict
|
|
from modules.routes.routeNotifications import create_access_change_notification
|
|
from modules.serviceCenter.services.serviceSubscription.mainServiceSubscription import SubscriptionCapacityException
|
|
from modules.shared.i18nRegistry import apiRouteContext
|
|
routeApiMsg = apiRouteContext("routeDataMandates")
|
|
|
|
|
|
# =============================================================================
|
|
# Request/Response Models for User Management
|
|
# =============================================================================
|
|
|
|
class UserMandateCreate(BaseModel):
|
|
"""Request model for adding a user to a mandate"""
|
|
targetUserId: str = Field(..., description="User ID to add to the mandate")
|
|
roleIds: List[str] = Field(..., description="Role IDs to assign to the user")
|
|
|
|
|
|
class UserMandateResponse(BaseModel):
|
|
"""Response model for user mandate membership"""
|
|
id: str # UserMandate ID as primary key
|
|
userId: str
|
|
mandateId: str
|
|
roleIds: List[str]
|
|
enabled: bool
|
|
|
|
|
|
class MandateUserInfo(BaseModel):
|
|
"""User info within a mandate context"""
|
|
id: str # UserMandate ID as primary key
|
|
userId: str
|
|
username: str
|
|
email: Optional[str]
|
|
fullName: Optional[str]
|
|
roleIds: List[str]
|
|
roleLabels: List[str] # Resolved role labels for display
|
|
enabled: bool
|
|
|
|
# Configure logger
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Model attributes for Mandate
|
|
mandateAttributes = getModelAttributeDefinitions(Mandate)
|
|
|
|
# Create a router for the mandate endpoints
|
|
router = APIRouter(
|
|
prefix="/api/mandates",
|
|
tags=["Manage Mandates"],
|
|
responses={404: {"description": "Not found"}}
|
|
)
|
|
|
|
@router.get("/", response_model=PaginatedResponse[Mandate])
|
|
@limiter.limit("30/minute")
|
|
def get_mandates(
|
|
request: Request,
|
|
pagination: Optional[str] = Query(None, description="JSON-encoded PaginationParams object"),
|
|
mode: Optional[str] = Query(None, description="'filterValues' for distinct column values, 'ids' for all filtered IDs"),
|
|
column: Optional[str] = Query(None, description="Column key (required when mode=filterValues)"),
|
|
context: RequestContext = Depends(getRequestContext)
|
|
) -> PaginatedResponse[Mandate]:
|
|
"""
|
|
Get mandates with optional pagination, sorting, and filtering.
|
|
|
|
Access:
|
|
- SysAdmin: all mandates
|
|
- MandateAdmin: only mandates where user has admin role
|
|
- Other: 403
|
|
|
|
Query Parameters:
|
|
- pagination: JSON-encoded PaginationParams object, or None for no pagination
|
|
"""
|
|
try:
|
|
# Check admin access
|
|
isSysAdmin = context.hasSysAdminRole
|
|
if not isSysAdmin:
|
|
adminMandateIds = _getAdminMandateIds(context)
|
|
if not adminMandateIds:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail=routeApiMsg("Admin role required")
|
|
)
|
|
|
|
# Parse pagination parameter
|
|
paginationParams = None
|
|
if pagination:
|
|
try:
|
|
paginationDict = json.loads(pagination)
|
|
if paginationDict:
|
|
paginationDict = normalize_pagination_dict(paginationDict)
|
|
paginationParams = PaginationParams(**paginationDict)
|
|
except (json.JSONDecodeError, ValueError) as e:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Invalid pagination parameter: {str(e)}"
|
|
)
|
|
|
|
from modules.routes.routeHelpers import (
|
|
handleFilterValuesInMemory, handleIdsInMemory,
|
|
handleFilterValuesMode, handleIdsMode,
|
|
parseCrossFilterPagination,
|
|
)
|
|
|
|
appInterface = interfaceDbApp.getRootInterface()
|
|
|
|
if mode == "filterValues":
|
|
if not column:
|
|
raise HTTPException(status_code=400, detail="column parameter required for mode=filterValues")
|
|
if isSysAdmin:
|
|
crossPagination = parseCrossFilterPagination(column, pagination)
|
|
try:
|
|
from fastapi.responses import JSONResponse
|
|
values = appInterface.db.getDistinctColumnValues(Mandate, column, crossPagination)
|
|
return JSONResponse(content=sorted(values, key=lambda v: str(v).lower()))
|
|
except Exception:
|
|
result = appInterface.getAllMandates(pagination=None)
|
|
items = result if isinstance(result, list) else (result.items if hasattr(result, 'items') else result)
|
|
items = [i.model_dump() if hasattr(i, 'model_dump') else i for i in items]
|
|
return handleFilterValuesInMemory(items, column, pagination)
|
|
else:
|
|
mandateItems = []
|
|
for mid in adminMandateIds:
|
|
m = appInterface.getMandate(mid)
|
|
if m and getattr(m, "enabled", True):
|
|
mandateItems.append(m.model_dump() if hasattr(m, 'model_dump') else m if isinstance(m, dict) else vars(m))
|
|
return handleFilterValuesInMemory(mandateItems, column, pagination)
|
|
|
|
if mode == "ids":
|
|
if isSysAdmin:
|
|
return handleIdsMode(appInterface.db, Mandate, pagination)
|
|
else:
|
|
mandateItems = []
|
|
for mid in adminMandateIds:
|
|
m = appInterface.getMandate(mid)
|
|
if m and getattr(m, "enabled", True):
|
|
mandateItems.append(m.model_dump() if hasattr(m, 'model_dump') else m if isinstance(m, dict) else vars(m))
|
|
return handleIdsInMemory(mandateItems, pagination)
|
|
|
|
if isSysAdmin:
|
|
result = appInterface.getAllMandates(pagination=paginationParams)
|
|
else:
|
|
allMandates = []
|
|
for mandateId in adminMandateIds:
|
|
mandate = appInterface.getMandate(mandateId)
|
|
if mandate and getattr(mandate, "enabled", True):
|
|
mandateDict = mandate if isinstance(mandate, dict) else mandate.model_dump() if hasattr(mandate, 'model_dump') else vars(mandate)
|
|
allMandates.append(mandateDict)
|
|
result = allMandates
|
|
paginationParams = None
|
|
|
|
if paginationParams and hasattr(result, 'items'):
|
|
return PaginatedResponse(
|
|
items=result.items,
|
|
pagination=PaginationMetadata(
|
|
currentPage=paginationParams.page,
|
|
pageSize=paginationParams.pageSize,
|
|
totalItems=result.totalItems,
|
|
totalPages=result.totalPages,
|
|
sort=paginationParams.sort,
|
|
filters=paginationParams.filters
|
|
)
|
|
)
|
|
else:
|
|
items = result if isinstance(result, list) else (result.items if hasattr(result, 'items') else result)
|
|
return PaginatedResponse(
|
|
items=items,
|
|
pagination=None
|
|
)
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error getting mandates: {str(e)}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Failed to get mandates: {str(e)}"
|
|
)
|
|
|
|
|
|
@router.get("/{targetMandateId}", response_model=Mandate)
|
|
@limiter.limit("30/minute")
|
|
def get_mandate(
|
|
request: Request,
|
|
targetMandateId: str = Path(..., description="ID of the mandate"),
|
|
context: RequestContext = Depends(getRequestContext)
|
|
) -> Mandate:
|
|
"""
|
|
Get a specific mandate by ID.
|
|
|
|
Access:
|
|
- SysAdmin: any mandate
|
|
- MandateAdmin: only mandates where user has admin role
|
|
- Other: 403
|
|
"""
|
|
try:
|
|
mandateId = targetMandateId
|
|
# Check access
|
|
if not context.hasSysAdminRole:
|
|
adminMandateIds = _getAdminMandateIds(context)
|
|
if mandateId not in adminMandateIds:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail=routeApiMsg("Admin role required for this mandate")
|
|
)
|
|
|
|
appInterface = interfaceDbApp.getRootInterface()
|
|
mandate = appInterface.getMandate(mandateId)
|
|
|
|
if not mandate:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Mandate with ID {mandateId} not found"
|
|
)
|
|
|
|
return mandate
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error getting mandate {mandateId}: {str(e)}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Failed to get mandate: {str(e)}"
|
|
)
|
|
|
|
@router.post("/", response_model=Mandate)
|
|
@limiter.limit("10/minute")
|
|
def create_mandate(
|
|
request: Request,
|
|
mandateData: dict = Body(..., description="Mandate data with at least 'name' field"),
|
|
currentUser: User = Depends(requireSysAdminRole)
|
|
) -> Mandate:
|
|
"""
|
|
Create a new mandate.
|
|
MULTI-TENANT: SysAdmin-only.
|
|
"""
|
|
try:
|
|
logger.debug(f"Creating mandate with data: {mandateData}")
|
|
|
|
# Validate required fields
|
|
name = mandateData.get('name')
|
|
if not name or (isinstance(name, str) and name.strip() == ''):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=routeApiMsg("Mandate name is required")
|
|
)
|
|
|
|
# Get optional fields with defaults
|
|
label = mandateData.get('label')
|
|
enabled = mandateData.get('enabled', True)
|
|
|
|
appInterface = interfaceDbApp.getRootInterface()
|
|
|
|
# Create mandate
|
|
newMandate = appInterface.createMandate(
|
|
name=name,
|
|
label=label,
|
|
enabled=enabled
|
|
)
|
|
|
|
if not newMandate:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=routeApiMsg("Failed to create mandate")
|
|
)
|
|
|
|
try:
|
|
billingInterface = _getBillingRootInterface()
|
|
billingInterface.getOrCreateSettings(str(newMandate.id))
|
|
logger.debug(f"Ensured billing settings for new mandate {newMandate.id}")
|
|
except Exception as billingErr:
|
|
logger.warning(
|
|
f"Could not create default billing settings for mandate {newMandate.id}: {billingErr}"
|
|
)
|
|
|
|
try:
|
|
from modules.datamodels.datamodelSubscription import (
|
|
MandateSubscription, SubscriptionStatusEnum, BUILTIN_PLANS,
|
|
)
|
|
from modules.interfaces.interfaceDbSubscription import _getRootInterface as _getSubRoot
|
|
from datetime import datetime, timezone, timedelta
|
|
|
|
planKey = mandateData.get("planKey", "TRIAL_14D")
|
|
plan = BUILTIN_PLANS.get(planKey)
|
|
if plan:
|
|
now = datetime.now(timezone.utc)
|
|
targetStatus = SubscriptionStatusEnum.TRIALING if plan.trialDays else SubscriptionStatusEnum.ACTIVE
|
|
sub = MandateSubscription(
|
|
mandateId=str(newMandate.id),
|
|
planKey=planKey,
|
|
status=targetStatus,
|
|
recurring=plan.autoRenew and not plan.trialDays,
|
|
startedAt=now,
|
|
currentPeriodStart=now,
|
|
)
|
|
if plan.trialDays:
|
|
sub.trialEndsAt = now + timedelta(days=plan.trialDays)
|
|
sub.currentPeriodEnd = now + timedelta(days=plan.trialDays)
|
|
subInterface = _getSubRoot()
|
|
subInterface.createSubscription(sub)
|
|
logger.info(f"Created {targetStatus.value} subscription ({planKey}) for mandate {newMandate.id}")
|
|
except Exception as subErr:
|
|
logger.error(f"Failed to create subscription for mandate {newMandate.id}: {subErr}")
|
|
|
|
logger.info(f"Mandate {newMandate.id} created by SysAdmin {currentUser.id}")
|
|
|
|
return newMandate
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error creating mandate: {str(e)}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Failed to create mandate: {str(e)}"
|
|
)
|
|
|
|
@router.put("/{mandateId}", response_model=Mandate)
|
|
@limiter.limit("10/minute")
|
|
def update_mandate(
|
|
request: Request,
|
|
mandateId: str = Path(..., description="ID of the mandate to update"),
|
|
mandateData: dict = Body(..., description="Mandate update data"),
|
|
currentUser: User = Depends(requireSysAdminRole)
|
|
) -> Mandate:
|
|
"""
|
|
Update an existing mandate.
|
|
MULTI-TENANT: SysAdmin-only.
|
|
"""
|
|
try:
|
|
logger.debug(f"Updating mandate {mandateId} with data: {mandateData}")
|
|
|
|
appInterface = interfaceDbApp.getRootInterface()
|
|
|
|
# Check if mandate exists
|
|
existingMandate = appInterface.getMandate(mandateId)
|
|
if not existingMandate:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Mandate with ID {mandateId} not found"
|
|
)
|
|
|
|
# Update mandate - mandateData is already a dict
|
|
updatedMandate = appInterface.updateMandate(mandateId, mandateData)
|
|
|
|
if not updatedMandate:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=routeApiMsg("Failed to update mandate")
|
|
)
|
|
|
|
logger.info(f"Mandate {mandateId} updated by SysAdmin {currentUser.id}")
|
|
|
|
return updatedMandate
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error updating mandate {mandateId}: {str(e)}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Failed to update mandate: {str(e)}"
|
|
)
|
|
|
|
@router.delete("/{mandateId}", response_model=Dict[str, Any])
|
|
@limiter.limit("10/minute")
|
|
def delete_mandate(
|
|
request: Request,
|
|
mandateId: str = Path(..., description="ID of the mandate to delete"),
|
|
force: bool = Query(False, description="Hard-delete with full cascade (irreversible)"),
|
|
currentUser: User = Depends(requireSysAdminRole)
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Delete a mandate.
|
|
Default: soft-delete (sets enabled=False, 30-day retention).
|
|
With ?force=true: hard-delete with full cascade (irreversible).
|
|
Requires X-Confirm-Name header matching the mandate name for hard-delete.
|
|
MULTI-TENANT: SysAdmin-only.
|
|
"""
|
|
try:
|
|
appInterface = interfaceDbApp.getRootInterface()
|
|
|
|
existingMandate = appInterface.getMandate(mandateId)
|
|
if not existingMandate:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Mandate {mandateId} not found"
|
|
)
|
|
|
|
if force:
|
|
confirmName = request.headers.get("X-Confirm-Name", "")
|
|
mandateName = getattr(existingMandate, "name", "") or ""
|
|
if confirmName != mandateName:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=routeApiMsg("Hard-delete requires X-Confirm-Name header matching the mandate name")
|
|
)
|
|
|
|
try:
|
|
appInterface.deleteMandate(mandateId, force=force)
|
|
except ValueError as e:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=str(e)
|
|
)
|
|
|
|
mode = "hard-deleted" if force else "soft-deleted"
|
|
logger.info(f"Mandate {mandateId} {mode} by SysAdmin {currentUser.id}")
|
|
|
|
return {"message": f"Mandate {mandateId} {mode} successfully"}
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error deleting mandate {mandateId}: {str(e)}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Failed to delete mandate: {str(e)}"
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# User Management within Mandates (Mandate-Admin)
|
|
# =============================================================================
|
|
|
|
@router.get("/{targetMandateId}/users")
|
|
@limiter.limit("60/minute")
|
|
def list_mandate_users(
|
|
request: Request,
|
|
targetMandateId: str = Path(..., description="ID of the mandate"),
|
|
pagination: Optional[str] = Query(None, description="JSON-encoded PaginationParams object"),
|
|
mode: Optional[str] = Query(None, description="'filterValues' for distinct column values, 'ids' for all filtered IDs"),
|
|
column: Optional[str] = Query(None, description="Column key (required when mode=filterValues)"),
|
|
context: RequestContext = Depends(getRequestContext)
|
|
):
|
|
"""
|
|
List all users in a mandate with pagination, search, and sorting support.
|
|
|
|
Requires Mandate-Admin role or SysAdmin.
|
|
|
|
Args:
|
|
pagination: Optional pagination parameters (page, pageSize, search, filters, sort)
|
|
"""
|
|
# Check permission
|
|
if not _hasMandateAdminRole(context, targetMandateId) and not context.hasSysAdminRole:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail=routeApiMsg("Mandate-Admin role required")
|
|
)
|
|
|
|
try:
|
|
rootInterface = interfaceDbApp.getRootInterface()
|
|
|
|
# Verify mandate exists
|
|
mandate = rootInterface.getMandate(targetMandateId)
|
|
if not mandate:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Mandate {targetMandateId} not found"
|
|
)
|
|
|
|
# Parse pagination parameter
|
|
paginationParams = None
|
|
if pagination:
|
|
try:
|
|
paginationDict = json.loads(pagination)
|
|
if paginationDict:
|
|
# Normalize pagination dict
|
|
if 'sort' in paginationDict and paginationDict['sort']:
|
|
normalizedSort = []
|
|
for item in paginationDict['sort']:
|
|
if isinstance(item, dict):
|
|
normalizedSort.append(item)
|
|
paginationDict['sort'] = normalizedSort if normalizedSort else None
|
|
paginationParams = paginationDict
|
|
except (json.JSONDecodeError, ValueError) as e:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Invalid pagination parameter: {str(e)}"
|
|
)
|
|
|
|
# Get all UserMandate entries for this mandate
|
|
userMandates = rootInterface.getUserMandatesByMandate(targetMandateId)
|
|
|
|
result = []
|
|
for um in userMandates:
|
|
# Get user info
|
|
user = rootInterface.getUser(str(um.userId))
|
|
if not user:
|
|
continue
|
|
|
|
# Get roles for this membership
|
|
roleIds = rootInterface.getRoleIdsForUserMandate(str(um.id))
|
|
|
|
# Resolve role labels for display (only mandate-level roles, deduplicated)
|
|
roleLabels = []
|
|
filteredRoleIds = []
|
|
seenLabels = set()
|
|
for roleId in roleIds:
|
|
role = rootInterface.getRole(roleId)
|
|
if role:
|
|
# Skip feature-instance roles - they don't belong in mandate membership
|
|
if role.featureInstanceId:
|
|
continue
|
|
filteredRoleIds.append(roleId)
|
|
if role.roleLabel not in seenLabels:
|
|
roleLabels.append(role.roleLabel)
|
|
seenLabels.add(role.roleLabel)
|
|
else:
|
|
# Role not found - fail-safe: skip (no access)
|
|
logger.warning(f"Role {roleId} not found, skipping")
|
|
continue
|
|
|
|
result.append({
|
|
"id": str(um.id),
|
|
"userId": str(user.id),
|
|
"username": user.username,
|
|
"email": user.email,
|
|
"fullName": user.fullName,
|
|
"roleIds": filteredRoleIds,
|
|
"roleLabels": roleLabels,
|
|
"enabled": um.enabled
|
|
})
|
|
|
|
from modules.routes.routeHelpers import (
|
|
handleFilterValuesInMemory, handleIdsInMemory,
|
|
_applyFiltersAndSort as _sharedApplyFiltersAndSort,
|
|
paginateInMemory,
|
|
)
|
|
|
|
if mode == "filterValues":
|
|
if not column:
|
|
raise HTTPException(status_code=400, detail="column parameter required for mode=filterValues")
|
|
return handleFilterValuesInMemory(result, column, pagination)
|
|
|
|
if mode == "ids":
|
|
return handleIdsInMemory(result, pagination)
|
|
|
|
if paginationParams:
|
|
paginationParamsObj = None
|
|
try:
|
|
paginationDict = json.loads(pagination) if pagination else None
|
|
if paginationDict:
|
|
paginationDict = normalize_pagination_dict(paginationDict)
|
|
paginationParamsObj = PaginationParams(**paginationDict)
|
|
except Exception:
|
|
pass
|
|
|
|
filtered = _sharedApplyFiltersAndSort(result, paginationParamsObj)
|
|
totalItems = len(filtered)
|
|
page = paginationParams.get('page', 1) if isinstance(paginationParams, dict) else 1
|
|
pageSize = paginationParams.get('pageSize', 25) if isinstance(paginationParams, dict) else 25
|
|
totalPages = (totalItems + pageSize - 1) // pageSize if totalItems > 0 else 0
|
|
startIdx = (page - 1) * pageSize
|
|
endIdx = startIdx + pageSize
|
|
|
|
return {
|
|
"items": filtered[startIdx:endIdx],
|
|
"pagination": {
|
|
"currentPage": page,
|
|
"pageSize": pageSize,
|
|
"totalItems": totalItems,
|
|
"totalPages": totalPages
|
|
}
|
|
}
|
|
|
|
return result
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error listing users for mandate {targetMandateId}: {e}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Failed to list users: {str(e)}"
|
|
)
|
|
|
|
|
|
@router.post("/{targetMandateId}/users", response_model=UserMandateResponse)
|
|
@limiter.limit("30/minute")
|
|
def add_user_to_mandate(
|
|
request: Request,
|
|
targetMandateId: str = Path(..., description="ID of the mandate"),
|
|
data: UserMandateCreate = Body(...),
|
|
context: RequestContext = Depends(getRequestContext)
|
|
) -> UserMandateResponse:
|
|
"""
|
|
Add a user to a mandate with specified roles.
|
|
|
|
Requires Mandate-Admin role (SysAdmin passes automatically).
|
|
|
|
Args:
|
|
targetMandateId: Target mandate ID
|
|
data: User ID and role IDs to assign
|
|
"""
|
|
# Check Mandate-Admin permission
|
|
if not _hasMandateAdminRole(context, targetMandateId):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail=routeApiMsg("Mandate-Admin role required to add users")
|
|
)
|
|
|
|
try:
|
|
rootInterface = interfaceDbApp.getRootInterface()
|
|
|
|
# 3. Verify mandate exists
|
|
mandate = rootInterface.getMandate(targetMandateId)
|
|
if not mandate:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Mandate {targetMandateId} not found"
|
|
)
|
|
|
|
# 4. Verify target user exists
|
|
targetUser = rootInterface.getUser(data.targetUserId)
|
|
if not targetUser:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"User {data.targetUserId} not found"
|
|
)
|
|
|
|
# 5. Check if user is already a member
|
|
existingMembership = rootInterface.getUserMandate(data.targetUserId, targetMandateId)
|
|
if existingMembership:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_409_CONFLICT,
|
|
detail=f"User {data.targetUserId} is already a member of this mandate"
|
|
)
|
|
|
|
# 6. Validate roles (must exist and belong to this mandate or be global)
|
|
for roleId in data.roleIds:
|
|
try:
|
|
rootInterface.validateRoleForMandate(roleId, targetMandateId)
|
|
except ValueError as e:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=str(e)
|
|
)
|
|
|
|
# 7. Create UserMandate
|
|
userMandate = rootInterface.createUserMandate(
|
|
userId=data.targetUserId,
|
|
mandateId=targetMandateId,
|
|
roleIds=data.roleIds
|
|
)
|
|
|
|
# 8. Audit - Log permission change with IP address
|
|
audit_logger.logPermissionChange(
|
|
userId=str(context.user.id),
|
|
mandateId=targetMandateId,
|
|
action="user_added_to_mandate",
|
|
targetUserId=data.targetUserId,
|
|
details=f"Roles assigned: {data.roleIds}",
|
|
resourceType="UserMandate",
|
|
resourceId=str(userMandate.id)
|
|
)
|
|
|
|
logger.info(
|
|
f"User {context.user.id} added user {data.targetUserId} to mandate {targetMandateId} "
|
|
f"with roles {data.roleIds}"
|
|
)
|
|
|
|
mname = _mandate_display_name(mandate, getattr(targetUser, "language", None))
|
|
create_access_change_notification(
|
|
data.targetUserId,
|
|
"Mandantenzugriff",
|
|
f"Sie wurden dem Mandanten «{mname}» hinzugefügt.",
|
|
"mandate_access",
|
|
targetMandateId,
|
|
)
|
|
|
|
return UserMandateResponse(
|
|
id=str(userMandate.id), # UserMandate ID as primary key
|
|
userId=data.targetUserId,
|
|
mandateId=targetMandateId,
|
|
roleIds=data.roleIds,
|
|
enabled=True
|
|
)
|
|
|
|
except HTTPException:
|
|
raise
|
|
except SubscriptionCapacityException as cap:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail=cap.message,
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Error adding user to mandate: {e}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Failed to add user to mandate: {str(e)}"
|
|
)
|
|
|
|
|
|
@router.delete("/{targetMandateId}/users/{targetUserId}", response_model=Dict[str, str])
|
|
@limiter.limit("30/minute")
|
|
def remove_user_from_mandate(
|
|
request: Request,
|
|
targetMandateId: str = Path(..., description="ID of the mandate"),
|
|
targetUserId: str = Path(..., description="ID of the user to remove"),
|
|
context: RequestContext = Depends(getRequestContext)
|
|
) -> Dict[str, str]:
|
|
"""
|
|
Remove a user from a mandate.
|
|
|
|
Requires Mandate-Admin role.
|
|
Cannot remove the last admin from a mandate (orphan prevention).
|
|
|
|
Args:
|
|
targetMandateId: Target mandate ID
|
|
targetUserId: User ID to remove
|
|
"""
|
|
# Check Mandate-Admin permission
|
|
if not _hasMandateAdminRole(context, targetMandateId):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail=routeApiMsg("Mandate-Admin role required")
|
|
)
|
|
|
|
try:
|
|
rootInterface = interfaceDbApp.getRootInterface()
|
|
|
|
# Verify mandate exists
|
|
mandate = rootInterface.getMandate(targetMandateId)
|
|
if not mandate:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Mandate {targetMandateId} not found"
|
|
)
|
|
|
|
# Get user's membership
|
|
membership = rootInterface.getUserMandate(targetUserId, targetMandateId)
|
|
if not membership:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"User {targetUserId} is not a member of this mandate"
|
|
)
|
|
|
|
# Check if this is the last admin (orphan prevention)
|
|
if _isLastMandateAdmin(rootInterface, targetMandateId, targetUserId):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=routeApiMsg("Cannot remove the last admin from a mandate. Assign another admin first.")
|
|
)
|
|
|
|
# Delete UserMandate (CASCADE will delete UserMandateRole entries)
|
|
rootInterface.deleteUserMandate(targetUserId, targetMandateId)
|
|
|
|
# Audit - Log permission change
|
|
audit_logger.logPermissionChange(
|
|
userId=str(context.user.id),
|
|
mandateId=targetMandateId,
|
|
action="user_removed_from_mandate",
|
|
targetUserId=targetUserId,
|
|
details="User removed from mandate",
|
|
resourceType="UserMandate"
|
|
)
|
|
|
|
logger.info(f"User {context.user.id} removed user {targetUserId} from mandate {targetMandateId}")
|
|
|
|
removedUser = rootInterface.getUser(targetUserId)
|
|
notifyLang = getattr(removedUser, "language", None) if removedUser else getattr(context.user, "language", None)
|
|
mname = _mandate_display_name(mandate, notifyLang)
|
|
create_access_change_notification(
|
|
targetUserId,
|
|
"Mandantenzugriff",
|
|
f"Sie wurden aus dem Mandanten «{mname}» entfernt.",
|
|
"mandate_access",
|
|
targetMandateId,
|
|
)
|
|
|
|
return {"message": "User removed from mandate", "userId": targetUserId}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error removing user from mandate: {e}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Failed to remove user from mandate: {str(e)}"
|
|
)
|
|
|
|
|
|
@router.put("/{targetMandateId}/users/{targetUserId}/roles", response_model=UserMandateResponse)
|
|
@limiter.limit("30/minute")
|
|
def update_user_roles_in_mandate(
|
|
request: Request,
|
|
targetMandateId: str = Path(..., description="ID of the mandate"),
|
|
targetUserId: str = Path(..., description="ID of the user"),
|
|
roleIds: List[str] = Body(..., description="New role IDs to assign"),
|
|
context: RequestContext = Depends(getRequestContext)
|
|
) -> UserMandateResponse:
|
|
"""
|
|
Update a user's roles within a mandate.
|
|
|
|
Replaces all existing roles with the new set.
|
|
Requires Mandate-Admin role.
|
|
|
|
Args:
|
|
targetMandateId: Target mandate ID
|
|
targetUserId: User ID to update
|
|
roleIds: New set of role IDs
|
|
"""
|
|
# Check Mandate-Admin permission
|
|
if not _hasMandateAdminRole(context, targetMandateId):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail=routeApiMsg("Mandate-Admin role required")
|
|
)
|
|
|
|
try:
|
|
rootInterface = interfaceDbApp.getRootInterface()
|
|
|
|
# Get user's membership
|
|
membership = rootInterface.getUserMandate(targetUserId, targetMandateId)
|
|
if not membership:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"User {targetUserId} is not a member of this mandate"
|
|
)
|
|
|
|
# Validate new roles
|
|
for roleId in roleIds:
|
|
try:
|
|
rootInterface.validateRoleForMandate(roleId, targetMandateId)
|
|
except ValueError as e:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=str(e)
|
|
)
|
|
|
|
# Check if removing admin role would leave mandate without admins
|
|
currentRoleIds = rootInterface.getRoleIdsForUserMandate(str(membership.id))
|
|
isCurrentlyAdmin = _hasAdminRoleInList(rootInterface, currentRoleIds, targetMandateId)
|
|
willBeAdmin = _hasAdminRoleInList(rootInterface, roleIds, targetMandateId)
|
|
|
|
if isCurrentlyAdmin and not willBeAdmin:
|
|
if _isLastMandateAdmin(rootInterface, targetMandateId, targetUserId):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=routeApiMsg("Cannot remove admin role from the last admin. Assign another admin first.")
|
|
)
|
|
|
|
# Remove existing role assignments
|
|
rootInterface.deleteUserMandateRoles(str(membership.id))
|
|
|
|
# Add new role assignments
|
|
for roleId in roleIds:
|
|
rootInterface.addRoleToUserMandate(str(membership.id), roleId)
|
|
|
|
# Audit - Log role assignment change
|
|
audit_logger.logPermissionChange(
|
|
userId=str(context.user.id),
|
|
mandateId=targetMandateId,
|
|
action="role_assigned",
|
|
targetUserId=targetUserId,
|
|
details=f"New roles: {roleIds}",
|
|
resourceType="UserMandateRole",
|
|
resourceId=str(membership.id)
|
|
)
|
|
|
|
logger.info(
|
|
f"User {context.user.id} updated roles for user {targetUserId} "
|
|
f"in mandate {targetMandateId} to {roleIds}"
|
|
)
|
|
|
|
mandate_meta = rootInterface.getMandate(targetMandateId)
|
|
roleUser = rootInterface.getUser(targetUserId)
|
|
notifyLang = getattr(roleUser, "language", None) if roleUser else getattr(context.user, "language", None)
|
|
mname = _mandate_display_name(mandate_meta, notifyLang)
|
|
create_access_change_notification(
|
|
targetUserId,
|
|
"Mandantenrollen geändert",
|
|
f"Ihre Rollen im Mandanten «{mname}» wurden angepasst.",
|
|
"mandate_access",
|
|
targetMandateId,
|
|
)
|
|
|
|
return UserMandateResponse(
|
|
id=str(membership.id), # UserMandate ID as primary key
|
|
userId=targetUserId,
|
|
mandateId=targetMandateId,
|
|
roleIds=roleIds,
|
|
enabled=membership.enabled
|
|
)
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error updating user roles in mandate: {e}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Failed to update user roles: {str(e)}"
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# Helper Functions
|
|
# =============================================================================
|
|
|
|
def _mandate_display_name(mandate: Any, requestLang: Optional[str] = None) -> str:
|
|
"""Human-readable mandate label for notifications."""
|
|
if mandate is None:
|
|
return ""
|
|
if isinstance(mandate, dict):
|
|
if mandate.get("label"):
|
|
return str(mandate["label"])
|
|
name = mandate.get("name")
|
|
if isinstance(name, dict):
|
|
picked = (requestLang and name.get(requestLang)) or name.get("xx") or next(iter(name.values()), "")
|
|
return str(picked) if picked is not None else ""
|
|
return str(name or mandate.get("id", ""))
|
|
label = getattr(mandate, "label", None)
|
|
if label:
|
|
return str(label)
|
|
name = getattr(mandate, "name", None)
|
|
if isinstance(name, dict):
|
|
picked = (requestLang and name.get(requestLang)) or name.get("xx") or next(iter(name.values()), "")
|
|
return str(picked) if picked is not None else ""
|
|
if name is not None:
|
|
return str(name)
|
|
return str(getattr(mandate, "id", ""))
|
|
|
|
|
|
def _getAdminMandateIds(context: RequestContext) -> List[str]:
|
|
"""
|
|
Get list of mandate IDs where the user has the admin role.
|
|
Returns empty list if user has no admin roles.
|
|
"""
|
|
mandateIds = []
|
|
try:
|
|
rootInterface = interfaceDbApp.getRootInterface()
|
|
userId = str(context.user.id)
|
|
userMandates = rootInterface.getUserMandates(userId)
|
|
for um in userMandates:
|
|
if not getattr(um, 'enabled', True):
|
|
continue
|
|
umId = getattr(um, 'id', None)
|
|
mandateId = getattr(um, 'mandateId', None)
|
|
if not umId or not mandateId:
|
|
continue
|
|
roleIds = rootInterface.getRoleIdsForUserMandate(str(umId))
|
|
for roleId in roleIds:
|
|
role = rootInterface.getRole(roleId)
|
|
if role and role.roleLabel == "admin" and not role.featureInstanceId:
|
|
mandateIds.append(str(mandateId))
|
|
break
|
|
except Exception as e:
|
|
logger.error(f"Error getting admin mandate IDs: {e}")
|
|
return mandateIds
|
|
|
|
|
|
def _hasMandateAdminRole(context: RequestContext, mandateId: str) -> bool:
|
|
"""
|
|
Check if the user has mandate admin role for the specified mandate.
|
|
Works with or without X-Mandate-Id header (admin pages don't send it).
|
|
"""
|
|
if context.hasSysAdminRole:
|
|
return True
|
|
|
|
# If mandate context matches, check roles from context directly
|
|
if context.mandateId and str(context.mandateId) == str(mandateId):
|
|
if context.roleIds:
|
|
try:
|
|
rootInterface = interfaceDbApp.getRootInterface()
|
|
for roleId in context.roleIds:
|
|
role = rootInterface.getRole(roleId)
|
|
if role and role.roleLabel == "admin" and not role.featureInstanceId:
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Error checking mandate admin role: {e}")
|
|
return False
|
|
|
|
# No mandate context (admin pages) — check via user's mandate memberships
|
|
adminMandateIds = _getAdminMandateIds(context)
|
|
return str(mandateId) in adminMandateIds
|
|
|
|
|
|
def _isLastMandateAdmin(interface, mandateId: str, excludeUserId: str) -> bool:
|
|
"""
|
|
Check if excluding this user would leave the mandate without any admins.
|
|
"""
|
|
try:
|
|
# Get all UserMandates for this mandate (Pydantic models)
|
|
allMandates = interface.getUserMandatesByMandate(mandateId)
|
|
userMandates = [um for um in allMandates if um.enabled]
|
|
|
|
adminCount = 0
|
|
for um in userMandates:
|
|
if str(um.userId) == str(excludeUserId):
|
|
continue
|
|
|
|
# Check if this user has admin role
|
|
roleIds = interface.getRoleIdsForUserMandate(str(um.id))
|
|
if _hasAdminRoleInList(interface, roleIds, mandateId):
|
|
adminCount += 1
|
|
|
|
return adminCount == 0
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error checking last admin: {e}")
|
|
return True # Fail-safe: assume they're the last admin (prevents deletion)
|
|
|
|
|
|
def _hasAdminRoleInList(interface, roleIds: List[str], mandateId: str) -> bool:
|
|
"""
|
|
Check if any of the role IDs is an admin role for the mandate.
|
|
"""
|
|
for roleId in roleIds:
|
|
role = interface.getRole(roleId)
|
|
if role:
|
|
# Admin role at mandate level (global or mandate-specific, not feature-instance)
|
|
if role.roleLabel == "admin" and not role.featureInstanceId:
|
|
if not role.mandateId or str(role.mandateId) == str(mandateId):
|
|
return True
|
|
return False
|