159 lines
4.9 KiB
Python
159 lines
4.9 KiB
Python
# Copyright (c) 2025 Patrick Motsch
|
|
# All rights reserved.
|
|
"""
|
|
Event manager for chatbot streaming.
|
|
Manages event queues for Server-Sent Events (SSE) streaming.
|
|
"""
|
|
|
|
import logging
|
|
import asyncio
|
|
from typing import Dict, Optional, Any
|
|
from collections import defaultdict
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class EventManager:
|
|
"""
|
|
Manages event queues for chatbot streaming.
|
|
Each workflow has its own async queue for events.
|
|
"""
|
|
|
|
def __init__(self):
|
|
"""Initialize the event manager."""
|
|
self._queues: Dict[str, asyncio.Queue] = {}
|
|
self._cleanup_tasks: Dict[str, asyncio.Task] = {}
|
|
|
|
def create_queue(self, workflow_id: str) -> asyncio.Queue:
|
|
"""
|
|
Create an event queue for a workflow.
|
|
|
|
Args:
|
|
workflow_id: Workflow ID
|
|
|
|
Returns:
|
|
Async queue for events
|
|
"""
|
|
if workflow_id not in self._queues:
|
|
self._queues[workflow_id] = asyncio.Queue()
|
|
logger.debug(f"Created event queue for workflow {workflow_id}")
|
|
return self._queues[workflow_id]
|
|
|
|
def get_queue(self, workflow_id: str) -> Optional[asyncio.Queue]:
|
|
"""
|
|
Get the event queue for a workflow.
|
|
|
|
Args:
|
|
workflow_id: Workflow ID
|
|
|
|
Returns:
|
|
Async queue if exists, None otherwise
|
|
"""
|
|
return self._queues.get(workflow_id)
|
|
|
|
def has_queue(self, workflow_id: str) -> bool:
|
|
"""
|
|
Check if a queue exists for a workflow.
|
|
|
|
Args:
|
|
workflow_id: Workflow ID
|
|
|
|
Returns:
|
|
True if queue exists, False otherwise
|
|
"""
|
|
return workflow_id in self._queues
|
|
|
|
async def emit_event(
|
|
self,
|
|
context_id: str,
|
|
event_type: str,
|
|
data: Dict[str, Any],
|
|
event_category: str = "chat",
|
|
message: Optional[str] = None,
|
|
step: Optional[str] = None
|
|
) -> None:
|
|
"""
|
|
Emit an event to the queue for a workflow.
|
|
|
|
Args:
|
|
context_id: Workflow ID (context)
|
|
event_type: Type of event (e.g., "chatdata", "complete", "error")
|
|
data: Event data dictionary
|
|
event_category: Category of event (e.g., "chat", "workflow")
|
|
message: Optional message string
|
|
step: Optional step identifier
|
|
"""
|
|
queue = self._queues.get(context_id)
|
|
if not queue:
|
|
# DEBUG level: This is normal for background workflows without active SSE listener
|
|
return
|
|
|
|
event = {
|
|
"type": event_type,
|
|
"data": data,
|
|
"category": event_category,
|
|
"message": message,
|
|
"step": step
|
|
}
|
|
|
|
try:
|
|
await queue.put(event)
|
|
logger.debug(f"Emitted {event_type} event for workflow {context_id}")
|
|
except Exception as e:
|
|
logger.error(f"Error emitting event for workflow {context_id}: {e}", exc_info=True)
|
|
|
|
async def cleanup(self, workflow_id: str, delay: float = 60.0) -> None:
|
|
"""
|
|
Schedule cleanup of a queue after a delay.
|
|
|
|
Args:
|
|
workflow_id: Workflow ID
|
|
delay: Delay in seconds before cleanup
|
|
"""
|
|
# Cancel existing cleanup task if any
|
|
if workflow_id in self._cleanup_tasks:
|
|
self._cleanup_tasks[workflow_id].cancel()
|
|
|
|
async def _cleanup():
|
|
try:
|
|
await asyncio.sleep(delay)
|
|
if workflow_id in self._queues:
|
|
# Drain remaining events
|
|
queue = self._queues[workflow_id]
|
|
while not queue.empty():
|
|
try:
|
|
queue.get_nowait()
|
|
except asyncio.QueueEmpty:
|
|
break
|
|
|
|
# Remove queue
|
|
del self._queues[workflow_id]
|
|
logger.info(f"Cleaned up event queue for workflow {workflow_id}")
|
|
except asyncio.CancelledError:
|
|
logger.debug(f"Cleanup cancelled for workflow {workflow_id}")
|
|
except Exception as e:
|
|
logger.error(f"Error during cleanup for workflow {workflow_id}: {e}", exc_info=True)
|
|
finally:
|
|
if workflow_id in self._cleanup_tasks:
|
|
del self._cleanup_tasks[workflow_id]
|
|
|
|
# Schedule cleanup
|
|
task = asyncio.create_task(_cleanup())
|
|
self._cleanup_tasks[workflow_id] = task
|
|
|
|
|
|
# Global event manager instance
|
|
_event_manager: Optional[EventManager] = None
|
|
|
|
|
|
def get_event_manager() -> EventManager:
|
|
"""
|
|
Get the global event manager instance.
|
|
|
|
Returns:
|
|
EventManager instance
|
|
"""
|
|
global _event_manager
|
|
if _event_manager is None:
|
|
_event_manager = EventManager()
|
|
return _event_manager
|