236 lines
8.4 KiB
Markdown
236 lines
8.4 KiB
Markdown
# Streaming Utility Architecture: Event-Driven Real-Time Updates
|
|
|
|
## Current Implementation
|
|
|
|
### Event Manager (`modules/features/chatbot/eventManager.py`)
|
|
|
|
The `StreamingEventManager` is a **generic, reusable** event manager that provides:
|
|
|
|
- **Generic Event Queue Management**: Per-context asyncio queues (not just workflows)
|
|
- **Event Emission**: `emit_event()` method supporting multiple event types and categories
|
|
- **Event Streaming**: `stream_events()` async generator for SSE streaming
|
|
- **Automatic Cleanup**: Queue cleanup after delay (60 seconds default)
|
|
- **Event Categories**: Filtering by category (chat, workflow, document, etc.)
|
|
- **Thread-Safe**: Lock-based synchronization for concurrent access
|
|
|
|
### Architecture Overview
|
|
|
|
The streaming system uses a **pure event-driven approach**:
|
|
|
|
1. **Event Emission**: When data changes (messages created, logs written), events are emitted directly
|
|
2. **Event Queue**: Events are queued per context (workflow_id, document_id, etc.)
|
|
3. **SSE Streaming**: Route endpoint streams events from queue in real-time
|
|
4. **No Database Polling**: The SSE endpoint does NOT poll the database - it only streams queued events
|
|
|
|
### Current Usage: Chatbot Feature
|
|
|
|
The chatbot feature (`modules/features/chatbot/`) demonstrates the streaming architecture:
|
|
|
|
- **Event Types**: `chatdata`, `complete`, `stopped`, `error`
|
|
- **Event Categories**: `chat`, `workflow`
|
|
- **Direct Emission**: Events emitted when messages/logs are created in `mainChatbot.py`
|
|
|
|
## Implementation Details
|
|
|
|
### 1. Event Manager API (`modules/features/chatbot/eventManager.py`)
|
|
|
|
```python
|
|
class StreamingEventManager:
|
|
"""
|
|
Generic event manager for real-time streaming across all features.
|
|
Supports multiple event types and contexts (workflows, documents, tasks, etc.)
|
|
Thread-safe event emission and queue management.
|
|
"""
|
|
|
|
def __init__(self):
|
|
self._queues: Dict[str, asyncio.Queue] = {}
|
|
self._locks: Dict[str, asyncio.Lock] = {}
|
|
self._cleanup_tasks: Dict[str, asyncio.Task] = {}
|
|
self._subscribers: Dict[str, Set[str]] = {} # context_id -> set of queue_ids
|
|
|
|
def create_queue(self, context_id: str) -> asyncio.Queue:
|
|
"""Create a new event queue for a context"""
|
|
|
|
def get_queue(self, context_id: str) -> Optional[asyncio.Queue]:
|
|
"""Get existing event queue for a context"""
|
|
|
|
def has_queue(self, context_id: str) -> bool:
|
|
"""Check if a queue exists for a context"""
|
|
|
|
async def emit_event(
|
|
self,
|
|
context_id: str, # workflow_id, document_id, task_id, etc.
|
|
event_type: str, # "message", "log", "status", "progress", "complete", "error", "chatdata"
|
|
data: Dict[str, Any], # Flexible data structure
|
|
event_category: str = "default", # "chat", "workflow", "document", etc.
|
|
message: Optional[str] = None, # For backward compatibility
|
|
step: Optional[str] = None # For backward compatibility
|
|
):
|
|
"""Emit event to the context's event queue"""
|
|
|
|
async def stream_events(
|
|
self,
|
|
context_id: str,
|
|
event_categories: Optional[List[str]] = None,
|
|
timeout: Optional[float] = None
|
|
) -> AsyncIterator[Dict[str, Any]]:
|
|
"""Async generator for streaming events from a context"""
|
|
|
|
async def cleanup(self, context_id: str, delay: float = 60.0):
|
|
"""Schedule cleanup of event queue after delay"""
|
|
```
|
|
|
|
**Global Singleton**: Access via `get_event_manager()` function
|
|
|
|
### 2. SSE Route Implementation (`modules/routes/routeChatbot.py`)
|
|
|
|
The chatbot streaming endpoint (`/api/chatbot/start/stream`) demonstrates the pattern:
|
|
|
|
```python
|
|
@router.post("/start/stream")
|
|
async def stream_chatbot_start(...) -> StreamingResponse:
|
|
event_manager = get_event_manager()
|
|
|
|
# Start background processing (creates workflow and event queue)
|
|
workflow = await chatProcess(currentUser, userInput, workflowId)
|
|
|
|
# Get or create event queue
|
|
queue = event_manager.get_queue(workflow.id) or event_manager.create_queue(workflow.id)
|
|
|
|
async def event_stream():
|
|
"""Pure event-driven streaming (no database polling)"""
|
|
# 1. Send initial chat data once (from database)
|
|
chatData = interfaceDbChat.getUnifiedChatData(workflow.id, None)
|
|
if chatData.get("items"):
|
|
for item in filtered_items:
|
|
yield f"data: {json.dumps(item)}\n\n"
|
|
|
|
# 2. Stream events from queue (event-driven)
|
|
while True:
|
|
try:
|
|
# Get event from queue (blocks until event available)
|
|
event = await asyncio.wait_for(queue.get(), timeout=1.0)
|
|
|
|
# Handle event types
|
|
if event["type"] == "chatdata":
|
|
yield f"data: {json.dumps(event["data"])}\n\n"
|
|
elif event["type"] == "complete":
|
|
break # Close stream
|
|
# ... other event types
|
|
|
|
except asyncio.TimeoutError:
|
|
# Send keepalive every 30 seconds
|
|
yield f": keepalive\n\n"
|
|
continue
|
|
|
|
return StreamingResponse(
|
|
event_stream(),
|
|
media_type="text/event-stream",
|
|
headers={
|
|
"Cache-Control": "no-cache",
|
|
"Connection": "keep-alive",
|
|
"X-Accel-Buffering": "no"
|
|
}
|
|
)
|
|
```
|
|
|
|
**Key Points**:
|
|
- **Initial Data**: Fetched once from database at stream start
|
|
- **Event Streaming**: Pure event-driven from queue (no polling)
|
|
- **Keepalive**: Sent every 30 seconds to keep connection alive
|
|
- **Status Check**: Periodic workflow status check (every 5 seconds) only for stopped detection
|
|
|
|
### 3. Event Emission in Processing Code
|
|
|
|
#### A. Chatbot Message Processing (`modules/features/chatbot/mainChatbot.py`)
|
|
|
|
Events are emitted **directly when data is created**:
|
|
|
|
```python
|
|
from modules.features.chatbot.eventManager import get_event_manager
|
|
|
|
event_manager = get_event_manager()
|
|
|
|
# When creating a user message
|
|
userMessage = interfaceDbChat.createMessage(userMessageData)
|
|
|
|
# Emit event immediately (exact chatData format)
|
|
await event_manager.emit_event(
|
|
context_id=workflow.id,
|
|
event_type="chatdata",
|
|
data={
|
|
"type": "message",
|
|
"createdAt": message_timestamp,
|
|
"item": userMessage.dict()
|
|
},
|
|
event_category="chat"
|
|
)
|
|
|
|
# When creating assistant message
|
|
assistantMessage = interfaceDbChat.createMessage(assistantMessageData)
|
|
|
|
# Emit event immediately
|
|
await event_manager.emit_event(
|
|
context_id=workflowId,
|
|
event_type="chatdata",
|
|
data={
|
|
"type": "message",
|
|
"createdAt": message_timestamp,
|
|
"item": assistantMessage.dict()
|
|
},
|
|
event_category="chat"
|
|
)
|
|
|
|
# When workflow completes
|
|
await event_manager.emit_event(
|
|
context_id=workflowId,
|
|
event_type="complete",
|
|
data={"workflowId": workflowId},
|
|
event_category="workflow",
|
|
message="Chatbot-Verarbeitung abgeschlossen",
|
|
step="complete"
|
|
)
|
|
```
|
|
|
|
#### B. Log Events
|
|
|
|
Logs are stored in database and then emitted as events:
|
|
|
|
```python
|
|
# Store log in database
|
|
log_data = {
|
|
"id": f"log_{uuid.uuid4()}",
|
|
"workflowId": workflowId,
|
|
"message": "Analysiere Benutzeranfrage...",
|
|
"type": "info",
|
|
"timestamp": getUtcTimestamp(),
|
|
"status": "running",
|
|
"roundNumber": round_number
|
|
}
|
|
interfaceDbChat.createLog(log_data)
|
|
|
|
# Note: Logs are emitted via the route's periodic chatData fetch mechanism
|
|
# OR can be emitted directly as events if needed
|
|
```
|
|
|
|
**Event Format**: Events use the exact `chatData` format: `{type, createdAt, item}`
|
|
|
|
## Benefits of Event-Driven Streaming
|
|
|
|
### Performance
|
|
- **Reduced Server Load**: No constant database queries every 0.5-3 seconds
|
|
- **Lower Latency**: Events delivered immediately (< 100ms) vs polling delay (500-3000ms)
|
|
- **Bandwidth Efficiency**: Only send data when it changes, not empty responses
|
|
- **No Database Polling**: SSE endpoint does NOT poll database - pure event-driven
|
|
|
|
### User Experience
|
|
- **Real-time Updates**: Users see progress instantly as events occur
|
|
- **Better Responsiveness**: No perceived delay from polling intervals
|
|
- **Reduced Battery**: Mobile devices consume less power without constant polling
|
|
- **Immediate Feedback**: Messages appear as soon as they're created
|
|
|
|
### Scalability
|
|
- **Horizontal Scaling**: Event queues can be distributed (Redis, RabbitMQ) in future
|
|
- **Connection Management**: Better handling of many concurrent streams
|
|
- **Resource Efficiency**: One persistent connection vs many HTTP requests
|
|
- **Memory Efficient**: Queues cleaned up automatically after workflow completion
|