# Copyright (c) 2025 Patrick Motsch # All rights reserved. """ Event manager for SSE streaming. Manages event queues for Server-Sent Events (SSE) streaming across features. """ import logging import asyncio from typing import Dict, Optional, Any logger = logging.getLogger(__name__) class EventManager: """ Manages event queues for SSE streaming. Each workflow has its own async queue for events. """ def __init__(self): """Initialize the event manager.""" self._queues: Dict[str, asyncio.Queue] = {} self._cleanup_tasks: Dict[str, asyncio.Task] = {} def create_queue(self, workflow_id: str) -> asyncio.Queue: """ Create an event queue for a workflow. Args: workflow_id: Workflow ID Returns: Async queue for events """ if workflow_id not in self._queues: self._queues[workflow_id] = asyncio.Queue() logger.debug(f"Created event queue for workflow {workflow_id}") return self._queues[workflow_id] def get_queue(self, workflow_id: str) -> Optional[asyncio.Queue]: """ Get the event queue for a workflow. Args: workflow_id: Workflow ID Returns: Async queue if exists, None otherwise """ return self._queues.get(workflow_id) def has_queue(self, workflow_id: str) -> bool: """ Check if a queue exists for a workflow. Args: workflow_id: Workflow ID Returns: True if queue exists, False otherwise """ return workflow_id in self._queues async def emit_event( self, context_id: str, event_type: str, data: Dict[str, Any], event_category: str = "chat", message: Optional[str] = None, step: Optional[str] = None ) -> None: """ Emit an event to the queue for a workflow. Args: context_id: Workflow ID (context) event_type: Type of event (e.g., "chatdata", "complete", "error") data: Event data dictionary event_category: Category of event (e.g., "chat", "workflow") message: Optional message string step: Optional step identifier """ queue = self._queues.get(context_id) if not queue: # DEBUG level: This is normal for background workflows without active SSE listener return event = { "type": event_type, "data": data, "category": event_category, "message": message, "step": step } try: await queue.put(event) logger.debug(f"Emitted {event_type} event for workflow {context_id}") except Exception as e: logger.error(f"Error emitting event for workflow {context_id}: {e}", exc_info=True) async def cleanup(self, workflow_id: str, delay: float = 60.0) -> None: """ Schedule cleanup of a queue after a delay. Args: workflow_id: Workflow ID delay: Delay in seconds before cleanup """ # Cancel existing cleanup task if any if workflow_id in self._cleanup_tasks: self._cleanup_tasks[workflow_id].cancel() async def _cleanup(): try: await asyncio.sleep(delay) if workflow_id in self._queues: # Drain remaining events queue = self._queues[workflow_id] while not queue.empty(): try: queue.get_nowait() except asyncio.QueueEmpty: break # Remove queue del self._queues[workflow_id] logger.info(f"Cleaned up event queue for workflow {workflow_id}") except asyncio.CancelledError: logger.debug(f"Cleanup cancelled for workflow {workflow_id}") except Exception as e: logger.error(f"Error during cleanup for workflow {workflow_id}: {e}", exc_info=True) finally: if workflow_id in self._cleanup_tasks: del self._cleanup_tasks[workflow_id] # Schedule cleanup task = asyncio.create_task(_cleanup()) self._cleanup_tasks[workflow_id] = task # Global event manager instance _event_manager: Optional[EventManager] = None def get_event_manager() -> EventManager: """ Get the global event manager instance. Returns: EventManager instance """ global _event_manager if _event_manager is None: _event_manager = EventManager() return _event_manager