List endpoint (/api/chatbot/threads without workflowId): Normalizes all workflows before returning, converting maxSteps: None to 10 (the default). Single workflow endpoint (/api/chatbot/threads?workflowId=xxx): Normalizes the workflow dict, handling both Pydantic models and plain dicts.
243 lines
9.1 KiB
Python
243 lines
9.1 KiB
Python
# Copyright (c) 2025 Patrick Motsch
|
|
# All rights reserved.
|
|
"""
|
|
Generic streaming event manager for real-time updates.
|
|
Manages event queues for SSE streaming across all features (chatbot, workflows, documents, etc.).
|
|
Supports event-driven streaming instead of polling.
|
|
"""
|
|
|
|
import logging
|
|
import asyncio
|
|
from typing import Dict, Optional, Any, List, AsyncIterator, Set
|
|
from datetime import datetime
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class StreamingEventManager:
|
|
"""
|
|
Generic event manager for real-time streaming across all features.
|
|
Supports multiple event types and contexts (workflows, documents, tasks, etc.).
|
|
Thread-safe event emission and queue management.
|
|
"""
|
|
|
|
def __init__(self):
|
|
"""Initialize the event manager."""
|
|
self._queues: Dict[str, asyncio.Queue] = {}
|
|
self._locks: Dict[str, asyncio.Lock] = {}
|
|
self._cleanup_tasks: Dict[str, asyncio.Task] = {}
|
|
self._subscribers: Dict[str, Set[str]] = {} # context_id -> set of queue_ids (for future multi-subscriber support)
|
|
|
|
def create_queue(self, context_id: str) -> asyncio.Queue:
|
|
"""
|
|
Create a new event queue for a context.
|
|
|
|
Args:
|
|
context_id: Context ID (workflow_id, document_id, task_id, etc.)
|
|
|
|
Returns:
|
|
Event queue for the context
|
|
"""
|
|
if context_id not in self._queues:
|
|
self._queues[context_id] = asyncio.Queue()
|
|
self._locks[context_id] = asyncio.Lock()
|
|
self._subscribers[context_id] = set()
|
|
logger.debug(f"Created event queue for context {context_id}")
|
|
return self._queues[context_id]
|
|
|
|
def get_queue(self, context_id: str) -> Optional[asyncio.Queue]:
|
|
"""
|
|
Get existing event queue for a context.
|
|
|
|
Args:
|
|
context_id: Context ID
|
|
|
|
Returns:
|
|
Event queue if exists, None otherwise
|
|
"""
|
|
return self._queues.get(context_id)
|
|
|
|
async def emit_event(
|
|
self,
|
|
context_id: str,
|
|
event_type: str,
|
|
data: Dict[str, Any],
|
|
event_category: str = "default",
|
|
message: Optional[str] = None,
|
|
step: Optional[str] = None
|
|
):
|
|
"""
|
|
Emit an event to the context's event queue.
|
|
|
|
Args:
|
|
context_id: Context ID (workflow_id, document_id, etc.)
|
|
event_type: Type of event ("message", "log", "status", "progress", "complete", "error", "chatdata")
|
|
data: Event data dictionary (will be included in event)
|
|
event_category: Category of event for filtering ("chat", "workflow", "document", etc.)
|
|
message: Optional event message (for backward compatibility)
|
|
step: Optional processing step (for backward compatibility)
|
|
"""
|
|
queue = self.get_queue(context_id)
|
|
if not queue:
|
|
logger.debug(f"No event queue found for context {context_id}, skipping event")
|
|
return
|
|
|
|
event = {
|
|
"type": event_type,
|
|
"category": event_category,
|
|
"timestamp": datetime.now().timestamp(),
|
|
"data": data,
|
|
"message": message, # For backward compatibility
|
|
"step": step # For backward compatibility
|
|
}
|
|
|
|
try:
|
|
await queue.put(event)
|
|
logger.debug(f"Emitted {event_type} event (category: {event_category}) for context {context_id}")
|
|
except Exception as e:
|
|
logger.error(f"Error emitting event for context {context_id}: {e}")
|
|
|
|
async def stream_events(
|
|
self,
|
|
context_id: str,
|
|
event_categories: Optional[List[str]] = None,
|
|
timeout: Optional[float] = None
|
|
) -> AsyncIterator[Dict[str, Any]]:
|
|
"""
|
|
Async generator for streaming events from a context.
|
|
|
|
Args:
|
|
context_id: Context ID to stream events from
|
|
event_categories: Optional list of event categories to filter by
|
|
timeout: Optional timeout in seconds (None = no timeout, default: 300s for long-running streams)
|
|
|
|
Yields:
|
|
Event dictionaries
|
|
"""
|
|
queue = self.get_queue(context_id)
|
|
if not queue:
|
|
logger.warning(f"No queue found for context {context_id}")
|
|
return
|
|
|
|
# Default timeout of 5 minutes for long-running streams if not specified
|
|
effective_timeout = timeout if timeout is not None else 300.0
|
|
start_time = asyncio.get_event_loop().time()
|
|
last_event_time = start_time
|
|
heartbeat_interval = 30.0 # Send heartbeat every 30 seconds to keep connection alive
|
|
|
|
while True:
|
|
# Check timeout
|
|
elapsed = asyncio.get_event_loop().time() - start_time
|
|
if elapsed > effective_timeout:
|
|
logger.debug(f"Stream timeout for context {context_id} after {effective_timeout}s")
|
|
break
|
|
|
|
try:
|
|
# Wait for event with longer timeout to avoid premature closure
|
|
wait_timeout = heartbeat_interval # Check every 30 seconds
|
|
if effective_timeout:
|
|
remaining = effective_timeout - elapsed
|
|
if remaining <= 0:
|
|
break
|
|
wait_timeout = min(wait_timeout, remaining)
|
|
|
|
event = await asyncio.wait_for(queue.get(), timeout=wait_timeout)
|
|
last_event_time = asyncio.get_event_loop().time()
|
|
|
|
# Filter by category if specified
|
|
if event_categories and event.get("category") not in event_categories:
|
|
continue
|
|
|
|
yield event
|
|
|
|
except asyncio.TimeoutError:
|
|
# Send heartbeat to keep connection alive if no events
|
|
time_since_last_event = asyncio.get_event_loop().time() - last_event_time
|
|
if time_since_last_event >= heartbeat_interval:
|
|
# Send heartbeat event to keep stream alive
|
|
heartbeat_event = {
|
|
"type": "heartbeat",
|
|
"category": "system",
|
|
"timestamp": datetime.now().timestamp(),
|
|
"data": {"status": "alive"},
|
|
"message": None,
|
|
"step": None
|
|
}
|
|
yield heartbeat_event
|
|
last_event_time = asyncio.get_event_loop().time()
|
|
|
|
# Check if we should continue or timeout
|
|
elapsed = asyncio.get_event_loop().time() - start_time
|
|
if elapsed >= effective_timeout:
|
|
break
|
|
continue
|
|
except Exception as e:
|
|
logger.error(f"Error in stream_events for context {context_id}: {e}")
|
|
break
|
|
|
|
async def cleanup(self, context_id: str, delay: float = 60.0):
|
|
"""
|
|
Schedule cleanup of event queue after delay.
|
|
This allows time for any remaining events to be consumed.
|
|
|
|
Args:
|
|
context_id: Context ID
|
|
delay: Delay in seconds before cleanup (default: 60 seconds)
|
|
"""
|
|
if context_id in self._cleanup_tasks:
|
|
# Cancel existing cleanup task
|
|
self._cleanup_tasks[context_id].cancel()
|
|
|
|
async def _cleanup():
|
|
try:
|
|
await asyncio.sleep(delay)
|
|
if context_id in self._queues:
|
|
# Drain remaining events
|
|
queue = self._queues[context_id]
|
|
while not queue.empty():
|
|
try:
|
|
queue.get_nowait()
|
|
except asyncio.QueueEmpty:
|
|
break
|
|
|
|
del self._queues[context_id]
|
|
if context_id in self._locks:
|
|
del self._locks[context_id]
|
|
if context_id in self._subscribers:
|
|
del self._subscribers[context_id]
|
|
logger.info(f"Cleaned up event queue for context {context_id}")
|
|
except asyncio.CancelledError:
|
|
pass
|
|
except Exception as e:
|
|
logger.error(f"Error during cleanup for context {context_id}: {e}")
|
|
finally:
|
|
if context_id in self._cleanup_tasks:
|
|
del self._cleanup_tasks[context_id]
|
|
|
|
self._cleanup_tasks[context_id] = asyncio.create_task(_cleanup())
|
|
|
|
def has_queue(self, context_id: str) -> bool:
|
|
"""
|
|
Check if a queue exists for a context.
|
|
|
|
Args:
|
|
context_id: Context ID
|
|
|
|
Returns:
|
|
True if queue exists, False otherwise
|
|
"""
|
|
return context_id in self._queues
|
|
|
|
|
|
# Backward compatibility: ChatbotEventManager is an alias
|
|
ChatbotEventManager = StreamingEventManager
|
|
|
|
# Global singleton instance
|
|
_event_manager = StreamingEventManager()
|
|
|
|
|
|
def get_event_manager() -> StreamingEventManager:
|
|
"""Get the global event manager instance."""
|
|
return _event_manager
|
|
|
|
|