gateway/docs/STREAMING_UTILITY_ARCHITECTURE.md

10 KiB

Streaming Utility Architecture: Transforming eventManager into a Shared Utility

Current State Analysis

Existing Implementation

The eventManager.py in modules/features/chatbot/ currently provides:

  • Event Queue Management: Per-workflow asyncio queues for SSE streaming
  • Event Emission: emit_event() method for chatbot-specific events
  • Cleanup: Automatic queue cleanup after workflow completion
  • SSE Streaming: Used in /api/chatbot/start/stream endpoint

Current Limitations

  1. Chatbot-Specific: Hardcoded for chatbot workflows only
  2. Polling Still Required: Frontend still polls getUnifiedChatData() every 0.5 seconds even with SSE
  3. Not Reusable: Other features (workflows, document generation, etc.) can't use it
  4. Mixed Approach: SSE endpoint still internally polls database instead of pure event-driven streaming

Frontend Polling Pattern

Currently, the frontend uses:

  • useWorkflowPolling.ts - Polls /api/workflow/{id}/chatData every few seconds
  • useWorkflowLifecycle.ts - Manages polling lifecycle and state
  • Rate limit handling and backoff logic for failed polls

Proposed Architecture: Shared Streaming Utility

1. Generic Event Manager (modules/shared/streamingManager.py)

class StreamingEventManager:
    """
    Generic event manager for real-time streaming across all features.
    Supports multiple event types and contexts (workflows, documents, tasks, etc.)
    """
    
    def __init__(self):
        self._queues: Dict[str, asyncio.Queue] = {}
        self._locks: Dict[str, asyncio.Lock] = {}
        self._cleanup_tasks: Dict[str, asyncio.Task] = {}
        self._subscribers: Dict[str, Set[str]] = {}  # context_id -> set of queue_ids
    
    async def emit_event(
        self,
        context_id: str,  # workflow_id, document_id, task_id, etc.
        event_type: str,  # "message", "log", "status", "progress", "complete", "error"
        data: Dict[str, Any],  # Flexible data structure
        event_category: str = "default"  # "chat", "workflow", "document", etc.
    ):
        """Emit event to all subscribers of a context"""
        
    def create_stream(
        self,
        context_id: str,
        event_categories: Optional[List[str]] = None  # Filter by category
    ) -> asyncio.Queue:
        """Create a new stream for a context"""
        
    async def stream_events(
        self,
        context_id: str,
        event_categories: Optional[List[str]] = None
    ) -> AsyncIterator[Dict[str, Any]]:
        """Async generator for streaming events"""

2. Generic SSE Route Helper (modules/shared/sseUtils.py)

def create_sse_stream(
    context_id: str,
    event_categories: Optional[List[str]] = None,
    initial_data_callback: Optional[Callable] = None,
    timeout: float = 300.0
) -> StreamingResponse:
    """
    Create a generic SSE streaming response.
    
    Args:
        context_id: Workflow ID, document ID, or other context identifier
        event_categories: Filter events by category (e.g., ["chat", "workflow"])
        initial_data_callback: Optional function to fetch initial state
        timeout: Stream timeout in seconds
    """
    streaming_manager = get_streaming_manager()
    
    async def event_stream():
        # Send initial data if callback provided
        if initial_data_callback:
            initial_data = await initial_data_callback(context_id)
            yield format_sse_event("initial", initial_data)
        
        # Stream events from manager
        async for event in streaming_manager.stream_events(context_id, event_categories):
            yield format_sse_event(event["type"], event["data"])
    
    return StreamingResponse(
        event_stream(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no"
        }
    )

3. Integration Points

A. Workflow Processing

# In workflow processing code
from modules.shared.streamingManager import get_streaming_manager

streaming_manager = get_streaming_manager()

# Emit progress updates
await streaming_manager.emit_event(
    context_id=workflow_id,
    event_type="progress",
    data={"step": "analyzing", "message": "Processing documents..."},
    event_category="workflow"
)

# Emit new messages
await streaming_manager.emit_event(
    context_id=workflow_id,
    event_type="message",
    data={"role": "assistant", "content": "Response text"},
    event_category="chat"
)

B. Route Endpoints

# Generic streaming endpoint for any context
@router.get("/{contextId}/stream")
async def stream_context_updates(
    contextId: str,
    categories: Optional[str] = Query(None),  # Comma-separated categories
    currentUser: User = Depends(getCurrentUser)
):
    event_categories = categories.split(",") if categories else None
    
    # Optional: Fetch initial state
    async def get_initial_data(ctx_id: str):
        interfaceDbChat = getServiceChat(currentUser)
        return interfaceDbChat.getUnifiedChatData(ctx_id, None)
    
    return create_sse_stream(
        context_id=contextId,
        event_categories=event_categories,
        initial_data_callback=get_initial_data
    )

Benefits of Streaming vs Polling

Performance

  • Reduced Server Load: No constant database queries every 0.5-3 seconds
  • Lower Latency: Events delivered immediately (< 100ms) vs polling delay (500-3000ms)
  • Bandwidth Efficiency: Only send data when it changes, not empty responses

User Experience

  • Real-time Updates: Users see progress instantly
  • Better Responsiveness: No perceived delay from polling intervals
  • Reduced Battery: Mobile devices consume less power without constant polling

Scalability

  • Horizontal Scaling: Event queues can be distributed (Redis, RabbitMQ)
  • Connection Management: Better handling of many concurrent streams
  • Resource Efficiency: One persistent connection vs many HTTP requests

Migration Strategy

Phase 1: Create Shared Utility

  1. Move eventManager.pymodules/shared/streamingManager.py
  2. Generalize for any context type (not just workflows)
  3. Add event categorization and filtering
  4. Create sseUtils.py helper functions

Phase 2: Update Chatbot Feature

  1. Update chatbot to use shared streaming manager
  2. Replace internal polling in SSE endpoint with pure event-driven streaming
  3. Emit events directly when data changes (in database write operations)

Phase 3: Migrate Other Features

  1. Workflows: Add streaming to workflow processing
  2. Document Generation: Stream document creation progress
  3. Data Processing: Stream extraction/transformation progress
  4. Any Long-Running Task: Use streaming for status updates

Phase 4: Frontend Migration

  1. Replace useWorkflowPolling with SSE EventSource connections
  2. Create generic useStreaming hook
  3. Update all components to use streaming instead of polling
  4. Remove polling logic entirely

Implementation Details

Event-Driven Data Emission

Instead of polling getUnifiedChatData(), emit events when data changes:

# In interfaceDbChatObjects.py - when creating a message
def createMessage(self, workflowId: str, message: ChatMessage):
    # ... existing database write ...
    
    # Emit streaming event
    from modules.shared.streamingManager import get_streaming_manager
    streaming_manager = get_streaming_manager()
    asyncio.create_task(streaming_manager.emit_event(
        context_id=workflowId,
        event_type="message",
        data={
            "type": "message",
            "createdAt": message.publishedAt,
            "item": message.dict()
        },
        event_category="chat"
    ))

Frontend Integration

// Generic streaming hook
function useStreaming<T>(
  contextId: string,
  categories?: string[],
  onEvent?: (event: T) => void
) {
  useEffect(() => {
    const eventSource = new EventSource(
      `/api/stream/${contextId}?categories=${categories?.join(',')}`
    );
    
    eventSource.onmessage = (e) => {
      const event = JSON.parse(e.data);
      onEvent?.(event);
    };
    
    return () => eventSource.close();
  }, [contextId, categories]);
}

Key Design Decisions

1. Context-Based Streaming

  • Use generic context_id instead of workflow_id
  • Supports workflows, documents, tasks, user sessions, etc.

2. Event Categories

  • Allow filtering by category (chat, workflow, document, etc.)
  • Enables multiple features to stream from same context

3. Backward Compatibility

  • Keep existing polling endpoints during migration
  • Gradually migrate features one at a time
  • Frontend can use both during transition

4. Error Handling

  • Graceful degradation if streaming unavailable
  • Automatic reconnection logic in frontend
  • Fallback to polling if SSE fails

Example: Complete Flow

Backend: Workflow Processing

async def process_workflow(workflow_id: str):
    streaming = get_streaming_manager()
    
    # Emit status update
    await streaming.emit_event(workflow_id, "status", 
        {"status": "running"}, "workflow")
    
    # Process and emit messages
    result = await ai_call(...)
    await streaming.emit_event(workflow_id, "message",
        {"role": "assistant", "content": result}, "chat")
    
    # Emit completion
    await streaming.emit_event(workflow_id, "complete",
        {"status": "completed"}, "workflow")

Frontend: React Hook

function ChatComponent({ workflowId }: { workflowId: string }) {
  const [messages, setMessages] = useState<Message[]>([]);
  
  useStreaming(workflowId, ["chat"], (event) => {
    if (event.type === "message") {
      setMessages(prev => [...prev, event.item]);
    }
  });
  
  return <MessageList messages={messages} />;
}

Conclusion

By transforming eventManager.py into a shared streaming utility:

  1. Eliminates Polling: All features can stream updates in real-time
  2. Improves Performance: Reduces server load and latency
  3. Better UX: Instant updates instead of polling delays
  4. Reusable: Any feature can use streaming with minimal code
  5. Scalable: Foundation for future real-time features

The migration can be done incrementally, feature by feature, without breaking existing functionality.