741 lines
27 KiB
Python
741 lines
27 KiB
Python
# Copyright (c) 2025 Patrick Motsch
|
|
# All rights reserved.
|
|
"""
|
|
Invitation routes for the backend API.
|
|
Implements token-based user invitations for self-service onboarding.
|
|
|
|
Multi-Tenant Design:
|
|
- Invitations are mandate-scoped (Mandate Admin creates them)
|
|
- Tokens are secure, time-limited, and optionally use-limited
|
|
- Users accept invitations to join mandates/features with predefined roles
|
|
"""
|
|
|
|
from fastapi import APIRouter, HTTPException, Depends, Request, Query
|
|
from typing import List, Dict, Any, Optional
|
|
from fastapi import status
|
|
import logging
|
|
from pydantic import BaseModel, Field
|
|
|
|
from modules.auth import limiter, getRequestContext, RequestContext, getCurrentUser
|
|
from modules.datamodels.datamodelUam import User
|
|
from modules.datamodels.datamodelInvitation import Invitation
|
|
from modules.interfaces.interfaceDbApp import getRootInterface
|
|
from modules.shared.timeUtils import getUtcTimestamp
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
router = APIRouter(
|
|
prefix="/api/invitations",
|
|
tags=["Invitations"],
|
|
responses={404: {"description": "Not found"}}
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# Request/Response Models
|
|
# =============================================================================
|
|
|
|
class InvitationCreate(BaseModel):
|
|
"""Request model for creating an invitation"""
|
|
targetUsername: str = Field(..., description="Username of the user to invite (must match on acceptance)")
|
|
email: Optional[str] = Field(None, description="Email address to send invitation link (optional)")
|
|
roleIds: List[str] = Field(..., description="Role IDs to assign to the invited user")
|
|
featureInstanceId: Optional[str] = Field(None, description="Optional feature instance access")
|
|
frontendUrl: str = Field(..., description="Frontend URL for building the invite link (provided by frontend)")
|
|
expiresInHours: int = Field(
|
|
72,
|
|
ge=1,
|
|
le=720, # Max 30 days
|
|
description="Hours until invitation expires"
|
|
)
|
|
maxUses: int = Field(
|
|
1,
|
|
ge=1,
|
|
le=100,
|
|
description="Maximum number of times this invitation can be used"
|
|
)
|
|
|
|
|
|
class InvitationResponse(BaseModel):
|
|
"""Response model for invitation"""
|
|
id: str
|
|
token: str
|
|
mandateId: str
|
|
featureInstanceId: Optional[str]
|
|
roleIds: List[str]
|
|
targetUsername: str
|
|
email: Optional[str]
|
|
createdBy: str
|
|
createdAt: float
|
|
expiresAt: float
|
|
usedBy: Optional[str]
|
|
usedAt: Optional[float]
|
|
revokedAt: Optional[float]
|
|
maxUses: int
|
|
currentUses: int
|
|
inviteUrl: str # Full URL for the invitation
|
|
emailSent: bool = False # Whether invitation email was sent
|
|
|
|
|
|
class InvitationValidation(BaseModel):
|
|
"""Response model for invitation validation"""
|
|
valid: bool
|
|
reason: Optional[str]
|
|
mandateId: Optional[str]
|
|
mandateName: Optional[str] = None
|
|
featureInstanceId: Optional[str]
|
|
roleIds: List[str]
|
|
roleLabels: List[str] = []
|
|
targetUsername: Optional[str] = None
|
|
|
|
|
|
# =============================================================================
|
|
# Invitation CRUD Endpoints
|
|
# =============================================================================
|
|
|
|
@router.post("/", response_model=InvitationResponse)
|
|
@limiter.limit("30/minute")
|
|
def create_invitation(
|
|
request: Request,
|
|
data: InvitationCreate,
|
|
context: RequestContext = Depends(getRequestContext)
|
|
) -> InvitationResponse:
|
|
"""
|
|
Create a new invitation for the current mandate.
|
|
|
|
Requires Mandate-Admin role. Creates a secure token that can be shared
|
|
with users to join the mandate with predefined roles.
|
|
|
|
Args:
|
|
data: Invitation creation data
|
|
"""
|
|
if not context.mandateId:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="X-Mandate-Id header is required"
|
|
)
|
|
|
|
# Check mandate admin permission
|
|
if not _hasMandateAdminRole(context):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Mandate-Admin role required to create invitations"
|
|
)
|
|
|
|
try:
|
|
rootInterface = getRootInterface()
|
|
|
|
# Note: targetUsername does NOT need to exist yet!
|
|
# The invitation can be for a user who will register later.
|
|
# When they register with this username (or accept the invitation),
|
|
# they will get the assigned roles.
|
|
|
|
# Validate role IDs exist and belong to this mandate or are global
|
|
for roleId in data.roleIds:
|
|
role = rootInterface.getRole(roleId)
|
|
if not role:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Role '{roleId}' not found"
|
|
)
|
|
# Role must be global or belong to this mandate
|
|
if role.mandateId and str(role.mandateId) != str(context.mandateId):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=f"Role '{roleId}' belongs to a different mandate"
|
|
)
|
|
|
|
# Validate feature instance if provided
|
|
if data.featureInstanceId:
|
|
instance = rootInterface.getFeatureInstance(data.featureInstanceId)
|
|
if not instance:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Feature instance '{data.featureInstanceId}' not found"
|
|
)
|
|
if str(instance.mandateId) != str(context.mandateId):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Feature instance belongs to a different mandate"
|
|
)
|
|
|
|
# Calculate expiration time
|
|
currentTime = getUtcTimestamp()
|
|
expiresAt = currentTime + (data.expiresInHours * 3600)
|
|
|
|
# Create invitation
|
|
invitation = Invitation(
|
|
mandateId=str(context.mandateId),
|
|
featureInstanceId=data.featureInstanceId,
|
|
roleIds=data.roleIds,
|
|
targetUsername=data.targetUsername,
|
|
email=data.email,
|
|
createdBy=str(context.user.id),
|
|
expiresAt=expiresAt,
|
|
maxUses=data.maxUses
|
|
)
|
|
|
|
createdRecord = rootInterface.db.recordCreate(Invitation, invitation.model_dump())
|
|
if not createdRecord:
|
|
raise ValueError("Failed to create invitation record")
|
|
|
|
# Build invite URL using frontend URL provided by the caller
|
|
baseUrl = data.frontendUrl.rstrip("/")
|
|
inviteUrl = f"{baseUrl}/invite/{invitation.token}"
|
|
|
|
# Send email if email address is provided
|
|
emailSent = False
|
|
if data.email:
|
|
try:
|
|
from modules.connectors.connectorMessagingEmail import ConnectorMessagingEmail
|
|
# Get mandate name for the email
|
|
mandate = rootInterface.getMandate(str(context.mandateId))
|
|
mandateName = (mandate.label or mandate.name) if mandate else "PowerOn"
|
|
|
|
emailConnector = ConnectorMessagingEmail()
|
|
emailSubject = f"Einladung zu {mandateName}"
|
|
emailBody = f"""
|
|
<html>
|
|
<body style="font-family: Arial, sans-serif; line-height: 1.6; color: #333;">
|
|
<h2>Sie wurden eingeladen!</h2>
|
|
<p>Hallo <strong>{data.targetUsername}</strong>,</p>
|
|
<p>Sie wurden eingeladen, dem Mandanten <strong>{mandateName}</strong> beizutreten.</p>
|
|
<p>Klicken Sie auf den folgenden Link, um die Einladung anzunehmen:</p>
|
|
<p style="margin: 20px 0;">
|
|
<a href="{inviteUrl}" style="background-color: #007bff; color: white; padding: 12px 24px; text-decoration: none; border-radius: 4px;">
|
|
Einladung annehmen
|
|
</a>
|
|
</p>
|
|
<p style="color: #666; font-size: 0.9em;">
|
|
Oder kopieren Sie diesen Link in Ihren Browser:<br>
|
|
<code>{inviteUrl}</code>
|
|
</p>
|
|
<p style="color: #666; font-size: 0.9em;">
|
|
Diese Einladung ist {data.expiresInHours} Stunden gültig.
|
|
</p>
|
|
<hr style="border: none; border-top: 1px solid #eee; margin: 20px 0;">
|
|
<p style="color: #999; font-size: 0.8em;">
|
|
Diese E-Mail wurde automatisch von PowerOn gesendet.
|
|
</p>
|
|
</body>
|
|
</html>
|
|
"""
|
|
|
|
emailConnector.send(
|
|
recipient=data.email,
|
|
subject=emailSubject,
|
|
message=emailBody
|
|
)
|
|
emailSent = True
|
|
logger.info(f"Invitation email sent to {data.email} for user {data.targetUsername}")
|
|
except Exception as emailError:
|
|
logger.warning(f"Failed to send invitation email to {data.email}: {emailError}")
|
|
# Don't fail the invitation creation if email fails
|
|
|
|
# Update the invitation record with emailSent status
|
|
if emailSent:
|
|
rootInterface.db.recordModify(
|
|
Invitation,
|
|
createdRecord.get("id"),
|
|
{"emailSent": True}
|
|
)
|
|
createdRecord["emailSent"] = True
|
|
|
|
# If the target user already exists, create an in-app notification
|
|
try:
|
|
existingUser = rootInterface.getUserByUsername(data.targetUsername)
|
|
if existingUser:
|
|
from modules.routes.routeNotifications import createInvitationNotification
|
|
|
|
# Get mandate name for notification
|
|
mandate = rootInterface.getMandate(str(context.mandateId))
|
|
mandateName = (mandate.label or mandate.name) if mandate else "PowerOn"
|
|
inviterName = context.user.fullName or context.user.username
|
|
|
|
createInvitationNotification(
|
|
userId=str(existingUser.id),
|
|
invitationId=str(createdRecord.get("id")),
|
|
mandateName=mandateName,
|
|
inviterName=inviterName
|
|
)
|
|
logger.info(f"Created notification for existing user {data.targetUsername}")
|
|
except Exception as notifError:
|
|
logger.warning(f"Failed to create notification for user {data.targetUsername}: {notifError}")
|
|
# Don't fail the invitation if notification fails
|
|
|
|
logger.info(
|
|
f"User {context.user.id} created invitation for user {data.targetUsername} "
|
|
f"to mandate {context.mandateId}, expires in {data.expiresInHours}h"
|
|
)
|
|
|
|
return InvitationResponse(
|
|
id=str(createdRecord.get("id")),
|
|
token=str(createdRecord.get("token")),
|
|
mandateId=str(createdRecord.get("mandateId")),
|
|
featureInstanceId=createdRecord.get("featureInstanceId"),
|
|
roleIds=createdRecord.get("roleIds", []),
|
|
targetUsername=createdRecord.get("targetUsername"),
|
|
email=createdRecord.get("email"),
|
|
createdBy=str(createdRecord.get("createdBy")),
|
|
createdAt=createdRecord.get("createdAt"),
|
|
expiresAt=createdRecord.get("expiresAt"),
|
|
usedBy=createdRecord.get("usedBy"),
|
|
usedAt=createdRecord.get("usedAt"),
|
|
revokedAt=createdRecord.get("revokedAt"),
|
|
maxUses=createdRecord.get("maxUses", 1),
|
|
currentUses=createdRecord.get("currentUses", 0),
|
|
inviteUrl=inviteUrl,
|
|
emailSent=emailSent
|
|
)
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error creating invitation: {e}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Failed to create invitation: {str(e)}"
|
|
)
|
|
|
|
|
|
@router.get("/", response_model=List[Dict[str, Any]])
|
|
@limiter.limit("60/minute")
|
|
def list_invitations(
|
|
request: Request,
|
|
frontendUrl: str = Query(..., description="Frontend URL for building invite links (provided by frontend)"),
|
|
includeUsed: bool = Query(False, description="Include already used invitations"),
|
|
includeExpired: bool = Query(False, description="Include expired invitations"),
|
|
context: RequestContext = Depends(getRequestContext)
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
List invitations for the current mandate.
|
|
|
|
Requires Mandate-Admin role. Returns all invitations created for this mandate.
|
|
|
|
Args:
|
|
includeUsed: Include invitations that have reached maxUses
|
|
includeExpired: Include expired invitations
|
|
"""
|
|
if not context.mandateId:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="X-Mandate-Id header is required"
|
|
)
|
|
|
|
# Check mandate admin permission
|
|
if not _hasMandateAdminRole(context):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Mandate-Admin role required to list invitations"
|
|
)
|
|
|
|
try:
|
|
rootInterface = getRootInterface()
|
|
|
|
# Get all invitations for this mandate (Pydantic models)
|
|
allInvitations = rootInterface.getInvitationsByMandate(str(context.mandateId))
|
|
|
|
currentTime = getUtcTimestamp()
|
|
result = []
|
|
|
|
for inv in allInvitations:
|
|
# Skip revoked invitations
|
|
if inv.revokedAt:
|
|
continue
|
|
|
|
# Filter by usage
|
|
currentUses = inv.currentUses or 0
|
|
maxUses = inv.maxUses or 1
|
|
if not includeUsed and currentUses >= maxUses:
|
|
continue
|
|
|
|
# Filter by expiration
|
|
expiresAt = inv.expiresAt or 0
|
|
if not includeExpired and expiresAt < currentTime:
|
|
continue
|
|
|
|
# Build invite URL using frontend URL provided by the caller
|
|
baseUrl = frontendUrl.rstrip("/")
|
|
inviteUrl = f"{baseUrl}/invite/{inv.token}"
|
|
|
|
result.append({
|
|
**inv.model_dump(),
|
|
"inviteUrl": inviteUrl,
|
|
"isExpired": expiresAt < currentTime,
|
|
"isUsedUp": currentUses >= maxUses
|
|
})
|
|
|
|
return result
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error listing invitations: {e}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Failed to list invitations: {str(e)}"
|
|
)
|
|
|
|
|
|
@router.delete("/{invitationId}", response_model=Dict[str, str])
|
|
@limiter.limit("30/minute")
|
|
def revoke_invitation(
|
|
request: Request,
|
|
invitationId: str,
|
|
context: RequestContext = Depends(getRequestContext)
|
|
) -> Dict[str, str]:
|
|
"""
|
|
Revoke an invitation.
|
|
|
|
Requires Mandate-Admin role. Revoked invitations cannot be used.
|
|
|
|
Args:
|
|
invitationId: Invitation ID
|
|
"""
|
|
if not context.mandateId:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="X-Mandate-Id header is required"
|
|
)
|
|
|
|
# Check mandate admin permission
|
|
if not _hasMandateAdminRole(context):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Mandate-Admin role required to revoke invitations"
|
|
)
|
|
|
|
try:
|
|
rootInterface = getRootInterface()
|
|
|
|
# Get invitation (Pydantic model)
|
|
invitation = rootInterface.getInvitation(invitationId)
|
|
|
|
if not invitation:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Invitation '{invitationId}' not found"
|
|
)
|
|
|
|
# Verify mandate access
|
|
if str(invitation.mandateId) != str(context.mandateId):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Access denied to this invitation"
|
|
)
|
|
|
|
# Already revoked?
|
|
if invitation.revokedAt:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Invitation is already revoked"
|
|
)
|
|
|
|
# Revoke invitation
|
|
rootInterface.db.recordModify(
|
|
Invitation,
|
|
invitationId,
|
|
{"revokedAt": getUtcTimestamp()}
|
|
)
|
|
|
|
logger.info(f"User {context.user.id} revoked invitation {invitationId}")
|
|
|
|
return {"message": "Invitation revoked", "invitationId": invitationId}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error revoking invitation {invitationId}: {e}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Failed to revoke invitation: {str(e)}"
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# Public Invitation Endpoints (No auth required for validation)
|
|
# =============================================================================
|
|
|
|
@router.get("/validate/{token}", response_model=InvitationValidation)
|
|
@limiter.limit("30/minute")
|
|
def validate_invitation(
|
|
request: Request,
|
|
token: str
|
|
) -> InvitationValidation:
|
|
"""
|
|
Validate an invitation token (public endpoint).
|
|
|
|
Used by the frontend to check if an invitation is valid before
|
|
showing the registration/acceptance form.
|
|
|
|
Args:
|
|
token: Invitation token
|
|
"""
|
|
try:
|
|
rootInterface = getRootInterface()
|
|
|
|
# Find invitation by token (Pydantic model)
|
|
invitation = rootInterface.getInvitationByToken(token)
|
|
|
|
if not invitation:
|
|
return InvitationValidation(
|
|
valid=False,
|
|
reason="Invitation not found",
|
|
mandateId=None,
|
|
featureInstanceId=None,
|
|
roleIds=[]
|
|
)
|
|
|
|
# Check if revoked
|
|
if invitation.revokedAt:
|
|
return InvitationValidation(
|
|
valid=False,
|
|
reason="Invitation has been revoked",
|
|
mandateId=None,
|
|
featureInstanceId=None,
|
|
roleIds=[]
|
|
)
|
|
|
|
# Check if expired
|
|
currentTime = getUtcTimestamp()
|
|
expiresAt = invitation.expiresAt or 0
|
|
if expiresAt < currentTime:
|
|
return InvitationValidation(
|
|
valid=False,
|
|
reason="Invitation has expired",
|
|
mandateId=None,
|
|
featureInstanceId=None,
|
|
roleIds=[]
|
|
)
|
|
|
|
# Check if used up
|
|
currentUses = invitation.currentUses or 0
|
|
maxUses = invitation.maxUses or 1
|
|
if currentUses >= maxUses:
|
|
return InvitationValidation(
|
|
valid=False,
|
|
reason="Invitation has reached maximum uses",
|
|
mandateId=None,
|
|
featureInstanceId=None,
|
|
roleIds=[]
|
|
)
|
|
|
|
# Get additional info for display
|
|
mandateId = invitation.mandateId
|
|
mandateName = None
|
|
roleLabels = []
|
|
targetUsername = invitation.targetUsername
|
|
|
|
# Get mandate name
|
|
mandate = rootInterface.getMandate(str(mandateId)) if mandateId else None
|
|
if mandate:
|
|
mandateName = mandate.label or mandate.name
|
|
|
|
# Get role names
|
|
roleIds = invitation.roleIds or []
|
|
for roleId in roleIds:
|
|
role = rootInterface.getRole(roleId)
|
|
if role:
|
|
roleLabels.append(role.roleLabel)
|
|
|
|
return InvitationValidation(
|
|
valid=True,
|
|
reason=None,
|
|
mandateId=str(mandateId) if mandateId else None,
|
|
mandateName=mandateName,
|
|
featureInstanceId=str(invitation.featureInstanceId) if invitation.featureInstanceId else None,
|
|
roleIds=roleIds,
|
|
roleLabels=roleLabels,
|
|
targetUsername=targetUsername
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error validating invitation token: {e}")
|
|
return InvitationValidation(
|
|
valid=False,
|
|
reason="Validation error",
|
|
mandateId=None,
|
|
featureInstanceId=None,
|
|
roleIds=[]
|
|
)
|
|
|
|
|
|
@router.post("/accept/{token}", response_model=Dict[str, Any])
|
|
@limiter.limit("10/minute")
|
|
def accept_invitation(
|
|
request: Request,
|
|
token: str,
|
|
currentUser: User = Depends(getCurrentUser)
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Accept an invitation (requires authentication).
|
|
|
|
The authenticated user joins the mandate with the predefined roles.
|
|
If the user is already a member, their roles are updated.
|
|
|
|
Args:
|
|
token: Invitation token
|
|
"""
|
|
try:
|
|
rootInterface = getRootInterface()
|
|
|
|
# Find invitation by token (Pydantic model)
|
|
invitation = rootInterface.getInvitationByToken(token)
|
|
|
|
if not invitation:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="Invitation not found"
|
|
)
|
|
|
|
# Validate invitation
|
|
if invitation.revokedAt:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Invitation has been revoked"
|
|
)
|
|
|
|
currentTime = getUtcTimestamp()
|
|
expiresAt = invitation.expiresAt or 0
|
|
if expiresAt < currentTime:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Invitation has expired"
|
|
)
|
|
|
|
currentUses = invitation.currentUses or 0
|
|
maxUses = invitation.maxUses or 1
|
|
if currentUses >= maxUses:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Invitation has reached maximum uses"
|
|
)
|
|
|
|
# Validate username matches - the invitation is bound to a specific user
|
|
targetUsername = invitation.targetUsername
|
|
if targetUsername and currentUser.username != targetUsername:
|
|
logger.warning(
|
|
f"User {currentUser.username} tried to accept invitation meant for {targetUsername}"
|
|
)
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail=f"Diese Einladung ist für Benutzer '{targetUsername}' bestimmt"
|
|
)
|
|
|
|
mandateId = str(invitation.mandateId) if invitation.mandateId else None
|
|
roleIds = invitation.roleIds or []
|
|
featureInstanceId = str(invitation.featureInstanceId) if invitation.featureInstanceId else None
|
|
|
|
# Check if user is already a member
|
|
existingMembership = rootInterface.getUserMandate(str(currentUser.id), mandateId)
|
|
|
|
if existingMembership:
|
|
# Update existing membership with additional roles
|
|
for roleId in roleIds:
|
|
try:
|
|
rootInterface.addRoleToUserMandate(str(existingMembership.id), roleId)
|
|
except Exception:
|
|
pass # Role might already be assigned
|
|
|
|
userMandateId = str(existingMembership.id)
|
|
message = "Roles updated for existing membership"
|
|
else:
|
|
# Create new membership
|
|
userMandate = rootInterface.createUserMandate(
|
|
userId=str(currentUser.id),
|
|
mandateId=mandateId,
|
|
roleIds=roleIds
|
|
)
|
|
userMandateId = str(userMandate.id)
|
|
message = "Successfully joined mandate"
|
|
|
|
# Grant feature access if specified
|
|
featureAccessId = None
|
|
if featureInstanceId:
|
|
existingAccess = rootInterface.getFeatureAccess(str(currentUser.id), featureInstanceId)
|
|
if not existingAccess:
|
|
# Create feature access with instance-level roles if any
|
|
instanceRoleIds = [r for r in roleIds if _isInstanceRole(rootInterface, r, featureInstanceId)]
|
|
featureAccess = rootInterface.createFeatureAccess(
|
|
userId=str(currentUser.id),
|
|
featureInstanceId=featureInstanceId,
|
|
roleIds=instanceRoleIds
|
|
)
|
|
featureAccessId = str(featureAccess.id)
|
|
|
|
# Update invitation usage
|
|
rootInterface.db.recordModify(
|
|
Invitation,
|
|
invitation.id,
|
|
{
|
|
"currentUses": (invitation.currentUses or 0) + 1,
|
|
"usedBy": str(currentUser.id),
|
|
"usedAt": currentTime
|
|
}
|
|
)
|
|
|
|
logger.info(
|
|
f"User {currentUser.id} accepted invitation {invitation.id} "
|
|
f"for mandate {mandateId}"
|
|
)
|
|
|
|
return {
|
|
"message": message,
|
|
"mandateId": mandateId,
|
|
"userMandateId": userMandateId,
|
|
"featureAccessId": featureAccessId,
|
|
"roleIds": roleIds
|
|
}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error accepting invitation: {e}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Failed to accept invitation: {str(e)}"
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# Helper Functions
|
|
# =============================================================================
|
|
|
|
def _hasMandateAdminRole(context: RequestContext) -> bool:
|
|
"""
|
|
Check if the user has mandate admin role in the current context.
|
|
"""
|
|
if context.isSysAdmin:
|
|
return True
|
|
|
|
if not context.roleIds:
|
|
return False
|
|
|
|
try:
|
|
rootInterface = getRootInterface()
|
|
|
|
for roleId in context.roleIds:
|
|
role = rootInterface.getRole(roleId)
|
|
if role:
|
|
# Admin role at mandate level (not feature-instance level)
|
|
if role.roleLabel == "admin" and role.mandateId and not role.featureInstanceId:
|
|
return True
|
|
|
|
return False
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error checking mandate admin role: {e}")
|
|
return False # Fail-safe: no access on error
|
|
|
|
|
|
def _isInstanceRole(interface, roleId: str, featureInstanceId: str) -> bool:
|
|
"""
|
|
Check if a role belongs to a specific feature instance.
|
|
"""
|
|
try:
|
|
role = interface.getRole(roleId)
|
|
if role:
|
|
return str(role.featureInstanceId or "") == str(featureInstanceId)
|
|
return False
|
|
except Exception:
|
|
return False # Fail-safe: assume not instance role on error
|