feat(billing): Nutzerhinweise bei leerem Budget + Mandats-Mail (402/SSE) Gateway - InsufficientBalanceException: billingModel, userAction (TOP_UP_SELF / CONTACT_MANDATE_ADMIN), DE/EN-Texte, toClientDict(), fromBalanceCheck() - HTTP 402 + JSON detail für globale API-Fehlerbehandlung - AI/Chatbot: vor Raise ggf. E-Mail an BillingSettings.notifyEmails (PREPAY_MANDATE, Throttle 1h/Mandat) via billingExhaustedNotify - Agent-Loop & Workspace-Route: SSE-ERROR mit strukturiertem Billing-Payload - datamodelBilling: notifyEmails-Doku für Pool-Alerts frontend_nyla - useWorkspace: SSE onError für INSUFFICIENT_BALANCE mit messageDe/En und Hinweis auf Billing-Pfad bei TOP_UP_SELF
969 lines
36 KiB
Python
969 lines
36 KiB
Python
# Copyright (c) 2025 Patrick Motsch
|
|
# All rights reserved.
|
|
"""
|
|
Mandate routes for the backend API.
|
|
Implements the endpoints for mandate management.
|
|
|
|
MULTI-TENANT:
|
|
- Mandate CRUD is SysAdmin-only (mandates are system resources)
|
|
- User management within mandates is Mandate-Admin (add/remove users)
|
|
"""
|
|
|
|
from fastapi import APIRouter, HTTPException, Depends, Body, Path, Request, Response, Query
|
|
from typing import List, Dict, Any, Optional
|
|
from fastapi import status
|
|
import logging
|
|
import json
|
|
from pydantic import BaseModel, Field
|
|
|
|
# Import auth module
|
|
from modules.auth import limiter, requireSysAdminRole, getRequestContext, RequestContext
|
|
|
|
# Import interfaces
|
|
import modules.interfaces.interfaceDbApp as interfaceDbApp
|
|
from modules.interfaces.interfaceDbBilling import _getRootInterface as _getBillingRootInterface
|
|
from modules.shared.attributeUtils import getModelAttributeDefinitions
|
|
from modules.shared.auditLogger import audit_logger
|
|
|
|
# Import the model classes
|
|
from modules.datamodels.datamodelUam import Mandate, User
|
|
from modules.datamodels.datamodelMembership import UserMandate, UserMandateRole
|
|
from modules.datamodels.datamodelRbac import Role
|
|
from modules.datamodels.datamodelPagination import PaginationParams, PaginatedResponse, PaginationMetadata, normalize_pagination_dict
|
|
from modules.routes.routeNotifications import create_access_change_notification
|
|
|
|
|
|
# =============================================================================
|
|
# Request/Response Models for User Management
|
|
# =============================================================================
|
|
|
|
class UserMandateCreate(BaseModel):
|
|
"""Request model for adding a user to a mandate"""
|
|
targetUserId: str = Field(..., description="User ID to add to the mandate")
|
|
roleIds: List[str] = Field(..., description="Role IDs to assign to the user")
|
|
|
|
|
|
class UserMandateResponse(BaseModel):
|
|
"""Response model for user mandate membership"""
|
|
id: str # UserMandate ID as primary key
|
|
userId: str
|
|
mandateId: str
|
|
roleIds: List[str]
|
|
enabled: bool
|
|
|
|
|
|
class MandateUserInfo(BaseModel):
|
|
"""User info within a mandate context"""
|
|
id: str # UserMandate ID as primary key
|
|
userId: str
|
|
username: str
|
|
email: Optional[str]
|
|
fullName: Optional[str]
|
|
roleIds: List[str]
|
|
roleLabels: List[str] # Resolved role labels for display
|
|
enabled: bool
|
|
|
|
# Configure logger
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Model attributes for Mandate
|
|
mandateAttributes = getModelAttributeDefinitions(Mandate)
|
|
|
|
# Create a router for the mandate endpoints
|
|
router = APIRouter(
|
|
prefix="/api/mandates",
|
|
tags=["Manage Mandates"],
|
|
responses={404: {"description": "Not found"}}
|
|
)
|
|
|
|
@router.get("/", response_model=PaginatedResponse[Mandate])
|
|
@limiter.limit("30/minute")
|
|
def get_mandates(
|
|
request: Request,
|
|
pagination: Optional[str] = Query(None, description="JSON-encoded PaginationParams object"),
|
|
context: RequestContext = Depends(getRequestContext)
|
|
) -> PaginatedResponse[Mandate]:
|
|
"""
|
|
Get mandates with optional pagination, sorting, and filtering.
|
|
|
|
Access:
|
|
- SysAdmin: all mandates
|
|
- MandateAdmin: only mandates where user has admin role
|
|
- Other: 403
|
|
|
|
Query Parameters:
|
|
- pagination: JSON-encoded PaginationParams object, or None for no pagination
|
|
"""
|
|
try:
|
|
# Check admin access
|
|
isSysAdmin = context.hasSysAdminRole
|
|
if not isSysAdmin:
|
|
adminMandateIds = _getAdminMandateIds(context)
|
|
if not adminMandateIds:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Admin role required"
|
|
)
|
|
|
|
# Parse pagination parameter
|
|
paginationParams = None
|
|
if pagination:
|
|
try:
|
|
paginationDict = json.loads(pagination)
|
|
if paginationDict:
|
|
paginationDict = normalize_pagination_dict(paginationDict)
|
|
paginationParams = PaginationParams(**paginationDict)
|
|
except (json.JSONDecodeError, ValueError) as e:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Invalid pagination parameter: {str(e)}"
|
|
)
|
|
|
|
appInterface = interfaceDbApp.getRootInterface()
|
|
|
|
if isSysAdmin:
|
|
# SysAdmin: all mandates
|
|
result = appInterface.getAllMandates(pagination=paginationParams)
|
|
else:
|
|
# MandateAdmin: only their mandates
|
|
allMandates = []
|
|
for mandateId in adminMandateIds:
|
|
mandate = appInterface.getMandate(mandateId)
|
|
if mandate:
|
|
mandateDict = mandate if isinstance(mandate, dict) else mandate.model_dump() if hasattr(mandate, 'model_dump') else vars(mandate)
|
|
allMandates.append(mandateDict)
|
|
result = allMandates
|
|
paginationParams = None # Client-side pagination for filtered results
|
|
|
|
# If pagination was requested, result is PaginatedResult
|
|
# If no pagination, result is List[Mandate]
|
|
if paginationParams and hasattr(result, 'items'):
|
|
return PaginatedResponse(
|
|
items=result.items,
|
|
pagination=PaginationMetadata(
|
|
currentPage=paginationParams.page,
|
|
pageSize=paginationParams.pageSize,
|
|
totalItems=result.totalItems,
|
|
totalPages=result.totalPages,
|
|
sort=paginationParams.sort,
|
|
filters=paginationParams.filters
|
|
)
|
|
)
|
|
else:
|
|
items = result if isinstance(result, list) else (result.items if hasattr(result, 'items') else result)
|
|
return PaginatedResponse(
|
|
items=items,
|
|
pagination=None
|
|
)
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error getting mandates: {str(e)}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Failed to get mandates: {str(e)}"
|
|
)
|
|
|
|
@router.get("/{targetMandateId}", response_model=Mandate)
|
|
@limiter.limit("30/minute")
|
|
def get_mandate(
|
|
request: Request,
|
|
targetMandateId: str = Path(..., description="ID of the mandate"),
|
|
context: RequestContext = Depends(getRequestContext)
|
|
) -> Mandate:
|
|
"""
|
|
Get a specific mandate by ID.
|
|
|
|
Access:
|
|
- SysAdmin: any mandate
|
|
- MandateAdmin: only mandates where user has admin role
|
|
- Other: 403
|
|
"""
|
|
try:
|
|
mandateId = targetMandateId
|
|
# Check access
|
|
if not context.hasSysAdminRole:
|
|
adminMandateIds = _getAdminMandateIds(context)
|
|
if mandateId not in adminMandateIds:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Admin role required for this mandate"
|
|
)
|
|
|
|
appInterface = interfaceDbApp.getRootInterface()
|
|
mandate = appInterface.getMandate(mandateId)
|
|
|
|
if not mandate:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Mandate with ID {mandateId} not found"
|
|
)
|
|
|
|
return mandate
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error getting mandate {mandateId}: {str(e)}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Failed to get mandate: {str(e)}"
|
|
)
|
|
|
|
@router.post("/", response_model=Mandate)
|
|
@limiter.limit("10/minute")
|
|
def create_mandate(
|
|
request: Request,
|
|
mandateData: dict = Body(..., description="Mandate data with at least 'name' field"),
|
|
currentUser: User = Depends(requireSysAdminRole)
|
|
) -> Mandate:
|
|
"""
|
|
Create a new mandate.
|
|
MULTI-TENANT: SysAdmin-only.
|
|
"""
|
|
try:
|
|
logger.debug(f"Creating mandate with data: {mandateData}")
|
|
|
|
# Validate required fields
|
|
name = mandateData.get('name')
|
|
if not name or (isinstance(name, str) and name.strip() == ''):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Mandate name is required"
|
|
)
|
|
|
|
# Get optional fields with defaults
|
|
label = mandateData.get('label')
|
|
enabled = mandateData.get('enabled', True)
|
|
|
|
appInterface = interfaceDbApp.getRootInterface()
|
|
|
|
# Create mandate
|
|
newMandate = appInterface.createMandate(
|
|
name=name,
|
|
label=label,
|
|
enabled=enabled
|
|
)
|
|
|
|
if not newMandate:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail="Failed to create mandate"
|
|
)
|
|
|
|
try:
|
|
billingInterface = _getBillingRootInterface()
|
|
billingInterface.getOrCreateSettings(str(newMandate.id))
|
|
logger.debug(f"Ensured billing settings for new mandate {newMandate.id}")
|
|
except Exception as billingErr:
|
|
logger.warning(
|
|
f"Could not create default billing settings for mandate {newMandate.id}: {billingErr}"
|
|
)
|
|
|
|
logger.info(f"Mandate {newMandate.id} created by SysAdmin {currentUser.id}")
|
|
|
|
return newMandate
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error creating mandate: {str(e)}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Failed to create mandate: {str(e)}"
|
|
)
|
|
|
|
@router.put("/{mandateId}", response_model=Mandate)
|
|
@limiter.limit("10/minute")
|
|
def update_mandate(
|
|
request: Request,
|
|
mandateId: str = Path(..., description="ID of the mandate to update"),
|
|
mandateData: dict = Body(..., description="Mandate update data"),
|
|
currentUser: User = Depends(requireSysAdminRole)
|
|
) -> Mandate:
|
|
"""
|
|
Update an existing mandate.
|
|
MULTI-TENANT: SysAdmin-only.
|
|
"""
|
|
try:
|
|
logger.debug(f"Updating mandate {mandateId} with data: {mandateData}")
|
|
|
|
appInterface = interfaceDbApp.getRootInterface()
|
|
|
|
# Check if mandate exists
|
|
existingMandate = appInterface.getMandate(mandateId)
|
|
if not existingMandate:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Mandate with ID {mandateId} not found"
|
|
)
|
|
|
|
# Update mandate - mandateData is already a dict
|
|
updatedMandate = appInterface.updateMandate(mandateId, mandateData)
|
|
|
|
if not updatedMandate:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail="Failed to update mandate"
|
|
)
|
|
|
|
logger.info(f"Mandate {mandateId} updated by SysAdmin {currentUser.id}")
|
|
|
|
return updatedMandate
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error updating mandate {mandateId}: {str(e)}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Failed to update mandate: {str(e)}"
|
|
)
|
|
|
|
@router.delete("/{mandateId}", response_model=Dict[str, Any])
|
|
@limiter.limit("10/minute")
|
|
def delete_mandate(
|
|
request: Request,
|
|
mandateId: str = Path(..., description="ID of the mandate to delete"),
|
|
currentUser: User = Depends(requireSysAdminRole)
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Delete a mandate.
|
|
MULTI-TENANT: SysAdmin-only.
|
|
"""
|
|
try:
|
|
appInterface = interfaceDbApp.getRootInterface()
|
|
|
|
# Check if mandate exists
|
|
existingMandate = appInterface.getMandate(mandateId)
|
|
if not existingMandate:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Mandate {mandateId} not found"
|
|
)
|
|
|
|
# MULTI-TENANT: Delete all UserMandate entries for this mandate first
|
|
userMandates = appInterface.getUserMandatesByMandate(mandateId)
|
|
for um in userMandates:
|
|
appInterface.deleteUserMandate(str(um.userId), mandateId)
|
|
logger.info(f"Deleted {len(userMandates)} UserMandate entries for mandate {mandateId}")
|
|
|
|
# Delete mandate
|
|
try:
|
|
appInterface.deleteMandate(mandateId)
|
|
except ValueError as e:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=str(e)
|
|
)
|
|
|
|
logger.info(f"Mandate {mandateId} deleted by SysAdmin {currentUser.id}")
|
|
|
|
return {"message": f"Mandate {mandateId} deleted successfully"}
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error deleting mandate {mandateId}: {str(e)}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Failed to delete mandate: {str(e)}"
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# User Management within Mandates (Mandate-Admin)
|
|
# =============================================================================
|
|
|
|
@router.get("/{targetMandateId}/users")
|
|
@limiter.limit("60/minute")
|
|
def list_mandate_users(
|
|
request: Request,
|
|
targetMandateId: str = Path(..., description="ID of the mandate"),
|
|
pagination: Optional[str] = Query(None, description="JSON-encoded PaginationParams object"),
|
|
context: RequestContext = Depends(getRequestContext)
|
|
):
|
|
"""
|
|
List all users in a mandate with pagination, search, and sorting support.
|
|
|
|
Requires Mandate-Admin role or SysAdmin.
|
|
|
|
Args:
|
|
pagination: Optional pagination parameters (page, pageSize, search, filters, sort)
|
|
"""
|
|
# Check permission
|
|
if not _hasMandateAdminRole(context, targetMandateId) and not context.hasSysAdminRole:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Mandate-Admin role required"
|
|
)
|
|
|
|
try:
|
|
rootInterface = interfaceDbApp.getRootInterface()
|
|
|
|
# Verify mandate exists
|
|
mandate = rootInterface.getMandate(targetMandateId)
|
|
if not mandate:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Mandate {targetMandateId} not found"
|
|
)
|
|
|
|
# Parse pagination parameter
|
|
paginationParams = None
|
|
if pagination:
|
|
try:
|
|
paginationDict = json.loads(pagination)
|
|
if paginationDict:
|
|
# Normalize pagination dict
|
|
if 'sort' in paginationDict and paginationDict['sort']:
|
|
normalizedSort = []
|
|
for item in paginationDict['sort']:
|
|
if isinstance(item, dict):
|
|
normalizedSort.append(item)
|
|
paginationDict['sort'] = normalizedSort if normalizedSort else None
|
|
paginationParams = paginationDict
|
|
except (json.JSONDecodeError, ValueError) as e:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Invalid pagination parameter: {str(e)}"
|
|
)
|
|
|
|
# Get all UserMandate entries for this mandate
|
|
userMandates = rootInterface.getUserMandatesByMandate(targetMandateId)
|
|
|
|
result = []
|
|
for um in userMandates:
|
|
# Get user info
|
|
user = rootInterface.getUser(str(um.userId))
|
|
if not user:
|
|
continue
|
|
|
|
# Get roles for this membership
|
|
roleIds = rootInterface.getRoleIdsForUserMandate(str(um.id))
|
|
|
|
# Resolve role labels for display (only mandate-level roles, deduplicated)
|
|
roleLabels = []
|
|
filteredRoleIds = []
|
|
seenLabels = set()
|
|
for roleId in roleIds:
|
|
role = rootInterface.getRole(roleId)
|
|
if role:
|
|
# Skip feature-instance roles - they don't belong in mandate membership
|
|
if role.featureInstanceId:
|
|
continue
|
|
filteredRoleIds.append(roleId)
|
|
if role.roleLabel not in seenLabels:
|
|
roleLabels.append(role.roleLabel)
|
|
seenLabels.add(role.roleLabel)
|
|
else:
|
|
# Role not found - fail-safe: skip (no access)
|
|
logger.warning(f"Role {roleId} not found, skipping")
|
|
continue
|
|
|
|
result.append({
|
|
"id": str(um.id), # UserMandate ID as primary key
|
|
"userId": str(user.id),
|
|
"username": user.username,
|
|
"email": user.email,
|
|
"fullName": user.fullName,
|
|
"roleIds": filteredRoleIds,
|
|
"roleLabels": roleLabels,
|
|
"enabled": um.enabled
|
|
})
|
|
|
|
# Apply search, filtering, and sorting if pagination requested
|
|
if paginationParams:
|
|
# Apply search (if search term provided)
|
|
searchTerm = paginationParams.get('search', '').lower() if paginationParams.get('search') else ''
|
|
if searchTerm:
|
|
searchedResult = []
|
|
for item in result:
|
|
username = (item.get("username") or "").lower()
|
|
email = (item.get("email") or "").lower()
|
|
fullName = (item.get("fullName") or "").lower()
|
|
roleLabelsStr = " ".join(item.get("roleLabels") or []).lower()
|
|
|
|
if searchTerm in username or searchTerm in email or searchTerm in fullName or searchTerm in roleLabelsStr:
|
|
searchedResult.append(item)
|
|
result = searchedResult
|
|
|
|
# Apply filters (if filters provided)
|
|
filters = paginationParams.get('filters')
|
|
if filters:
|
|
for fieldName, filterValue in filters.items():
|
|
if filterValue is not None and filterValue != '':
|
|
filterValueLower = str(filterValue).lower()
|
|
result = [
|
|
item for item in result
|
|
if str(item.get(fieldName, '')).lower() == filterValueLower
|
|
]
|
|
|
|
# Apply sorting
|
|
sortFields = paginationParams.get('sort')
|
|
if sortFields:
|
|
for sortItem in reversed(sortFields):
|
|
field = sortItem.get('field')
|
|
direction = sortItem.get('direction', 'asc')
|
|
if field:
|
|
result = sorted(
|
|
result,
|
|
key=lambda x: str(x.get(field, '') or '').lower(),
|
|
reverse=(direction == 'desc')
|
|
)
|
|
|
|
# Apply pagination
|
|
page = paginationParams.get('page', 1)
|
|
pageSize = paginationParams.get('pageSize', 25)
|
|
totalItems = len(result)
|
|
totalPages = (totalItems + pageSize - 1) // pageSize if totalItems > 0 else 0
|
|
startIdx = (page - 1) * pageSize
|
|
endIdx = startIdx + pageSize
|
|
paginatedResult = result[startIdx:endIdx]
|
|
|
|
return {
|
|
"items": paginatedResult,
|
|
"pagination": {
|
|
"currentPage": page,
|
|
"pageSize": pageSize,
|
|
"totalItems": totalItems,
|
|
"totalPages": totalPages
|
|
}
|
|
}
|
|
|
|
# No pagination - return all users as list
|
|
return result
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error listing users for mandate {targetMandateId}: {e}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Failed to list users: {str(e)}"
|
|
)
|
|
|
|
|
|
@router.post("/{targetMandateId}/users", response_model=UserMandateResponse)
|
|
@limiter.limit("30/minute")
|
|
def add_user_to_mandate(
|
|
request: Request,
|
|
targetMandateId: str = Path(..., description="ID of the mandate"),
|
|
data: UserMandateCreate = Body(...),
|
|
context: RequestContext = Depends(getRequestContext)
|
|
) -> UserMandateResponse:
|
|
"""
|
|
Add a user to a mandate with specified roles.
|
|
|
|
Requires Mandate-Admin role (SysAdmin passes automatically).
|
|
|
|
Args:
|
|
targetMandateId: Target mandate ID
|
|
data: User ID and role IDs to assign
|
|
"""
|
|
# Check Mandate-Admin permission
|
|
if not _hasMandateAdminRole(context, targetMandateId):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Mandate-Admin role required to add users"
|
|
)
|
|
|
|
try:
|
|
rootInterface = interfaceDbApp.getRootInterface()
|
|
|
|
# 3. Verify mandate exists
|
|
mandate = rootInterface.getMandate(targetMandateId)
|
|
if not mandate:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Mandate {targetMandateId} not found"
|
|
)
|
|
|
|
# 4. Verify target user exists
|
|
targetUser = rootInterface.getUser(data.targetUserId)
|
|
if not targetUser:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"User {data.targetUserId} not found"
|
|
)
|
|
|
|
# 5. Check if user is already a member
|
|
existingMembership = rootInterface.getUserMandate(data.targetUserId, targetMandateId)
|
|
if existingMembership:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_409_CONFLICT,
|
|
detail=f"User {data.targetUserId} is already a member of this mandate"
|
|
)
|
|
|
|
# 6. Validate roles (must exist and belong to this mandate or be global)
|
|
for roleId in data.roleIds:
|
|
try:
|
|
rootInterface.validateRoleForMandate(roleId, targetMandateId)
|
|
except ValueError as e:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=str(e)
|
|
)
|
|
|
|
# 7. Create UserMandate
|
|
userMandate = rootInterface.createUserMandate(
|
|
userId=data.targetUserId,
|
|
mandateId=targetMandateId,
|
|
roleIds=data.roleIds
|
|
)
|
|
|
|
# 8. Audit - Log permission change with IP address
|
|
audit_logger.logPermissionChange(
|
|
userId=str(context.user.id),
|
|
mandateId=targetMandateId,
|
|
action="user_added_to_mandate",
|
|
targetUserId=data.targetUserId,
|
|
details=f"Roles assigned: {data.roleIds}",
|
|
resourceType="UserMandate",
|
|
resourceId=str(userMandate.id)
|
|
)
|
|
|
|
logger.info(
|
|
f"User {context.user.id} added user {data.targetUserId} to mandate {targetMandateId} "
|
|
f"with roles {data.roleIds}"
|
|
)
|
|
|
|
mname = _mandate_display_name(mandate)
|
|
create_access_change_notification(
|
|
data.targetUserId,
|
|
"Mandantenzugriff",
|
|
f"Sie wurden dem Mandanten «{mname}» hinzugefügt.",
|
|
"mandate_access",
|
|
targetMandateId,
|
|
)
|
|
|
|
return UserMandateResponse(
|
|
id=str(userMandate.id), # UserMandate ID as primary key
|
|
userId=data.targetUserId,
|
|
mandateId=targetMandateId,
|
|
roleIds=data.roleIds,
|
|
enabled=True
|
|
)
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error adding user to mandate: {e}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Failed to add user to mandate: {str(e)}"
|
|
)
|
|
|
|
|
|
@router.delete("/{targetMandateId}/users/{targetUserId}", response_model=Dict[str, str])
|
|
@limiter.limit("30/minute")
|
|
def remove_user_from_mandate(
|
|
request: Request,
|
|
targetMandateId: str = Path(..., description="ID of the mandate"),
|
|
targetUserId: str = Path(..., description="ID of the user to remove"),
|
|
context: RequestContext = Depends(getRequestContext)
|
|
) -> Dict[str, str]:
|
|
"""
|
|
Remove a user from a mandate.
|
|
|
|
Requires Mandate-Admin role.
|
|
Cannot remove the last admin from a mandate (orphan prevention).
|
|
|
|
Args:
|
|
targetMandateId: Target mandate ID
|
|
targetUserId: User ID to remove
|
|
"""
|
|
# Check Mandate-Admin permission
|
|
if not _hasMandateAdminRole(context, targetMandateId):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Mandate-Admin role required"
|
|
)
|
|
|
|
try:
|
|
rootInterface = interfaceDbApp.getRootInterface()
|
|
|
|
# Verify mandate exists
|
|
mandate = rootInterface.getMandate(targetMandateId)
|
|
if not mandate:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Mandate {targetMandateId} not found"
|
|
)
|
|
|
|
# Get user's membership
|
|
membership = rootInterface.getUserMandate(targetUserId, targetMandateId)
|
|
if not membership:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"User {targetUserId} is not a member of this mandate"
|
|
)
|
|
|
|
# Check if this is the last admin (orphan prevention)
|
|
if _isLastMandateAdmin(rootInterface, targetMandateId, targetUserId):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Cannot remove the last admin from a mandate. Assign another admin first."
|
|
)
|
|
|
|
# Delete UserMandate (CASCADE will delete UserMandateRole entries)
|
|
rootInterface.deleteUserMandate(targetUserId, targetMandateId)
|
|
|
|
# Audit - Log permission change
|
|
audit_logger.logPermissionChange(
|
|
userId=str(context.user.id),
|
|
mandateId=targetMandateId,
|
|
action="user_removed_from_mandate",
|
|
targetUserId=targetUserId,
|
|
details="User removed from mandate",
|
|
resourceType="UserMandate"
|
|
)
|
|
|
|
logger.info(f"User {context.user.id} removed user {targetUserId} from mandate {targetMandateId}")
|
|
|
|
mname = _mandate_display_name(mandate)
|
|
create_access_change_notification(
|
|
targetUserId,
|
|
"Mandantenzugriff",
|
|
f"Sie wurden aus dem Mandanten «{mname}» entfernt.",
|
|
"mandate_access",
|
|
targetMandateId,
|
|
)
|
|
|
|
return {"message": "User removed from mandate", "userId": targetUserId}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error removing user from mandate: {e}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Failed to remove user from mandate: {str(e)}"
|
|
)
|
|
|
|
|
|
@router.put("/{targetMandateId}/users/{targetUserId}/roles", response_model=UserMandateResponse)
|
|
@limiter.limit("30/minute")
|
|
def update_user_roles_in_mandate(
|
|
request: Request,
|
|
targetMandateId: str = Path(..., description="ID of the mandate"),
|
|
targetUserId: str = Path(..., description="ID of the user"),
|
|
roleIds: List[str] = Body(..., description="New role IDs to assign"),
|
|
context: RequestContext = Depends(getRequestContext)
|
|
) -> UserMandateResponse:
|
|
"""
|
|
Update a user's roles within a mandate.
|
|
|
|
Replaces all existing roles with the new set.
|
|
Requires Mandate-Admin role.
|
|
|
|
Args:
|
|
targetMandateId: Target mandate ID
|
|
targetUserId: User ID to update
|
|
roleIds: New set of role IDs
|
|
"""
|
|
# Check Mandate-Admin permission
|
|
if not _hasMandateAdminRole(context, targetMandateId):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Mandate-Admin role required"
|
|
)
|
|
|
|
try:
|
|
rootInterface = interfaceDbApp.getRootInterface()
|
|
|
|
# Get user's membership
|
|
membership = rootInterface.getUserMandate(targetUserId, targetMandateId)
|
|
if not membership:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"User {targetUserId} is not a member of this mandate"
|
|
)
|
|
|
|
# Validate new roles
|
|
for roleId in roleIds:
|
|
try:
|
|
rootInterface.validateRoleForMandate(roleId, targetMandateId)
|
|
except ValueError as e:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=str(e)
|
|
)
|
|
|
|
# Check if removing admin role would leave mandate without admins
|
|
currentRoleIds = rootInterface.getRoleIdsForUserMandate(str(membership.id))
|
|
isCurrentlyAdmin = _hasAdminRoleInList(rootInterface, currentRoleIds, targetMandateId)
|
|
willBeAdmin = _hasAdminRoleInList(rootInterface, roleIds, targetMandateId)
|
|
|
|
if isCurrentlyAdmin and not willBeAdmin:
|
|
if _isLastMandateAdmin(rootInterface, targetMandateId, targetUserId):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Cannot remove admin role from the last admin. Assign another admin first."
|
|
)
|
|
|
|
# Remove existing role assignments
|
|
rootInterface.deleteUserMandateRoles(str(membership.id))
|
|
|
|
# Add new role assignments
|
|
for roleId in roleIds:
|
|
rootInterface.addRoleToUserMandate(str(membership.id), roleId)
|
|
|
|
# Audit - Log role assignment change
|
|
audit_logger.logPermissionChange(
|
|
userId=str(context.user.id),
|
|
mandateId=targetMandateId,
|
|
action="role_assigned",
|
|
targetUserId=targetUserId,
|
|
details=f"New roles: {roleIds}",
|
|
resourceType="UserMandateRole",
|
|
resourceId=str(membership.id)
|
|
)
|
|
|
|
logger.info(
|
|
f"User {context.user.id} updated roles for user {targetUserId} "
|
|
f"in mandate {targetMandateId} to {roleIds}"
|
|
)
|
|
|
|
mandate_meta = rootInterface.getMandate(targetMandateId)
|
|
mname = _mandate_display_name(mandate_meta)
|
|
create_access_change_notification(
|
|
targetUserId,
|
|
"Mandantenrollen geändert",
|
|
f"Ihre Rollen im Mandanten «{mname}» wurden angepasst.",
|
|
"mandate_access",
|
|
targetMandateId,
|
|
)
|
|
|
|
return UserMandateResponse(
|
|
id=str(membership.id), # UserMandate ID as primary key
|
|
userId=targetUserId,
|
|
mandateId=targetMandateId,
|
|
roleIds=roleIds,
|
|
enabled=membership.enabled
|
|
)
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error updating user roles in mandate: {e}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Failed to update user roles: {str(e)}"
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# Helper Functions
|
|
# =============================================================================
|
|
|
|
def _mandate_display_name(mandate: Any) -> str:
|
|
"""Human-readable mandate label for notifications."""
|
|
if mandate is None:
|
|
return ""
|
|
if isinstance(mandate, dict):
|
|
if mandate.get("label"):
|
|
return str(mandate["label"])
|
|
name = mandate.get("name")
|
|
if isinstance(name, dict):
|
|
return str(name.get("de") or name.get("en") or (next(iter(name.values()), "") if name else ""))
|
|
return str(name or mandate.get("id", ""))
|
|
label = getattr(mandate, "label", None)
|
|
if label:
|
|
return str(label)
|
|
name = getattr(mandate, "name", None)
|
|
if isinstance(name, dict):
|
|
return str(name.get("de") or name.get("en") or (next(iter(name.values()), "") if name else ""))
|
|
if name is not None:
|
|
return str(name)
|
|
return str(getattr(mandate, "id", ""))
|
|
|
|
|
|
def _getAdminMandateIds(context: RequestContext) -> List[str]:
|
|
"""
|
|
Get list of mandate IDs where the user has the admin role.
|
|
Returns empty list if user has no admin roles.
|
|
"""
|
|
mandateIds = []
|
|
try:
|
|
rootInterface = interfaceDbApp.getRootInterface()
|
|
userId = str(context.user.id)
|
|
userMandates = rootInterface.getUserMandates(userId)
|
|
for um in userMandates:
|
|
if not getattr(um, 'enabled', True):
|
|
continue
|
|
umId = getattr(um, 'id', None)
|
|
mandateId = getattr(um, 'mandateId', None)
|
|
if not umId or not mandateId:
|
|
continue
|
|
roleIds = rootInterface.getRoleIdsForUserMandate(str(umId))
|
|
for roleId in roleIds:
|
|
role = rootInterface.getRole(roleId)
|
|
if role and role.roleLabel == "admin" and not role.featureInstanceId:
|
|
mandateIds.append(str(mandateId))
|
|
break
|
|
except Exception as e:
|
|
logger.error(f"Error getting admin mandate IDs: {e}")
|
|
return mandateIds
|
|
|
|
|
|
def _hasMandateAdminRole(context: RequestContext, mandateId: str) -> bool:
|
|
"""
|
|
Check if the user has mandate admin role for the specified mandate.
|
|
Works with or without X-Mandate-Id header (admin pages don't send it).
|
|
"""
|
|
if context.hasSysAdminRole:
|
|
return True
|
|
|
|
# If mandate context matches, check roles from context directly
|
|
if context.mandateId and str(context.mandateId) == str(mandateId):
|
|
if context.roleIds:
|
|
try:
|
|
rootInterface = interfaceDbApp.getRootInterface()
|
|
for roleId in context.roleIds:
|
|
role = rootInterface.getRole(roleId)
|
|
if role and role.roleLabel == "admin" and not role.featureInstanceId:
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Error checking mandate admin role: {e}")
|
|
return False
|
|
|
|
# No mandate context (admin pages) — check via user's mandate memberships
|
|
adminMandateIds = _getAdminMandateIds(context)
|
|
return str(mandateId) in adminMandateIds
|
|
|
|
|
|
def _isLastMandateAdmin(interface, mandateId: str, excludeUserId: str) -> bool:
|
|
"""
|
|
Check if excluding this user would leave the mandate without any admins.
|
|
"""
|
|
try:
|
|
# Get all UserMandates for this mandate (Pydantic models)
|
|
allMandates = interface.getUserMandatesByMandate(mandateId)
|
|
userMandates = [um for um in allMandates if um.enabled]
|
|
|
|
adminCount = 0
|
|
for um in userMandates:
|
|
if str(um.userId) == str(excludeUserId):
|
|
continue
|
|
|
|
# Check if this user has admin role
|
|
roleIds = interface.getRoleIdsForUserMandate(str(um.id))
|
|
if _hasAdminRoleInList(interface, roleIds, mandateId):
|
|
adminCount += 1
|
|
|
|
return adminCount == 0
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error checking last admin: {e}")
|
|
return True # Fail-safe: assume they're the last admin (prevents deletion)
|
|
|
|
|
|
def _hasAdminRoleInList(interface, roleIds: List[str], mandateId: str) -> bool:
|
|
"""
|
|
Check if any of the role IDs is an admin role for the mandate.
|
|
"""
|
|
for roleId in roleIds:
|
|
role = interface.getRole(roleId)
|
|
if role:
|
|
# Admin role at mandate level (global or mandate-specific, not feature-instance)
|
|
if role.roleLabel == "admin" and not role.featureInstanceId:
|
|
if not role.mandateId or str(role.mandateId) == str(mandateId):
|
|
return True
|
|
return False
|