fix: generalized event manager for message streaming

This commit is contained in:
Ida Dittrich 2026-01-09 12:14:47 +01:00
parent b327e6dc03
commit 8147f3f7c8
5 changed files with 523 additions and 179 deletions

View file

@ -0,0 +1,314 @@
# Streaming Utility Architecture: Transforming eventManager into a Shared Utility
## Current State Analysis
### Existing Implementation
The `eventManager.py` in `modules/features/chatbot/` currently provides:
- **Event Queue Management**: Per-workflow asyncio queues for SSE streaming
- **Event Emission**: `emit_event()` method for chatbot-specific events
- **Cleanup**: Automatic queue cleanup after workflow completion
- **SSE Streaming**: Used in `/api/chatbot/start/stream` endpoint
### Current Limitations
1. **Chatbot-Specific**: Hardcoded for chatbot workflows only
2. **Polling Still Required**: Frontend still polls `getUnifiedChatData()` every 0.5 seconds even with SSE
3. **Not Reusable**: Other features (workflows, document generation, etc.) can't use it
4. **Mixed Approach**: SSE endpoint still internally polls database instead of pure event-driven streaming
### Frontend Polling Pattern
Currently, the frontend uses:
- `useWorkflowPolling.ts` - Polls `/api/workflow/{id}/chatData` every few seconds
- `useWorkflowLifecycle.ts` - Manages polling lifecycle and state
- Rate limit handling and backoff logic for failed polls
## Proposed Architecture: Shared Streaming Utility
### 1. Generic Event Manager (`modules/shared/streamingManager.py`)
```python
class StreamingEventManager:
"""
Generic event manager for real-time streaming across all features.
Supports multiple event types and contexts (workflows, documents, tasks, etc.)
"""
def __init__(self):
self._queues: Dict[str, asyncio.Queue] = {}
self._locks: Dict[str, asyncio.Lock] = {}
self._cleanup_tasks: Dict[str, asyncio.Task] = {}
self._subscribers: Dict[str, Set[str]] = {} # context_id -> set of queue_ids
async def emit_event(
self,
context_id: str, # workflow_id, document_id, task_id, etc.
event_type: str, # "message", "log", "status", "progress", "complete", "error"
data: Dict[str, Any], # Flexible data structure
event_category: str = "default" # "chat", "workflow", "document", etc.
):
"""Emit event to all subscribers of a context"""
def create_stream(
self,
context_id: str,
event_categories: Optional[List[str]] = None # Filter by category
) -> asyncio.Queue:
"""Create a new stream for a context"""
async def stream_events(
self,
context_id: str,
event_categories: Optional[List[str]] = None
) -> AsyncIterator[Dict[str, Any]]:
"""Async generator for streaming events"""
```
### 2. Generic SSE Route Helper (`modules/shared/sseUtils.py`)
```python
def create_sse_stream(
context_id: str,
event_categories: Optional[List[str]] = None,
initial_data_callback: Optional[Callable] = None,
timeout: float = 300.0
) -> StreamingResponse:
"""
Create a generic SSE streaming response.
Args:
context_id: Workflow ID, document ID, or other context identifier
event_categories: Filter events by category (e.g., ["chat", "workflow"])
initial_data_callback: Optional function to fetch initial state
timeout: Stream timeout in seconds
"""
streaming_manager = get_streaming_manager()
async def event_stream():
# Send initial data if callback provided
if initial_data_callback:
initial_data = await initial_data_callback(context_id)
yield format_sse_event("initial", initial_data)
# Stream events from manager
async for event in streaming_manager.stream_events(context_id, event_categories):
yield format_sse_event(event["type"], event["data"])
return StreamingResponse(
event_stream(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no"
}
)
```
### 3. Integration Points
#### A. Workflow Processing
```python
# In workflow processing code
from modules.shared.streamingManager import get_streaming_manager
streaming_manager = get_streaming_manager()
# Emit progress updates
await streaming_manager.emit_event(
context_id=workflow_id,
event_type="progress",
data={"step": "analyzing", "message": "Processing documents..."},
event_category="workflow"
)
# Emit new messages
await streaming_manager.emit_event(
context_id=workflow_id,
event_type="message",
data={"role": "assistant", "content": "Response text"},
event_category="chat"
)
```
#### B. Route Endpoints
```python
# Generic streaming endpoint for any context
@router.get("/{contextId}/stream")
async def stream_context_updates(
contextId: str,
categories: Optional[str] = Query(None), # Comma-separated categories
currentUser: User = Depends(getCurrentUser)
):
event_categories = categories.split(",") if categories else None
# Optional: Fetch initial state
async def get_initial_data(ctx_id: str):
interfaceDbChat = getServiceChat(currentUser)
return interfaceDbChat.getUnifiedChatData(ctx_id, None)
return create_sse_stream(
context_id=contextId,
event_categories=event_categories,
initial_data_callback=get_initial_data
)
```
## Benefits of Streaming vs Polling
### Performance
- **Reduced Server Load**: No constant database queries every 0.5-3 seconds
- **Lower Latency**: Events delivered immediately (< 100ms) vs polling delay (500-3000ms)
- **Bandwidth Efficiency**: Only send data when it changes, not empty responses
### User Experience
- **Real-time Updates**: Users see progress instantly
- **Better Responsiveness**: No perceived delay from polling intervals
- **Reduced Battery**: Mobile devices consume less power without constant polling
### Scalability
- **Horizontal Scaling**: Event queues can be distributed (Redis, RabbitMQ)
- **Connection Management**: Better handling of many concurrent streams
- **Resource Efficiency**: One persistent connection vs many HTTP requests
## Migration Strategy
### Phase 1: Create Shared Utility
1. Move `eventManager.py``modules/shared/streamingManager.py`
2. Generalize for any context type (not just workflows)
3. Add event categorization and filtering
4. Create `sseUtils.py` helper functions
### Phase 2: Update Chatbot Feature
1. Update chatbot to use shared streaming manager
2. Replace internal polling in SSE endpoint with pure event-driven streaming
3. Emit events directly when data changes (in database write operations)
### Phase 3: Migrate Other Features
1. **Workflows**: Add streaming to workflow processing
2. **Document Generation**: Stream document creation progress
3. **Data Processing**: Stream extraction/transformation progress
4. **Any Long-Running Task**: Use streaming for status updates
### Phase 4: Frontend Migration
1. Replace `useWorkflowPolling` with SSE EventSource connections
2. Create generic `useStreaming` hook
3. Update all components to use streaming instead of polling
4. Remove polling logic entirely
## Implementation Details
### Event-Driven Data Emission
Instead of polling `getUnifiedChatData()`, emit events when data changes:
```python
# In interfaceDbChatObjects.py - when creating a message
def createMessage(self, workflowId: str, message: ChatMessage):
# ... existing database write ...
# Emit streaming event
from modules.shared.streamingManager import get_streaming_manager
streaming_manager = get_streaming_manager()
asyncio.create_task(streaming_manager.emit_event(
context_id=workflowId,
event_type="message",
data={
"type": "message",
"createdAt": message.publishedAt,
"item": message.dict()
},
event_category="chat"
))
```
### Frontend Integration
```typescript
// Generic streaming hook
function useStreaming<T>(
contextId: string,
categories?: string[],
onEvent?: (event: T) => void
) {
useEffect(() => {
const eventSource = new EventSource(
`/api/stream/${contextId}?categories=${categories?.join(',')}`
);
eventSource.onmessage = (e) => {
const event = JSON.parse(e.data);
onEvent?.(event);
};
return () => eventSource.close();
}, [contextId, categories]);
}
```
## Key Design Decisions
### 1. Context-Based Streaming
- Use generic `context_id` instead of `workflow_id`
- Supports workflows, documents, tasks, user sessions, etc.
### 2. Event Categories
- Allow filtering by category (chat, workflow, document, etc.)
- Enables multiple features to stream from same context
### 3. Backward Compatibility
- Keep existing polling endpoints during migration
- Gradually migrate features one at a time
- Frontend can use both during transition
### 4. Error Handling
- Graceful degradation if streaming unavailable
- Automatic reconnection logic in frontend
- Fallback to polling if SSE fails
## Example: Complete Flow
### Backend: Workflow Processing
```python
async def process_workflow(workflow_id: str):
streaming = get_streaming_manager()
# Emit status update
await streaming.emit_event(workflow_id, "status",
{"status": "running"}, "workflow")
# Process and emit messages
result = await ai_call(...)
await streaming.emit_event(workflow_id, "message",
{"role": "assistant", "content": result}, "chat")
# Emit completion
await streaming.emit_event(workflow_id, "complete",
{"status": "completed"}, "workflow")
```
### Frontend: React Hook
```typescript
function ChatComponent({ workflowId }: { workflowId: string }) {
const [messages, setMessages] = useState<Message[]>([]);
useStreaming(workflowId, ["chat"], (event) => {
if (event.type === "message") {
setMessages(prev => [...prev, event.item]);
}
});
return <MessageList messages={messages} />;
}
```
## Conclusion
By transforming `eventManager.py` into a shared streaming utility:
1. **Eliminates Polling**: All features can stream updates in real-time
2. **Improves Performance**: Reduces server load and latency
3. **Better UX**: Instant updates instead of polling delays
4. **Reusable**: Any feature can use streaming with minimal code
5. **Scalable**: Foundation for future real-time features
The migration can be done incrementally, feature by feature, without breaking existing functionality.

View file

@ -1,21 +1,23 @@
# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Event manager for chatbot streaming.
Manages event queues for SSE streaming of chatbot progress updates.
Generic streaming event manager for real-time updates.
Manages event queues for SSE streaming across all features (chatbot, workflows, documents, etc.).
Supports event-driven streaming instead of polling.
"""
import logging
import asyncio
from typing import Dict, Optional, Any
from typing import Dict, Optional, Any, List, AsyncIterator, Set
from datetime import datetime
logger = logging.getLogger(__name__)
class ChatbotEventManager:
class StreamingEventManager:
"""
Manages event queues for chatbot streaming.
Generic event manager for real-time streaming across all features.
Supports multiple event types and contexts (workflows, documents, tasks, etc.).
Thread-safe event emission and queue management.
"""
@ -24,128 +26,199 @@ class ChatbotEventManager:
self._queues: Dict[str, asyncio.Queue] = {}
self._locks: Dict[str, asyncio.Lock] = {}
self._cleanup_tasks: Dict[str, asyncio.Task] = {}
self._subscribers: Dict[str, Set[str]] = {} # context_id -> set of queue_ids (for future multi-subscriber support)
def create_queue(self, workflow_id: str) -> asyncio.Queue:
def create_queue(self, context_id: str) -> asyncio.Queue:
"""
Create a new event queue for a workflow.
Create a new event queue for a context.
Args:
workflow_id: Workflow ID
context_id: Context ID (workflow_id, document_id, task_id, etc.)
Returns:
Event queue for the workflow
Event queue for the context
"""
if workflow_id not in self._queues:
self._queues[workflow_id] = asyncio.Queue()
self._locks[workflow_id] = asyncio.Lock()
logger.debug(f"Created event queue for workflow {workflow_id}")
return self._queues[workflow_id]
if context_id not in self._queues:
self._queues[context_id] = asyncio.Queue()
self._locks[context_id] = asyncio.Lock()
self._subscribers[context_id] = set()
logger.debug(f"Created event queue for context {context_id}")
return self._queues[context_id]
def get_queue(self, workflow_id: str) -> Optional[asyncio.Queue]:
def get_queue(self, context_id: str) -> Optional[asyncio.Queue]:
"""
Get existing event queue for a workflow.
Get existing event queue for a context.
Args:
workflow_id: Workflow ID
context_id: Context ID
Returns:
Event queue if exists, None otherwise
"""
return self._queues.get(workflow_id)
return self._queues.get(context_id)
async def emit_event(
self,
workflow_id: str,
context_id: str,
event_type: str,
message: str,
step: Optional[str] = None,
data: Optional[Dict[str, Any]] = None
data: Dict[str, Any],
event_category: str = "default",
message: Optional[str] = None,
step: Optional[str] = None
):
"""
Emit an event to the workflow's event queue.
Emit an event to the context's event queue.
Args:
workflow_id: Workflow ID
event_type: Type of event ("status", "progress", "complete", "error")
message: Event message
step: Current processing step (optional)
data: Additional event data (optional)
context_id: Context ID (workflow_id, document_id, etc.)
event_type: Type of event ("message", "log", "status", "progress", "complete", "error", "chatdata")
data: Event data dictionary (will be included in event)
event_category: Category of event for filtering ("chat", "workflow", "document", etc.)
message: Optional event message (for backward compatibility)
step: Optional processing step (for backward compatibility)
"""
queue = self.get_queue(workflow_id)
queue = self.get_queue(context_id)
if not queue:
logger.debug(f"No event queue found for workflow {workflow_id}, skipping event")
logger.debug(f"No event queue found for context {context_id}, skipping event")
return
event = {
"type": event_type,
"message": message,
"category": event_category,
"timestamp": datetime.now().timestamp(),
"step": step,
"data": data or {}
"data": data,
"message": message, # For backward compatibility
"step": step # For backward compatibility
}
try:
await queue.put(event)
logger.debug(f"Emitted {event_type} event for workflow {workflow_id}: {message[:50]}")
logger.debug(f"Emitted {event_type} event (category: {event_category}) for context {context_id}")
except Exception as e:
logger.error(f"Error emitting event for workflow {workflow_id}: {e}")
logger.error(f"Error emitting event for context {context_id}: {e}")
async def cleanup(self, workflow_id: str, delay: float = 60.0):
async def stream_events(
self,
context_id: str,
event_categories: Optional[List[str]] = None,
timeout: Optional[float] = None
) -> AsyncIterator[Dict[str, Any]]:
"""
Async generator for streaming events from a context.
Args:
context_id: Context ID to stream events from
event_categories: Optional list of event categories to filter by
timeout: Optional timeout in seconds (None = no timeout)
Yields:
Event dictionaries
"""
queue = self.get_queue(context_id)
if not queue:
logger.warning(f"No queue found for context {context_id}")
return
start_time = asyncio.get_event_loop().time() if timeout else None
while True:
# Check timeout
if timeout and start_time:
elapsed = asyncio.get_event_loop().time() - start_time
if elapsed > timeout:
logger.debug(f"Stream timeout for context {context_id}")
break
try:
# Wait for event with timeout
wait_timeout = 1.0 # Check timeout every second
if timeout and start_time:
remaining = timeout - (asyncio.get_event_loop().time() - start_time)
if remaining <= 0:
break
wait_timeout = min(wait_timeout, remaining)
event = await asyncio.wait_for(queue.get(), timeout=wait_timeout)
# Filter by category if specified
if event_categories and event.get("category") not in event_categories:
continue
yield event
except asyncio.TimeoutError:
# Check if we should continue or timeout
if timeout and start_time:
elapsed = asyncio.get_event_loop().time() - start_time
if elapsed >= timeout:
break
continue
except Exception as e:
logger.error(f"Error in stream_events for context {context_id}: {e}")
break
async def cleanup(self, context_id: str, delay: float = 60.0):
"""
Schedule cleanup of event queue after delay.
This allows time for any remaining events to be consumed.
Args:
workflow_id: Workflow ID
context_id: Context ID
delay: Delay in seconds before cleanup (default: 60 seconds)
"""
if workflow_id in self._cleanup_tasks:
if context_id in self._cleanup_tasks:
# Cancel existing cleanup task
self._cleanup_tasks[workflow_id].cancel()
self._cleanup_tasks[context_id].cancel()
async def _cleanup():
try:
await asyncio.sleep(delay)
if workflow_id in self._queues:
if context_id in self._queues:
# Drain remaining events
queue = self._queues[workflow_id]
queue = self._queues[context_id]
while not queue.empty():
try:
queue.get_nowait()
except asyncio.QueueEmpty:
break
del self._queues[workflow_id]
del self._locks[workflow_id]
logger.info(f"Cleaned up event queue for workflow {workflow_id}")
del self._queues[context_id]
if context_id in self._locks:
del self._locks[context_id]
if context_id in self._subscribers:
del self._subscribers[context_id]
logger.info(f"Cleaned up event queue for context {context_id}")
except asyncio.CancelledError:
pass
except Exception as e:
logger.error(f"Error during cleanup for workflow {workflow_id}: {e}")
logger.error(f"Error during cleanup for context {context_id}: {e}")
finally:
if workflow_id in self._cleanup_tasks:
del self._cleanup_tasks[workflow_id]
if context_id in self._cleanup_tasks:
del self._cleanup_tasks[context_id]
self._cleanup_tasks[workflow_id] = asyncio.create_task(_cleanup())
self._cleanup_tasks[context_id] = asyncio.create_task(_cleanup())
def has_queue(self, workflow_id: str) -> bool:
def has_queue(self, context_id: str) -> bool:
"""
Check if a queue exists for a workflow.
Check if a queue exists for a context.
Args:
workflow_id: Workflow ID
context_id: Context ID
Returns:
True if queue exists, False otherwise
"""
return workflow_id in self._queues
return context_id in self._queues
# Backward compatibility: ChatbotEventManager is an alias
ChatbotEventManager = StreamingEventManager
# Global singleton instance
_event_manager = ChatbotEventManager()
_event_manager = StreamingEventManager()
def get_event_manager() -> ChatbotEventManager:
def get_event_manager() -> StreamingEventManager:
"""Get the global event manager instance."""
return _event_manager

View file

@ -158,15 +158,14 @@ async def chatProcess(
# Emit message event for streaming (exact chatData format)
message_timestamp = parseTimestamp(userMessage.publishedAt, default=getUtcTimestamp())
await event_manager.emit_event(
workflow.id,
"chatdata",
"New message",
"message",
{
context_id=workflow.id,
event_type="chatdata",
data={
"type": "message",
"createdAt": message_timestamp,
"item": userMessage.dict()
}
},
event_category="chat"
)
# Update workflow status
@ -538,7 +537,14 @@ async def _processChatbotMessage(
workflow = interfaceDbChat.getWorkflow(workflowId)
if not workflow:
logger.error(f"Workflow {workflowId} not found during processing")
await event_manager.emit_event(workflowId, "error", f"Workflow {workflowId} nicht gefunden", "error")
await event_manager.emit_event(
context_id=workflowId,
event_type="error",
data={"error": f"Workflow {workflowId} nicht gefunden"},
event_category="workflow",
message=f"Workflow {workflowId} nicht gefunden",
step="error"
)
return
# Check if workflow was stopped before starting
@ -896,15 +902,14 @@ Beginne DIREKT mit "Aus der Datenbank habe ich..." (wenn Daten vorhanden) oder "
# Emit message event for streaming (exact chatData format)
message_timestamp = parseTimestamp(assistantMessage.publishedAt, default=getUtcTimestamp())
await event_manager.emit_event(
workflowId,
"chatdata",
"New message",
"message",
{
context_id=workflowId,
event_type="chatdata",
data={
"type": "message",
"createdAt": message_timestamp,
"item": assistantMessage.dict()
}
},
event_category="chat"
)
# Update workflow status to completed (only if not stopped)
@ -921,11 +926,12 @@ Beginne DIREKT mit "Aus der Datenbank habe ich..." (wenn Daten vorhanden) oder "
# Emit completion event only if workflow wasn't stopped
if not await _check_workflow_stopped(interfaceDbChat, workflowId):
await event_manager.emit_event(
workflowId,
"complete",
"Chatbot-Verarbeitung abgeschlossen",
"complete",
{"workflowId": workflowId}
context_id=workflowId,
event_type="complete",
data={"workflowId": workflowId},
event_category="workflow",
message="Chatbot-Verarbeitung abgeschlossen",
step="complete"
)
# Schedule cleanup
@ -968,15 +974,14 @@ Beginne DIREKT mit "Aus der Datenbank habe ich..." (wenn Daten vorhanden) oder "
# Emit message event for streaming (exact chatData format)
message_timestamp = parseTimestamp(errorMessage.publishedAt, default=getUtcTimestamp())
await event_manager.emit_event(
workflowId,
"chatdata",
"New message",
"message",
{
context_id=workflowId,
event_type="chatdata",
data={
"type": "message",
"createdAt": message_timestamp,
"item": errorMessage.dict()
}
},
event_category="chat"
)
# Update workflow status to error (only if not stopped)

View file

@ -1058,6 +1058,26 @@ class ChatObjects:
actionName=createdMessage.get("actionName")
)
# Emit message event for streaming (if event manager is available)
try:
from modules.features.chatbot.eventManager import get_event_manager
event_manager = get_event_manager()
message_timestamp = parseTimestamp(chat_message.publishedAt, default=getUtcTimestamp())
# Emit message event in exact chatData format: {type, createdAt, item}
asyncio.create_task(event_manager.emit_event(
context_id=workflowId,
event_type="chatdata",
data={
"type": "message",
"createdAt": message_timestamp,
"item": chat_message.dict()
},
event_category="chat"
))
except Exception as e:
# Event manager not available or error - continue without emitting
logger.debug(f"Could not emit message event: {e}")
# Debug: Store message and documents for debugging - only if debug enabled
storeDebugMessageAndDocuments(chat_message, self.currentUser)
@ -1419,15 +1439,14 @@ class ChatObjects:
log_timestamp = parseTimestamp(createdLog.get("timestamp"), default=getUtcTimestamp())
# Emit log event in exact chatData format: {type, createdAt, item}
asyncio.create_task(event_manager.emit_event(
workflowId,
"chatdata",
"New log",
"log",
{
context_id=workflowId,
event_type="chatdata",
data={
"type": "log",
"createdAt": log_timestamp,
"item": ChatLog(**createdLog).dict()
}
},
event_category="chat"
))
except Exception as e:
# Event manager not available or error - continue without emitting

View file

@ -80,9 +80,9 @@ async def stream_chatbot_start(
queue = event_manager.create_queue(workflow.id)
async def event_stream():
"""Async generator for SSE events."""
"""Async generator for SSE events - pure event-driven streaming (no polling)."""
try:
# Get interface for status checks and chat data
# Get interface for initial data and status checks
interfaceDbChat = getServiceChat(currentUser)
# Get current workflow to check if resuming and get current round
@ -90,7 +90,7 @@ async def stream_chatbot_start(
current_round = current_workflow.currentRound if current_workflow else None
is_resuming = final_workflow_id is not None and current_round and current_round > 1
# Send initial chat data (exact format as chatData endpoint)
# Send initial chat data (exact format as chatData endpoint) - only once at start
try:
chatData = interfaceDbChat.getUnifiedChatData(workflow.id, None)
if chatData.get("items"):
@ -126,32 +126,18 @@ async def stream_chatbot_start(
}
# Emit item directly in exact chatData format: {type, createdAt, item}
yield f"data: {json.dumps(serializable_item)}\n\n"
# Set initial timestamp for incremental fetching
if filtered_items:
timestamps = [parseTimestamp(item.get("createdAt"), default=0) for item in filtered_items]
last_chatdata_timestamp = max(timestamps) if timestamps else None
else:
last_chatdata_timestamp = None
else:
last_chatdata_timestamp = None
except Exception as e:
logger.warning(f"Error fetching initial chat data: {e}")
last_chatdata_timestamp = None
# Keepalive interval (30 seconds)
keepalive_interval = 30.0
last_keepalive = asyncio.get_event_loop().time()
# Status check interval (check workflow status every 3 seconds)
status_check_interval = 3.0
# Status check interval (check workflow status every 5 seconds - less frequent since we're event-driven)
status_check_interval = 5.0
last_status_check = asyncio.get_event_loop().time()
# Chat data fetch interval (fetch chat data every 0.5 seconds for real-time updates)
chatdata_fetch_interval = 0.5
last_chatdata_fetch = asyncio.get_event_loop().time()
# Stream events until completion or timeout
# Stream events until completion or timeout - pure event-driven (no polling)
timeout = 300.0 # 5 minutes max
start_time = asyncio.get_event_loop().time()
@ -159,7 +145,6 @@ async def stream_chatbot_start(
# Check timeout
elapsed = asyncio.get_event_loop().time() - start_time
if elapsed > timeout:
# Timeout - just close stream, don't emit non-chatData format events
logger.info(f"Stream timeout for workflow {workflow.id}")
break
@ -170,76 +155,29 @@ async def stream_chatbot_start(
current_time = asyncio.get_event_loop().time()
# Periodically check workflow status and fetch chat data
# Periodically check workflow status (less frequent since we're event-driven)
if current_time - last_status_check >= status_check_interval:
try:
current_workflow = interfaceDbChat.getWorkflow(workflow.id)
if current_workflow and current_workflow.status == "stopped":
logger.info(f"Workflow {workflow.id} was stopped, closing stream")
# Don't emit stopped event - just close stream
break
except Exception as e:
logger.warning(f"Error checking workflow status: {e}")
last_status_check = current_time
# Periodically fetch and emit chat data
if current_time - last_chatdata_fetch >= chatdata_fetch_interval:
try:
chatData = interfaceDbChat.getUnifiedChatData(workflow.id, last_chatdata_timestamp)
if chatData.get("items"):
# Filter items by round number if resuming
filtered_items = []
for item in chatData["items"]:
if is_resuming and current_round:
# Get round number from item
item_round = None
item_data = item.get("item")
if item_data:
# Handle both dict and object access
if isinstance(item_data, dict):
item_round = item_data.get("roundNumber")
elif hasattr(item_data, "roundNumber"):
item_round = item_data.roundNumber
# When resuming, only include items from current round onwards
# Exclude items without roundNumber (they're from old rounds before roundNumber was added)
# Exclude items with roundNumber < current_round (from previous rounds)
if item_round is None or item_round < current_round:
continue # Skip items from previous rounds or without round info
filtered_items.append(item)
# Emit filtered items directly in exact chatData format: {type, createdAt, item}
for item in filtered_items:
# Convert Pydantic models to dicts for JSON serialization
serializable_item = {
"type": item.get("type"),
"createdAt": item.get("createdAt"),
"item": item.get("item").dict() if hasattr(item.get("item"), "dict") else item.get("item")
}
yield f"data: {json.dumps(serializable_item)}\n\n"
# Update timestamp to only get new items next time
if chatData["items"]:
# Parse timestamps and get the maximum
timestamps = []
for item in chatData["items"]:
ts = parseTimestamp(item.get("createdAt"), default=0)
timestamps.append(ts)
if timestamps:
last_chatdata_timestamp = max(timestamps)
except Exception as e:
logger.warning(f"Error fetching chat data: {e}")
last_chatdata_fetch = current_time
# Try to get event with timeout
# Get event from queue (pure event-driven - no polling database)
try:
event = await asyncio.wait_for(queue.get(), timeout=1.0)
# Only emit chatdata events (messages, logs, stats) in exact chatData format
# Ignore status/progress/complete/stopped/error events that don't match the format
if event.get("type") == "chatdata" and event.get("data"):
# Handle different event types
event_type = event.get("type")
event_data = event.get("data", {})
# Emit chatdata events (messages, logs, stats) in exact chatData format
if event_type == "chatdata" and event_data:
# Emit item directly in exact chatData format: {type, createdAt, item}
chatdata_item = event.get("data")
chatdata_item = event_data
# Ensure item field is serializable (convert Pydantic models to dicts)
if isinstance(chatdata_item, dict) and "item" in chatdata_item:
item_obj = chatdata_item.get("item")
@ -247,26 +185,21 @@ async def stream_chatbot_start(
chatdata_item = chatdata_item.copy()
chatdata_item["item"] = item_obj.dict()
yield f"data: {json.dumps(chatdata_item)}\n\n"
# Update timestamp for incremental fetching
if chatdata_item.get("createdAt"):
last_chatdata_timestamp = parseTimestamp(chatdata_item["createdAt"], default=None)
# Check if this is a completion/stopped event to close stream
if event.get("type") == "complete":
# Handle completion/stopped events to close stream
elif event_type == "complete":
logger.info(f"Workflow {workflow.id} completed, closing stream")
break
elif event.get("type") == "stopped":
# Workflow was stopped, close stream
elif event_type == "stopped":
logger.info(f"Workflow {workflow.id} stopped, closing stream")
break
elif event.get("type") == "error" and event.get("step") == "error":
# Final error, close stream
elif event_type == "error" and event.get("step") == "error":
logger.warning(f"Workflow {workflow.id} error, closing stream")
break
last_keepalive = asyncio.get_event_loop().time()
last_keepalive = current_time
except asyncio.TimeoutError:
# Send keepalive if needed
# Send keepalive if needed (no events received, but keep connection alive)
current_time = asyncio.get_event_loop().time()
if current_time - last_keepalive >= keepalive_interval:
yield f": keepalive\n\n"
@ -274,14 +207,12 @@ async def stream_chatbot_start(
continue
except Exception as e:
logger.error(f"Error in event stream: {e}")
yield f"data: {json.dumps({'type': 'error', 'message': f'Stream error: {str(e)}'})}\n\n"
break
except Exception as e:
logger.error(f"Error in event stream generator: {e}", exc_info=True)
# Don't emit error events that don't match chatData format
finally:
# Stream ends - no final event needed as it doesn't match chatData format
# Stream ends - cleanup handled by event manager
pass
return StreamingResponse(
@ -317,10 +248,12 @@ async def stop_chatbot(
# Emit stopped event to active streams
event_manager = get_event_manager()
await event_manager.emit_event(
workflowId,
"stopped",
"Workflow stopped by user",
"stopped"
context_id=workflowId,
event_type="stopped",
data={"workflowId": workflowId},
event_category="workflow",
message="Workflow stopped by user",
step="stopped"
)
logger.info(f"Emitted stopped event for workflow {workflowId}")