gateway/modules/features/chatbot/eventManager.py
2026-01-05 07:34:45 +01:00

152 lines
4.7 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Event manager for chatbot streaming.
Manages event queues for SSE streaming of chatbot progress updates.
"""
import logging
import asyncio
from typing import Dict, Optional, Any
from datetime import datetime
logger = logging.getLogger(__name__)
class ChatbotEventManager:
"""
Manages event queues for chatbot streaming.
Thread-safe event emission and queue management.
"""
def __init__(self):
"""Initialize the event manager."""
self._queues: Dict[str, asyncio.Queue] = {}
self._locks: Dict[str, asyncio.Lock] = {}
self._cleanup_tasks: Dict[str, asyncio.Task] = {}
def create_queue(self, workflow_id: str) -> asyncio.Queue:
"""
Create a new event queue for a workflow.
Args:
workflow_id: Workflow ID
Returns:
Event queue for the workflow
"""
if workflow_id not in self._queues:
self._queues[workflow_id] = asyncio.Queue()
self._locks[workflow_id] = asyncio.Lock()
logger.debug(f"Created event queue for workflow {workflow_id}")
return self._queues[workflow_id]
def get_queue(self, workflow_id: str) -> Optional[asyncio.Queue]:
"""
Get existing event queue for a workflow.
Args:
workflow_id: Workflow ID
Returns:
Event queue if exists, None otherwise
"""
return self._queues.get(workflow_id)
async def emit_event(
self,
workflow_id: str,
event_type: str,
message: str,
step: Optional[str] = None,
data: Optional[Dict[str, Any]] = None
):
"""
Emit an event to the workflow's event queue.
Args:
workflow_id: Workflow ID
event_type: Type of event ("status", "progress", "complete", "error")
message: Event message
step: Current processing step (optional)
data: Additional event data (optional)
"""
queue = self.get_queue(workflow_id)
if not queue:
logger.debug(f"No event queue found for workflow {workflow_id}, skipping event")
return
event = {
"type": event_type,
"message": message,
"timestamp": datetime.now().timestamp(),
"step": step,
"data": data or {}
}
try:
await queue.put(event)
logger.debug(f"Emitted {event_type} event for workflow {workflow_id}: {message[:50]}")
except Exception as e:
logger.error(f"Error emitting event for workflow {workflow_id}: {e}")
async def cleanup(self, workflow_id: str, delay: float = 60.0):
"""
Schedule cleanup of event queue after delay.
This allows time for any remaining events to be consumed.
Args:
workflow_id: Workflow ID
delay: Delay in seconds before cleanup (default: 60 seconds)
"""
if workflow_id in self._cleanup_tasks:
# Cancel existing cleanup task
self._cleanup_tasks[workflow_id].cancel()
async def _cleanup():
try:
await asyncio.sleep(delay)
if workflow_id in self._queues:
# Drain remaining events
queue = self._queues[workflow_id]
while not queue.empty():
try:
queue.get_nowait()
except asyncio.QueueEmpty:
break
del self._queues[workflow_id]
del self._locks[workflow_id]
logger.info(f"Cleaned up event queue for workflow {workflow_id}")
except asyncio.CancelledError:
pass
except Exception as e:
logger.error(f"Error during cleanup for workflow {workflow_id}: {e}")
finally:
if workflow_id in self._cleanup_tasks:
del self._cleanup_tasks[workflow_id]
self._cleanup_tasks[workflow_id] = asyncio.create_task(_cleanup())
def has_queue(self, workflow_id: str) -> bool:
"""
Check if a queue exists for a workflow.
Args:
workflow_id: Workflow ID
Returns:
True if queue exists, False otherwise
"""
return workflow_id in self._queues
# Global singleton instance
_event_manager = ChatbotEventManager()
def get_event_manager() -> ChatbotEventManager:
"""Get the global event manager instance."""
return _event_manager