From 8147f3f7c89c14c459f67eda3490e3b6d3f02a84 Mon Sep 17 00:00:00 2001 From: Ida Dittrich Date: Fri, 9 Jan 2026 12:14:47 +0100 Subject: [PATCH] fix: generalized event manager for message streaming --- docs/STREAMING_UTILITY_ARCHITECTURE.md | 314 +++++++++++++++++++ modules/features/chatbot/eventManager.py | 181 +++++++---- modules/features/chatbot/mainChatbot.py | 53 ++-- modules/interfaces/interfaceDbChatObjects.py | 31 +- modules/routes/routeChatbot.py | 123 ++------ 5 files changed, 523 insertions(+), 179 deletions(-) create mode 100644 docs/STREAMING_UTILITY_ARCHITECTURE.md diff --git a/docs/STREAMING_UTILITY_ARCHITECTURE.md b/docs/STREAMING_UTILITY_ARCHITECTURE.md new file mode 100644 index 00000000..368fb374 --- /dev/null +++ b/docs/STREAMING_UTILITY_ARCHITECTURE.md @@ -0,0 +1,314 @@ +# Streaming Utility Architecture: Transforming eventManager into a Shared Utility + +## Current State Analysis + +### Existing Implementation +The `eventManager.py` in `modules/features/chatbot/` currently provides: +- **Event Queue Management**: Per-workflow asyncio queues for SSE streaming +- **Event Emission**: `emit_event()` method for chatbot-specific events +- **Cleanup**: Automatic queue cleanup after workflow completion +- **SSE Streaming**: Used in `/api/chatbot/start/stream` endpoint + +### Current Limitations + +1. **Chatbot-Specific**: Hardcoded for chatbot workflows only +2. **Polling Still Required**: Frontend still polls `getUnifiedChatData()` every 0.5 seconds even with SSE +3. **Not Reusable**: Other features (workflows, document generation, etc.) can't use it +4. **Mixed Approach**: SSE endpoint still internally polls database instead of pure event-driven streaming + +### Frontend Polling Pattern +Currently, the frontend uses: +- `useWorkflowPolling.ts` - Polls `/api/workflow/{id}/chatData` every few seconds +- `useWorkflowLifecycle.ts` - Manages polling lifecycle and state +- Rate limit handling and backoff logic for failed polls + +## Proposed Architecture: Shared Streaming Utility + +### 1. Generic Event Manager (`modules/shared/streamingManager.py`) + +```python +class StreamingEventManager: + """ + Generic event manager for real-time streaming across all features. + Supports multiple event types and contexts (workflows, documents, tasks, etc.) + """ + + def __init__(self): + self._queues: Dict[str, asyncio.Queue] = {} + self._locks: Dict[str, asyncio.Lock] = {} + self._cleanup_tasks: Dict[str, asyncio.Task] = {} + self._subscribers: Dict[str, Set[str]] = {} # context_id -> set of queue_ids + + async def emit_event( + self, + context_id: str, # workflow_id, document_id, task_id, etc. + event_type: str, # "message", "log", "status", "progress", "complete", "error" + data: Dict[str, Any], # Flexible data structure + event_category: str = "default" # "chat", "workflow", "document", etc. + ): + """Emit event to all subscribers of a context""" + + def create_stream( + self, + context_id: str, + event_categories: Optional[List[str]] = None # Filter by category + ) -> asyncio.Queue: + """Create a new stream for a context""" + + async def stream_events( + self, + context_id: str, + event_categories: Optional[List[str]] = None + ) -> AsyncIterator[Dict[str, Any]]: + """Async generator for streaming events""" +``` + +### 2. Generic SSE Route Helper (`modules/shared/sseUtils.py`) + +```python +def create_sse_stream( + context_id: str, + event_categories: Optional[List[str]] = None, + initial_data_callback: Optional[Callable] = None, + timeout: float = 300.0 +) -> StreamingResponse: + """ + Create a generic SSE streaming response. + + Args: + context_id: Workflow ID, document ID, or other context identifier + event_categories: Filter events by category (e.g., ["chat", "workflow"]) + initial_data_callback: Optional function to fetch initial state + timeout: Stream timeout in seconds + """ + streaming_manager = get_streaming_manager() + + async def event_stream(): + # Send initial data if callback provided + if initial_data_callback: + initial_data = await initial_data_callback(context_id) + yield format_sse_event("initial", initial_data) + + # Stream events from manager + async for event in streaming_manager.stream_events(context_id, event_categories): + yield format_sse_event(event["type"], event["data"]) + + return StreamingResponse( + event_stream(), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "X-Accel-Buffering": "no" + } + ) +``` + +### 3. Integration Points + +#### A. Workflow Processing +```python +# In workflow processing code +from modules.shared.streamingManager import get_streaming_manager + +streaming_manager = get_streaming_manager() + +# Emit progress updates +await streaming_manager.emit_event( + context_id=workflow_id, + event_type="progress", + data={"step": "analyzing", "message": "Processing documents..."}, + event_category="workflow" +) + +# Emit new messages +await streaming_manager.emit_event( + context_id=workflow_id, + event_type="message", + data={"role": "assistant", "content": "Response text"}, + event_category="chat" +) +``` + +#### B. Route Endpoints +```python +# Generic streaming endpoint for any context +@router.get("/{contextId}/stream") +async def stream_context_updates( + contextId: str, + categories: Optional[str] = Query(None), # Comma-separated categories + currentUser: User = Depends(getCurrentUser) +): + event_categories = categories.split(",") if categories else None + + # Optional: Fetch initial state + async def get_initial_data(ctx_id: str): + interfaceDbChat = getServiceChat(currentUser) + return interfaceDbChat.getUnifiedChatData(ctx_id, None) + + return create_sse_stream( + context_id=contextId, + event_categories=event_categories, + initial_data_callback=get_initial_data + ) +``` + +## Benefits of Streaming vs Polling + +### Performance +- **Reduced Server Load**: No constant database queries every 0.5-3 seconds +- **Lower Latency**: Events delivered immediately (< 100ms) vs polling delay (500-3000ms) +- **Bandwidth Efficiency**: Only send data when it changes, not empty responses + +### User Experience +- **Real-time Updates**: Users see progress instantly +- **Better Responsiveness**: No perceived delay from polling intervals +- **Reduced Battery**: Mobile devices consume less power without constant polling + +### Scalability +- **Horizontal Scaling**: Event queues can be distributed (Redis, RabbitMQ) +- **Connection Management**: Better handling of many concurrent streams +- **Resource Efficiency**: One persistent connection vs many HTTP requests + +## Migration Strategy + +### Phase 1: Create Shared Utility +1. Move `eventManager.py` → `modules/shared/streamingManager.py` +2. Generalize for any context type (not just workflows) +3. Add event categorization and filtering +4. Create `sseUtils.py` helper functions + +### Phase 2: Update Chatbot Feature +1. Update chatbot to use shared streaming manager +2. Replace internal polling in SSE endpoint with pure event-driven streaming +3. Emit events directly when data changes (in database write operations) + +### Phase 3: Migrate Other Features +1. **Workflows**: Add streaming to workflow processing +2. **Document Generation**: Stream document creation progress +3. **Data Processing**: Stream extraction/transformation progress +4. **Any Long-Running Task**: Use streaming for status updates + +### Phase 4: Frontend Migration +1. Replace `useWorkflowPolling` with SSE EventSource connections +2. Create generic `useStreaming` hook +3. Update all components to use streaming instead of polling +4. Remove polling logic entirely + +## Implementation Details + +### Event-Driven Data Emission + +Instead of polling `getUnifiedChatData()`, emit events when data changes: + +```python +# In interfaceDbChatObjects.py - when creating a message +def createMessage(self, workflowId: str, message: ChatMessage): + # ... existing database write ... + + # Emit streaming event + from modules.shared.streamingManager import get_streaming_manager + streaming_manager = get_streaming_manager() + asyncio.create_task(streaming_manager.emit_event( + context_id=workflowId, + event_type="message", + data={ + "type": "message", + "createdAt": message.publishedAt, + "item": message.dict() + }, + event_category="chat" + )) +``` + +### Frontend Integration + +```typescript +// Generic streaming hook +function useStreaming( + contextId: string, + categories?: string[], + onEvent?: (event: T) => void +) { + useEffect(() => { + const eventSource = new EventSource( + `/api/stream/${contextId}?categories=${categories?.join(',')}` + ); + + eventSource.onmessage = (e) => { + const event = JSON.parse(e.data); + onEvent?.(event); + }; + + return () => eventSource.close(); + }, [contextId, categories]); +} +``` + +## Key Design Decisions + +### 1. Context-Based Streaming +- Use generic `context_id` instead of `workflow_id` +- Supports workflows, documents, tasks, user sessions, etc. + +### 2. Event Categories +- Allow filtering by category (chat, workflow, document, etc.) +- Enables multiple features to stream from same context + +### 3. Backward Compatibility +- Keep existing polling endpoints during migration +- Gradually migrate features one at a time +- Frontend can use both during transition + +### 4. Error Handling +- Graceful degradation if streaming unavailable +- Automatic reconnection logic in frontend +- Fallback to polling if SSE fails + +## Example: Complete Flow + +### Backend: Workflow Processing +```python +async def process_workflow(workflow_id: str): + streaming = get_streaming_manager() + + # Emit status update + await streaming.emit_event(workflow_id, "status", + {"status": "running"}, "workflow") + + # Process and emit messages + result = await ai_call(...) + await streaming.emit_event(workflow_id, "message", + {"role": "assistant", "content": result}, "chat") + + # Emit completion + await streaming.emit_event(workflow_id, "complete", + {"status": "completed"}, "workflow") +``` + +### Frontend: React Hook +```typescript +function ChatComponent({ workflowId }: { workflowId: string }) { + const [messages, setMessages] = useState([]); + + useStreaming(workflowId, ["chat"], (event) => { + if (event.type === "message") { + setMessages(prev => [...prev, event.item]); + } + }); + + return ; +} +``` + +## Conclusion + +By transforming `eventManager.py` into a shared streaming utility: + +1. **Eliminates Polling**: All features can stream updates in real-time +2. **Improves Performance**: Reduces server load and latency +3. **Better UX**: Instant updates instead of polling delays +4. **Reusable**: Any feature can use streaming with minimal code +5. **Scalable**: Foundation for future real-time features + +The migration can be done incrementally, feature by feature, without breaking existing functionality. diff --git a/modules/features/chatbot/eventManager.py b/modules/features/chatbot/eventManager.py index 48d1601b..8bc4ff94 100644 --- a/modules/features/chatbot/eventManager.py +++ b/modules/features/chatbot/eventManager.py @@ -1,21 +1,23 @@ # Copyright (c) 2025 Patrick Motsch # All rights reserved. """ -Event manager for chatbot streaming. -Manages event queues for SSE streaming of chatbot progress updates. +Generic streaming event manager for real-time updates. +Manages event queues for SSE streaming across all features (chatbot, workflows, documents, etc.). +Supports event-driven streaming instead of polling. """ import logging import asyncio -from typing import Dict, Optional, Any +from typing import Dict, Optional, Any, List, AsyncIterator, Set from datetime import datetime logger = logging.getLogger(__name__) -class ChatbotEventManager: +class StreamingEventManager: """ - Manages event queues for chatbot streaming. + Generic event manager for real-time streaming across all features. + Supports multiple event types and contexts (workflows, documents, tasks, etc.). Thread-safe event emission and queue management. """ @@ -24,128 +26,199 @@ class ChatbotEventManager: self._queues: Dict[str, asyncio.Queue] = {} self._locks: Dict[str, asyncio.Lock] = {} self._cleanup_tasks: Dict[str, asyncio.Task] = {} + self._subscribers: Dict[str, Set[str]] = {} # context_id -> set of queue_ids (for future multi-subscriber support) - def create_queue(self, workflow_id: str) -> asyncio.Queue: + def create_queue(self, context_id: str) -> asyncio.Queue: """ - Create a new event queue for a workflow. + Create a new event queue for a context. Args: - workflow_id: Workflow ID + context_id: Context ID (workflow_id, document_id, task_id, etc.) Returns: - Event queue for the workflow + Event queue for the context """ - if workflow_id not in self._queues: - self._queues[workflow_id] = asyncio.Queue() - self._locks[workflow_id] = asyncio.Lock() - logger.debug(f"Created event queue for workflow {workflow_id}") - return self._queues[workflow_id] + if context_id not in self._queues: + self._queues[context_id] = asyncio.Queue() + self._locks[context_id] = asyncio.Lock() + self._subscribers[context_id] = set() + logger.debug(f"Created event queue for context {context_id}") + return self._queues[context_id] - def get_queue(self, workflow_id: str) -> Optional[asyncio.Queue]: + def get_queue(self, context_id: str) -> Optional[asyncio.Queue]: """ - Get existing event queue for a workflow. + Get existing event queue for a context. Args: - workflow_id: Workflow ID + context_id: Context ID Returns: Event queue if exists, None otherwise """ - return self._queues.get(workflow_id) + return self._queues.get(context_id) async def emit_event( self, - workflow_id: str, + context_id: str, event_type: str, - message: str, - step: Optional[str] = None, - data: Optional[Dict[str, Any]] = None + data: Dict[str, Any], + event_category: str = "default", + message: Optional[str] = None, + step: Optional[str] = None ): """ - Emit an event to the workflow's event queue. + Emit an event to the context's event queue. Args: - workflow_id: Workflow ID - event_type: Type of event ("status", "progress", "complete", "error") - message: Event message - step: Current processing step (optional) - data: Additional event data (optional) + context_id: Context ID (workflow_id, document_id, etc.) + event_type: Type of event ("message", "log", "status", "progress", "complete", "error", "chatdata") + data: Event data dictionary (will be included in event) + event_category: Category of event for filtering ("chat", "workflow", "document", etc.) + message: Optional event message (for backward compatibility) + step: Optional processing step (for backward compatibility) """ - queue = self.get_queue(workflow_id) + queue = self.get_queue(context_id) if not queue: - logger.debug(f"No event queue found for workflow {workflow_id}, skipping event") + logger.debug(f"No event queue found for context {context_id}, skipping event") return event = { "type": event_type, - "message": message, + "category": event_category, "timestamp": datetime.now().timestamp(), - "step": step, - "data": data or {} + "data": data, + "message": message, # For backward compatibility + "step": step # For backward compatibility } try: await queue.put(event) - logger.debug(f"Emitted {event_type} event for workflow {workflow_id}: {message[:50]}") + logger.debug(f"Emitted {event_type} event (category: {event_category}) for context {context_id}") except Exception as e: - logger.error(f"Error emitting event for workflow {workflow_id}: {e}") + logger.error(f"Error emitting event for context {context_id}: {e}") - async def cleanup(self, workflow_id: str, delay: float = 60.0): + async def stream_events( + self, + context_id: str, + event_categories: Optional[List[str]] = None, + timeout: Optional[float] = None + ) -> AsyncIterator[Dict[str, Any]]: + """ + Async generator for streaming events from a context. + + Args: + context_id: Context ID to stream events from + event_categories: Optional list of event categories to filter by + timeout: Optional timeout in seconds (None = no timeout) + + Yields: + Event dictionaries + """ + queue = self.get_queue(context_id) + if not queue: + logger.warning(f"No queue found for context {context_id}") + return + + start_time = asyncio.get_event_loop().time() if timeout else None + + while True: + # Check timeout + if timeout and start_time: + elapsed = asyncio.get_event_loop().time() - start_time + if elapsed > timeout: + logger.debug(f"Stream timeout for context {context_id}") + break + + try: + # Wait for event with timeout + wait_timeout = 1.0 # Check timeout every second + if timeout and start_time: + remaining = timeout - (asyncio.get_event_loop().time() - start_time) + if remaining <= 0: + break + wait_timeout = min(wait_timeout, remaining) + + event = await asyncio.wait_for(queue.get(), timeout=wait_timeout) + + # Filter by category if specified + if event_categories and event.get("category") not in event_categories: + continue + + yield event + + except asyncio.TimeoutError: + # Check if we should continue or timeout + if timeout and start_time: + elapsed = asyncio.get_event_loop().time() - start_time + if elapsed >= timeout: + break + continue + except Exception as e: + logger.error(f"Error in stream_events for context {context_id}: {e}") + break + + async def cleanup(self, context_id: str, delay: float = 60.0): """ Schedule cleanup of event queue after delay. This allows time for any remaining events to be consumed. Args: - workflow_id: Workflow ID + context_id: Context ID delay: Delay in seconds before cleanup (default: 60 seconds) """ - if workflow_id in self._cleanup_tasks: + if context_id in self._cleanup_tasks: # Cancel existing cleanup task - self._cleanup_tasks[workflow_id].cancel() + self._cleanup_tasks[context_id].cancel() async def _cleanup(): try: await asyncio.sleep(delay) - if workflow_id in self._queues: + if context_id in self._queues: # Drain remaining events - queue = self._queues[workflow_id] + queue = self._queues[context_id] while not queue.empty(): try: queue.get_nowait() except asyncio.QueueEmpty: break - del self._queues[workflow_id] - del self._locks[workflow_id] - logger.info(f"Cleaned up event queue for workflow {workflow_id}") + del self._queues[context_id] + if context_id in self._locks: + del self._locks[context_id] + if context_id in self._subscribers: + del self._subscribers[context_id] + logger.info(f"Cleaned up event queue for context {context_id}") except asyncio.CancelledError: pass except Exception as e: - logger.error(f"Error during cleanup for workflow {workflow_id}: {e}") + logger.error(f"Error during cleanup for context {context_id}: {e}") finally: - if workflow_id in self._cleanup_tasks: - del self._cleanup_tasks[workflow_id] + if context_id in self._cleanup_tasks: + del self._cleanup_tasks[context_id] - self._cleanup_tasks[workflow_id] = asyncio.create_task(_cleanup()) + self._cleanup_tasks[context_id] = asyncio.create_task(_cleanup()) - def has_queue(self, workflow_id: str) -> bool: + def has_queue(self, context_id: str) -> bool: """ - Check if a queue exists for a workflow. + Check if a queue exists for a context. Args: - workflow_id: Workflow ID + context_id: Context ID Returns: True if queue exists, False otherwise """ - return workflow_id in self._queues + return context_id in self._queues +# Backward compatibility: ChatbotEventManager is an alias +ChatbotEventManager = StreamingEventManager + # Global singleton instance -_event_manager = ChatbotEventManager() +_event_manager = StreamingEventManager() -def get_event_manager() -> ChatbotEventManager: +def get_event_manager() -> StreamingEventManager: """Get the global event manager instance.""" return _event_manager diff --git a/modules/features/chatbot/mainChatbot.py b/modules/features/chatbot/mainChatbot.py index 6f0cf770..ee74313d 100644 --- a/modules/features/chatbot/mainChatbot.py +++ b/modules/features/chatbot/mainChatbot.py @@ -158,15 +158,14 @@ async def chatProcess( # Emit message event for streaming (exact chatData format) message_timestamp = parseTimestamp(userMessage.publishedAt, default=getUtcTimestamp()) await event_manager.emit_event( - workflow.id, - "chatdata", - "New message", - "message", - { + context_id=workflow.id, + event_type="chatdata", + data={ "type": "message", "createdAt": message_timestamp, "item": userMessage.dict() - } + }, + event_category="chat" ) # Update workflow status @@ -538,7 +537,14 @@ async def _processChatbotMessage( workflow = interfaceDbChat.getWorkflow(workflowId) if not workflow: logger.error(f"Workflow {workflowId} not found during processing") - await event_manager.emit_event(workflowId, "error", f"Workflow {workflowId} nicht gefunden", "error") + await event_manager.emit_event( + context_id=workflowId, + event_type="error", + data={"error": f"Workflow {workflowId} nicht gefunden"}, + event_category="workflow", + message=f"Workflow {workflowId} nicht gefunden", + step="error" + ) return # Check if workflow was stopped before starting @@ -896,15 +902,14 @@ Beginne DIREKT mit "Aus der Datenbank habe ich..." (wenn Daten vorhanden) oder " # Emit message event for streaming (exact chatData format) message_timestamp = parseTimestamp(assistantMessage.publishedAt, default=getUtcTimestamp()) await event_manager.emit_event( - workflowId, - "chatdata", - "New message", - "message", - { + context_id=workflowId, + event_type="chatdata", + data={ "type": "message", "createdAt": message_timestamp, "item": assistantMessage.dict() - } + }, + event_category="chat" ) # Update workflow status to completed (only if not stopped) @@ -921,11 +926,12 @@ Beginne DIREKT mit "Aus der Datenbank habe ich..." (wenn Daten vorhanden) oder " # Emit completion event only if workflow wasn't stopped if not await _check_workflow_stopped(interfaceDbChat, workflowId): await event_manager.emit_event( - workflowId, - "complete", - "Chatbot-Verarbeitung abgeschlossen", - "complete", - {"workflowId": workflowId} + context_id=workflowId, + event_type="complete", + data={"workflowId": workflowId}, + event_category="workflow", + message="Chatbot-Verarbeitung abgeschlossen", + step="complete" ) # Schedule cleanup @@ -968,15 +974,14 @@ Beginne DIREKT mit "Aus der Datenbank habe ich..." (wenn Daten vorhanden) oder " # Emit message event for streaming (exact chatData format) message_timestamp = parseTimestamp(errorMessage.publishedAt, default=getUtcTimestamp()) await event_manager.emit_event( - workflowId, - "chatdata", - "New message", - "message", - { + context_id=workflowId, + event_type="chatdata", + data={ "type": "message", "createdAt": message_timestamp, "item": errorMessage.dict() - } + }, + event_category="chat" ) # Update workflow status to error (only if not stopped) diff --git a/modules/interfaces/interfaceDbChatObjects.py b/modules/interfaces/interfaceDbChatObjects.py index 40e30eaa..ac8bb586 100644 --- a/modules/interfaces/interfaceDbChatObjects.py +++ b/modules/interfaces/interfaceDbChatObjects.py @@ -1058,6 +1058,26 @@ class ChatObjects: actionName=createdMessage.get("actionName") ) + # Emit message event for streaming (if event manager is available) + try: + from modules.features.chatbot.eventManager import get_event_manager + event_manager = get_event_manager() + message_timestamp = parseTimestamp(chat_message.publishedAt, default=getUtcTimestamp()) + # Emit message event in exact chatData format: {type, createdAt, item} + asyncio.create_task(event_manager.emit_event( + context_id=workflowId, + event_type="chatdata", + data={ + "type": "message", + "createdAt": message_timestamp, + "item": chat_message.dict() + }, + event_category="chat" + )) + except Exception as e: + # Event manager not available or error - continue without emitting + logger.debug(f"Could not emit message event: {e}") + # Debug: Store message and documents for debugging - only if debug enabled storeDebugMessageAndDocuments(chat_message, self.currentUser) @@ -1419,15 +1439,14 @@ class ChatObjects: log_timestamp = parseTimestamp(createdLog.get("timestamp"), default=getUtcTimestamp()) # Emit log event in exact chatData format: {type, createdAt, item} asyncio.create_task(event_manager.emit_event( - workflowId, - "chatdata", - "New log", - "log", - { + context_id=workflowId, + event_type="chatdata", + data={ "type": "log", "createdAt": log_timestamp, "item": ChatLog(**createdLog).dict() - } + }, + event_category="chat" )) except Exception as e: # Event manager not available or error - continue without emitting diff --git a/modules/routes/routeChatbot.py b/modules/routes/routeChatbot.py index 25e24164..4975bf45 100644 --- a/modules/routes/routeChatbot.py +++ b/modules/routes/routeChatbot.py @@ -80,9 +80,9 @@ async def stream_chatbot_start( queue = event_manager.create_queue(workflow.id) async def event_stream(): - """Async generator for SSE events.""" + """Async generator for SSE events - pure event-driven streaming (no polling).""" try: - # Get interface for status checks and chat data + # Get interface for initial data and status checks interfaceDbChat = getServiceChat(currentUser) # Get current workflow to check if resuming and get current round @@ -90,7 +90,7 @@ async def stream_chatbot_start( current_round = current_workflow.currentRound if current_workflow else None is_resuming = final_workflow_id is not None and current_round and current_round > 1 - # Send initial chat data (exact format as chatData endpoint) + # Send initial chat data (exact format as chatData endpoint) - only once at start try: chatData = interfaceDbChat.getUnifiedChatData(workflow.id, None) if chatData.get("items"): @@ -126,32 +126,18 @@ async def stream_chatbot_start( } # Emit item directly in exact chatData format: {type, createdAt, item} yield f"data: {json.dumps(serializable_item)}\n\n" - - # Set initial timestamp for incremental fetching - if filtered_items: - timestamps = [parseTimestamp(item.get("createdAt"), default=0) for item in filtered_items] - last_chatdata_timestamp = max(timestamps) if timestamps else None - else: - last_chatdata_timestamp = None - else: - last_chatdata_timestamp = None except Exception as e: logger.warning(f"Error fetching initial chat data: {e}") - last_chatdata_timestamp = None # Keepalive interval (30 seconds) keepalive_interval = 30.0 last_keepalive = asyncio.get_event_loop().time() - # Status check interval (check workflow status every 3 seconds) - status_check_interval = 3.0 + # Status check interval (check workflow status every 5 seconds - less frequent since we're event-driven) + status_check_interval = 5.0 last_status_check = asyncio.get_event_loop().time() - # Chat data fetch interval (fetch chat data every 0.5 seconds for real-time updates) - chatdata_fetch_interval = 0.5 - last_chatdata_fetch = asyncio.get_event_loop().time() - - # Stream events until completion or timeout + # Stream events until completion or timeout - pure event-driven (no polling) timeout = 300.0 # 5 minutes max start_time = asyncio.get_event_loop().time() @@ -159,7 +145,6 @@ async def stream_chatbot_start( # Check timeout elapsed = asyncio.get_event_loop().time() - start_time if elapsed > timeout: - # Timeout - just close stream, don't emit non-chatData format events logger.info(f"Stream timeout for workflow {workflow.id}") break @@ -170,76 +155,29 @@ async def stream_chatbot_start( current_time = asyncio.get_event_loop().time() - # Periodically check workflow status and fetch chat data + # Periodically check workflow status (less frequent since we're event-driven) if current_time - last_status_check >= status_check_interval: try: current_workflow = interfaceDbChat.getWorkflow(workflow.id) if current_workflow and current_workflow.status == "stopped": logger.info(f"Workflow {workflow.id} was stopped, closing stream") - # Don't emit stopped event - just close stream break except Exception as e: logger.warning(f"Error checking workflow status: {e}") last_status_check = current_time - # Periodically fetch and emit chat data - if current_time - last_chatdata_fetch >= chatdata_fetch_interval: - try: - chatData = interfaceDbChat.getUnifiedChatData(workflow.id, last_chatdata_timestamp) - if chatData.get("items"): - # Filter items by round number if resuming - filtered_items = [] - for item in chatData["items"]: - if is_resuming and current_round: - # Get round number from item - item_round = None - item_data = item.get("item") - if item_data: - # Handle both dict and object access - if isinstance(item_data, dict): - item_round = item_data.get("roundNumber") - elif hasattr(item_data, "roundNumber"): - item_round = item_data.roundNumber - - # When resuming, only include items from current round onwards - # Exclude items without roundNumber (they're from old rounds before roundNumber was added) - # Exclude items with roundNumber < current_round (from previous rounds) - if item_round is None or item_round < current_round: - continue # Skip items from previous rounds or without round info - - filtered_items.append(item) - - # Emit filtered items directly in exact chatData format: {type, createdAt, item} - for item in filtered_items: - # Convert Pydantic models to dicts for JSON serialization - serializable_item = { - "type": item.get("type"), - "createdAt": item.get("createdAt"), - "item": item.get("item").dict() if hasattr(item.get("item"), "dict") else item.get("item") - } - yield f"data: {json.dumps(serializable_item)}\n\n" - # Update timestamp to only get new items next time - if chatData["items"]: - # Parse timestamps and get the maximum - timestamps = [] - for item in chatData["items"]: - ts = parseTimestamp(item.get("createdAt"), default=0) - timestamps.append(ts) - if timestamps: - last_chatdata_timestamp = max(timestamps) - except Exception as e: - logger.warning(f"Error fetching chat data: {e}") - last_chatdata_fetch = current_time - - # Try to get event with timeout + # Get event from queue (pure event-driven - no polling database) try: event = await asyncio.wait_for(queue.get(), timeout=1.0) - # Only emit chatdata events (messages, logs, stats) in exact chatData format - # Ignore status/progress/complete/stopped/error events that don't match the format - if event.get("type") == "chatdata" and event.get("data"): + # Handle different event types + event_type = event.get("type") + event_data = event.get("data", {}) + + # Emit chatdata events (messages, logs, stats) in exact chatData format + if event_type == "chatdata" and event_data: # Emit item directly in exact chatData format: {type, createdAt, item} - chatdata_item = event.get("data") + chatdata_item = event_data # Ensure item field is serializable (convert Pydantic models to dicts) if isinstance(chatdata_item, dict) and "item" in chatdata_item: item_obj = chatdata_item.get("item") @@ -247,26 +185,21 @@ async def stream_chatbot_start( chatdata_item = chatdata_item.copy() chatdata_item["item"] = item_obj.dict() yield f"data: {json.dumps(chatdata_item)}\n\n" - # Update timestamp for incremental fetching - if chatdata_item.get("createdAt"): - last_chatdata_timestamp = parseTimestamp(chatdata_item["createdAt"], default=None) - # Check if this is a completion/stopped event to close stream - if event.get("type") == "complete": + # Handle completion/stopped events to close stream + elif event_type == "complete": logger.info(f"Workflow {workflow.id} completed, closing stream") break - elif event.get("type") == "stopped": - # Workflow was stopped, close stream + elif event_type == "stopped": logger.info(f"Workflow {workflow.id} stopped, closing stream") break - elif event.get("type") == "error" and event.get("step") == "error": - # Final error, close stream + elif event_type == "error" and event.get("step") == "error": logger.warning(f"Workflow {workflow.id} error, closing stream") break - last_keepalive = asyncio.get_event_loop().time() + last_keepalive = current_time except asyncio.TimeoutError: - # Send keepalive if needed + # Send keepalive if needed (no events received, but keep connection alive) current_time = asyncio.get_event_loop().time() if current_time - last_keepalive >= keepalive_interval: yield f": keepalive\n\n" @@ -274,14 +207,12 @@ async def stream_chatbot_start( continue except Exception as e: logger.error(f"Error in event stream: {e}") - yield f"data: {json.dumps({'type': 'error', 'message': f'Stream error: {str(e)}'})}\n\n" break except Exception as e: logger.error(f"Error in event stream generator: {e}", exc_info=True) - # Don't emit error events that don't match chatData format finally: - # Stream ends - no final event needed as it doesn't match chatData format + # Stream ends - cleanup handled by event manager pass return StreamingResponse( @@ -317,10 +248,12 @@ async def stop_chatbot( # Emit stopped event to active streams event_manager = get_event_manager() await event_manager.emit_event( - workflowId, - "stopped", - "Workflow stopped by user", - "stopped" + context_id=workflowId, + event_type="stopped", + data={"workflowId": workflowId}, + event_category="workflow", + message="Workflow stopped by user", + step="stopped" ) logger.info(f"Emitted stopped event for workflow {workflowId}")