# Streaming Utility Architecture: Event-Driven Real-Time Updates ## Current Implementation ### Event Manager (`modules/features/chatbot/eventManager.py`) The `StreamingEventManager` is a **generic, reusable** event manager that provides: - **Generic Event Queue Management**: Per-context asyncio queues (not just workflows) - **Event Emission**: `emit_event()` method supporting multiple event types and categories - **Event Streaming**: `stream_events()` async generator for SSE streaming - **Automatic Cleanup**: Queue cleanup after delay (60 seconds default) - **Event Categories**: Filtering by category (chat, workflow, document, etc.) - **Thread-Safe**: Lock-based synchronization for concurrent access ### Architecture Overview The streaming system uses a **pure event-driven approach**: 1. **Event Emission**: When data changes (messages created, logs written), events are emitted directly 2. **Event Queue**: Events are queued per context (workflow_id, document_id, etc.) 3. **SSE Streaming**: Route endpoint streams events from queue in real-time 4. **No Database Polling**: The SSE endpoint does NOT poll the database - it only streams queued events ### Current Usage: Chatbot Feature The chatbot feature (`modules/features/chatbot/`) demonstrates the streaming architecture: - **Event Types**: `chatdata`, `complete`, `stopped`, `error` - **Event Categories**: `chat`, `workflow` - **Direct Emission**: Events emitted when messages/logs are created in `mainChatbot.py` ## Implementation Details ### 1. Event Manager API (`modules/features/chatbot/eventManager.py`) ```python class StreamingEventManager: """ Generic event manager for real-time streaming across all features. Supports multiple event types and contexts (workflows, documents, tasks, etc.) Thread-safe event emission and queue management. """ def __init__(self): self._queues: Dict[str, asyncio.Queue] = {} self._locks: Dict[str, asyncio.Lock] = {} self._cleanup_tasks: Dict[str, asyncio.Task] = {} self._subscribers: Dict[str, Set[str]] = {} # context_id -> set of queue_ids def create_queue(self, context_id: str) -> asyncio.Queue: """Create a new event queue for a context""" def get_queue(self, context_id: str) -> Optional[asyncio.Queue]: """Get existing event queue for a context""" def has_queue(self, context_id: str) -> bool: """Check if a queue exists for a context""" async def emit_event( self, context_id: str, # workflow_id, document_id, task_id, etc. event_type: str, # "message", "log", "status", "progress", "complete", "error", "chatdata" data: Dict[str, Any], # Flexible data structure event_category: str = "default", # "chat", "workflow", "document", etc. message: Optional[str] = None, # For backward compatibility step: Optional[str] = None # For backward compatibility ): """Emit event to the context's event queue""" async def stream_events( self, context_id: str, event_categories: Optional[List[str]] = None, timeout: Optional[float] = None ) -> AsyncIterator[Dict[str, Any]]: """Async generator for streaming events from a context""" async def cleanup(self, context_id: str, delay: float = 60.0): """Schedule cleanup of event queue after delay""" ``` **Global Singleton**: Access via `get_event_manager()` function ### 2. SSE Route Implementation (`modules/routes/routeChatbot.py`) The chatbot streaming endpoint (`/api/chatbot/start/stream`) demonstrates the pattern: ```python @router.post("/start/stream") async def stream_chatbot_start(...) -> StreamingResponse: event_manager = get_event_manager() # Start background processing (creates workflow and event queue) workflow = await chatProcess(currentUser, userInput, workflowId) # Get or create event queue queue = event_manager.get_queue(workflow.id) or event_manager.create_queue(workflow.id) async def event_stream(): """Pure event-driven streaming (no database polling)""" # 1. Send initial chat data once (from database) chatData = interfaceDbChat.getUnifiedChatData(workflow.id, None) if chatData.get("items"): for item in filtered_items: yield f"data: {json.dumps(item)}\n\n" # 2. Stream events from queue (event-driven) while True: try: # Get event from queue (blocks until event available) event = await asyncio.wait_for(queue.get(), timeout=1.0) # Handle event types if event["type"] == "chatdata": yield f"data: {json.dumps(event["data"])}\n\n" elif event["type"] == "complete": break # Close stream # ... other event types except asyncio.TimeoutError: # Send keepalive every 30 seconds yield f": keepalive\n\n" continue return StreamingResponse( event_stream(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no" } ) ``` **Key Points**: - **Initial Data**: Fetched once from database at stream start - **Event Streaming**: Pure event-driven from queue (no polling) - **Keepalive**: Sent every 30 seconds to keep connection alive - **Status Check**: Periodic workflow status check (every 5 seconds) only for stopped detection ### 3. Event Emission in Processing Code #### A. Chatbot Message Processing (`modules/features/chatbot/mainChatbot.py`) Events are emitted **directly when data is created**: ```python from modules.features.chatbot.eventManager import get_event_manager event_manager = get_event_manager() # When creating a user message userMessage = interfaceDbChat.createMessage(userMessageData) # Emit event immediately (exact chatData format) await event_manager.emit_event( context_id=workflow.id, event_type="chatdata", data={ "type": "message", "createdAt": message_timestamp, "item": userMessage.dict() }, event_category="chat" ) # When creating assistant message assistantMessage = interfaceDbChat.createMessage(assistantMessageData) # Emit event immediately await event_manager.emit_event( context_id=workflowId, event_type="chatdata", data={ "type": "message", "createdAt": message_timestamp, "item": assistantMessage.dict() }, event_category="chat" ) # When workflow completes await event_manager.emit_event( context_id=workflowId, event_type="complete", data={"workflowId": workflowId}, event_category="workflow", message="Chatbot-Verarbeitung abgeschlossen", step="complete" ) ``` #### B. Log Events Logs are stored in database and then emitted as events: ```python # Store log in database log_data = { "id": f"log_{uuid.uuid4()}", "workflowId": workflowId, "message": "Analysiere Benutzeranfrage...", "type": "info", "timestamp": getUtcTimestamp(), "status": "running", "roundNumber": round_number } interfaceDbChat.createLog(log_data) # Note: Logs are emitted via the route's periodic chatData fetch mechanism # OR can be emitted directly as events if needed ``` **Event Format**: Events use the exact `chatData` format: `{type, createdAt, item}` ## Benefits of Event-Driven Streaming ### Performance - **Reduced Server Load**: No constant database queries every 0.5-3 seconds - **Lower Latency**: Events delivered immediately (< 100ms) vs polling delay (500-3000ms) - **Bandwidth Efficiency**: Only send data when it changes, not empty responses - **No Database Polling**: SSE endpoint does NOT poll database - pure event-driven ### User Experience - **Real-time Updates**: Users see progress instantly as events occur - **Better Responsiveness**: No perceived delay from polling intervals - **Reduced Battery**: Mobile devices consume less power without constant polling - **Immediate Feedback**: Messages appear as soon as they're created ### Scalability - **Horizontal Scaling**: Event queues can be distributed (Redis, RabbitMQ) in future - **Connection Management**: Better handling of many concurrent streams - **Resource Efficiency**: One persistent connection vs many HTTP requests - **Memory Efficient**: Queues cleaned up automatically after workflow completion