diff --git a/modules/features/chatbot/eventManager.py b/modules/features/chatbot/eventManager.py index 8bc4ff94..8780ef4f 100644 --- a/modules/features/chatbot/eventManager.py +++ b/modules/features/chatbot/eventManager.py @@ -109,7 +109,7 @@ class StreamingEventManager: Args: context_id: Context ID to stream events from event_categories: Optional list of event categories to filter by - timeout: Optional timeout in seconds (None = no timeout) + timeout: Optional timeout in seconds (None = no timeout, default: 300s for long-running streams) Yields: Event dictionaries @@ -119,26 +119,30 @@ class StreamingEventManager: logger.warning(f"No queue found for context {context_id}") return - start_time = asyncio.get_event_loop().time() if timeout else None + # Default timeout of 5 minutes for long-running streams if not specified + effective_timeout = timeout if timeout is not None else 300.0 + start_time = asyncio.get_event_loop().time() + last_event_time = start_time + heartbeat_interval = 30.0 # Send heartbeat every 30 seconds to keep connection alive while True: # Check timeout - if timeout and start_time: - elapsed = asyncio.get_event_loop().time() - start_time - if elapsed > timeout: - logger.debug(f"Stream timeout for context {context_id}") - break + elapsed = asyncio.get_event_loop().time() - start_time + if elapsed > effective_timeout: + logger.debug(f"Stream timeout for context {context_id} after {effective_timeout}s") + break try: - # Wait for event with timeout - wait_timeout = 1.0 # Check timeout every second - if timeout and start_time: - remaining = timeout - (asyncio.get_event_loop().time() - start_time) + # Wait for event with longer timeout to avoid premature closure + wait_timeout = heartbeat_interval # Check every 30 seconds + if effective_timeout: + remaining = effective_timeout - elapsed if remaining <= 0: break wait_timeout = min(wait_timeout, remaining) event = await asyncio.wait_for(queue.get(), timeout=wait_timeout) + last_event_time = asyncio.get_event_loop().time() # Filter by category if specified if event_categories and event.get("category") not in event_categories: @@ -147,11 +151,25 @@ class StreamingEventManager: yield event except asyncio.TimeoutError: + # Send heartbeat to keep connection alive if no events + time_since_last_event = asyncio.get_event_loop().time() - last_event_time + if time_since_last_event >= heartbeat_interval: + # Send heartbeat event to keep stream alive + heartbeat_event = { + "type": "heartbeat", + "category": "system", + "timestamp": datetime.now().timestamp(), + "data": {"status": "alive"}, + "message": None, + "step": None + } + yield heartbeat_event + last_event_time = asyncio.get_event_loop().time() + # Check if we should continue or timeout - if timeout and start_time: - elapsed = asyncio.get_event_loop().time() - start_time - if elapsed >= timeout: - break + elapsed = asyncio.get_event_loop().time() - start_time + if elapsed >= effective_timeout: + break continue except Exception as e: logger.error(f"Error in stream_events for context {context_id}: {e}") diff --git a/modules/features/chatbot/mainChatbot.py b/modules/features/chatbot/mainChatbot.py index 4c93e6a9..3fbc14fa 100644 --- a/modules/features/chatbot/mainChatbot.py +++ b/modules/features/chatbot/mainChatbot.py @@ -1521,8 +1521,8 @@ async def _processChatbotMessage( step="complete" ) - # Schedule cleanup - await event_manager.cleanup(workflowId) + # Schedule cleanup with longer delay to allow stream to stay open + await event_manager.cleanup(workflowId, delay=300.0) # 5 minutes delay except Exception as e: logger.error(f"Error processing chatbot message: {str(e)}", exc_info=True) diff --git a/modules/routes/routeChatbot.py b/modules/routes/routeChatbot.py index 86cbb940..11814313 100644 --- a/modules/routes/routeChatbot.py +++ b/modules/routes/routeChatbot.py @@ -357,11 +357,26 @@ async def get_chatbot_threads( detail=f"Workflow with ID {workflowId} not found" ) + # Normalize workflow data to match ChatWorkflow model requirements + # Convert workflow object to dict if needed, and normalize None values + if hasattr(workflow, 'model_dump'): + workflow_dict = workflow.model_dump() + elif hasattr(workflow, 'dict'): + workflow_dict = workflow.dict() + elif isinstance(workflow, dict): + workflow_dict = dict(workflow) + else: + workflow_dict = workflow + + # Set maxSteps to default value of 10 if None (as per ChatWorkflow model) + if workflow_dict.get("maxSteps") is None: + workflow_dict["maxSteps"] = 10 + # Get unified chat data for this workflow chatData = interfaceDbChat.getUnifiedChatData(workflowId, None) return { - "workflow": workflow, + "workflow": workflow_dict, "chatData": chatData } @@ -407,6 +422,16 @@ async def get_chatbot_threads( totalItems = len(chatbot_workflows_data) totalPages = 1 + # Normalize workflow data to match ChatWorkflow model requirements + # Convert None values to defaults before response validation + normalized_workflows = [] + for wf in workflows: + normalized_wf = dict(wf) # Create a copy + # Set maxSteps to default value of 10 if None (as per ChatWorkflow model) + if normalized_wf.get("maxSteps") is None: + normalized_wf["maxSteps"] = 10 + normalized_workflows.append(normalized_wf) + # Create paginated response from modules.datamodels.datamodelPagination import PaginationMetadata metadata = PaginationMetadata( @@ -419,7 +444,7 @@ async def get_chatbot_threads( ) return PaginatedResponse( - items=workflows, + items=normalized_workflows, pagination=metadata )