# Copyright (c) 2025 Patrick Motsch # All rights reserved. """ Event manager for chatbot streaming. Manages event queues for SSE streaming of chatbot progress updates. """ import logging import asyncio from typing import Dict, Optional, Any from datetime import datetime logger = logging.getLogger(__name__) class ChatbotEventManager: """ Manages event queues for chatbot streaming. Thread-safe event emission and queue management. """ def __init__(self): """Initialize the event manager.""" self._queues: Dict[str, asyncio.Queue] = {} self._locks: Dict[str, asyncio.Lock] = {} self._cleanup_tasks: Dict[str, asyncio.Task] = {} def create_queue(self, workflow_id: str) -> asyncio.Queue: """ Create a new event queue for a workflow. Args: workflow_id: Workflow ID Returns: Event queue for the workflow """ if workflow_id not in self._queues: self._queues[workflow_id] = asyncio.Queue() self._locks[workflow_id] = asyncio.Lock() logger.debug(f"Created event queue for workflow {workflow_id}") return self._queues[workflow_id] def get_queue(self, workflow_id: str) -> Optional[asyncio.Queue]: """ Get existing event queue for a workflow. Args: workflow_id: Workflow ID Returns: Event queue if exists, None otherwise """ return self._queues.get(workflow_id) async def emit_event( self, workflow_id: str, event_type: str, message: str, step: Optional[str] = None, data: Optional[Dict[str, Any]] = None ): """ Emit an event to the workflow's event queue. Args: workflow_id: Workflow ID event_type: Type of event ("status", "progress", "complete", "error") message: Event message step: Current processing step (optional) data: Additional event data (optional) """ queue = self.get_queue(workflow_id) if not queue: logger.debug(f"No event queue found for workflow {workflow_id}, skipping event") return event = { "type": event_type, "message": message, "timestamp": datetime.now().timestamp(), "step": step, "data": data or {} } try: await queue.put(event) logger.debug(f"Emitted {event_type} event for workflow {workflow_id}: {message[:50]}") except Exception as e: logger.error(f"Error emitting event for workflow {workflow_id}: {e}") async def cleanup(self, workflow_id: str, delay: float = 60.0): """ Schedule cleanup of event queue after delay. This allows time for any remaining events to be consumed. Args: workflow_id: Workflow ID delay: Delay in seconds before cleanup (default: 60 seconds) """ if workflow_id in self._cleanup_tasks: # Cancel existing cleanup task self._cleanup_tasks[workflow_id].cancel() async def _cleanup(): try: await asyncio.sleep(delay) if workflow_id in self._queues: # Drain remaining events queue = self._queues[workflow_id] while not queue.empty(): try: queue.get_nowait() except asyncio.QueueEmpty: break del self._queues[workflow_id] del self._locks[workflow_id] logger.info(f"Cleaned up event queue for workflow {workflow_id}") except asyncio.CancelledError: pass except Exception as e: logger.error(f"Error during cleanup for workflow {workflow_id}: {e}") finally: if workflow_id in self._cleanup_tasks: del self._cleanup_tasks[workflow_id] self._cleanup_tasks[workflow_id] = asyncio.create_task(_cleanup()) def has_queue(self, workflow_id: str) -> bool: """ Check if a queue exists for a workflow. Args: workflow_id: Workflow ID Returns: True if queue exists, False otherwise """ return workflow_id in self._queues # Global singleton instance _event_manager = ChatbotEventManager() def get_event_manager() -> ChatbotEventManager: """Get the global event manager instance.""" return _event_manager