# Copyright (c) 2025 Patrick Motsch # All rights reserved. """ Notification routes for in-app notifications. Provides user-specific notification inbox with support for actionable notifications. """ from fastapi import APIRouter, HTTPException, Depends, Request from typing import List, Dict, Any, Optional from fastapi import status import logging from pydantic import BaseModel, Field from modules.auth import limiter, getCurrentUser from modules.datamodels.datamodelUam import User from modules.datamodels.datamodelNotification import ( UserNotification, NotificationType, NotificationStatus, NotificationAction ) from modules.datamodels.datamodelRbac import Role from modules.interfaces.interfaceDbApp import getRootInterface from modules.shared.timeUtils import getUtcTimestamp logger = logging.getLogger(__name__) router = APIRouter( prefix="/api/notifications", tags=["Notifications"], responses={404: {"description": "Not found"}} ) # ============================================================================= # Request/Response Models # ============================================================================= class NotificationActionRequest(BaseModel): """Request model for executing a notification action""" actionId: str = Field(..., description="ID of the action to execute (e.g., 'accept', 'decline')") class UnreadCountResponse(BaseModel): """Response model for unread count""" count: int # ============================================================================= # Helper Functions # ============================================================================= def _createNotification( userId: str, notificationType: NotificationType, title: str, message: str, referenceType: Optional[str] = None, referenceId: Optional[str] = None, actions: Optional[List[NotificationAction]] = None, icon: Optional[str] = None, expiresAt: Optional[float] = None ) -> UserNotification: """ Create a notification for a user. This is a helper function that can be imported by other modules. """ rootInterface = getRootInterface() notification = UserNotification( userId=userId, type=notificationType, title=title, message=message, referenceType=referenceType, referenceId=referenceId, actions=actions, icon=icon, expiresAt=expiresAt ) # Store in database rootInterface.db.recordCreate( model_class=UserNotification, record=notification.model_dump() ) logger.info(f"Created notification {notification.id} for user {userId}: {title}") return notification def createInvitationNotification( userId: str, invitationId: str, mandateName: str, inviterName: str ) -> UserNotification: """ Create a notification for a pending invitation. Called when an invitation is created for an existing user. """ return _createNotification( userId=userId, notificationType=NotificationType.INVITATION, title="Neue Einladung", message=f"{inviterName} hat Sie zu '{mandateName}' eingeladen.", referenceType="Invitation", referenceId=invitationId, icon="mail", actions=[ NotificationAction(actionId="accept", label="Annehmen", style="primary"), NotificationAction(actionId="decline", label="Ablehnen", style="danger") ] ) # ============================================================================= # API Endpoints # ============================================================================= @router.get("", response_model=List[Dict[str, Any]]) @limiter.limit("60/minute") def getNotifications( request: Request, currentUser: User = Depends(getCurrentUser), status: Optional[str] = None, type: Optional[str] = None, limit: int = 50 ) -> List[Dict[str, Any]]: """ Get all notifications for the current user. Optionally filter by status (unread, read, actioned, dismissed) or type. """ try: rootInterface = getRootInterface() # Build filter recordFilter = {"userId": str(currentUser.id)} # Get notifications (Pydantic models, sorted and limited) notifications = rootInterface.getNotificationsByUser( userId=str(currentUser.id), status=status, limit=limit ) # Apply type filter if needed (not common, so filter post-fetch) if type: notifications = [n for n in notifications if n.type == type] # Convert to dicts for response return [n.model_dump() for n in notifications] except Exception as e: logger.error(f"Error getting notifications: {e}") raise HTTPException( status_code=500, detail=f"Failed to get notifications: {str(e)}" ) @router.get("/unread-count", response_model=UnreadCountResponse) @limiter.limit("120/minute") def getUnreadCount( request: Request, currentUser: User = Depends(getCurrentUser) ) -> UnreadCountResponse: """ Get the count of unread notifications for the current user. Used for the notification badge in the header. """ try: rootInterface = getRootInterface() # Get unread notifications (Pydantic models) notifications = rootInterface.getNotificationsByUser( userId=str(currentUser.id), status=NotificationStatus.UNREAD.value ) return UnreadCountResponse(count=len(notifications)) except Exception as e: logger.error(f"Error getting unread count: {e}") raise HTTPException( status_code=500, detail=f"Failed to get unread count: {str(e)}" ) @router.put("/{notificationId}/read", response_model=Dict[str, Any]) @limiter.limit("60/minute") def markAsRead( request: Request, notificationId: str, currentUser: User = Depends(getCurrentUser) ) -> Dict[str, Any]: """ Mark a notification as read. """ try: rootInterface = getRootInterface() # Get the notification (Pydantic model) notification = rootInterface.getNotification(notificationId) if not notification: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Notification not found" ) # Verify ownership if str(notification.userId) != str(currentUser.id): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Not authorized to access this notification" ) # Update status rootInterface.db.recordModify( model_class=UserNotification, recordId=notificationId, record={ "status": NotificationStatus.READ.value, "readAt": getUtcTimestamp() } ) return {"message": "Notification marked as read", "id": notificationId} except HTTPException: raise except Exception as e: logger.error(f"Error marking notification as read: {e}") raise HTTPException( status_code=500, detail=f"Failed to mark notification as read: {str(e)}" ) @router.put("/mark-all-read", response_model=Dict[str, Any]) @limiter.limit("10/minute") def markAllAsRead( request: Request, currentUser: User = Depends(getCurrentUser) ) -> Dict[str, Any]: """ Mark all notifications as read for the current user. """ try: rootInterface = getRootInterface() # Get all unread notifications (Pydantic models) notifications = rootInterface.getNotificationsByUser( userId=str(currentUser.id), status=NotificationStatus.UNREAD.value ) currentTime = getUtcTimestamp() updatedCount = 0 for notification in notifications: rootInterface.db.recordModify( model_class=UserNotification, recordId=str(notification.id), record={ "status": NotificationStatus.READ.value, "readAt": currentTime } ) updatedCount += 1 return {"message": f"Marked {updatedCount} notifications as read", "count": updatedCount} except Exception as e: logger.error(f"Error marking all notifications as read: {e}") raise HTTPException( status_code=500, detail=f"Failed to mark notifications as read: {str(e)}" ) @router.post("/{notificationId}/action", response_model=Dict[str, Any]) @limiter.limit("30/minute") def executeAction( request: Request, notificationId: str, actionRequest: NotificationActionRequest, currentUser: User = Depends(getCurrentUser) ) -> Dict[str, Any]: """ Execute an action on a notification (e.g., accept/decline invitation). """ try: rootInterface = getRootInterface() # Get the notification (Pydantic model) notification = rootInterface.getNotification(notificationId) if not notification: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Notification not found" ) # Verify ownership if str(notification.userId) != str(currentUser.id): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Not authorized to access this notification" ) # Check if already actioned if notification.status == NotificationStatus.ACTIONED.value: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Notification has already been actioned" ) # Validate action exists actions = notification.actions or [] validActionIds = [a.get("actionId") if isinstance(a, dict) else a.actionId for a in actions] if actionRequest.actionId not in validActionIds: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Invalid action. Valid actions: {validActionIds}" ) # Execute action based on notification type actionResult = None if notification.type == NotificationType.INVITATION.value: actionResult = _handleInvitationAction( notification=notification, actionId=actionRequest.actionId, currentUser=currentUser, rootInterface=rootInterface ) else: # Generic action handling actionResult = f"Action '{actionRequest.actionId}' executed" # Update notification status rootInterface.db.recordModify( model_class=UserNotification, recordId=notificationId, record={ "status": NotificationStatus.ACTIONED.value, "actionTaken": actionRequest.actionId, "actionResult": actionResult, "actionedAt": getUtcTimestamp() } ) return { "message": actionResult, "action": actionRequest.actionId, "notificationId": notificationId } except HTTPException: raise except Exception as e: logger.error(f"Error executing notification action: {e}") raise HTTPException( status_code=500, detail=f"Failed to execute action: {str(e)}" ) def _handleInvitationAction( notification: Dict[str, Any], actionId: str, currentUser: User, rootInterface ) -> str: """Handle accept/decline actions for invitation notifications.""" from modules.datamodels.datamodelInvitation import Invitation from modules.datamodels.datamodelUam import Mandate from modules.datamodels.datamodelMembership import UserMandate invitationId = notification.referenceId if not invitationId: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="No invitation reference found" ) # Get the invitation (Pydantic model) invitation = rootInterface.getInvitation(invitationId) if not invitation: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Invitation not found" ) # Verify username matches if invitation.targetUsername != currentUser.username: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="This invitation is for a different user" ) # Check if invitation is still valid currentTime = getUtcTimestamp() expiresAt = invitation.expiresAt or 0 if expiresAt < currentTime: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Invitation has expired" ) if invitation.revokedAt: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Invitation has been revoked" ) currentUses = invitation.currentUses or 0 maxUses = invitation.maxUses or 1 if currentUses >= maxUses: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Invitation has reached maximum uses" ) if actionId == "accept": # Accept the invitation - assign roles and mandate access mandateId = str(invitation.mandateId) if invitation.mandateId else None roleIds = list(invitation.roleIds or []) # Ensure user gets the system "user" role for access to public UI elements (e.g. playground) userRole = rootInterface.getRoleByLabel("user") if userRole: userRoleId = str(userRole.id) if userRoleId and userRoleId not in roleIds: roleIds = roleIds + [userRoleId] logger.debug(f"Added system 'user' role {userRoleId} to invitation roles") # Get mandate name for result message mandate = rootInterface.getMandate(mandateId) if mandateId else None mandateName = (mandate.label or mandate.name) if mandate else mandateId # Check if user already has this mandate existingMembership = rootInterface.getUserMandate(str(currentUser.id), mandateId) if mandateId else None if existingMembership: # Update existing membership with new roles via interface # Note: roleIds on UserMandate is deprecated - roles should be assigned via UserMandateRole logger.info(f"User {currentUser.id} already has membership in mandate {mandateId}, adding roles via UserMandateRole") # Add roles via junction table for roleId in roleIds: rootInterface.addRoleToUserMandate(str(existingMembership.id), roleId) else: # Create new user-mandate relationship via interface rootInterface.createUserMandate(str(currentUser.id), mandateId, roleIds) logger.info(f"Created UserMandate for user {currentUser.id} in mandate {mandateId}") # Mark invitation as used rootInterface.db.recordModify( model_class=Invitation, recordId=invitationId, record={ "usedBy": str(currentUser.id), "usedAt": currentTime, "currentUses": currentUses + 1 } ) logger.info(f"User {currentUser.id} accepted invitation {invitationId} for mandate {mandateId}") return f"Einladung angenommen. Sie haben jetzt Zugang zu '{mandateName}'." elif actionId == "decline": # Decline the invitation # We don't revoke it, just mark the notification as declined logger.info(f"User {currentUser.id} declined invitation {invitationId}") return "Einladung abgelehnt." else: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Unknown action: {actionId}" ) @router.delete("/{notificationId}", response_model=Dict[str, Any]) @limiter.limit("30/minute") def deleteNotification( request: Request, notificationId: str, currentUser: User = Depends(getCurrentUser) ) -> Dict[str, Any]: """ Delete/dismiss a notification. """ try: rootInterface = getRootInterface() # Get the notification (Pydantic model) notification = rootInterface.getNotification(notificationId) if not notification: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Notification not found" ) # Verify ownership if str(notification.userId) != str(currentUser.id): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Not authorized to delete this notification" ) # Mark as dismissed (soft delete) rootInterface.db.recordModify( model_class=UserNotification, recordId=notificationId, record={ "status": NotificationStatus.DISMISSED.value } ) return {"message": "Notification dismissed", "id": notificationId} except HTTPException: raise except Exception as e: logger.error(f"Error deleting notification: {e}") raise HTTPException( status_code=500, detail=f"Failed to delete notification: {str(e)}" )