gateway/modules/routes/routeInvitations.py
2026-03-30 23:03:36 +02:00

928 lines
37 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Invitation routes for the backend API.
Implements token-based user invitations for self-service onboarding.
Multi-Tenant Design:
- Invitations are mandate-scoped (Mandate Admin creates them)
- Tokens are secure, time-limited, and optionally use-limited
- Users accept invitations to join mandates/features with predefined roles
"""
from fastapi import APIRouter, HTTPException, Depends, Request, Query
from typing import List, Dict, Any, Optional
from fastapi import status
import logging
from pydantic import BaseModel, Field, model_validator
from modules.auth import limiter, getRequestContext, RequestContext, getCurrentUser
from modules.datamodels.datamodelUam import User
from modules.datamodels.datamodelInvitation import Invitation
from modules.interfaces.interfaceDbApp import getRootInterface
from modules.shared.timeUtils import getUtcTimestamp
logger = logging.getLogger(__name__)
router = APIRouter(
prefix="/api/invitations",
tags=["Invitations"],
responses={404: {"description": "Not found"}}
)
# =============================================================================
# Request/Response Models
# =============================================================================
class InvitationCreate(BaseModel):
"""Request model for creating an invitation.
Supports two modes:
- Mandate-level: featureInstanceId omitted, roleIds are mandate-level roles (user, viewer, admin)
- Feature-instance-level: featureInstanceId required, roleIds are instance-level roles
Email is required for new users; targetUsername is optional.
At least one of email or targetUsername must be provided.
"""
targetUsername: Optional[str] = Field(None, description="Username of the user to invite (must match on acceptance)")
email: Optional[str] = Field(None, description="Email address to send invitation link (required for new users)")
featureInstanceId: Optional[str] = Field(None, description="Feature instance to grant access to (optional for mandate-level invitations)")
roleIds: List[str] = Field(..., description="Role IDs: mandate-level (user, viewer, admin) or instance-level")
frontendUrl: str = Field(..., description="Frontend URL for building the invite link (provided by frontend)")
expiresInHours: int = Field(
72,
ge=1,
le=720, # Max 30 days
description="Hours until invitation expires"
)
maxUses: int = Field(
1,
ge=1,
le=100,
description="Maximum number of times this invitation can be used"
)
@model_validator(mode="after")
def require_email_or_username(self):
email_val = (self.email or "").strip()
username_val = (self.targetUsername or "").strip()
if not email_val and not username_val:
raise ValueError("At least one of email or targetUsername must be provided")
return self
class InvitationResponse(BaseModel):
"""Response model for invitation"""
id: str
token: str
mandateId: str
featureInstanceId: Optional[str]
roleIds: List[str]
targetUsername: Optional[str]
email: Optional[str]
createdBy: str
createdAt: float
expiresAt: float
usedBy: Optional[str]
usedAt: Optional[float]
revokedAt: Optional[float]
maxUses: int
currentUses: int
inviteUrl: str # Full URL for the invitation
emailSent: bool = False # Whether invitation email was sent
class InvitationValidation(BaseModel):
"""Response model for invitation validation"""
valid: bool
reason: Optional[str]
mandateId: Optional[str]
mandateName: Optional[str] = None
featureInstanceId: Optional[str]
featureInstanceName: Optional[str] = None
roleIds: List[str]
roleLabels: List[str] = []
targetUsername: Optional[str] = None
email: Optional[str] = None
# =============================================================================
# Invitation CRUD Endpoints
# =============================================================================
@router.post("/", response_model=InvitationResponse)
@limiter.limit("30/minute")
def create_invitation(
request: Request,
data: InvitationCreate,
context: RequestContext = Depends(getRequestContext)
) -> InvitationResponse:
"""
Create a new invitation for a feature instance.
Requires SysAdmin or Mandate-Admin role. Creates a secure token that can be shared
with users to join a feature instance with predefined roles.
The mandateId is derived from the feature instance automatically.
Args:
data: Invitation creation data (featureInstanceId + roleIds required)
"""
try:
rootInterface = getRootInterface()
# Determine mandateId and validate
if data.featureInstanceId:
# Feature-instance-level invitation
instance = rootInterface.getFeatureInstance(data.featureInstanceId)
if not instance:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Feature instance '{data.featureInstanceId}' not found"
)
mandateId = str(instance.mandateId)
# Validate roles belong to this feature instance
for roleId in data.roleIds:
role = rootInterface.getRole(roleId)
if not role:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Role '{roleId}' not found"
)
if str(role.featureInstanceId or "") != data.featureInstanceId:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Role '{roleId}' does not belong to feature instance '{data.featureInstanceId}'"
)
else:
# Mandate-level invitation (user, viewer, admin roles)
if not context.mandateId:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="X-Mandate-Id header is required for mandate-level invitations"
)
mandateId = str(context.mandateId)
# Validate roles are mandate-level (no featureInstanceId)
for roleId in data.roleIds:
role = rootInterface.getRole(roleId)
if not role:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Role '{roleId}' not found"
)
if role.featureInstanceId is not None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Role '{roleId}' is an instance-level role; use mandate-level roles (user, viewer, admin) for mandate invitations"
)
if str(role.mandateId or "") != mandateId:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Role '{roleId}' does not belong to mandate"
)
# Check admin permission
if not context.hasSysAdminRole:
if str(context.mandateId) != mandateId:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Access denied to this mandate"
)
if not _hasMandateAdminRole(context):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Mandate-Admin role required to create invitations"
)
# Calculate expiration time
currentTime = getUtcTimestamp()
expiresAt = currentTime + (data.expiresInHours * 3600)
# Create invitation (targetUsername optional when email provided)
target_username_val = (data.targetUsername or "").strip() or None
email_val = (data.email or "").strip() or None
# Silent duplicate check: only for existing users (identified by username)
# Username is unique; email can exist multiple times, so we do NOT look up by email
target_user_id = None
if target_username_val:
u = rootInterface.getUserByUsername(target_username_val)
if u:
target_user_id = str(u.id)
if target_user_id:
if data.featureInstanceId:
existing_access = rootInterface.getFeatureAccess(target_user_id, data.featureInstanceId)
if existing_access:
logger.info(f"Invitation skipped: user {target_user_id} already has access to feature instance {data.featureInstanceId}")
return InvitationResponse(
id="duplicate-skipped",
token="",
mandateId=mandateId,
featureInstanceId=data.featureInstanceId,
roleIds=data.roleIds,
targetUsername=target_username_val,
email=email_val,
createdBy=str(context.user.id),
createdAt=currentTime,
expiresAt=expiresAt,
usedBy=None,
usedAt=None,
revokedAt=None,
maxUses=data.maxUses,
currentUses=0,
inviteUrl="",
emailSent=False
)
else:
existing_membership = rootInterface.getUserMandate(target_user_id, mandateId)
if existing_membership:
logger.info(f"Invitation skipped: user {target_user_id} already member of mandate {mandateId}")
return InvitationResponse(
id="duplicate-skipped",
token="",
mandateId=mandateId,
featureInstanceId=None,
roleIds=data.roleIds,
targetUsername=target_username_val,
email=email_val,
createdBy=str(context.user.id),
createdAt=currentTime,
expiresAt=expiresAt,
usedBy=None,
usedAt=None,
revokedAt=None,
maxUses=data.maxUses,
currentUses=0,
inviteUrl="",
emailSent=False
)
invitation = Invitation(
mandateId=mandateId,
featureInstanceId=data.featureInstanceId or None,
roleIds=data.roleIds,
targetUsername=target_username_val,
email=email_val,
createdBy=str(context.user.id),
expiresAt=expiresAt,
maxUses=data.maxUses
)
createdRecord = rootInterface.db.recordCreate(Invitation, invitation.model_dump())
if not createdRecord:
raise ValueError("Failed to create invitation record")
# Build invite URL using frontend URL provided by the caller
baseUrl = data.frontendUrl.rstrip("/")
inviteUrl = f"{baseUrl}/invite/{invitation.token}"
# Send email if email address is provided
emailSent = False
if email_val:
try:
from modules.connectors.connectorMessagingEmail import ConnectorMessagingEmail
# Get mandate and optionally instance for the email
mandate = rootInterface.getMandate(str(mandateId))
mandateName = (mandate.label or mandate.name) if mandate else "PowerOn"
# Only use username for existing users; new users set it when they register
display_name = target_username_val if target_username_val else "Neuer/e Nutzer/in"
instance = rootInterface.getFeatureInstance(data.featureInstanceId) if data.featureInstanceId else None
instance_label = (instance.label or instance.featureCode) if instance else None
emailConnector = ConnectorMessagingEmail()
if instance_label:
emailSubject = f"Einladung zur Feature-Instanz {instance_label}"
invite_desc = f"der Feature-Instanz «{instance_label}» (Mandant: {mandateName}) beizutreten"
else:
emailSubject = f"Einladung zu {mandateName}"
invite_desc = f"dem Mandanten «{mandateName}» beizutreten"
from modules.routes.routeSecurityLocal import _buildAuthEmailHtml
emailBody = _buildAuthEmailHtml(
greeting=f"Hallo {display_name}",
bodyLines=[
f"Sie wurden eingeladen, {invite_desc}.",
"",
"Klicken Sie auf die Schaltfläche, um die Einladung anzunehmen:",
],
buttonText="Einladung annehmen",
buttonUrl=inviteUrl,
footerText=f"Diese Einladung ist {data.expiresInHours} Stunden gültig.",
)
emailConnector.send(
recipient=email_val,
subject=emailSubject,
message=emailBody
)
emailSent = True
logger.info(f"Invitation email sent to {email_val} for user {target_username_val or 'email-only'}")
except Exception as emailError:
logger.warning(f"Failed to send invitation email to {email_val}: {emailError}")
# Don't fail the invitation creation if email fails
# Update the invitation record with emailSent status
if emailSent:
rootInterface.db.recordModify(
Invitation,
createdRecord.get("id"),
{"emailSent": True}
)
createdRecord["emailSent"] = True
# If the target user already exists (identified by username), create an in-app notification
# Only look up by username - email is not used for "existing user" since new users are invited by email
try:
existingUser = None
if target_username_val:
existingUser = rootInterface.getUserByUsername(target_username_val)
if existingUser:
from modules.routes.routeNotifications import createInvitationNotification
mandate = rootInterface.getMandate(str(mandateId))
mandateName = (mandate.label or mandate.name) if mandate else "PowerOn"
inviterName = context.user.fullName or context.user.username
instance = rootInterface.getFeatureInstance(data.featureInstanceId) if data.featureInstanceId else None
instance_label = (instance.label or instance.featureCode) if instance else None
createInvitationNotification(
userId=str(existingUser.id),
invitationId=str(createdRecord.get("id")),
mandateName=mandateName,
inviterName=inviterName,
featureInstanceName=instance_label
)
logger.info(f"Created notification for existing user {target_username_val or email_val}")
except Exception as notifError:
logger.warning(f"Failed to create notification for user {target_username_val or email_val}: {notifError}")
# Don't fail the invitation if notification fails
target_desc = f"feature instance {data.featureInstanceId}" if data.featureInstanceId else f"mandate {mandateId}"
logger.info(
f"User {context.user.id} created invitation for user {target_username_val or email_val} "
f"to {target_desc}, expires in {data.expiresInHours}h"
)
# Invitation extends PowerOnModel: recordCreate/_saveRecord set sysCreatedAt and sysCreatedBy automatically.
# API response uses createdAt/createdBy; map from the system fields (no separate createdAt column on model).
return InvitationResponse(
id=str(createdRecord.get("id")),
token=str(createdRecord.get("token")),
mandateId=str(createdRecord.get("mandateId")),
featureInstanceId=createdRecord.get("featureInstanceId"),
roleIds=createdRecord.get("roleIds", []),
targetUsername=createdRecord.get("targetUsername"),
email=createdRecord.get("email"),
createdBy=str(createdRecord["sysCreatedBy"]),
createdAt=float(createdRecord["sysCreatedAt"]),
expiresAt=createdRecord.get("expiresAt"),
usedBy=createdRecord.get("usedBy"),
usedAt=createdRecord.get("usedAt"),
revokedAt=createdRecord.get("revokedAt"),
maxUses=createdRecord.get("maxUses", 1),
currentUses=createdRecord.get("currentUses", 0),
inviteUrl=inviteUrl,
emailSent=emailSent
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error creating invitation: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to create invitation: {str(e)}"
)
@router.get("/", response_model=List[Dict[str, Any]])
@limiter.limit("60/minute")
def list_invitations(
request: Request,
frontendUrl: str = Query(..., description="Frontend URL for building invite links (provided by frontend)"),
includeUsed: bool = Query(False, description="Include already used invitations"),
includeExpired: bool = Query(False, description="Include expired invitations"),
context: RequestContext = Depends(getRequestContext)
) -> List[Dict[str, Any]]:
"""
List invitations for the current mandate.
Requires Mandate-Admin role. Returns all invitations created for this mandate.
NOTE: Cannot use db.getRecordsetPaginated() because:
- Computed status fields (isExpired, isUsedUp) are derived in-memory
- Filtering by revoked/used/expired requires post-fetch logic
- Invitation volume per mandate is typically low (< 100)
When this endpoint needs FormGeneratorTable pagination, add PaginatedResponse
support with in-memory slicing (similar to routeDataConnections).
Args:
includeUsed: Include invitations that have reached maxUses
includeExpired: Include expired invitations
"""
if not context.mandateId:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="X-Mandate-Id header is required"
)
# Check mandate admin permission
if not _hasMandateAdminRole(context):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Mandate-Admin role required to list invitations"
)
try:
rootInterface = getRootInterface()
# Get all invitations for this mandate (Pydantic models)
allInvitations = rootInterface.getInvitationsByMandate(str(context.mandateId))
currentTime = getUtcTimestamp()
result = []
for inv in allInvitations:
# Skip revoked invitations
if inv.revokedAt:
continue
# Filter by usage
currentUses = inv.currentUses or 0
maxUses = inv.maxUses or 1
if not includeUsed and currentUses >= maxUses:
continue
# Filter by expiration
expiresAt = inv.expiresAt or 0
if not includeExpired and expiresAt < currentTime:
continue
# Build invite URL using frontend URL provided by the caller
baseUrl = frontendUrl.rstrip("/")
inviteUrl = f"{baseUrl}/invite/{inv.token}"
result.append({
**inv.model_dump(),
"inviteUrl": inviteUrl,
"isExpired": expiresAt < currentTime,
"isUsedUp": currentUses >= maxUses
})
return result
except HTTPException:
raise
except Exception as e:
logger.error(f"Error listing invitations: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to list invitations: {str(e)}"
)
@router.get("/filter-values")
@limiter.limit("60/minute")
def get_invitation_filter_values(
request: Request,
column: str = Query(..., description="Column key"),
pagination: Optional[str] = Query(None, description="JSON-encoded current filters"),
frontendUrl: str = Query("", description="Frontend URL for building invite links"),
includeUsed: bool = Query(False, description="Include already used invitations"),
includeExpired: bool = Query(False, description="Include expired invitations"),
context: RequestContext = Depends(getRequestContext)
) -> list:
"""Return distinct filter values for a column in invitations."""
if not context.mandateId:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="X-Mandate-Id header is required")
if not _hasMandateAdminRole(context):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Mandate-Admin role required")
try:
from modules.routes.routeDataUsers import _handleFilterValuesRequest
rootInterface = getRootInterface()
allInvitations = rootInterface.getInvitationsByMandate(str(context.mandateId))
currentTime = getUtcTimestamp()
result = []
for inv in allInvitations:
if inv.revokedAt:
continue
currentUses = inv.currentUses or 0
maxUses = inv.maxUses or 1
if not includeUsed and currentUses >= maxUses:
continue
expiresAt = inv.expiresAt or 0
if not includeExpired and expiresAt < currentTime:
continue
baseUrl = frontendUrl.rstrip("/") if frontendUrl else ""
inviteUrl = f"{baseUrl}/invite/{inv.token}" if baseUrl else ""
result.append({
**inv.model_dump(),
"inviteUrl": inviteUrl,
"isExpired": expiresAt < currentTime,
"isUsedUp": currentUses >= maxUses
})
return _handleFilterValuesRequest(result, column, pagination)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting filter values for invitations: {e}")
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
@router.delete("/{invitationId}", response_model=Dict[str, str])
@limiter.limit("30/minute")
def revoke_invitation(
request: Request,
invitationId: str,
context: RequestContext = Depends(getRequestContext)
) -> Dict[str, str]:
"""
Revoke an invitation.
Requires Mandate-Admin role. Revoked invitations cannot be used.
Args:
invitationId: Invitation ID
"""
if not context.mandateId:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="X-Mandate-Id header is required"
)
# Check mandate admin permission
if not _hasMandateAdminRole(context):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Mandate-Admin role required to revoke invitations"
)
try:
rootInterface = getRootInterface()
# Get invitation (Pydantic model)
invitation = rootInterface.getInvitation(invitationId)
if not invitation:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Invitation '{invitationId}' not found"
)
# Verify mandate access
if str(invitation.mandateId) != str(context.mandateId):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Access denied to this invitation"
)
# Already revoked?
if invitation.revokedAt:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invitation is already revoked"
)
# Revoke invitation
rootInterface.db.recordModify(
Invitation,
invitationId,
{"revokedAt": getUtcTimestamp()}
)
logger.info(f"User {context.user.id} revoked invitation {invitationId}")
return {"message": "Invitation revoked", "invitationId": invitationId}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error revoking invitation {invitationId}: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to revoke invitation: {str(e)}"
)
# =============================================================================
# Public Invitation Endpoints (No auth required for validation)
# =============================================================================
@router.get("/validate/{token}", response_model=InvitationValidation)
@limiter.limit("30/minute")
def validate_invitation(
request: Request,
token: str
) -> InvitationValidation:
"""
Validate an invitation token (public endpoint).
Used by the frontend to check if an invitation is valid before
showing the registration/acceptance form.
Args:
token: Invitation token
"""
try:
rootInterface = getRootInterface()
# Find invitation by token (Pydantic model)
invitation = rootInterface.getInvitationByToken(token)
if not invitation:
return InvitationValidation(
valid=False,
reason="Invitation not found",
mandateId=None,
featureInstanceId=None,
roleIds=[]
)
# Check if revoked
if invitation.revokedAt:
return InvitationValidation(
valid=False,
reason="Invitation has been revoked",
mandateId=None,
featureInstanceId=None,
roleIds=[]
)
# Check if expired
currentTime = getUtcTimestamp()
expiresAt = invitation.expiresAt or 0
if expiresAt < currentTime:
return InvitationValidation(
valid=False,
reason="Invitation has expired",
mandateId=None,
featureInstanceId=None,
roleIds=[]
)
# Check if used up
currentUses = invitation.currentUses or 0
maxUses = invitation.maxUses or 1
if currentUses >= maxUses:
return InvitationValidation(
valid=False,
reason="Invitation has reached maximum uses",
mandateId=None,
featureInstanceId=None,
roleIds=[]
)
# Get additional info for display
mandateId = invitation.mandateId
mandateName = None
roleLabels = []
targetUsername = invitation.targetUsername
# Get mandate name
mandate = rootInterface.getMandate(str(mandateId)) if mandateId else None
if mandate:
mandateName = mandate.label or mandate.name
# Get role names
roleIds = invitation.roleIds or []
for roleId in roleIds:
role = rootInterface.getRole(roleId)
if role:
roleLabels.append(role.roleLabel)
# Get feature instance label for display
featureInstanceName = None
if invitation.featureInstanceId:
instance = rootInterface.getFeatureInstance(str(invitation.featureInstanceId))
if instance:
featureInstanceName = instance.label or instance.featureCode
return InvitationValidation(
valid=True,
reason=None,
mandateId=str(mandateId) if mandateId else None,
mandateName=mandateName,
featureInstanceId=str(invitation.featureInstanceId) if invitation.featureInstanceId else None,
featureInstanceName=featureInstanceName,
roleIds=roleIds,
roleLabels=roleLabels,
targetUsername=targetUsername,
email=invitation.email
)
except Exception as e:
logger.error(f"Error validating invitation token: {e}")
return InvitationValidation(
valid=False,
reason="Validation error",
mandateId=None,
featureInstanceId=None,
roleIds=[]
)
@router.post("/accept/{token}", response_model=Dict[str, Any])
@limiter.limit("10/minute")
def accept_invitation(
request: Request,
token: str,
currentUser: User = Depends(getCurrentUser)
) -> Dict[str, Any]:
"""
Accept an invitation (requires authentication).
The authenticated user joins the mandate with the predefined roles.
If the user is already a member, their roles are updated.
Args:
token: Invitation token
"""
try:
rootInterface = getRootInterface()
# Find invitation by token (Pydantic model)
invitation = rootInterface.getInvitationByToken(token)
if not invitation:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Invitation not found"
)
# Validate invitation
if invitation.revokedAt:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invitation has been revoked"
)
currentTime = getUtcTimestamp()
expiresAt = invitation.expiresAt or 0
if expiresAt < currentTime:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invitation has expired"
)
currentUses = invitation.currentUses or 0
maxUses = invitation.maxUses or 1
if currentUses >= maxUses:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invitation has reached maximum uses"
)
# Validate user matches - invitation is bound by username or email
targetUsername = invitation.targetUsername
invitationEmail = (invitation.email or "").strip().lower() if invitation.email else None
currentUserEmail = (currentUser.email or "").strip().lower() if getattr(currentUser, "email", None) else None
if targetUsername:
if currentUser.username != targetUsername:
logger.warning(
f"User {currentUser.username} tried to accept invitation meant for {targetUsername}"
)
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Diese Einladung ist für Benutzer '{targetUsername}' bestimmt"
)
elif invitationEmail:
if not currentUserEmail or currentUserEmail != invitationEmail:
logger.warning(
f"User {currentUser.username} (email: {currentUserEmail}) tried to accept invitation meant for {invitationEmail}"
)
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Diese Einladung ist für die E-Mail-Adresse '{invitationEmail}' bestimmt"
)
else:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invitation has no target user or email"
)
mandateId = str(invitation.mandateId) if invitation.mandateId else None
roleIds = invitation.roleIds or []
featureInstanceId = str(invitation.featureInstanceId) if invitation.featureInstanceId else None
# Grant feature access (creates FeatureAccess + auto-assigns mandate 'user' role via Regel 4)
featureAccessId = None
if featureInstanceId:
existingAccess = rootInterface.getFeatureAccess(str(currentUser.id), featureInstanceId)
if existingAccess:
# Update existing access with additional roles
featureAccessId = str(existingAccess.id)
for roleId in roleIds:
try:
rootInterface.addRoleToFeatureAccess(str(existingAccess.id), roleId)
except Exception:
pass # Role might already be assigned
message = "Roles updated for existing feature access"
else:
# Create feature access with instance-level roles
# This auto-creates UserMandate with 'user' role (Regel 4)
featureAccess = rootInterface.createFeatureAccess(
userId=str(currentUser.id),
featureInstanceId=featureInstanceId,
roleIds=roleIds
)
featureAccessId = str(featureAccess.id)
message = "Successfully joined feature instance"
else:
# Legacy: mandate-only invitation (no feature instance)
existingMembership = rootInterface.getUserMandate(str(currentUser.id), mandateId)
if existingMembership:
for roleId in roleIds:
try:
rootInterface.addRoleToUserMandate(str(existingMembership.id), roleId)
except Exception:
pass
message = "Roles updated for existing membership"
else:
rootInterface.createUserMandate(
userId=str(currentUser.id),
mandateId=mandateId,
roleIds=roleIds
)
message = "Successfully joined mandate"
# Get userMandateId for response
userMandate = rootInterface.getUserMandate(str(currentUser.id), mandateId)
userMandateId = str(userMandate.id) if userMandate else None
# Update invitation usage
rootInterface.db.recordModify(
Invitation,
invitation.id,
{
"currentUses": (invitation.currentUses or 0) + 1,
"usedBy": str(currentUser.id),
"usedAt": currentTime
}
)
logger.info(
f"User {currentUser.id} accepted invitation {invitation.id} "
f"for mandate {mandateId}"
)
return {
"message": message,
"mandateId": mandateId,
"userMandateId": userMandateId,
"featureAccessId": featureAccessId,
"roleIds": roleIds
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error accepting invitation: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to accept invitation: {str(e)}"
)
# =============================================================================
# Helper Functions
# =============================================================================
def _hasMandateAdminRole(context: RequestContext) -> bool:
"""
Check if the user has mandate admin role in the current context.
"""
if context.hasSysAdminRole:
return True
if not context.roleIds:
return False
try:
rootInterface = getRootInterface()
for roleId in context.roleIds:
role = rootInterface.getRole(roleId)
if role:
# Admin role at mandate level (not feature-instance level)
if role.roleLabel == "admin" and not role.featureInstanceId:
return True
return False
except Exception as e:
logger.error(f"Error checking mandate admin role: {e}")
return False # Fail-safe: no access on error
def _isInstanceRole(interface, roleId: str, featureInstanceId: str) -> bool:
"""
Check if a role belongs to a specific feature instance.
"""
try:
role = interface.getRole(roleId)
if role:
return str(role.featureInstanceId or "") == str(featureInstanceId)
return False
except Exception:
return False # Fail-safe: assume not instance role on error