fix: Invitation Wizard Anpassungen fertig

This commit is contained in:
Ida Dittrich 2026-02-26 10:44:33 +01:00
parent cd2fffc651
commit 7163397fd3
2 changed files with 218 additions and 71 deletions

View file

@ -14,7 +14,7 @@ from fastapi import APIRouter, HTTPException, Depends, Request, Query
from typing import List, Dict, Any, Optional
from fastapi import status
import logging
from pydantic import BaseModel, Field
from pydantic import BaseModel, Field, model_validator
from modules.auth import limiter, getRequestContext, RequestContext, getCurrentUser
from modules.datamodels.datamodelUam import User
@ -40,9 +40,12 @@ class InvitationCreate(BaseModel):
Supports two modes:
- Mandate-level: featureInstanceId omitted, roleIds are mandate-level roles (user, viewer, admin)
- Feature-instance-level: featureInstanceId required, roleIds are instance-level roles
Email is required for new users; targetUsername is optional.
At least one of email or targetUsername must be provided.
"""
targetUsername: str = Field(..., description="Username of the user to invite (must match on acceptance)")
email: Optional[str] = Field(None, description="Email address to send invitation link (optional)")
targetUsername: Optional[str] = Field(None, description="Username of the user to invite (must match on acceptance)")
email: Optional[str] = Field(None, description="Email address to send invitation link (required for new users)")
featureInstanceId: Optional[str] = Field(None, description="Feature instance to grant access to (optional for mandate-level invitations)")
roleIds: List[str] = Field(..., description="Role IDs: mandate-level (user, viewer, admin) or instance-level")
frontendUrl: str = Field(..., description="Frontend URL for building the invite link (provided by frontend)")
@ -59,6 +62,14 @@ class InvitationCreate(BaseModel):
description="Maximum number of times this invitation can be used"
)
@model_validator(mode="after")
def require_email_or_username(self):
email_val = (self.email or "").strip()
username_val = (self.targetUsername or "").strip()
if not email_val and not username_val:
raise ValueError("At least one of email or targetUsername must be provided")
return self
class InvitationResponse(BaseModel):
"""Response model for invitation"""
@ -67,7 +78,7 @@ class InvitationResponse(BaseModel):
mandateId: str
featureInstanceId: Optional[str]
roleIds: List[str]
targetUsername: str
targetUsername: Optional[str]
email: Optional[str]
createdBy: str
createdAt: float
@ -88,6 +99,7 @@ class InvitationValidation(BaseModel):
mandateId: Optional[str]
mandateName: Optional[str] = None
featureInstanceId: Optional[str]
featureInstanceName: Optional[str] = None
roleIds: List[str]
roleLabels: List[str] = []
targetUsername: Optional[str] = None
@ -184,14 +196,73 @@ def create_invitation(
# Calculate expiration time
currentTime = getUtcTimestamp()
expiresAt = currentTime + (data.expiresInHours * 3600)
# Create invitation
# Create invitation (targetUsername optional when email provided)
target_username_val = (data.targetUsername or "").strip() or None
email_val = (data.email or "").strip() or None
# Silent duplicate check: only for existing users (identified by username)
# Username is unique; email can exist multiple times, so we do NOT look up by email
target_user_id = None
if target_username_val:
u = rootInterface.getUserByUsername(target_username_val)
if u:
target_user_id = str(u.id)
if target_user_id:
if data.featureInstanceId:
existing_access = rootInterface.getFeatureAccess(target_user_id, data.featureInstanceId)
if existing_access:
logger.info(f"Invitation skipped: user {target_user_id} already has access to feature instance {data.featureInstanceId}")
return InvitationResponse(
id="duplicate-skipped",
token="",
mandateId=mandateId,
featureInstanceId=data.featureInstanceId,
roleIds=data.roleIds,
targetUsername=target_username_val,
email=email_val,
createdBy=str(context.user.id),
createdAt=currentTime,
expiresAt=expiresAt,
usedBy=None,
usedAt=None,
revokedAt=None,
maxUses=data.maxUses,
currentUses=0,
inviteUrl="",
emailSent=False
)
else:
existing_membership = rootInterface.getUserMandate(target_user_id, mandateId)
if existing_membership:
logger.info(f"Invitation skipped: user {target_user_id} already member of mandate {mandateId}")
return InvitationResponse(
id="duplicate-skipped",
token="",
mandateId=mandateId,
featureInstanceId=None,
roleIds=data.roleIds,
targetUsername=target_username_val,
email=email_val,
createdBy=str(context.user.id),
createdAt=currentTime,
expiresAt=expiresAt,
usedBy=None,
usedAt=None,
revokedAt=None,
maxUses=data.maxUses,
currentUses=0,
inviteUrl="",
emailSent=False
)
invitation = Invitation(
mandateId=mandateId,
featureInstanceId=data.featureInstanceId or None,
roleIds=data.roleIds,
targetUsername=data.targetUsername,
email=data.email,
targetUsername=target_username_val,
email=email_val,
createdBy=str(context.user.id),
expiresAt=expiresAt,
maxUses=data.maxUses
@ -207,21 +278,30 @@ def create_invitation(
# Send email if email address is provided
emailSent = False
if data.email:
if email_val:
try:
from modules.connectors.connectorMessagingEmail import ConnectorMessagingEmail
# Get mandate name for the email
mandate = rootInterface.getMandate(str(context.mandateId))
# Get mandate and optionally instance for the email
mandate = rootInterface.getMandate(str(mandateId))
mandateName = (mandate.label or mandate.name) if mandate else "PowerOn"
# Only use username for existing users; new users set it when they register
display_name = target_username_val if target_username_val else "Neuer/e Nutzer/in"
instance = rootInterface.getFeatureInstance(data.featureInstanceId) if data.featureInstanceId else None
instance_label = (instance.label or instance.featureCode) if instance else None
emailConnector = ConnectorMessagingEmail()
emailSubject = f"Einladung zu {mandateName}"
if instance_label:
emailSubject = f"Einladung zur Feature-Instanz {instance_label}"
invite_text = f"der Feature-Instanz <strong>{instance_label}</strong> (Mandant: {mandateName}) beizutreten"
else:
emailSubject = f"Einladung zu {mandateName}"
invite_text = f"dem Mandanten <strong>{mandateName}</strong> beizutreten"
emailBody = f"""
<html>
<body style="font-family: Arial, sans-serif; line-height: 1.6; color: #333;">
<h2>Sie wurden eingeladen!</h2>
<p>Hallo <strong>{data.targetUsername}</strong>,</p>
<p>Sie wurden eingeladen, dem Mandanten <strong>{mandateName}</strong> beizutreten.</p>
<p>Hallo <strong>{display_name}</strong>,</p>
<p>Sie wurden eingeladen, {invite_text}.</p>
<p>Klicken Sie auf den folgenden Link, um die Einladung anzunehmen:</p>
<p style="margin: 20px 0;">
<a href="{inviteUrl}" style="background-color: #007bff; color: white; padding: 12px 24px; text-decoration: none; border-radius: 4px;">
@ -244,14 +324,14 @@ def create_invitation(
"""
emailConnector.send(
recipient=data.email,
recipient=email_val,
subject=emailSubject,
message=emailBody
)
emailSent = True
logger.info(f"Invitation email sent to {data.email} for user {data.targetUsername}")
logger.info(f"Invitation email sent to {email_val} for user {target_username_val or 'email-only'}")
except Exception as emailError:
logger.warning(f"Failed to send invitation email to {data.email}: {emailError}")
logger.warning(f"Failed to send invitation email to {email_val}: {emailError}")
# Don't fail the invitation creation if email fails
# Update the invitation record with emailSent status
@ -263,31 +343,37 @@ def create_invitation(
)
createdRecord["emailSent"] = True
# If the target user already exists, create an in-app notification
# If the target user already exists (identified by username), create an in-app notification
# Only look up by username - email is not used for "existing user" since new users are invited by email
try:
existingUser = rootInterface.getUserByUsername(data.targetUsername)
existingUser = None
if target_username_val:
existingUser = rootInterface.getUserByUsername(target_username_val)
if existingUser:
from modules.routes.routeNotifications import createInvitationNotification
# Get mandate name for notification
mandate = rootInterface.getMandate(str(context.mandateId))
mandate = rootInterface.getMandate(str(mandateId))
mandateName = (mandate.label or mandate.name) if mandate else "PowerOn"
inviterName = context.user.fullName or context.user.username
instance = rootInterface.getFeatureInstance(data.featureInstanceId) if data.featureInstanceId else None
instance_label = (instance.label or instance.featureCode) if instance else None
createInvitationNotification(
userId=str(existingUser.id),
invitationId=str(createdRecord.get("id")),
mandateName=mandateName,
inviterName=inviterName
inviterName=inviterName,
featureInstanceName=instance_label
)
logger.info(f"Created notification for existing user {data.targetUsername}")
logger.info(f"Created notification for existing user {target_username_val or email_val}")
except Exception as notifError:
logger.warning(f"Failed to create notification for user {data.targetUsername}: {notifError}")
logger.warning(f"Failed to create notification for user {target_username_val or email_val}: {notifError}")
# Don't fail the invitation if notification fails
target_desc = f"feature instance {data.featureInstanceId}" if data.featureInstanceId else f"mandate {mandateId}"
logger.info(
f"User {context.user.id} created invitation for user {data.targetUsername} "
f"to mandate {context.mandateId}, expires in {data.expiresInHours}h"
f"User {context.user.id} created invitation for user {target_username_val or email_val} "
f"to {target_desc}, expires in {data.expiresInHours}h"
)
return InvitationResponse(
@ -560,12 +646,20 @@ def validate_invitation(
if role:
roleLabels.append(role.roleLabel)
# Get feature instance label for display
featureInstanceName = None
if invitation.featureInstanceId:
instance = rootInterface.getFeatureInstance(str(invitation.featureInstanceId))
if instance:
featureInstanceName = instance.label or instance.featureCode
return InvitationValidation(
valid=True,
reason=None,
mandateId=str(mandateId) if mandateId else None,
mandateName=mandateName,
featureInstanceId=str(invitation.featureInstanceId) if invitation.featureInstanceId else None,
featureInstanceName=featureInstanceName,
roleIds=roleIds,
roleLabels=roleLabels,
targetUsername=targetUsername,
@ -634,15 +728,33 @@ def accept_invitation(
detail="Invitation has reached maximum uses"
)
# Validate username matches - the invitation is bound to a specific user
# Validate user matches - invitation is bound by username or email
targetUsername = invitation.targetUsername
if targetUsername and currentUser.username != targetUsername:
logger.warning(
f"User {currentUser.username} tried to accept invitation meant for {targetUsername}"
)
invitationEmail = (invitation.email or "").strip().lower() if invitation.email else None
currentUserEmail = (currentUser.email or "").strip().lower() if getattr(currentUser, "email", None) else None
if targetUsername:
if currentUser.username != targetUsername:
logger.warning(
f"User {currentUser.username} tried to accept invitation meant for {targetUsername}"
)
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Diese Einladung ist für Benutzer '{targetUsername}' bestimmt"
)
elif invitationEmail:
if not currentUserEmail or currentUserEmail != invitationEmail:
logger.warning(
f"User {currentUser.username} (email: {currentUserEmail}) tried to accept invitation meant for {invitationEmail}"
)
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Diese Einladung ist für die E-Mail-Adresse '{invitationEmail}' bestimmt"
)
else:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Diese Einladung ist für Benutzer '{targetUsername}' bestimmt"
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invitation has no target user or email"
)
mandateId = str(invitation.mandateId) if invitation.mandateId else None

View file

@ -93,17 +93,23 @@ def createInvitationNotification(
userId: str,
invitationId: str,
mandateName: str,
inviterName: str
inviterName: str,
featureInstanceName: Optional[str] = None
) -> UserNotification:
"""
Create a notification for a pending invitation.
Called when an invitation is created for an existing user.
If featureInstanceName is set, the message refers to the feature instance; otherwise to the mandate.
"""
if featureInstanceName:
msg = f"{inviterName} hat Sie zur Feature-Instanz '{featureInstanceName}' eingeladen."
else:
msg = f"{inviterName} hat Sie zu '{mandateName}' eingeladen."
return _createNotification(
userId=userId,
notificationType=NotificationType.INVITATION,
title="Neue Einladung",
message=f"{inviterName} hat Sie zu '{mandateName}' eingeladen.",
message=msg,
referenceType="Invitation",
referenceId=invitationId,
icon="mail",
@ -397,11 +403,26 @@ def _handleInvitationAction(
detail="Invitation not found"
)
# Verify username matches
if invitation.targetUsername != currentUser.username:
# Verify user matches (username or email)
targetUsername = (invitation.targetUsername or "").strip() or None
invitationEmail = (invitation.email or "").strip().lower() if invitation.email else None
currentUserEmail = (currentUser.email or "").strip().lower() if getattr(currentUser, "email", None) else None
if targetUsername:
if currentUser.username != targetUsername:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="This invitation is for a different user"
)
elif invitationEmail:
if not currentUserEmail or currentUserEmail != invitationEmail:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="This invitation is for a different user"
)
else:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="This invitation is for a different user"
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invitation has no target user or email"
)
# Check if invitation is still valid
@ -428,37 +449,51 @@ def _handleInvitationAction(
)
if actionId == "accept":
# Accept the invitation - assign roles and mandate access
mandateId = str(invitation.mandateId) if invitation.mandateId else None
roleIds = list(invitation.roleIds or [])
# Ensure user gets the system "user" role for access to public UI elements (e.g. playground)
userRole = rootInterface.getRoleByLabel("user")
if userRole:
userRoleId = str(userRole.id)
if userRoleId and userRoleId not in roleIds:
roleIds = roleIds + [userRoleId]
logger.debug(f"Added system 'user' role {userRoleId} to invitation roles")
# Get mandate name for result message
mandate = rootInterface.getMandate(mandateId) if mandateId else None
mandateName = (mandate.label or mandate.name) if mandate else mandateId
# Check if user already has this mandate
existingMembership = rootInterface.getUserMandate(str(currentUser.id), mandateId) if mandateId else None
if existingMembership:
# Update existing membership with new roles via interface
# Note: roleIds on UserMandate is deprecated - roles should be assigned via UserMandateRole
logger.info(f"User {currentUser.id} already has membership in mandate {mandateId}, adding roles via UserMandateRole")
# Add roles via junction table
for roleId in roleIds:
rootInterface.addRoleToUserMandate(str(existingMembership.id), roleId)
featureInstanceId = str(invitation.featureInstanceId) if invitation.featureInstanceId else None
if featureInstanceId:
# Feature-instance invitation: create FeatureAccess (or add roles to existing)
# Do NOT add system "user" role - instance roles only; createFeatureAccess auto-assigns mandate user via Regel 4
existingAccess = rootInterface.getFeatureAccess(str(currentUser.id), featureInstanceId)
if existingAccess:
for roleId in roleIds:
try:
rootInterface.addRoleToFeatureAccess(str(existingAccess.id), roleId)
except Exception:
pass # Role might already be assigned
logger.info(f"User {currentUser.id} already had feature access, added roles to instance {featureInstanceId}")
else:
rootInterface.createFeatureAccess(
userId=str(currentUser.id),
featureInstanceId=featureInstanceId,
roleIds=roleIds
)
logger.info(f"User {currentUser.id} granted feature access to instance {featureInstanceId}")
instance = rootInterface.getFeatureInstance(featureInstanceId)
displayName = (instance.label or instance.featureCode) if instance else featureInstanceId
resultMessage = f"Einladung angenommen. Sie haben jetzt Zugang zur Feature-Instanz '{displayName}'."
else:
# Create new user-mandate relationship via interface
rootInterface.createUserMandate(str(currentUser.id), mandateId, roleIds)
logger.info(f"Created UserMandate for user {currentUser.id} in mandate {mandateId}")
# Mandate-level invitation: assign roles and mandate access
userRole = rootInterface.getRoleByLabel("user")
if userRole:
userRoleId = str(userRole.id)
if userRoleId and userRoleId not in roleIds:
roleIds = roleIds + [userRoleId]
logger.debug(f"Added system 'user' role {userRoleId} to invitation roles")
mandate = rootInterface.getMandate(mandateId) if mandateId else None
mandateName = (mandate.label or mandate.name) if mandate else mandateId
existingMembership = rootInterface.getUserMandate(str(currentUser.id), mandateId) if mandateId else None
if existingMembership:
logger.info(f"User {currentUser.id} already has membership in mandate {mandateId}, adding roles via UserMandateRole")
for roleId in roleIds:
rootInterface.addRoleToUserMandate(str(existingMembership.id), roleId)
else:
rootInterface.createUserMandate(str(currentUser.id), mandateId, roleIds)
logger.info(f"Created UserMandate for user {currentUser.id} in mandate {mandateId}")
resultMessage = f"Einladung angenommen. Sie haben jetzt Zugang zu '{mandateName}'."
# Mark invitation as used
rootInterface.db.recordModify(
model_class=Invitation,
@ -469,9 +504,9 @@ def _handleInvitationAction(
"currentUses": currentUses + 1
}
)
logger.info(f"User {currentUser.id} accepted invitation {invitationId} for mandate {mandateId}")
return f"Einladung angenommen. Sie haben jetzt Zugang zu '{mandateName}'."
target_desc = f"feature instance {featureInstanceId}" if featureInstanceId else f"mandate {mandateId}"
logger.info(f"User {currentUser.id} accepted invitation {invitationId} for {target_desc}")
return resultMessage
elif actionId == "decline":
# Decline the invitation