wiki/implementation/Chatbot/implementation_SSE.md

236 lines
8.4 KiB
Markdown

# Streaming Utility Architecture: Event-Driven Real-Time Updates
## Current Implementation
### Event Manager (`modules/features/chatbot/eventManager.py`)
The `StreamingEventManager` is a **generic, reusable** event manager that provides:
- **Generic Event Queue Management**: Per-context asyncio queues (not just workflows)
- **Event Emission**: `emit_event()` method supporting multiple event types and categories
- **Event Streaming**: `stream_events()` async generator for SSE streaming
- **Automatic Cleanup**: Queue cleanup after delay (60 seconds default)
- **Event Categories**: Filtering by category (chat, workflow, document, etc.)
- **Thread-Safe**: Lock-based synchronization for concurrent access
### Architecture Overview
The streaming system uses a **pure event-driven approach**:
1. **Event Emission**: When data changes (messages created, logs written), events are emitted directly
2. **Event Queue**: Events are queued per context (workflow_id, document_id, etc.)
3. **SSE Streaming**: Route endpoint streams events from queue in real-time
4. **No Database Polling**: The SSE endpoint does NOT poll the database - it only streams queued events
### Current Usage: Chatbot Feature
The chatbot feature (`modules/features/chatbot/`) demonstrates the streaming architecture:
- **Event Types**: `chatdata`, `complete`, `stopped`, `error`
- **Event Categories**: `chat`, `workflow`
- **Direct Emission**: Events emitted when messages/logs are created in `mainChatbot.py`
## Implementation Details
### 1. Event Manager API (`modules/features/chatbot/eventManager.py`)
```python
class StreamingEventManager:
"""
Generic event manager for real-time streaming across all features.
Supports multiple event types and contexts (workflows, documents, tasks, etc.)
Thread-safe event emission and queue management.
"""
def __init__(self):
self._queues: Dict[str, asyncio.Queue] = {}
self._locks: Dict[str, asyncio.Lock] = {}
self._cleanup_tasks: Dict[str, asyncio.Task] = {}
self._subscribers: Dict[str, Set[str]] = {} # context_id -> set of queue_ids
def create_queue(self, context_id: str) -> asyncio.Queue:
"""Create a new event queue for a context"""
def get_queue(self, context_id: str) -> Optional[asyncio.Queue]:
"""Get existing event queue for a context"""
def has_queue(self, context_id: str) -> bool:
"""Check if a queue exists for a context"""
async def emit_event(
self,
context_id: str, # workflow_id, document_id, task_id, etc.
event_type: str, # "message", "log", "status", "progress", "complete", "error", "chatdata"
data: Dict[str, Any], # Flexible data structure
event_category: str = "default", # "chat", "workflow", "document", etc.
message: Optional[str] = None, # For backward compatibility
step: Optional[str] = None # For backward compatibility
):
"""Emit event to the context's event queue"""
async def stream_events(
self,
context_id: str,
event_categories: Optional[List[str]] = None,
timeout: Optional[float] = None
) -> AsyncIterator[Dict[str, Any]]:
"""Async generator for streaming events from a context"""
async def cleanup(self, context_id: str, delay: float = 60.0):
"""Schedule cleanup of event queue after delay"""
```
**Global Singleton**: Access via `get_event_manager()` function
### 2. SSE Route Implementation (`modules/routes/routeChatbot.py`)
The chatbot streaming endpoint (`/api/chatbot/start/stream`) demonstrates the pattern:
```python
@router.post("/start/stream")
async def stream_chatbot_start(...) -> StreamingResponse:
event_manager = get_event_manager()
# Start background processing (creates workflow and event queue)
workflow = await chatProcess(currentUser, userInput, workflowId)
# Get or create event queue
queue = event_manager.get_queue(workflow.id) or event_manager.create_queue(workflow.id)
async def event_stream():
"""Pure event-driven streaming (no database polling)"""
# 1. Send initial chat data once (from database)
chatData = interfaceDbChat.getUnifiedChatData(workflow.id, None)
if chatData.get("items"):
for item in filtered_items:
yield f"data: {json.dumps(item)}\n\n"
# 2. Stream events from queue (event-driven)
while True:
try:
# Get event from queue (blocks until event available)
event = await asyncio.wait_for(queue.get(), timeout=1.0)
# Handle event types
if event["type"] == "chatdata":
yield f"data: {json.dumps(event["data"])}\n\n"
elif event["type"] == "complete":
break # Close stream
# ... other event types
except asyncio.TimeoutError:
# Send keepalive every 30 seconds
yield f": keepalive\n\n"
continue
return StreamingResponse(
event_stream(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no"
}
)
```
**Key Points**:
- **Initial Data**: Fetched once from database at stream start
- **Event Streaming**: Pure event-driven from queue (no polling)
- **Keepalive**: Sent every 30 seconds to keep connection alive
- **Status Check**: Periodic workflow status check (every 5 seconds) only for stopped detection
### 3. Event Emission in Processing Code
#### A. Chatbot Message Processing (`modules/features/chatbot/mainChatbot.py`)
Events are emitted **directly when data is created**:
```python
from modules.features.chatbot.eventManager import get_event_manager
event_manager = get_event_manager()
# When creating a user message
userMessage = interfaceDbChat.createMessage(userMessageData)
# Emit event immediately (exact chatData format)
await event_manager.emit_event(
context_id=workflow.id,
event_type="chatdata",
data={
"type": "message",
"createdAt": message_timestamp,
"item": userMessage.dict()
},
event_category="chat"
)
# When creating assistant message
assistantMessage = interfaceDbChat.createMessage(assistantMessageData)
# Emit event immediately
await event_manager.emit_event(
context_id=workflowId,
event_type="chatdata",
data={
"type": "message",
"createdAt": message_timestamp,
"item": assistantMessage.dict()
},
event_category="chat"
)
# When workflow completes
await event_manager.emit_event(
context_id=workflowId,
event_type="complete",
data={"workflowId": workflowId},
event_category="workflow",
message="Chatbot-Verarbeitung abgeschlossen",
step="complete"
)
```
#### B. Log Events
Logs are stored in database and then emitted as events:
```python
# Store log in database
log_data = {
"id": f"log_{uuid.uuid4()}",
"workflowId": workflowId,
"message": "Analysiere Benutzeranfrage...",
"type": "info",
"timestamp": getUtcTimestamp(),
"status": "running",
"roundNumber": round_number
}
interfaceDbChat.createLog(log_data)
# Note: Logs are emitted via the route's periodic chatData fetch mechanism
# OR can be emitted directly as events if needed
```
**Event Format**: Events use the exact `chatData` format: `{type, createdAt, item}`
## Benefits of Event-Driven Streaming
### Performance
- **Reduced Server Load**: No constant database queries every 0.5-3 seconds
- **Lower Latency**: Events delivered immediately (< 100ms) vs polling delay (500-3000ms)
- **Bandwidth Efficiency**: Only send data when it changes, not empty responses
- **No Database Polling**: SSE endpoint does NOT poll database - pure event-driven
### User Experience
- **Real-time Updates**: Users see progress instantly as events occur
- **Better Responsiveness**: No perceived delay from polling intervals
- **Reduced Battery**: Mobile devices consume less power without constant polling
- **Immediate Feedback**: Messages appear as soon as they're created
### Scalability
- **Horizontal Scaling**: Event queues can be distributed (Redis, RabbitMQ) in future
- **Connection Management**: Better handling of many concurrent streams
- **Resource Efficiency**: One persistent connection vs many HTTP requests
- **Memory Efficient**: Queues cleaned up automatically after workflow completion