# Copyright (c) 2025 Patrick Motsch # All rights reserved. """ Invitation routes for the backend API. Implements token-based user invitations for self-service onboarding. Multi-Tenant Design: - Invitations are mandate-scoped (Mandate Admin creates them) - Tokens are secure, time-limited, and optionally use-limited - Users accept invitations to join mandates/features with predefined roles """ from fastapi import APIRouter, HTTPException, Depends, Request, Query from typing import List, Dict, Any, Optional from fastapi import status import logging from pydantic import BaseModel, Field from modules.auth import limiter, getRequestContext, RequestContext, getCurrentUser from modules.datamodels.datamodelUam import User from modules.datamodels.datamodelInvitation import Invitation from modules.interfaces.interfaceDbApp import getRootInterface from modules.shared.timeUtils import getUtcTimestamp logger = logging.getLogger(__name__) router = APIRouter( prefix="/api/invitations", tags=["Invitations"], responses={404: {"description": "Not found"}} ) # ============================================================================= # Request/Response Models # ============================================================================= class InvitationCreate(BaseModel): """Request model for creating an invitation""" email: Optional[str] = Field(None, description="Target email address (optional)") roleIds: List[str] = Field(..., description="Role IDs to assign to the invited user") featureInstanceId: Optional[str] = Field(None, description="Optional feature instance access") expiresInHours: int = Field( 72, ge=1, le=720, # Max 30 days description="Hours until invitation expires" ) maxUses: int = Field( 1, ge=1, le=100, description="Maximum number of times this invitation can be used" ) class InvitationResponse(BaseModel): """Response model for invitation""" id: str token: str mandateId: str featureInstanceId: Optional[str] roleIds: List[str] email: Optional[str] createdBy: str createdAt: float expiresAt: float usedBy: Optional[str] usedAt: Optional[float] revokedAt: Optional[float] maxUses: int currentUses: int inviteUrl: str # Full URL for the invitation class InvitationValidation(BaseModel): """Response model for invitation validation""" valid: bool reason: Optional[str] mandateId: Optional[str] featureInstanceId: Optional[str] roleIds: List[str] # ============================================================================= # Invitation CRUD Endpoints # ============================================================================= @router.post("/", response_model=InvitationResponse) @limiter.limit("30/minute") async def create_invitation( request: Request, data: InvitationCreate, context: RequestContext = Depends(getRequestContext) ) -> InvitationResponse: """ Create a new invitation for the current mandate. Requires Mandate-Admin role. Creates a secure token that can be shared with users to join the mandate with predefined roles. Args: data: Invitation creation data """ if not context.mandateId: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="X-Mandate-Id header is required" ) # Check mandate admin permission if not _hasMandateAdminRole(context): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Mandate-Admin role required to create invitations" ) try: rootInterface = getRootInterface() # Validate role IDs exist and belong to this mandate or are global for roleId in data.roleIds: from modules.datamodels.datamodelRbac import Role roleRecords = rootInterface.db.getRecordset(Role, recordFilter={"id": roleId}) if not roleRecords: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Role '{roleId}' not found" ) role = roleRecords[0] # Role must be global or belong to this mandate roleMandateId = role.get("mandateId") if roleMandateId and str(roleMandateId) != str(context.mandateId): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Role '{roleId}' belongs to a different mandate" ) # Validate feature instance if provided if data.featureInstanceId: from modules.datamodels.datamodelFeatures import FeatureInstance instanceRecords = rootInterface.db.getRecordset( FeatureInstance, recordFilter={"id": data.featureInstanceId} ) if not instanceRecords: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Feature instance '{data.featureInstanceId}' not found" ) instance = instanceRecords[0] if str(instance.get("mandateId")) != str(context.mandateId): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Feature instance belongs to a different mandate" ) # Calculate expiration time currentTime = getUtcTimestamp() expiresAt = currentTime + (data.expiresInHours * 3600) # Create invitation invitation = Invitation( mandateId=str(context.mandateId), featureInstanceId=data.featureInstanceId, roleIds=data.roleIds, email=data.email, createdBy=str(context.user.id), expiresAt=expiresAt, maxUses=data.maxUses ) createdRecord = rootInterface.db.recordCreate(Invitation, invitation.model_dump()) if not createdRecord: raise ValueError("Failed to create invitation record") # Build invite URL from modules.shared.configuration import APP_CONFIG frontendUrl = APP_CONFIG.get("APP_FRONTEND_URL", "http://localhost:8080") inviteUrl = f"{frontendUrl}/invite/{invitation.token}" logger.info( f"User {context.user.id} created invitation for mandate {context.mandateId}, " f"expires in {data.expiresInHours}h" ) return InvitationResponse( id=str(createdRecord.get("id")), token=str(createdRecord.get("token")), mandateId=str(createdRecord.get("mandateId")), featureInstanceId=createdRecord.get("featureInstanceId"), roleIds=createdRecord.get("roleIds", []), email=createdRecord.get("email"), createdBy=str(createdRecord.get("createdBy")), createdAt=createdRecord.get("createdAt"), expiresAt=createdRecord.get("expiresAt"), usedBy=createdRecord.get("usedBy"), usedAt=createdRecord.get("usedAt"), revokedAt=createdRecord.get("revokedAt"), maxUses=createdRecord.get("maxUses", 1), currentUses=createdRecord.get("currentUses", 0), inviteUrl=inviteUrl ) except HTTPException: raise except Exception as e: logger.error(f"Error creating invitation: {e}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to create invitation: {str(e)}" ) @router.get("/", response_model=List[Dict[str, Any]]) @limiter.limit("60/minute") async def list_invitations( request: Request, includeUsed: bool = Query(False, description="Include already used invitations"), includeExpired: bool = Query(False, description="Include expired invitations"), context: RequestContext = Depends(getRequestContext) ) -> List[Dict[str, Any]]: """ List invitations for the current mandate. Requires Mandate-Admin role. Returns all invitations created for this mandate. Args: includeUsed: Include invitations that have reached maxUses includeExpired: Include expired invitations """ if not context.mandateId: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="X-Mandate-Id header is required" ) # Check mandate admin permission if not _hasMandateAdminRole(context): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Mandate-Admin role required to list invitations" ) try: rootInterface = getRootInterface() # Get all invitations for this mandate allInvitations = rootInterface.db.getRecordset( Invitation, recordFilter={"mandateId": str(context.mandateId)} ) currentTime = getUtcTimestamp() result = [] for inv in allInvitations: # Skip revoked invitations if inv.get("revokedAt"): continue # Filter by usage if not includeUsed and inv.get("currentUses", 0) >= inv.get("maxUses", 1): continue # Filter by expiration if not includeExpired and inv.get("expiresAt", 0) < currentTime: continue # Build invite URL from modules.shared.configuration import APP_CONFIG frontendUrl = APP_CONFIG.get("APP_FRONTEND_URL", "http://localhost:8080") inviteUrl = f"{frontendUrl}/invite/{inv.get('token')}" result.append({ **{k: v for k, v in inv.items() if not k.startswith("_")}, "inviteUrl": inviteUrl, "isExpired": inv.get("expiresAt", 0) < currentTime, "isUsedUp": inv.get("currentUses", 0) >= inv.get("maxUses", 1) }) return result except HTTPException: raise except Exception as e: logger.error(f"Error listing invitations: {e}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to list invitations: {str(e)}" ) @router.delete("/{invitationId}", response_model=Dict[str, str]) @limiter.limit("30/minute") async def revoke_invitation( request: Request, invitationId: str, context: RequestContext = Depends(getRequestContext) ) -> Dict[str, str]: """ Revoke an invitation. Requires Mandate-Admin role. Revoked invitations cannot be used. Args: invitationId: Invitation ID """ if not context.mandateId: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="X-Mandate-Id header is required" ) # Check mandate admin permission if not _hasMandateAdminRole(context): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Mandate-Admin role required to revoke invitations" ) try: rootInterface = getRootInterface() # Get invitation invitationRecords = rootInterface.db.getRecordset( Invitation, recordFilter={"id": invitationId} ) if not invitationRecords: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Invitation '{invitationId}' not found" ) invitation = invitationRecords[0] # Verify mandate access if str(invitation.get("mandateId")) != str(context.mandateId): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Access denied to this invitation" ) # Already revoked? if invitation.get("revokedAt"): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Invitation is already revoked" ) # Revoke invitation rootInterface.db.recordModify( Invitation, invitationId, {"revokedAt": getUtcTimestamp()} ) logger.info(f"User {context.user.id} revoked invitation {invitationId}") return {"message": "Invitation revoked", "invitationId": invitationId} except HTTPException: raise except Exception as e: logger.error(f"Error revoking invitation {invitationId}: {e}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to revoke invitation: {str(e)}" ) # ============================================================================= # Public Invitation Endpoints (No auth required for validation) # ============================================================================= @router.get("/validate/{token}", response_model=InvitationValidation) @limiter.limit("30/minute") async def validate_invitation( request: Request, token: str ) -> InvitationValidation: """ Validate an invitation token (public endpoint). Used by the frontend to check if an invitation is valid before showing the registration/acceptance form. Args: token: Invitation token """ try: rootInterface = getRootInterface() # Find invitation by token invitationRecords = rootInterface.db.getRecordset( Invitation, recordFilter={"token": token} ) if not invitationRecords: return InvitationValidation( valid=False, reason="Invitation not found", mandateId=None, featureInstanceId=None, roleIds=[] ) invitation = invitationRecords[0] # Check if revoked if invitation.get("revokedAt"): return InvitationValidation( valid=False, reason="Invitation has been revoked", mandateId=None, featureInstanceId=None, roleIds=[] ) # Check if expired currentTime = getUtcTimestamp() if invitation.get("expiresAt", 0) < currentTime: return InvitationValidation( valid=False, reason="Invitation has expired", mandateId=None, featureInstanceId=None, roleIds=[] ) # Check if used up if invitation.get("currentUses", 0) >= invitation.get("maxUses", 1): return InvitationValidation( valid=False, reason="Invitation has reached maximum uses", mandateId=None, featureInstanceId=None, roleIds=[] ) return InvitationValidation( valid=True, reason=None, mandateId=invitation.get("mandateId"), featureInstanceId=invitation.get("featureInstanceId"), roleIds=invitation.get("roleIds", []) ) except Exception as e: logger.error(f"Error validating invitation token: {e}") return InvitationValidation( valid=False, reason="Validation error", mandateId=None, featureInstanceId=None, roleIds=[] ) @router.post("/accept/{token}", response_model=Dict[str, Any]) @limiter.limit("10/minute") async def accept_invitation( request: Request, token: str, currentUser: User = Depends(getCurrentUser) ) -> Dict[str, Any]: """ Accept an invitation (requires authentication). The authenticated user joins the mandate with the predefined roles. If the user is already a member, their roles are updated. Args: token: Invitation token """ try: rootInterface = getRootInterface() # Find invitation by token invitationRecords = rootInterface.db.getRecordset( Invitation, recordFilter={"token": token} ) if not invitationRecords: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Invitation not found" ) invitation = invitationRecords[0] # Validate invitation if invitation.get("revokedAt"): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Invitation has been revoked" ) currentTime = getUtcTimestamp() if invitation.get("expiresAt", 0) < currentTime: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Invitation has expired" ) if invitation.get("currentUses", 0) >= invitation.get("maxUses", 1): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Invitation has reached maximum uses" ) mandateId = invitation.get("mandateId") roleIds = invitation.get("roleIds", []) featureInstanceId = invitation.get("featureInstanceId") # Check if user is already a member existingMembership = rootInterface.getUserMandate(str(currentUser.id), mandateId) if existingMembership: # Update existing membership with additional roles for roleId in roleIds: try: rootInterface.addRoleToUserMandate(str(existingMembership.id), roleId) except Exception: pass # Role might already be assigned userMandateId = str(existingMembership.id) message = "Roles updated for existing membership" else: # Create new membership userMandate = rootInterface.createUserMandate( userId=str(currentUser.id), mandateId=mandateId, roleIds=roleIds ) userMandateId = str(userMandate.id) message = "Successfully joined mandate" # Grant feature access if specified featureAccessId = None if featureInstanceId: existingAccess = rootInterface.getFeatureAccess(str(currentUser.id), featureInstanceId) if not existingAccess: # Create feature access with instance-level roles if any instanceRoleIds = [r for r in roleIds if _isInstanceRole(rootInterface, r, featureInstanceId)] featureAccess = rootInterface.createFeatureAccess( userId=str(currentUser.id), featureInstanceId=featureInstanceId, roleIds=instanceRoleIds ) featureAccessId = str(featureAccess.id) # Update invitation usage rootInterface.db.recordModify( Invitation, invitation.get("id"), { "currentUses": invitation.get("currentUses", 0) + 1, "usedBy": str(currentUser.id), "usedAt": currentTime } ) logger.info( f"User {currentUser.id} accepted invitation {invitation.get('id')} " f"for mandate {mandateId}" ) return { "message": message, "mandateId": mandateId, "userMandateId": userMandateId, "featureAccessId": featureAccessId, "roleIds": roleIds } except HTTPException: raise except Exception as e: logger.error(f"Error accepting invitation: {e}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to accept invitation: {str(e)}" ) # ============================================================================= # Helper Functions # ============================================================================= def _hasMandateAdminRole(context: RequestContext) -> bool: """ Check if the user has mandate admin role in the current context. """ if context.isSysAdmin: return True if not context.roleIds: return False try: rootInterface = getRootInterface() from modules.datamodels.datamodelRbac import Role for roleId in context.roleIds: roleRecords = rootInterface.db.getRecordset(Role, recordFilter={"id": roleId}) if roleRecords: role = roleRecords[0] roleLabel = role.get("roleLabel", "") # Admin role at mandate level if roleLabel == "admin" and role.get("mandateId") and not role.get("featureInstanceId"): return True return False except Exception as e: logger.error(f"Error checking mandate admin role: {e}") return False def _isInstanceRole(interface, roleId: str, featureInstanceId: str) -> bool: """ Check if a role belongs to a specific feature instance. """ try: from modules.datamodels.datamodelRbac import Role roleRecords = interface.db.getRecordset(Role, recordFilter={"id": roleId}) if roleRecords: role = roleRecords[0] return str(role.get("featureInstanceId", "")) == str(featureInstanceId) return False except Exception: return False