# Copyright (c) 2025 Patrick Motsch # All rights reserved. """ Invitation routes for the backend API. Implements token-based user invitations for self-service onboarding. Multi-Tenant Design: - Invitations are mandate-scoped (Mandate Admin creates them) - Tokens are secure, time-limited, and optionally use-limited - Users accept invitations to join mandates/features with predefined roles """ from fastapi import APIRouter, HTTPException, Depends, Request, Query from typing import List, Dict, Any, Optional from fastapi import status import logging from pydantic import BaseModel, Field from modules.auth import limiter, getRequestContext, RequestContext, getCurrentUser from modules.datamodels.datamodelUam import User from modules.datamodels.datamodelInvitation import Invitation from modules.interfaces.interfaceDbApp import getRootInterface from modules.shared.timeUtils import getUtcTimestamp logger = logging.getLogger(__name__) router = APIRouter( prefix="/api/invitations", tags=["Invitations"], responses={404: {"description": "Not found"}} ) # ============================================================================= # Request/Response Models # ============================================================================= class InvitationCreate(BaseModel): """Request model for creating an invitation. Invitations are feature-instance-level: the user selects a feature instance and instance-level roles. The mandateId is derived from the feature instance automatically. """ targetUsername: str = Field(..., description="Username of the user to invite (must match on acceptance)") email: Optional[str] = Field(None, description="Email address to send invitation link (optional)") featureInstanceId: str = Field(..., description="Feature instance to grant access to") roleIds: List[str] = Field(..., description="Instance-level role IDs to assign to the invited user") frontendUrl: str = Field(..., description="Frontend URL for building the invite link (provided by frontend)") expiresInHours: int = Field( 72, ge=1, le=720, # Max 30 days description="Hours until invitation expires" ) maxUses: int = Field( 1, ge=1, le=100, description="Maximum number of times this invitation can be used" ) class InvitationResponse(BaseModel): """Response model for invitation""" id: str token: str mandateId: str featureInstanceId: Optional[str] roleIds: List[str] targetUsername: str email: Optional[str] createdBy: str createdAt: float expiresAt: float usedBy: Optional[str] usedAt: Optional[float] revokedAt: Optional[float] maxUses: int currentUses: int inviteUrl: str # Full URL for the invitation emailSent: bool = False # Whether invitation email was sent class InvitationValidation(BaseModel): """Response model for invitation validation""" valid: bool reason: Optional[str] mandateId: Optional[str] mandateName: Optional[str] = None featureInstanceId: Optional[str] roleIds: List[str] roleLabels: List[str] = [] targetUsername: Optional[str] = None email: Optional[str] = None # ============================================================================= # Invitation CRUD Endpoints # ============================================================================= @router.post("/", response_model=InvitationResponse) @limiter.limit("30/minute") def create_invitation( request: Request, data: InvitationCreate, context: RequestContext = Depends(getRequestContext) ) -> InvitationResponse: """ Create a new invitation for a feature instance. Requires SysAdmin or Mandate-Admin role. Creates a secure token that can be shared with users to join a feature instance with predefined roles. The mandateId is derived from the feature instance automatically. Args: data: Invitation creation data (featureInstanceId + roleIds required) """ try: rootInterface = getRootInterface() # Validate feature instance exists and get mandateId from it instance = rootInterface.getFeatureInstance(data.featureInstanceId) if not instance: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Feature instance '{data.featureInstanceId}' not found" ) mandateId = str(instance.mandateId) # Check admin permission: SysAdmin can invite for any mandate, # MandateAdmin can invite for their own mandate if not context.hasSysAdminRole: if str(context.mandateId) != mandateId: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Feature instance belongs to a different mandate" ) if not _hasMandateAdminRole(context): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Mandate-Admin role required to create invitations" ) # Note: targetUsername does NOT need to exist yet! # The invitation can be for a user who will register later. # Validate role IDs exist and belong to this feature instance for roleId in data.roleIds: role = rootInterface.getRole(roleId) if not role: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Role '{roleId}' not found" ) # Role must belong to this feature instance if str(role.featureInstanceId or "") != data.featureInstanceId: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Role '{roleId}' does not belong to feature instance '{data.featureInstanceId}'" ) # Calculate expiration time currentTime = getUtcTimestamp() expiresAt = currentTime + (data.expiresInHours * 3600) # Create invitation (mandateId derived from feature instance) invitation = Invitation( mandateId=mandateId, featureInstanceId=data.featureInstanceId, roleIds=data.roleIds, targetUsername=data.targetUsername, email=data.email, createdBy=str(context.user.id), expiresAt=expiresAt, maxUses=data.maxUses ) createdRecord = rootInterface.db.recordCreate(Invitation, invitation.model_dump()) if not createdRecord: raise ValueError("Failed to create invitation record") # Build invite URL using frontend URL provided by the caller baseUrl = data.frontendUrl.rstrip("/") inviteUrl = f"{baseUrl}/invite/{invitation.token}" # Send email if email address is provided emailSent = False if data.email: try: from modules.connectors.connectorMessagingEmail import ConnectorMessagingEmail # Get mandate name for the email mandate = rootInterface.getMandate(str(context.mandateId)) mandateName = (mandate.label or mandate.name) if mandate else "PowerOn" emailConnector = ConnectorMessagingEmail() emailSubject = f"Einladung zu {mandateName}" emailBody = f"""
Hallo {data.targetUsername},
Sie wurden eingeladen, dem Mandanten {mandateName} beizutreten.
Klicken Sie auf den folgenden Link, um die Einladung anzunehmen:
Oder kopieren Sie diesen Link in Ihren Browser:
{inviteUrl}
Diese Einladung ist {data.expiresInHours} Stunden gültig.
Diese E-Mail wurde automatisch von PowerOn gesendet.
""" emailConnector.send( recipient=data.email, subject=emailSubject, message=emailBody ) emailSent = True logger.info(f"Invitation email sent to {data.email} for user {data.targetUsername}") except Exception as emailError: logger.warning(f"Failed to send invitation email to {data.email}: {emailError}") # Don't fail the invitation creation if email fails # Update the invitation record with emailSent status if emailSent: rootInterface.db.recordModify( Invitation, createdRecord.get("id"), {"emailSent": True} ) createdRecord["emailSent"] = True # If the target user already exists, create an in-app notification try: existingUser = rootInterface.getUserByUsername(data.targetUsername) if existingUser: from modules.routes.routeNotifications import createInvitationNotification # Get mandate name for notification mandate = rootInterface.getMandate(str(context.mandateId)) mandateName = (mandate.label or mandate.name) if mandate else "PowerOn" inviterName = context.user.fullName or context.user.username createInvitationNotification( userId=str(existingUser.id), invitationId=str(createdRecord.get("id")), mandateName=mandateName, inviterName=inviterName ) logger.info(f"Created notification for existing user {data.targetUsername}") except Exception as notifError: logger.warning(f"Failed to create notification for user {data.targetUsername}: {notifError}") # Don't fail the invitation if notification fails logger.info( f"User {context.user.id} created invitation for user {data.targetUsername} " f"to mandate {context.mandateId}, expires in {data.expiresInHours}h" ) return InvitationResponse( id=str(createdRecord.get("id")), token=str(createdRecord.get("token")), mandateId=str(createdRecord.get("mandateId")), featureInstanceId=createdRecord.get("featureInstanceId"), roleIds=createdRecord.get("roleIds", []), targetUsername=createdRecord.get("targetUsername"), email=createdRecord.get("email"), createdBy=str(createdRecord.get("createdBy")), createdAt=createdRecord.get("createdAt"), expiresAt=createdRecord.get("expiresAt"), usedBy=createdRecord.get("usedBy"), usedAt=createdRecord.get("usedAt"), revokedAt=createdRecord.get("revokedAt"), maxUses=createdRecord.get("maxUses", 1), currentUses=createdRecord.get("currentUses", 0), inviteUrl=inviteUrl, emailSent=emailSent ) except HTTPException: raise except Exception as e: logger.error(f"Error creating invitation: {e}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to create invitation: {str(e)}" ) @router.get("/", response_model=List[Dict[str, Any]]) @limiter.limit("60/minute") def list_invitations( request: Request, frontendUrl: str = Query(..., description="Frontend URL for building invite links (provided by frontend)"), includeUsed: bool = Query(False, description="Include already used invitations"), includeExpired: bool = Query(False, description="Include expired invitations"), context: RequestContext = Depends(getRequestContext) ) -> List[Dict[str, Any]]: """ List invitations for the current mandate. Requires Mandate-Admin role. Returns all invitations created for this mandate. Args: includeUsed: Include invitations that have reached maxUses includeExpired: Include expired invitations """ if not context.mandateId: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="X-Mandate-Id header is required" ) # Check mandate admin permission if not _hasMandateAdminRole(context): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Mandate-Admin role required to list invitations" ) try: rootInterface = getRootInterface() # Get all invitations for this mandate (Pydantic models) allInvitations = rootInterface.getInvitationsByMandate(str(context.mandateId)) currentTime = getUtcTimestamp() result = [] for inv in allInvitations: # Skip revoked invitations if inv.revokedAt: continue # Filter by usage currentUses = inv.currentUses or 0 maxUses = inv.maxUses or 1 if not includeUsed and currentUses >= maxUses: continue # Filter by expiration expiresAt = inv.expiresAt or 0 if not includeExpired and expiresAt < currentTime: continue # Build invite URL using frontend URL provided by the caller baseUrl = frontendUrl.rstrip("/") inviteUrl = f"{baseUrl}/invite/{inv.token}" result.append({ **inv.model_dump(), "inviteUrl": inviteUrl, "isExpired": expiresAt < currentTime, "isUsedUp": currentUses >= maxUses }) return result except HTTPException: raise except Exception as e: logger.error(f"Error listing invitations: {e}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to list invitations: {str(e)}" ) @router.delete("/{invitationId}", response_model=Dict[str, str]) @limiter.limit("30/minute") def revoke_invitation( request: Request, invitationId: str, context: RequestContext = Depends(getRequestContext) ) -> Dict[str, str]: """ Revoke an invitation. Requires Mandate-Admin role. Revoked invitations cannot be used. Args: invitationId: Invitation ID """ if not context.mandateId: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="X-Mandate-Id header is required" ) # Check mandate admin permission if not _hasMandateAdminRole(context): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Mandate-Admin role required to revoke invitations" ) try: rootInterface = getRootInterface() # Get invitation (Pydantic model) invitation = rootInterface.getInvitation(invitationId) if not invitation: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Invitation '{invitationId}' not found" ) # Verify mandate access if str(invitation.mandateId) != str(context.mandateId): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Access denied to this invitation" ) # Already revoked? if invitation.revokedAt: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Invitation is already revoked" ) # Revoke invitation rootInterface.db.recordModify( Invitation, invitationId, {"revokedAt": getUtcTimestamp()} ) logger.info(f"User {context.user.id} revoked invitation {invitationId}") return {"message": "Invitation revoked", "invitationId": invitationId} except HTTPException: raise except Exception as e: logger.error(f"Error revoking invitation {invitationId}: {e}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to revoke invitation: {str(e)}" ) # ============================================================================= # Public Invitation Endpoints (No auth required for validation) # ============================================================================= @router.get("/validate/{token}", response_model=InvitationValidation) @limiter.limit("30/minute") def validate_invitation( request: Request, token: str ) -> InvitationValidation: """ Validate an invitation token (public endpoint). Used by the frontend to check if an invitation is valid before showing the registration/acceptance form. Args: token: Invitation token """ try: rootInterface = getRootInterface() # Find invitation by token (Pydantic model) invitation = rootInterface.getInvitationByToken(token) if not invitation: return InvitationValidation( valid=False, reason="Invitation not found", mandateId=None, featureInstanceId=None, roleIds=[] ) # Check if revoked if invitation.revokedAt: return InvitationValidation( valid=False, reason="Invitation has been revoked", mandateId=None, featureInstanceId=None, roleIds=[] ) # Check if expired currentTime = getUtcTimestamp() expiresAt = invitation.expiresAt or 0 if expiresAt < currentTime: return InvitationValidation( valid=False, reason="Invitation has expired", mandateId=None, featureInstanceId=None, roleIds=[] ) # Check if used up currentUses = invitation.currentUses or 0 maxUses = invitation.maxUses or 1 if currentUses >= maxUses: return InvitationValidation( valid=False, reason="Invitation has reached maximum uses", mandateId=None, featureInstanceId=None, roleIds=[] ) # Get additional info for display mandateId = invitation.mandateId mandateName = None roleLabels = [] targetUsername = invitation.targetUsername # Get mandate name mandate = rootInterface.getMandate(str(mandateId)) if mandateId else None if mandate: mandateName = mandate.label or mandate.name # Get role names roleIds = invitation.roleIds or [] for roleId in roleIds: role = rootInterface.getRole(roleId) if role: roleLabels.append(role.roleLabel) return InvitationValidation( valid=True, reason=None, mandateId=str(mandateId) if mandateId else None, mandateName=mandateName, featureInstanceId=str(invitation.featureInstanceId) if invitation.featureInstanceId else None, roleIds=roleIds, roleLabels=roleLabels, targetUsername=targetUsername, email=invitation.email ) except Exception as e: logger.error(f"Error validating invitation token: {e}") return InvitationValidation( valid=False, reason="Validation error", mandateId=None, featureInstanceId=None, roleIds=[] ) @router.post("/accept/{token}", response_model=Dict[str, Any]) @limiter.limit("10/minute") def accept_invitation( request: Request, token: str, currentUser: User = Depends(getCurrentUser) ) -> Dict[str, Any]: """ Accept an invitation (requires authentication). The authenticated user joins the mandate with the predefined roles. If the user is already a member, their roles are updated. Args: token: Invitation token """ try: rootInterface = getRootInterface() # Find invitation by token (Pydantic model) invitation = rootInterface.getInvitationByToken(token) if not invitation: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Invitation not found" ) # Validate invitation if invitation.revokedAt: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Invitation has been revoked" ) currentTime = getUtcTimestamp() expiresAt = invitation.expiresAt or 0 if expiresAt < currentTime: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Invitation has expired" ) currentUses = invitation.currentUses or 0 maxUses = invitation.maxUses or 1 if currentUses >= maxUses: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Invitation has reached maximum uses" ) # Validate username matches - the invitation is bound to a specific user targetUsername = invitation.targetUsername if targetUsername and currentUser.username != targetUsername: logger.warning( f"User {currentUser.username} tried to accept invitation meant for {targetUsername}" ) raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=f"Diese Einladung ist für Benutzer '{targetUsername}' bestimmt" ) mandateId = str(invitation.mandateId) if invitation.mandateId else None roleIds = invitation.roleIds or [] featureInstanceId = str(invitation.featureInstanceId) if invitation.featureInstanceId else None # Grant feature access (creates FeatureAccess + auto-assigns mandate 'user' role via Regel 4) featureAccessId = None if featureInstanceId: existingAccess = rootInterface.getFeatureAccess(str(currentUser.id), featureInstanceId) if existingAccess: # Update existing access with additional roles featureAccessId = str(existingAccess.id) for roleId in roleIds: try: rootInterface.addRoleToFeatureAccess(str(existingAccess.id), roleId) except Exception: pass # Role might already be assigned message = "Roles updated for existing feature access" else: # Create feature access with instance-level roles # This auto-creates UserMandate with 'user' role (Regel 4) featureAccess = rootInterface.createFeatureAccess( userId=str(currentUser.id), featureInstanceId=featureInstanceId, roleIds=roleIds ) featureAccessId = str(featureAccess.id) message = "Successfully joined feature instance" else: # Legacy: mandate-only invitation (no feature instance) existingMembership = rootInterface.getUserMandate(str(currentUser.id), mandateId) if existingMembership: for roleId in roleIds: try: rootInterface.addRoleToUserMandate(str(existingMembership.id), roleId) except Exception: pass message = "Roles updated for existing membership" else: rootInterface.createUserMandate( userId=str(currentUser.id), mandateId=mandateId, roleIds=roleIds ) message = "Successfully joined mandate" # Get userMandateId for response userMandate = rootInterface.getUserMandate(str(currentUser.id), mandateId) userMandateId = str(userMandate.id) if userMandate else None # Update invitation usage rootInterface.db.recordModify( Invitation, invitation.id, { "currentUses": (invitation.currentUses or 0) + 1, "usedBy": str(currentUser.id), "usedAt": currentTime } ) logger.info( f"User {currentUser.id} accepted invitation {invitation.id} " f"for mandate {mandateId}" ) return { "message": message, "mandateId": mandateId, "userMandateId": userMandateId, "featureAccessId": featureAccessId, "roleIds": roleIds } except HTTPException: raise except Exception as e: logger.error(f"Error accepting invitation: {e}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to accept invitation: {str(e)}" ) # ============================================================================= # Helper Functions # ============================================================================= def _hasMandateAdminRole(context: RequestContext) -> bool: """ Check if the user has mandate admin role in the current context. """ if context.hasSysAdminRole: return True if not context.roleIds: return False try: rootInterface = getRootInterface() for roleId in context.roleIds: role = rootInterface.getRole(roleId) if role: # Admin role at mandate level (not feature-instance level) if role.roleLabel == "admin" and not role.featureInstanceId: return True return False except Exception as e: logger.error(f"Error checking mandate admin role: {e}") return False # Fail-safe: no access on error def _isInstanceRole(interface, roleId: str, featureInstanceId: str) -> bool: """ Check if a role belongs to a specific feature instance. """ try: role = interface.getRole(roleId) if role: return str(role.featureInstanceId or "") == str(featureInstanceId) return False except Exception: return False # Fail-safe: assume not instance role on error