10 KiB
10 KiB
Streaming Utility Architecture: Transforming eventManager into a Shared Utility
Current State Analysis
Existing Implementation
The eventManager.py in modules/features/chatbot/ currently provides:
- Event Queue Management: Per-workflow asyncio queues for SSE streaming
- Event Emission:
emit_event()method for chatbot-specific events - Cleanup: Automatic queue cleanup after workflow completion
- SSE Streaming: Used in
/api/chatbot/start/streamendpoint
Current Limitations
- Chatbot-Specific: Hardcoded for chatbot workflows only
- Polling Still Required: Frontend still polls
getUnifiedChatData()every 0.5 seconds even with SSE - Not Reusable: Other features (workflows, document generation, etc.) can't use it
- Mixed Approach: SSE endpoint still internally polls database instead of pure event-driven streaming
Frontend Polling Pattern
Currently, the frontend uses:
useWorkflowPolling.ts- Polls/api/workflow/{id}/chatDataevery few secondsuseWorkflowLifecycle.ts- Manages polling lifecycle and state- Rate limit handling and backoff logic for failed polls
Proposed Architecture: Shared Streaming Utility
1. Generic Event Manager (modules/shared/streamingManager.py)
class StreamingEventManager:
"""
Generic event manager for real-time streaming across all features.
Supports multiple event types and contexts (workflows, documents, tasks, etc.)
"""
def __init__(self):
self._queues: Dict[str, asyncio.Queue] = {}
self._locks: Dict[str, asyncio.Lock] = {}
self._cleanup_tasks: Dict[str, asyncio.Task] = {}
self._subscribers: Dict[str, Set[str]] = {} # context_id -> set of queue_ids
async def emit_event(
self,
context_id: str, # workflow_id, document_id, task_id, etc.
event_type: str, # "message", "log", "status", "progress", "complete", "error"
data: Dict[str, Any], # Flexible data structure
event_category: str = "default" # "chat", "workflow", "document", etc.
):
"""Emit event to all subscribers of a context"""
def create_stream(
self,
context_id: str,
event_categories: Optional[List[str]] = None # Filter by category
) -> asyncio.Queue:
"""Create a new stream for a context"""
async def stream_events(
self,
context_id: str,
event_categories: Optional[List[str]] = None
) -> AsyncIterator[Dict[str, Any]]:
"""Async generator for streaming events"""
2. Generic SSE Route Helper (modules/shared/sseUtils.py)
def create_sse_stream(
context_id: str,
event_categories: Optional[List[str]] = None,
initial_data_callback: Optional[Callable] = None,
timeout: float = 300.0
) -> StreamingResponse:
"""
Create a generic SSE streaming response.
Args:
context_id: Workflow ID, document ID, or other context identifier
event_categories: Filter events by category (e.g., ["chat", "workflow"])
initial_data_callback: Optional function to fetch initial state
timeout: Stream timeout in seconds
"""
streaming_manager = get_streaming_manager()
async def event_stream():
# Send initial data if callback provided
if initial_data_callback:
initial_data = await initial_data_callback(context_id)
yield format_sse_event("initial", initial_data)
# Stream events from manager
async for event in streaming_manager.stream_events(context_id, event_categories):
yield format_sse_event(event["type"], event["data"])
return StreamingResponse(
event_stream(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no"
}
)
3. Integration Points
A. Workflow Processing
# In workflow processing code
from modules.shared.streamingManager import get_streaming_manager
streaming_manager = get_streaming_manager()
# Emit progress updates
await streaming_manager.emit_event(
context_id=workflow_id,
event_type="progress",
data={"step": "analyzing", "message": "Processing documents..."},
event_category="workflow"
)
# Emit new messages
await streaming_manager.emit_event(
context_id=workflow_id,
event_type="message",
data={"role": "assistant", "content": "Response text"},
event_category="chat"
)
B. Route Endpoints
# Generic streaming endpoint for any context
@router.get("/{contextId}/stream")
async def stream_context_updates(
contextId: str,
categories: Optional[str] = Query(None), # Comma-separated categories
currentUser: User = Depends(getCurrentUser)
):
event_categories = categories.split(",") if categories else None
# Optional: Fetch initial state
async def get_initial_data(ctx_id: str):
interfaceDbChat = getServiceChat(currentUser)
return interfaceDbChat.getUnifiedChatData(ctx_id, None)
return create_sse_stream(
context_id=contextId,
event_categories=event_categories,
initial_data_callback=get_initial_data
)
Benefits of Streaming vs Polling
Performance
- Reduced Server Load: No constant database queries every 0.5-3 seconds
- Lower Latency: Events delivered immediately (< 100ms) vs polling delay (500-3000ms)
- Bandwidth Efficiency: Only send data when it changes, not empty responses
User Experience
- Real-time Updates: Users see progress instantly
- Better Responsiveness: No perceived delay from polling intervals
- Reduced Battery: Mobile devices consume less power without constant polling
Scalability
- Horizontal Scaling: Event queues can be distributed (Redis, RabbitMQ)
- Connection Management: Better handling of many concurrent streams
- Resource Efficiency: One persistent connection vs many HTTP requests
Migration Strategy
Phase 1: Create Shared Utility
- Move
eventManager.py→modules/shared/streamingManager.py - Generalize for any context type (not just workflows)
- Add event categorization and filtering
- Create
sseUtils.pyhelper functions
Phase 2: Update Chatbot Feature
- Update chatbot to use shared streaming manager
- Replace internal polling in SSE endpoint with pure event-driven streaming
- Emit events directly when data changes (in database write operations)
Phase 3: Migrate Other Features
- Workflows: Add streaming to workflow processing
- Document Generation: Stream document creation progress
- Data Processing: Stream extraction/transformation progress
- Any Long-Running Task: Use streaming for status updates
Phase 4: Frontend Migration
- Replace
useWorkflowPollingwith SSE EventSource connections - Create generic
useStreaminghook - Update all components to use streaming instead of polling
- Remove polling logic entirely
Implementation Details
Event-Driven Data Emission
Instead of polling getUnifiedChatData(), emit events when data changes:
# In interfaceDbChatObjects.py - when creating a message
def createMessage(self, workflowId: str, message: ChatMessage):
# ... existing database write ...
# Emit streaming event
from modules.shared.streamingManager import get_streaming_manager
streaming_manager = get_streaming_manager()
asyncio.create_task(streaming_manager.emit_event(
context_id=workflowId,
event_type="message",
data={
"type": "message",
"createdAt": message.publishedAt,
"item": message.dict()
},
event_category="chat"
))
Frontend Integration
// Generic streaming hook
function useStreaming<T>(
contextId: string,
categories?: string[],
onEvent?: (event: T) => void
) {
useEffect(() => {
const eventSource = new EventSource(
`/api/stream/${contextId}?categories=${categories?.join(',')}`
);
eventSource.onmessage = (e) => {
const event = JSON.parse(e.data);
onEvent?.(event);
};
return () => eventSource.close();
}, [contextId, categories]);
}
Key Design Decisions
1. Context-Based Streaming
- Use generic
context_idinstead ofworkflow_id - Supports workflows, documents, tasks, user sessions, etc.
2. Event Categories
- Allow filtering by category (chat, workflow, document, etc.)
- Enables multiple features to stream from same context
3. Backward Compatibility
- Keep existing polling endpoints during migration
- Gradually migrate features one at a time
- Frontend can use both during transition
4. Error Handling
- Graceful degradation if streaming unavailable
- Automatic reconnection logic in frontend
- Fallback to polling if SSE fails
Example: Complete Flow
Backend: Workflow Processing
async def process_workflow(workflow_id: str):
streaming = get_streaming_manager()
# Emit status update
await streaming.emit_event(workflow_id, "status",
{"status": "running"}, "workflow")
# Process and emit messages
result = await ai_call(...)
await streaming.emit_event(workflow_id, "message",
{"role": "assistant", "content": result}, "chat")
# Emit completion
await streaming.emit_event(workflow_id, "complete",
{"status": "completed"}, "workflow")
Frontend: React Hook
function ChatComponent({ workflowId }: { workflowId: string }) {
const [messages, setMessages] = useState<Message[]>([]);
useStreaming(workflowId, ["chat"], (event) => {
if (event.type === "message") {
setMessages(prev => [...prev, event.item]);
}
});
return <MessageList messages={messages} />;
}
Conclusion
By transforming eventManager.py into a shared streaming utility:
- Eliminates Polling: All features can stream updates in real-time
- Improves Performance: Reduces server load and latency
- Better UX: Instant updates instead of polling delays
- Reusable: Any feature can use streaming with minimal code
- Scalable: Foundation for future real-time features
The migration can be done incrementally, feature by feature, without breaking existing functionality.