# Copyright (c) 2025 Patrick Motsch # All rights reserved. """ Event manager for SSE streaming (Layer L0 - shared). Manages event queues for Server-Sent Events (SSE) streaming across features. Generic pub/sub infrastructure with zero internal dependencies. """ import logging import asyncio from typing import Dict, Optional, Any logger = logging.getLogger(__name__) class EventManager: """ Manages event queues for SSE streaming. Each workflow has its own async queue for events. """ def __init__(self): """Initialize the event manager.""" self._queues: Dict[str, asyncio.Queue] = {} self._cleanup_tasks: Dict[str, asyncio.Task] = {} self._agent_tasks: Dict[str, asyncio.Task] = {} self._cancelled: Dict[str, bool] = {} def create_queue(self, workflow_id: str) -> asyncio.Queue: """Create an event queue for a workflow.""" if workflow_id in self._cleanup_tasks: self._cleanup_tasks[workflow_id].cancel() del self._cleanup_tasks[workflow_id] logger.debug(f"Cancelled pending cleanup for workflow {workflow_id}") if workflow_id not in self._queues: self._queues[workflow_id] = asyncio.Queue() logger.debug(f"Created event queue for workflow {workflow_id}") else: old = self._queues[workflow_id] while not old.empty(): try: old.get_nowait() except asyncio.QueueEmpty: break logger.debug(f"Reusing event queue for workflow {workflow_id} (drained stale events)") return self._queues[workflow_id] def get_queue(self, workflow_id: str) -> Optional[asyncio.Queue]: """Get the event queue for a workflow.""" return self._queues.get(workflow_id) def has_queue(self, workflow_id: str) -> bool: """Check if a queue exists for a workflow.""" return workflow_id in self._queues def register_agent_task(self, workflow_id: str, task: asyncio.Task) -> None: """Register the asyncio Task running the agent for a workflow.""" self._agent_tasks[workflow_id] = task self._cancelled.pop(workflow_id, None) def is_cancelled(self, workflow_id: str) -> bool: """Check if a workflow has been cancelled.""" return self._cancelled.get(workflow_id, False) async def cancel_agent(self, workflow_id: str) -> bool: """Cancel the running agent task for a workflow. Returns True if cancelled.""" self._cancelled[workflow_id] = True task = self._agent_tasks.pop(workflow_id, None) if task and not task.done(): task.cancel() logger.info(f"Cancelled agent task for workflow {workflow_id}") return True logger.debug(f"No running agent task found for workflow {workflow_id}") return False def _unregister_agent_task(self, workflow_id: str) -> None: """Remove the agent task reference after completion.""" self._agent_tasks.pop(workflow_id, None) self._cancelled.pop(workflow_id, None) async def emit_event( self, context_id: str, event_type: str, data: Dict[str, Any], event_category: str = "chat", message: Optional[str] = None, step: Optional[str] = None ) -> None: """Emit an event to the queue for a workflow.""" queue = self._queues.get(context_id) if not queue: return event = { "type": event_type, "data": data, "category": event_category, "message": message, "step": step } try: await queue.put(event) if event_type not in ("chunk",): logger.debug(f"Emitted {event_type} event for workflow {context_id}") except Exception as e: logger.error(f"Error emitting event for workflow {context_id}: {e}", exc_info=True) async def cleanup(self, workflow_id: str, delay: float = 60.0) -> None: """Schedule cleanup of a queue after a delay.""" if workflow_id in self._cleanup_tasks: self._cleanup_tasks[workflow_id].cancel() async def _cleanup(): try: await asyncio.sleep(delay) if workflow_id in self._queues: queue = self._queues[workflow_id] while not queue.empty(): try: queue.get_nowait() except asyncio.QueueEmpty: break del self._queues[workflow_id] logger.info(f"Cleaned up event queue for workflow {workflow_id}") except asyncio.CancelledError: logger.debug(f"Cleanup cancelled for workflow {workflow_id}") except Exception as e: logger.error(f"Error during cleanup for workflow {workflow_id}: {e}", exc_info=True) finally: if workflow_id in self._cleanup_tasks: del self._cleanup_tasks[workflow_id] task = asyncio.create_task(_cleanup()) self._cleanup_tasks[workflow_id] = task def shutdown(self) -> None: """Cancel all pending cleanup and agent tasks for fast process exit.""" for _wfId, q in list(self._queues.items()): try: q.put_nowait(None) except Exception: pass for wfId, task in list(self._cleanup_tasks.items()): if not task.done(): task.cancel() self._cleanup_tasks.clear() for wfId, task in list(self._agent_tasks.items()): if not task.done(): task.cancel() self._agent_tasks.clear() self._queues.clear() logger.info("EventManager shutdown: all tasks cancelled, queues drained") # Global event manager instance _event_manager: Optional[EventManager] = None def get_event_manager() -> EventManager: """Get the global event manager instance.""" global _event_manager if _event_manager is None: _event_manager = EventManager() return _event_manager