927 lines
37 KiB
Python
927 lines
37 KiB
Python
# Copyright (c) 2025 Patrick Motsch
|
|
# All rights reserved.
|
|
"""
|
|
Invitation routes for the backend API.
|
|
Implements token-based user invitations for self-service onboarding.
|
|
|
|
Multi-Tenant Design:
|
|
- Invitations are mandate-scoped (Mandate Admin creates them)
|
|
- Tokens are secure, time-limited, and optionally use-limited
|
|
- Users accept invitations to join mandates/features with predefined roles
|
|
"""
|
|
|
|
from fastapi import APIRouter, HTTPException, Depends, Request, Query
|
|
from typing import List, Dict, Any, Optional
|
|
from fastapi import status
|
|
import logging
|
|
import json
|
|
import math
|
|
from pydantic import BaseModel, Field, model_validator
|
|
|
|
from modules.auth import limiter, getRequestContext, RequestContext, getCurrentUser
|
|
from modules.datamodels.datamodelUam import User
|
|
from modules.datamodels.datamodelPagination import PaginationParams, PaginationMetadata, normalize_pagination_dict
|
|
from modules.routes.routeHelpers import _applyFiltersAndSort, handleFilterValuesInMemory, handleIdsInMemory
|
|
from modules.datamodels.datamodelInvitation import Invitation
|
|
from modules.interfaces.interfaceDbApp import getRootInterface
|
|
from modules.shared.timeUtils import getUtcTimestamp
|
|
from modules.shared.i18nRegistry import apiRouteContext
|
|
routeApiMsg = apiRouteContext("routeInvitations")
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
router = APIRouter(
|
|
prefix="/api/invitations",
|
|
tags=["Invitations"],
|
|
responses={404: {"description": "Not found"}}
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# Request/Response Models
|
|
# =============================================================================
|
|
|
|
class InvitationCreate(BaseModel):
|
|
"""Request model for creating an invitation.
|
|
Supports two modes:
|
|
- Mandate-level: featureInstanceId omitted, roleIds are mandate-level roles (user, viewer, admin)
|
|
- Feature-instance-level: featureInstanceId required, roleIds are instance-level roles
|
|
|
|
At least one of email or targetUsername must be provided.
|
|
"""
|
|
targetUsername: Optional[str] = Field(None, description="Username of the user to invite (must match on acceptance)")
|
|
email: Optional[str] = Field(None, description="Email address to send invitation link (optional if targetUsername is set)")
|
|
featureInstanceId: Optional[str] = Field(None, description="Feature instance to grant access to (optional for mandate-level invitations)")
|
|
roleIds: List[str] = Field(..., description="Role IDs: mandate-level (user, viewer, admin) or instance-level")
|
|
frontendUrl: str = Field(..., description="Frontend URL for building the invite link (provided by frontend)")
|
|
expiresInHours: int = Field(
|
|
72,
|
|
ge=1,
|
|
le=720, # Max 30 days
|
|
description="Hours until invitation expires"
|
|
)
|
|
maxUses: int = Field(
|
|
1,
|
|
ge=1,
|
|
le=100,
|
|
description="Maximum number of times this invitation can be used"
|
|
)
|
|
|
|
@model_validator(mode="after")
|
|
def require_email_or_username(self):
|
|
email_val = (self.email or "").strip()
|
|
username_val = (self.targetUsername or "").strip()
|
|
if not email_val and not username_val:
|
|
raise ValueError("At least one of email or targetUsername must be provided")
|
|
return self
|
|
|
|
|
|
class InvitationResponse(BaseModel):
|
|
"""Response model for invitation"""
|
|
id: str
|
|
token: str
|
|
mandateId: str
|
|
featureInstanceId: Optional[str]
|
|
roleIds: List[str]
|
|
targetUsername: Optional[str]
|
|
email: Optional[str]
|
|
createdBy: str
|
|
createdAt: float
|
|
expiresAt: float
|
|
usedBy: Optional[str]
|
|
usedAt: Optional[float]
|
|
revokedAt: Optional[float]
|
|
maxUses: int
|
|
currentUses: int
|
|
inviteUrl: str # Full URL for the invitation
|
|
emailSent: bool = False # Whether invitation email was sent
|
|
|
|
|
|
class InvitationValidation(BaseModel):
|
|
"""Response model for invitation validation"""
|
|
valid: bool
|
|
reason: Optional[str]
|
|
mandateId: Optional[str]
|
|
mandateName: Optional[str] = None
|
|
featureInstanceId: Optional[str]
|
|
featureInstanceName: Optional[str] = None
|
|
roleIds: List[str]
|
|
roleLabels: List[str] = []
|
|
targetUsername: Optional[str] = None
|
|
email: Optional[str] = None
|
|
|
|
|
|
# =============================================================================
|
|
# Invitation CRUD Endpoints
|
|
# =============================================================================
|
|
|
|
@router.post("/", response_model=InvitationResponse)
|
|
@limiter.limit("30/minute")
|
|
def create_invitation(
|
|
request: Request,
|
|
data: InvitationCreate,
|
|
context: RequestContext = Depends(getRequestContext)
|
|
) -> InvitationResponse:
|
|
"""
|
|
Create a new invitation for a feature instance.
|
|
|
|
Requires SysAdmin or Mandate-Admin role. Creates a secure token that can be shared
|
|
with users to join a feature instance with predefined roles.
|
|
The mandateId is derived from the feature instance automatically.
|
|
|
|
Args:
|
|
data: Invitation creation data (featureInstanceId + roleIds required)
|
|
"""
|
|
try:
|
|
rootInterface = getRootInterface()
|
|
|
|
# Determine mandateId and validate
|
|
if data.featureInstanceId:
|
|
# Feature-instance-level invitation
|
|
instance = rootInterface.getFeatureInstance(data.featureInstanceId)
|
|
if not instance:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Feature instance '{data.featureInstanceId}' not found"
|
|
)
|
|
mandateId = str(instance.mandateId)
|
|
# Validate roles belong to this feature instance
|
|
for roleId in data.roleIds:
|
|
role = rootInterface.getRole(roleId)
|
|
if not role:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Role '{roleId}' not found"
|
|
)
|
|
if str(role.featureInstanceId or "") != data.featureInstanceId:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=f"Role '{roleId}' does not belong to feature instance '{data.featureInstanceId}'"
|
|
)
|
|
else:
|
|
# Mandate-level invitation (user, viewer, admin roles)
|
|
if not context.mandateId:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=routeApiMsg("X-Mandate-Id header is required for mandate-level invitations")
|
|
)
|
|
mandateId = str(context.mandateId)
|
|
# Validate roles are mandate-level (no featureInstanceId)
|
|
for roleId in data.roleIds:
|
|
role = rootInterface.getRole(roleId)
|
|
if not role:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Role '{roleId}' not found"
|
|
)
|
|
if role.featureInstanceId is not None:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=f"Role '{roleId}' is an instance-level role; use mandate-level roles (user, viewer, admin) for mandate invitations"
|
|
)
|
|
if str(role.mandateId or "") != mandateId:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=f"Role '{roleId}' does not belong to mandate"
|
|
)
|
|
|
|
# Check admin permission
|
|
if not context.hasSysAdminRole:
|
|
if str(context.mandateId) != mandateId:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail=routeApiMsg("Access denied to this mandate")
|
|
)
|
|
if not _hasMandateAdminRole(context):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail=routeApiMsg("Mandate-Admin role required to create invitations")
|
|
)
|
|
|
|
# Calculate expiration time
|
|
currentTime = getUtcTimestamp()
|
|
expiresAt = currentTime + (data.expiresInHours * 3600)
|
|
|
|
# Create invitation (targetUsername optional when email provided)
|
|
target_username_val = (data.targetUsername or "").strip() or None
|
|
email_val = (data.email or "").strip() or None
|
|
|
|
# Silent duplicate check: only for existing users (identified by username)
|
|
# Username is unique; email can exist multiple times, so we do NOT look up by email
|
|
target_user_id = None
|
|
if target_username_val:
|
|
u = rootInterface.getUserByUsername(target_username_val)
|
|
if u:
|
|
target_user_id = str(u.id)
|
|
|
|
if target_user_id:
|
|
if data.featureInstanceId:
|
|
existing_access = rootInterface.getFeatureAccess(target_user_id, data.featureInstanceId)
|
|
if existing_access:
|
|
logger.info(f"Invitation skipped: user {target_user_id} already has access to feature instance {data.featureInstanceId}")
|
|
return InvitationResponse(
|
|
id="duplicate-skipped",
|
|
token="",
|
|
mandateId=mandateId,
|
|
featureInstanceId=data.featureInstanceId,
|
|
roleIds=data.roleIds,
|
|
targetUsername=target_username_val,
|
|
email=email_val,
|
|
createdBy=str(context.user.id),
|
|
createdAt=currentTime,
|
|
expiresAt=expiresAt,
|
|
usedBy=None,
|
|
usedAt=None,
|
|
revokedAt=None,
|
|
maxUses=data.maxUses,
|
|
currentUses=0,
|
|
inviteUrl="",
|
|
emailSent=False
|
|
)
|
|
else:
|
|
existing_membership = rootInterface.getUserMandate(target_user_id, mandateId)
|
|
if existing_membership:
|
|
logger.info(f"Invitation skipped: user {target_user_id} already member of mandate {mandateId}")
|
|
return InvitationResponse(
|
|
id="duplicate-skipped",
|
|
token="",
|
|
mandateId=mandateId,
|
|
featureInstanceId=None,
|
|
roleIds=data.roleIds,
|
|
targetUsername=target_username_val,
|
|
email=email_val,
|
|
createdBy=str(context.user.id),
|
|
createdAt=currentTime,
|
|
expiresAt=expiresAt,
|
|
usedBy=None,
|
|
usedAt=None,
|
|
revokedAt=None,
|
|
maxUses=data.maxUses,
|
|
currentUses=0,
|
|
inviteUrl="",
|
|
emailSent=False
|
|
)
|
|
|
|
invitation = Invitation(
|
|
mandateId=mandateId,
|
|
featureInstanceId=data.featureInstanceId or None,
|
|
roleIds=data.roleIds,
|
|
targetUsername=target_username_val,
|
|
email=email_val,
|
|
createdBy=str(context.user.id),
|
|
expiresAt=expiresAt,
|
|
maxUses=data.maxUses
|
|
)
|
|
|
|
createdRecord = rootInterface.db.recordCreate(Invitation, invitation.model_dump())
|
|
if not createdRecord:
|
|
raise ValueError("Failed to create invitation record")
|
|
|
|
# Build invite URL using frontend URL provided by the caller
|
|
baseUrl = data.frontendUrl.rstrip("/")
|
|
inviteUrl = f"{baseUrl}/invite/{invitation.token}"
|
|
|
|
# Send email if email address is provided
|
|
emailSent = False
|
|
if email_val:
|
|
try:
|
|
from modules.connectors.connectorMessagingEmail import ConnectorMessagingEmail
|
|
# Get mandate and optionally instance for the email
|
|
mandate = rootInterface.getMandate(str(mandateId))
|
|
mandateName = (mandate.label or mandate.name) if mandate else "PowerOn"
|
|
# Only use username for existing users; new users set it when they register
|
|
display_name = target_username_val if target_username_val else "Neuer/e Nutzer/in"
|
|
instance = rootInterface.getFeatureInstance(data.featureInstanceId) if data.featureInstanceId else None
|
|
instance_label = (instance.label or instance.featureCode) if instance else None
|
|
|
|
emailConnector = ConnectorMessagingEmail()
|
|
if instance_label:
|
|
emailSubject = f"Einladung zur Feature-Instanz {instance_label}"
|
|
invite_desc = f"der Feature-Instanz «{instance_label}» (Mandant: {mandateName}) beizutreten"
|
|
else:
|
|
emailSubject = f"Einladung zu {mandateName}"
|
|
invite_desc = f"dem Mandanten «{mandateName}» beizutreten"
|
|
|
|
from modules.routes.routeSecurityLocal import _buildAuthEmailHtml
|
|
emailBody = _buildAuthEmailHtml(
|
|
greeting=f"Hallo {display_name}",
|
|
bodyLines=[
|
|
f"Sie wurden eingeladen, {invite_desc}.",
|
|
"",
|
|
"Klicken Sie auf die Schaltfläche, um die Einladung anzunehmen:",
|
|
],
|
|
buttonText="Einladung annehmen",
|
|
buttonUrl=inviteUrl,
|
|
footerText=f"Diese Einladung ist {data.expiresInHours} Stunden gültig.",
|
|
)
|
|
|
|
emailConnector.send(
|
|
recipient=email_val,
|
|
subject=emailSubject,
|
|
message=emailBody
|
|
)
|
|
emailSent = True
|
|
logger.info(f"Invitation email sent to {email_val} for user {target_username_val or 'email-only'}")
|
|
except Exception as emailError:
|
|
logger.warning(f"Failed to send invitation email to {email_val}: {emailError}")
|
|
# Don't fail the invitation creation if email fails
|
|
|
|
# Update the invitation record with emailSent status
|
|
if emailSent:
|
|
rootInterface.db.recordModify(
|
|
Invitation,
|
|
createdRecord.get("id"),
|
|
{"emailSent": True}
|
|
)
|
|
createdRecord["emailSent"] = True
|
|
|
|
# If the target user already exists (identified by username), create an in-app notification
|
|
# Only look up by username - email is not used for "existing user" since new users are invited by email
|
|
try:
|
|
existingUser = None
|
|
if target_username_val:
|
|
existingUser = rootInterface.getUserByUsername(target_username_val)
|
|
if existingUser:
|
|
from modules.routes.routeNotifications import createInvitationNotification
|
|
|
|
mandate = rootInterface.getMandate(str(mandateId))
|
|
mandateName = (mandate.label or mandate.name) if mandate else "PowerOn"
|
|
inviterName = context.user.fullName or context.user.username
|
|
instance = rootInterface.getFeatureInstance(data.featureInstanceId) if data.featureInstanceId else None
|
|
instance_label = (instance.label or instance.featureCode) if instance else None
|
|
|
|
createInvitationNotification(
|
|
userId=str(existingUser.id),
|
|
invitationId=str(createdRecord.get("id")),
|
|
mandateName=mandateName,
|
|
inviterName=inviterName,
|
|
featureInstanceName=instance_label
|
|
)
|
|
logger.info(f"Created notification for existing user {target_username_val or email_val}")
|
|
except Exception as notifError:
|
|
logger.warning(f"Failed to create notification for user {target_username_val or email_val}: {notifError}")
|
|
# Don't fail the invitation if notification fails
|
|
|
|
target_desc = f"feature instance {data.featureInstanceId}" if data.featureInstanceId else f"mandate {mandateId}"
|
|
logger.info(
|
|
f"User {context.user.id} created invitation for user {target_username_val or email_val} "
|
|
f"to {target_desc}, expires in {data.expiresInHours}h"
|
|
)
|
|
|
|
# Invitation extends PowerOnModel: recordCreate/_saveRecord set sysCreatedAt and sysCreatedBy automatically.
|
|
# API response uses createdAt/createdBy; map from the system fields (no separate createdAt column on model).
|
|
return InvitationResponse(
|
|
id=str(createdRecord.get("id")),
|
|
token=str(createdRecord.get("token")),
|
|
mandateId=str(createdRecord.get("mandateId")),
|
|
featureInstanceId=createdRecord.get("featureInstanceId"),
|
|
roleIds=createdRecord.get("roleIds", []),
|
|
targetUsername=createdRecord.get("targetUsername"),
|
|
email=createdRecord.get("email"),
|
|
createdBy=str(createdRecord["sysCreatedBy"]),
|
|
createdAt=float(createdRecord["sysCreatedAt"]),
|
|
expiresAt=createdRecord.get("expiresAt"),
|
|
usedBy=createdRecord.get("usedBy"),
|
|
usedAt=createdRecord.get("usedAt"),
|
|
revokedAt=createdRecord.get("revokedAt"),
|
|
maxUses=createdRecord.get("maxUses", 1),
|
|
currentUses=createdRecord.get("currentUses", 0),
|
|
inviteUrl=inviteUrl,
|
|
emailSent=emailSent
|
|
)
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error creating invitation: {e}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Failed to create invitation: {str(e)}"
|
|
)
|
|
|
|
|
|
@router.get("/")
|
|
@limiter.limit("60/minute")
|
|
def list_invitations(
|
|
request: Request,
|
|
frontendUrl: str = Query(..., description="Frontend URL for building invite links (provided by frontend)"),
|
|
includeUsed: bool = Query(False, description="Include already used invitations"),
|
|
includeExpired: bool = Query(False, description="Include expired invitations"),
|
|
pagination: Optional[str] = Query(None, description="JSON-encoded PaginationParams object"),
|
|
mode: Optional[str] = Query(None, description="'filterValues' for distinct column values, 'ids' for all filtered IDs"),
|
|
column: Optional[str] = Query(None, description="Column key (required when mode=filterValues)"),
|
|
context: RequestContext = Depends(getRequestContext)
|
|
):
|
|
"""
|
|
List invitations for the current mandate.
|
|
|
|
Requires Mandate-Admin role. Returns all invitations created for this mandate.
|
|
|
|
NOTE: Cannot use db.getRecordsetPaginated() because:
|
|
- Computed status fields (isExpired, isUsedUp) are derived in-memory
|
|
- Filtering by revoked/used/expired requires post-fetch logic
|
|
- Invitation volume per mandate is typically low (< 100)
|
|
When this endpoint needs FormGeneratorTable pagination, add PaginatedResponse
|
|
support with in-memory slicing (similar to routeDataConnections).
|
|
|
|
Args:
|
|
includeUsed: Include invitations that have reached maxUses
|
|
includeExpired: Include expired invitations
|
|
"""
|
|
if not context.mandateId:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=routeApiMsg("X-Mandate-Id header is required")
|
|
)
|
|
|
|
# Check mandate admin permission
|
|
if not _hasMandateAdminRole(context):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail=routeApiMsg("Mandate-Admin role required to list invitations")
|
|
)
|
|
|
|
def _buildInvitationItems():
|
|
rootInterface = getRootInterface()
|
|
allInvitations = rootInterface.getInvitationsByMandate(str(context.mandateId))
|
|
currentTime = getUtcTimestamp()
|
|
items = []
|
|
for inv in allInvitations:
|
|
if inv.revokedAt:
|
|
continue
|
|
currentUses = inv.currentUses or 0
|
|
maxUses = inv.maxUses or 1
|
|
if not includeUsed and currentUses >= maxUses:
|
|
continue
|
|
expiresAt = inv.expiresAt or 0
|
|
if not includeExpired and expiresAt < currentTime:
|
|
continue
|
|
baseUrl = frontendUrl.rstrip("/") if frontendUrl else ""
|
|
inviteUrl = f"{baseUrl}/invite/{inv.token}" if baseUrl else ""
|
|
items.append({
|
|
**inv.model_dump(),
|
|
"inviteUrl": inviteUrl,
|
|
"isExpired": expiresAt < currentTime,
|
|
"isUsedUp": currentUses >= maxUses
|
|
})
|
|
return items
|
|
|
|
if mode == "filterValues":
|
|
if not column:
|
|
raise HTTPException(status_code=400, detail="column parameter required for mode=filterValues")
|
|
try:
|
|
return handleFilterValuesInMemory(_buildInvitationItems(), column, pagination)
|
|
except Exception as e:
|
|
logger.error(f"Error getting filter values for invitations: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
if mode == "ids":
|
|
try:
|
|
return handleIdsInMemory(_buildInvitationItems(), pagination)
|
|
except Exception as e:
|
|
logger.error(f"Error getting IDs for invitations: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
try:
|
|
result = _buildInvitationItems()
|
|
|
|
paginationParams = None
|
|
if pagination:
|
|
try:
|
|
paginationDict = json.loads(pagination)
|
|
if paginationDict:
|
|
paginationDict = normalize_pagination_dict(paginationDict)
|
|
paginationParams = PaginationParams(**paginationDict)
|
|
except (json.JSONDecodeError, ValueError) as e:
|
|
raise HTTPException(status_code=400, detail=f"Invalid pagination parameter: {str(e)}")
|
|
|
|
if paginationParams:
|
|
filtered = _applyFiltersAndSort(result, paginationParams)
|
|
totalItems = len(filtered)
|
|
totalPages = math.ceil(totalItems / paginationParams.pageSize) if totalItems > 0 else 0
|
|
startIdx = (paginationParams.page - 1) * paginationParams.pageSize
|
|
endIdx = startIdx + paginationParams.pageSize
|
|
return {
|
|
"items": filtered[startIdx:endIdx],
|
|
"pagination": PaginationMetadata(
|
|
currentPage=paginationParams.page, pageSize=paginationParams.pageSize,
|
|
totalItems=totalItems, totalPages=totalPages,
|
|
sort=paginationParams.sort, filters=paginationParams.filters,
|
|
).model_dump(),
|
|
}
|
|
return result
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error listing invitations: {e}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Failed to list invitations: {str(e)}"
|
|
)
|
|
|
|
|
|
@router.delete("/{invitationId}", response_model=Dict[str, str])
|
|
@limiter.limit("30/minute")
|
|
def revoke_invitation(
|
|
request: Request,
|
|
invitationId: str,
|
|
context: RequestContext = Depends(getRequestContext)
|
|
) -> Dict[str, str]:
|
|
"""
|
|
Revoke an invitation.
|
|
|
|
Requires Mandate-Admin role. Revoked invitations cannot be used.
|
|
|
|
Args:
|
|
invitationId: Invitation ID
|
|
"""
|
|
if not context.mandateId:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=routeApiMsg("X-Mandate-Id header is required")
|
|
)
|
|
|
|
# Check mandate admin permission
|
|
if not _hasMandateAdminRole(context):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail=routeApiMsg("Mandate-Admin role required to revoke invitations")
|
|
)
|
|
|
|
try:
|
|
rootInterface = getRootInterface()
|
|
|
|
# Get invitation (Pydantic model)
|
|
invitation = rootInterface.getInvitation(invitationId)
|
|
|
|
if not invitation:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Invitation '{invitationId}' not found"
|
|
)
|
|
|
|
# Verify mandate access
|
|
if str(invitation.mandateId) != str(context.mandateId):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail=routeApiMsg("Access denied to this invitation")
|
|
)
|
|
|
|
# Already revoked?
|
|
if invitation.revokedAt:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=routeApiMsg("Invitation is already revoked")
|
|
)
|
|
|
|
# Revoke invitation
|
|
rootInterface.db.recordModify(
|
|
Invitation,
|
|
invitationId,
|
|
{"revokedAt": getUtcTimestamp()}
|
|
)
|
|
|
|
logger.info(f"User {context.user.id} revoked invitation {invitationId}")
|
|
|
|
return {"message": "Invitation revoked", "invitationId": invitationId}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error revoking invitation {invitationId}: {e}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Failed to revoke invitation: {str(e)}"
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# Public Invitation Endpoints (No auth required for validation)
|
|
# =============================================================================
|
|
|
|
@router.get("/validate/{token}", response_model=InvitationValidation)
|
|
@limiter.limit("30/minute")
|
|
def validate_invitation(
|
|
request: Request,
|
|
token: str
|
|
) -> InvitationValidation:
|
|
"""
|
|
Validate an invitation token (public endpoint).
|
|
|
|
Used by the frontend to check if an invitation is valid before
|
|
showing the registration/acceptance form.
|
|
|
|
Args:
|
|
token: Invitation token
|
|
"""
|
|
try:
|
|
rootInterface = getRootInterface()
|
|
|
|
# Find invitation by token (Pydantic model)
|
|
invitation = rootInterface.getInvitationByToken(token)
|
|
|
|
if not invitation:
|
|
return InvitationValidation(
|
|
valid=False,
|
|
reason="Invitation not found",
|
|
mandateId=None,
|
|
featureInstanceId=None,
|
|
roleIds=[]
|
|
)
|
|
|
|
# Check if revoked
|
|
if invitation.revokedAt:
|
|
return InvitationValidation(
|
|
valid=False,
|
|
reason="Invitation has been revoked",
|
|
mandateId=None,
|
|
featureInstanceId=None,
|
|
roleIds=[]
|
|
)
|
|
|
|
# Check if expired
|
|
currentTime = getUtcTimestamp()
|
|
expiresAt = invitation.expiresAt or 0
|
|
if expiresAt < currentTime:
|
|
return InvitationValidation(
|
|
valid=False,
|
|
reason="Invitation has expired",
|
|
mandateId=None,
|
|
featureInstanceId=None,
|
|
roleIds=[]
|
|
)
|
|
|
|
# Check if used up
|
|
currentUses = invitation.currentUses or 0
|
|
maxUses = invitation.maxUses or 1
|
|
if currentUses >= maxUses:
|
|
return InvitationValidation(
|
|
valid=False,
|
|
reason="Invitation has reached maximum uses",
|
|
mandateId=None,
|
|
featureInstanceId=None,
|
|
roleIds=[]
|
|
)
|
|
|
|
# Get additional info for display
|
|
mandateId = invitation.mandateId
|
|
mandateName = None
|
|
roleLabels = []
|
|
targetUsername = invitation.targetUsername
|
|
|
|
mandate = rootInterface.getMandate(str(mandateId)) if mandateId else None
|
|
if mandate and not getattr(mandate, "enabled", True):
|
|
return InvitationValidation(
|
|
valid=False,
|
|
reason="Mandate is disabled",
|
|
mandateId=None,
|
|
featureInstanceId=None,
|
|
roleIds=[]
|
|
)
|
|
if mandate:
|
|
mandateName = mandate.label or mandate.name
|
|
|
|
# Get role names
|
|
roleIds = invitation.roleIds or []
|
|
for roleId in roleIds:
|
|
role = rootInterface.getRole(roleId)
|
|
if role:
|
|
roleLabels.append(role.roleLabel)
|
|
|
|
# Get feature instance label for display
|
|
featureInstanceName = None
|
|
if invitation.featureInstanceId:
|
|
instance = rootInterface.getFeatureInstance(str(invitation.featureInstanceId))
|
|
if instance:
|
|
featureInstanceName = instance.label or instance.featureCode
|
|
|
|
return InvitationValidation(
|
|
valid=True,
|
|
reason=None,
|
|
mandateId=str(mandateId) if mandateId else None,
|
|
mandateName=mandateName,
|
|
featureInstanceId=str(invitation.featureInstanceId) if invitation.featureInstanceId else None,
|
|
featureInstanceName=featureInstanceName,
|
|
roleIds=roleIds,
|
|
roleLabels=roleLabels,
|
|
targetUsername=targetUsername,
|
|
email=invitation.email
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error validating invitation token: {e}")
|
|
return InvitationValidation(
|
|
valid=False,
|
|
reason="Validation error",
|
|
mandateId=None,
|
|
featureInstanceId=None,
|
|
roleIds=[]
|
|
)
|
|
|
|
|
|
@router.post("/accept/{token}", response_model=Dict[str, Any])
|
|
@limiter.limit("10/minute")
|
|
def accept_invitation(
|
|
request: Request,
|
|
token: str,
|
|
currentUser: User = Depends(getCurrentUser)
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Accept an invitation (requires authentication).
|
|
|
|
The authenticated user joins the mandate with the predefined roles.
|
|
If the user is already a member, their roles are updated.
|
|
|
|
Args:
|
|
token: Invitation token
|
|
"""
|
|
try:
|
|
rootInterface = getRootInterface()
|
|
|
|
# Find invitation by token (Pydantic model)
|
|
invitation = rootInterface.getInvitationByToken(token)
|
|
|
|
if not invitation:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=routeApiMsg("Invitation not found")
|
|
)
|
|
|
|
# Validate invitation
|
|
if invitation.revokedAt:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=routeApiMsg("Invitation has been revoked")
|
|
)
|
|
|
|
currentTime = getUtcTimestamp()
|
|
expiresAt = invitation.expiresAt or 0
|
|
if expiresAt < currentTime:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=routeApiMsg("Invitation has expired")
|
|
)
|
|
|
|
currentUses = invitation.currentUses or 0
|
|
maxUses = invitation.maxUses or 1
|
|
if currentUses >= maxUses:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=routeApiMsg("Invitation has reached maximum uses")
|
|
)
|
|
|
|
# Validate user matches - invitation is bound by username or email
|
|
targetUsername = invitation.targetUsername
|
|
invitationEmail = (invitation.email or "").strip().lower() if invitation.email else None
|
|
currentUserEmail = (currentUser.email or "").strip().lower() if getattr(currentUser, "email", None) else None
|
|
|
|
if targetUsername:
|
|
if currentUser.username != targetUsername:
|
|
logger.warning(
|
|
f"User {currentUser.username} tried to accept invitation meant for {targetUsername}"
|
|
)
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail=f"Diese Einladung ist für Benutzer '{targetUsername}' bestimmt"
|
|
)
|
|
elif invitationEmail:
|
|
if not currentUserEmail or currentUserEmail != invitationEmail:
|
|
logger.warning(
|
|
f"User {currentUser.username} (email: {currentUserEmail}) tried to accept invitation meant for {invitationEmail}"
|
|
)
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail=f"Diese Einladung ist für die E-Mail-Adresse '{invitationEmail}' bestimmt"
|
|
)
|
|
else:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=routeApiMsg("Invitation has no target user or email")
|
|
)
|
|
|
|
mandateId = str(invitation.mandateId) if invitation.mandateId else None
|
|
roleIds = invitation.roleIds or []
|
|
featureInstanceId = str(invitation.featureInstanceId) if invitation.featureInstanceId else None
|
|
|
|
# Grant feature access (creates FeatureAccess + auto-assigns mandate 'user' role via Regel 4)
|
|
featureAccessId = None
|
|
if featureInstanceId:
|
|
existingAccess = rootInterface.getFeatureAccess(str(currentUser.id), featureInstanceId)
|
|
if existingAccess:
|
|
# Update existing access with additional roles
|
|
featureAccessId = str(existingAccess.id)
|
|
for roleId in roleIds:
|
|
try:
|
|
rootInterface.addRoleToFeatureAccess(str(existingAccess.id), roleId)
|
|
except Exception:
|
|
pass # Role might already be assigned
|
|
message = "Roles updated for existing feature access"
|
|
else:
|
|
# Create feature access with instance-level roles
|
|
# This auto-creates UserMandate with 'user' role (Regel 4)
|
|
featureAccess = rootInterface.createFeatureAccess(
|
|
userId=str(currentUser.id),
|
|
featureInstanceId=featureInstanceId,
|
|
roleIds=roleIds
|
|
)
|
|
featureAccessId = str(featureAccess.id)
|
|
message = "Successfully joined feature instance"
|
|
else:
|
|
# Legacy: mandate-only invitation (no feature instance)
|
|
existingMembership = rootInterface.getUserMandate(str(currentUser.id), mandateId)
|
|
if existingMembership:
|
|
for roleId in roleIds:
|
|
try:
|
|
rootInterface.addRoleToUserMandate(str(existingMembership.id), roleId)
|
|
except Exception:
|
|
pass
|
|
message = "Roles updated for existing membership"
|
|
else:
|
|
rootInterface.createUserMandate(
|
|
userId=str(currentUser.id),
|
|
mandateId=mandateId,
|
|
roleIds=roleIds
|
|
)
|
|
message = "Successfully joined mandate"
|
|
|
|
# Get userMandateId for response
|
|
userMandate = rootInterface.getUserMandate(str(currentUser.id), mandateId)
|
|
userMandateId = str(userMandate.id) if userMandate else None
|
|
|
|
# Update invitation usage
|
|
rootInterface.db.recordModify(
|
|
Invitation,
|
|
invitation.id,
|
|
{
|
|
"currentUses": (invitation.currentUses or 0) + 1,
|
|
"usedBy": str(currentUser.id),
|
|
"usedAt": currentTime
|
|
}
|
|
)
|
|
|
|
logger.info(
|
|
f"User {currentUser.id} accepted invitation {invitation.id} "
|
|
f"for mandate {mandateId}"
|
|
)
|
|
|
|
return {
|
|
"message": message,
|
|
"mandateId": mandateId,
|
|
"userMandateId": userMandateId,
|
|
"featureAccessId": featureAccessId,
|
|
"roleIds": roleIds
|
|
}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error accepting invitation: {e}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Failed to accept invitation: {str(e)}"
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# Helper Functions
|
|
# =============================================================================
|
|
|
|
def _hasMandateAdminRole(context: RequestContext) -> bool:
|
|
"""
|
|
Check if the user has mandate admin role in the current context.
|
|
"""
|
|
if context.hasSysAdminRole:
|
|
return True
|
|
|
|
if not context.roleIds:
|
|
return False
|
|
|
|
try:
|
|
rootInterface = getRootInterface()
|
|
|
|
for roleId in context.roleIds:
|
|
role = rootInterface.getRole(roleId)
|
|
if role:
|
|
# Admin role at mandate level (not feature-instance level)
|
|
if role.roleLabel == "admin" and not role.featureInstanceId:
|
|
return True
|
|
|
|
return False
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error checking mandate admin role: {e}")
|
|
return False # Fail-safe: no access on error
|
|
|
|
|
|
def _isInstanceRole(interface, roleId: str, featureInstanceId: str) -> bool:
|
|
"""
|
|
Check if a role belongs to a specific feature instance.
|
|
"""
|
|
try:
|
|
role = interface.getRole(roleId)
|
|
if role:
|
|
return str(role.featureInstanceId or "") == str(featureInstanceId)
|
|
return False
|
|
except Exception:
|
|
return False # Fail-safe: assume not instance role on error
|