8.4 KiB
8.4 KiB
Streaming Utility Architecture: Event-Driven Real-Time Updates
Current Implementation
Event Manager (modules/features/chatbot/eventManager.py)
The StreamingEventManager is a generic, reusable event manager that provides:
- Generic Event Queue Management: Per-context asyncio queues (not just workflows)
- Event Emission:
emit_event()method supporting multiple event types and categories - Event Streaming:
stream_events()async generator for SSE streaming - Automatic Cleanup: Queue cleanup after delay (60 seconds default)
- Event Categories: Filtering by category (chat, workflow, document, etc.)
- Thread-Safe: Lock-based synchronization for concurrent access
Architecture Overview
The streaming system uses a pure event-driven approach:
- Event Emission: When data changes (messages created, logs written), events are emitted directly
- Event Queue: Events are queued per context (workflow_id, document_id, etc.)
- SSE Streaming: Route endpoint streams events from queue in real-time
- No Database Polling: The SSE endpoint does NOT poll the database - it only streams queued events
Current Usage: Chatbot Feature
The chatbot feature (modules/features/chatbot/) demonstrates the streaming architecture:
- Event Types:
chatdata,complete,stopped,error - Event Categories:
chat,workflow - Direct Emission: Events emitted when messages/logs are created in
mainChatbot.py
Implementation Details
1. Event Manager API (modules/features/chatbot/eventManager.py)
class StreamingEventManager:
"""
Generic event manager for real-time streaming across all features.
Supports multiple event types and contexts (workflows, documents, tasks, etc.)
Thread-safe event emission and queue management.
"""
def __init__(self):
self._queues: Dict[str, asyncio.Queue] = {}
self._locks: Dict[str, asyncio.Lock] = {}
self._cleanup_tasks: Dict[str, asyncio.Task] = {}
self._subscribers: Dict[str, Set[str]] = {} # context_id -> set of queue_ids
def create_queue(self, context_id: str) -> asyncio.Queue:
"""Create a new event queue for a context"""
def get_queue(self, context_id: str) -> Optional[asyncio.Queue]:
"""Get existing event queue for a context"""
def has_queue(self, context_id: str) -> bool:
"""Check if a queue exists for a context"""
async def emit_event(
self,
context_id: str, # workflow_id, document_id, task_id, etc.
event_type: str, # "message", "log", "status", "progress", "complete", "error", "chatdata"
data: Dict[str, Any], # Flexible data structure
event_category: str = "default", # "chat", "workflow", "document", etc.
message: Optional[str] = None, # For backward compatibility
step: Optional[str] = None # For backward compatibility
):
"""Emit event to the context's event queue"""
async def stream_events(
self,
context_id: str,
event_categories: Optional[List[str]] = None,
timeout: Optional[float] = None
) -> AsyncIterator[Dict[str, Any]]:
"""Async generator for streaming events from a context"""
async def cleanup(self, context_id: str, delay: float = 60.0):
"""Schedule cleanup of event queue after delay"""
Global Singleton: Access via get_event_manager() function
2. SSE Route Implementation (modules/routes/routeChatbot.py)
The chatbot streaming endpoint (/api/chatbot/start/stream) demonstrates the pattern:
@router.post("/start/stream")
async def stream_chatbot_start(...) -> StreamingResponse:
event_manager = get_event_manager()
# Start background processing (creates workflow and event queue)
workflow = await chatProcess(currentUser, userInput, workflowId)
# Get or create event queue
queue = event_manager.get_queue(workflow.id) or event_manager.create_queue(workflow.id)
async def event_stream():
"""Pure event-driven streaming (no database polling)"""
# 1. Send initial chat data once (from database)
chatData = interfaceDbChat.getUnifiedChatData(workflow.id, None)
if chatData.get("items"):
for item in filtered_items:
yield f"data: {json.dumps(item)}\n\n"
# 2. Stream events from queue (event-driven)
while True:
try:
# Get event from queue (blocks until event available)
event = await asyncio.wait_for(queue.get(), timeout=1.0)
# Handle event types
if event["type"] == "chatdata":
yield f"data: {json.dumps(event["data"])}\n\n"
elif event["type"] == "complete":
break # Close stream
# ... other event types
except asyncio.TimeoutError:
# Send keepalive every 30 seconds
yield f": keepalive\n\n"
continue
return StreamingResponse(
event_stream(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no"
}
)
Key Points:
- Initial Data: Fetched once from database at stream start
- Event Streaming: Pure event-driven from queue (no polling)
- Keepalive: Sent every 30 seconds to keep connection alive
- Status Check: Periodic workflow status check (every 5 seconds) only for stopped detection
3. Event Emission in Processing Code
A. Chatbot Message Processing (modules/features/chatbot/mainChatbot.py)
Events are emitted directly when data is created:
from modules.features.chatbot.eventManager import get_event_manager
event_manager = get_event_manager()
# When creating a user message
userMessage = interfaceDbChat.createMessage(userMessageData)
# Emit event immediately (exact chatData format)
await event_manager.emit_event(
context_id=workflow.id,
event_type="chatdata",
data={
"type": "message",
"createdAt": message_timestamp,
"item": userMessage.dict()
},
event_category="chat"
)
# When creating assistant message
assistantMessage = interfaceDbChat.createMessage(assistantMessageData)
# Emit event immediately
await event_manager.emit_event(
context_id=workflowId,
event_type="chatdata",
data={
"type": "message",
"createdAt": message_timestamp,
"item": assistantMessage.dict()
},
event_category="chat"
)
# When workflow completes
await event_manager.emit_event(
context_id=workflowId,
event_type="complete",
data={"workflowId": workflowId},
event_category="workflow",
message="Chatbot-Verarbeitung abgeschlossen",
step="complete"
)
B. Log Events
Logs are stored in database and then emitted as events:
# Store log in database
log_data = {
"id": f"log_{uuid.uuid4()}",
"workflowId": workflowId,
"message": "Analysiere Benutzeranfrage...",
"type": "info",
"timestamp": getUtcTimestamp(),
"status": "running",
"roundNumber": round_number
}
interfaceDbChat.createLog(log_data)
# Note: Logs are emitted via the route's periodic chatData fetch mechanism
# OR can be emitted directly as events if needed
Event Format: Events use the exact chatData format: {type, createdAt, item}
Benefits of Event-Driven Streaming
Performance
- Reduced Server Load: No constant database queries every 0.5-3 seconds
- Lower Latency: Events delivered immediately (< 100ms) vs polling delay (500-3000ms)
- Bandwidth Efficiency: Only send data when it changes, not empty responses
- No Database Polling: SSE endpoint does NOT poll database - pure event-driven
User Experience
- Real-time Updates: Users see progress instantly as events occur
- Better Responsiveness: No perceived delay from polling intervals
- Reduced Battery: Mobile devices consume less power without constant polling
- Immediate Feedback: Messages appear as soon as they're created
Scalability
- Horizontal Scaling: Event queues can be distributed (Redis, RabbitMQ) in future
- Connection Management: Better handling of many concurrent streams
- Resource Efficiency: One persistent connection vs many HTTP requests
- Memory Efficient: Queues cleaned up automatically after workflow completion