# Streaming Utility Architecture: Transforming eventManager into a Shared Utility ## Current State Analysis ### Existing Implementation The `eventManager.py` in `modules/features/chatbot/` currently provides: - **Event Queue Management**: Per-workflow asyncio queues for SSE streaming - **Event Emission**: `emit_event()` method for chatbot-specific events - **Cleanup**: Automatic queue cleanup after workflow completion - **SSE Streaming**: Used in `/api/chatbot/start/stream` endpoint ### Current Limitations 1. **Chatbot-Specific**: Hardcoded for chatbot workflows only 2. **Polling Still Required**: Frontend still polls `getUnifiedChatData()` every 0.5 seconds even with SSE 3. **Not Reusable**: Other features (workflows, document generation, etc.) can't use it 4. **Mixed Approach**: SSE endpoint still internally polls database instead of pure event-driven streaming ### Frontend Polling Pattern Currently, the frontend uses: - `useWorkflowPolling.ts` - Polls `/api/workflow/{id}/chatData` every few seconds - `useWorkflowLifecycle.ts` - Manages polling lifecycle and state - Rate limit handling and backoff logic for failed polls ## Proposed Architecture: Shared Streaming Utility ### 1. Generic Event Manager (`modules/shared/streamingManager.py`) ```python class StreamingEventManager: """ Generic event manager for real-time streaming across all features. Supports multiple event types and contexts (workflows, documents, tasks, etc.) """ def __init__(self): self._queues: Dict[str, asyncio.Queue] = {} self._locks: Dict[str, asyncio.Lock] = {} self._cleanup_tasks: Dict[str, asyncio.Task] = {} self._subscribers: Dict[str, Set[str]] = {} # context_id -> set of queue_ids async def emit_event( self, context_id: str, # workflow_id, document_id, task_id, etc. event_type: str, # "message", "log", "status", "progress", "complete", "error" data: Dict[str, Any], # Flexible data structure event_category: str = "default" # "chat", "workflow", "document", etc. ): """Emit event to all subscribers of a context""" def create_stream( self, context_id: str, event_categories: Optional[List[str]] = None # Filter by category ) -> asyncio.Queue: """Create a new stream for a context""" async def stream_events( self, context_id: str, event_categories: Optional[List[str]] = None ) -> AsyncIterator[Dict[str, Any]]: """Async generator for streaming events""" ``` ### 2. Generic SSE Route Helper (`modules/shared/sseUtils.py`) ```python def create_sse_stream( context_id: str, event_categories: Optional[List[str]] = None, initial_data_callback: Optional[Callable] = None, timeout: float = 300.0 ) -> StreamingResponse: """ Create a generic SSE streaming response. Args: context_id: Workflow ID, document ID, or other context identifier event_categories: Filter events by category (e.g., ["chat", "workflow"]) initial_data_callback: Optional function to fetch initial state timeout: Stream timeout in seconds """ streaming_manager = get_streaming_manager() async def event_stream(): # Send initial data if callback provided if initial_data_callback: initial_data = await initial_data_callback(context_id) yield format_sse_event("initial", initial_data) # Stream events from manager async for event in streaming_manager.stream_events(context_id, event_categories): yield format_sse_event(event["type"], event["data"]) return StreamingResponse( event_stream(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no" } ) ``` ### 3. Integration Points #### A. Workflow Processing ```python # In workflow processing code from modules.shared.streamingManager import get_streaming_manager streaming_manager = get_streaming_manager() # Emit progress updates await streaming_manager.emit_event( context_id=workflow_id, event_type="progress", data={"step": "analyzing", "message": "Processing documents..."}, event_category="workflow" ) # Emit new messages await streaming_manager.emit_event( context_id=workflow_id, event_type="message", data={"role": "assistant", "content": "Response text"}, event_category="chat" ) ``` #### B. Route Endpoints ```python # Generic streaming endpoint for any context @router.get("/{contextId}/stream") async def stream_context_updates( contextId: str, categories: Optional[str] = Query(None), # Comma-separated categories currentUser: User = Depends(getCurrentUser) ): event_categories = categories.split(",") if categories else None # Optional: Fetch initial state async def get_initial_data(ctx_id: str): interfaceDbChat = getServiceChat(currentUser) return interfaceDbChat.getUnifiedChatData(ctx_id, None) return create_sse_stream( context_id=contextId, event_categories=event_categories, initial_data_callback=get_initial_data ) ``` ## Benefits of Streaming vs Polling ### Performance - **Reduced Server Load**: No constant database queries every 0.5-3 seconds - **Lower Latency**: Events delivered immediately (< 100ms) vs polling delay (500-3000ms) - **Bandwidth Efficiency**: Only send data when it changes, not empty responses ### User Experience - **Real-time Updates**: Users see progress instantly - **Better Responsiveness**: No perceived delay from polling intervals - **Reduced Battery**: Mobile devices consume less power without constant polling ### Scalability - **Horizontal Scaling**: Event queues can be distributed (Redis, RabbitMQ) - **Connection Management**: Better handling of many concurrent streams - **Resource Efficiency**: One persistent connection vs many HTTP requests ## Migration Strategy ### Phase 1: Create Shared Utility 1. Move `eventManager.py` → `modules/shared/streamingManager.py` 2. Generalize for any context type (not just workflows) 3. Add event categorization and filtering 4. Create `sseUtils.py` helper functions ### Phase 2: Update Chatbot Feature 1. Update chatbot to use shared streaming manager 2. Replace internal polling in SSE endpoint with pure event-driven streaming 3. Emit events directly when data changes (in database write operations) ### Phase 3: Migrate Other Features 1. **Workflows**: Add streaming to workflow processing 2. **Document Generation**: Stream document creation progress 3. **Data Processing**: Stream extraction/transformation progress 4. **Any Long-Running Task**: Use streaming for status updates ### Phase 4: Frontend Migration 1. Replace `useWorkflowPolling` with SSE EventSource connections 2. Create generic `useStreaming` hook 3. Update all components to use streaming instead of polling 4. Remove polling logic entirely ## Implementation Details ### Event-Driven Data Emission Instead of polling `getUnifiedChatData()`, emit events when data changes: ```python # In interfaceDbChatObjects.py - when creating a message def createMessage(self, workflowId: str, message: ChatMessage): # ... existing database write ... # Emit streaming event from modules.shared.streamingManager import get_streaming_manager streaming_manager = get_streaming_manager() asyncio.create_task(streaming_manager.emit_event( context_id=workflowId, event_type="message", data={ "type": "message", "createdAt": message.publishedAt, "item": message.dict() }, event_category="chat" )) ``` ### Frontend Integration ```typescript // Generic streaming hook function useStreaming( contextId: string, categories?: string[], onEvent?: (event: T) => void ) { useEffect(() => { const eventSource = new EventSource( `/api/stream/${contextId}?categories=${categories?.join(',')}` ); eventSource.onmessage = (e) => { const event = JSON.parse(e.data); onEvent?.(event); }; return () => eventSource.close(); }, [contextId, categories]); } ``` ## Key Design Decisions ### 1. Context-Based Streaming - Use generic `context_id` instead of `workflow_id` - Supports workflows, documents, tasks, user sessions, etc. ### 2. Event Categories - Allow filtering by category (chat, workflow, document, etc.) - Enables multiple features to stream from same context ### 3. Backward Compatibility - Keep existing polling endpoints during migration - Gradually migrate features one at a time - Frontend can use both during transition ### 4. Error Handling - Graceful degradation if streaming unavailable - Automatic reconnection logic in frontend - Fallback to polling if SSE fails ## Example: Complete Flow ### Backend: Workflow Processing ```python async def process_workflow(workflow_id: str): streaming = get_streaming_manager() # Emit status update await streaming.emit_event(workflow_id, "status", {"status": "running"}, "workflow") # Process and emit messages result = await ai_call(...) await streaming.emit_event(workflow_id, "message", {"role": "assistant", "content": result}, "chat") # Emit completion await streaming.emit_event(workflow_id, "complete", {"status": "completed"}, "workflow") ``` ### Frontend: React Hook ```typescript function ChatComponent({ workflowId }: { workflowId: string }) { const [messages, setMessages] = useState([]); useStreaming(workflowId, ["chat"], (event) => { if (event.type === "message") { setMessages(prev => [...prev, event.item]); } }); return ; } ``` ## Conclusion By transforming `eventManager.py` into a shared streaming utility: 1. **Eliminates Polling**: All features can stream updates in real-time 2. **Improves Performance**: Reduces server load and latency 3. **Better UX**: Instant updates instead of polling delays 4. **Reusable**: Any feature can use streaming with minimal code 5. **Scalable**: Foundation for future real-time features The migration can be done incrementally, feature by feature, without breaking existing functionality.