gateway/modules/serviceCenter/core/serviceStreaming/eventManager.py

199 lines
6.5 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Event manager for SSE streaming.
Manages event queues for Server-Sent Events (SSE) streaming across features.
"""
import logging
import asyncio
from typing import Dict, Optional, Any
logger = logging.getLogger(__name__)
class EventManager:
"""
Manages event queues for SSE streaming.
Each workflow has its own async queue for events.
"""
def __init__(self):
"""Initialize the event manager."""
self._queues: Dict[str, asyncio.Queue] = {}
self._cleanup_tasks: Dict[str, asyncio.Task] = {}
self._agent_tasks: Dict[str, asyncio.Task] = {}
self._cancelled: Dict[str, bool] = {}
def create_queue(self, workflow_id: str) -> asyncio.Queue:
"""
Create an event queue for a workflow.
Args:
workflow_id: Workflow ID
Returns:
Async queue for events
"""
if workflow_id in self._cleanup_tasks:
self._cleanup_tasks[workflow_id].cancel()
del self._cleanup_tasks[workflow_id]
logger.debug(f"Cancelled pending cleanup for workflow {workflow_id}")
if workflow_id not in self._queues:
self._queues[workflow_id] = asyncio.Queue()
logger.debug(f"Created event queue for workflow {workflow_id}")
else:
old = self._queues[workflow_id]
while not old.empty():
try:
old.get_nowait()
except asyncio.QueueEmpty:
break
logger.debug(f"Reusing event queue for workflow {workflow_id} (drained stale events)")
return self._queues[workflow_id]
def get_queue(self, workflow_id: str) -> Optional[asyncio.Queue]:
"""
Get the event queue for a workflow.
Args:
workflow_id: Workflow ID
Returns:
Async queue if exists, None otherwise
"""
return self._queues.get(workflow_id)
def has_queue(self, workflow_id: str) -> bool:
"""
Check if a queue exists for a workflow.
Args:
workflow_id: Workflow ID
Returns:
True if queue exists, False otherwise
"""
return workflow_id in self._queues
def register_agent_task(self, workflow_id: str, task: asyncio.Task) -> None:
"""Register the asyncio Task running the agent for a workflow."""
self._agent_tasks[workflow_id] = task
self._cancelled.pop(workflow_id, None)
def is_cancelled(self, workflow_id: str) -> bool:
"""Check if a workflow has been cancelled."""
return self._cancelled.get(workflow_id, False)
async def cancel_agent(self, workflow_id: str) -> bool:
"""Cancel the running agent task for a workflow. Returns True if cancelled."""
self._cancelled[workflow_id] = True
task = self._agent_tasks.pop(workflow_id, None)
if task and not task.done():
task.cancel()
logger.info(f"Cancelled agent task for workflow {workflow_id}")
return True
logger.debug(f"No running agent task found for workflow {workflow_id}")
return False
def _unregister_agent_task(self, workflow_id: str) -> None:
"""Remove the agent task reference after completion."""
self._agent_tasks.pop(workflow_id, None)
self._cancelled.pop(workflow_id, None)
async def emit_event(
self,
context_id: str,
event_type: str,
data: Dict[str, Any],
event_category: str = "chat",
message: Optional[str] = None,
step: Optional[str] = None
) -> None:
"""
Emit an event to the queue for a workflow.
Args:
context_id: Workflow ID (context)
event_type: Type of event (e.g., "chatdata", "complete", "error")
data: Event data dictionary
event_category: Category of event (e.g., "chat", "workflow")
message: Optional message string
step: Optional step identifier
"""
queue = self._queues.get(context_id)
if not queue:
# DEBUG level: This is normal for background workflows without active SSE listener
return
event = {
"type": event_type,
"data": data,
"category": event_category,
"message": message,
"step": step
}
try:
await queue.put(event)
if event_type not in ("chunk",):
logger.debug(f"Emitted {event_type} event for workflow {context_id}")
except Exception as e:
logger.error(f"Error emitting event for workflow {context_id}: {e}", exc_info=True)
async def cleanup(self, workflow_id: str, delay: float = 60.0) -> None:
"""
Schedule cleanup of a queue after a delay.
Args:
workflow_id: Workflow ID
delay: Delay in seconds before cleanup
"""
# Cancel existing cleanup task if any
if workflow_id in self._cleanup_tasks:
self._cleanup_tasks[workflow_id].cancel()
async def _cleanup():
try:
await asyncio.sleep(delay)
if workflow_id in self._queues:
# Drain remaining events
queue = self._queues[workflow_id]
while not queue.empty():
try:
queue.get_nowait()
except asyncio.QueueEmpty:
break
# Remove queue
del self._queues[workflow_id]
logger.info(f"Cleaned up event queue for workflow {workflow_id}")
except asyncio.CancelledError:
logger.debug(f"Cleanup cancelled for workflow {workflow_id}")
except Exception as e:
logger.error(f"Error during cleanup for workflow {workflow_id}: {e}", exc_info=True)
finally:
if workflow_id in self._cleanup_tasks:
del self._cleanup_tasks[workflow_id]
# Schedule cleanup
task = asyncio.create_task(_cleanup())
self._cleanup_tasks[workflow_id] = task
# Global event manager instance
_event_manager: Optional[EventManager] = None
def get_event_manager() -> EventManager:
"""
Get the global event manager instance.
Returns:
EventManager instance
"""
global _event_manager
if _event_manager is None:
_event_manager = EventManager()
return _event_manager