From f1231c4b867c850f369334207f4852eee950b1cc Mon Sep 17 00:00:00 2001 From: Ida Dittrich Date: Wed, 4 Mar 2026 08:10:50 +0100 Subject: [PATCH] feat: refactor message streaming + add streaming service --- modules/features/chatbot/chatbot.py | 14 ++-- .../chatbot/interfaceFeatureChatbot.py | 61 +++++++-------- .../features/chatbot/routeFeatureChatbot.py | 15 ++-- modules/features/chatbot/service.py | 76 +++++-------------- .../features/chatbot/streaming/__init__.py | 3 - .../codeeditor/routeFeatureCodeeditor.py | 2 +- modules/interfaces/interfaceDbChat.py | 76 +++++++++---------- modules/services/__init__.py | 3 + modules/services/serviceStreaming/__init__.py | 8 ++ .../serviceStreaming/eventManager.py} | 7 +- .../serviceStreaming}/helpers.py | 0 .../serviceStreaming/mainServiceStreaming.py | 36 +++++++++ 12 files changed, 152 insertions(+), 149 deletions(-) delete mode 100644 modules/features/chatbot/streaming/__init__.py create mode 100644 modules/services/serviceStreaming/__init__.py rename modules/{features/chatbot/streaming/events.py => services/serviceStreaming/eventManager.py} (96%) rename modules/{features/chatbot/streaming => services/serviceStreaming}/helpers.py (100%) create mode 100644 modules/services/serviceStreaming/mainServiceStreaming.py diff --git a/modules/features/chatbot/chatbot.py b/modules/features/chatbot/chatbot.py index 7bdbb183..2b983372 100644 --- a/modules/features/chatbot/chatbot.py +++ b/modules/features/chatbot/chatbot.py @@ -27,8 +27,7 @@ from modules.features.chatbot.bridges.tools import ( create_tavily_search_tool, create_send_streaming_message_tool, ) -from modules.features.chatbot.streaming.helpers import ChatStreamingHelper -from modules.features.chatbot.streaming.events import get_event_manager +from modules.services.serviceStreaming import ChatStreamingHelper from modules.datamodels.datamodelUam import User if TYPE_CHECKING: @@ -179,6 +178,7 @@ class Chatbot: system_prompt: str = "You are a helpful assistant." workflow_id: str = "default" config: Optional["ChatbotConfig"] = None + _event_manager: Any = None @classmethod async def create( @@ -188,6 +188,7 @@ class Chatbot: system_prompt: str, workflow_id: str = "default", config: Optional["ChatbotConfig"] = None, + event_manager=None, ) -> "Chatbot": """Factory method to create and configure a Chatbot instance. @@ -197,6 +198,7 @@ class Chatbot: system_prompt: The system prompt to initialize the chatbot. workflow_id: The workflow ID (maps to thread_id). config: Optional chatbot configuration for dynamic tool enablement. + event_manager: Optional event manager for streaming (passed from route). Returns: A configured Chatbot instance. @@ -207,6 +209,7 @@ class Chatbot: system_prompt=system_prompt, workflow_id=workflow_id, config=config, + _event_manager=event_manager, ) configured_tools = await instance._configure_tools() instance.app = instance._build_app(memory, configured_tools) @@ -247,10 +250,9 @@ class Chatbot: tools.append(tavily_tool) logger.debug("Added Tavily search tool") - # Streaming status tool (if enabled) - if streaming_enabled: - event_manager = get_event_manager() - send_streaming_message = create_send_streaming_message_tool(event_manager) + # Streaming status tool (if enabled and event_manager available) + if streaming_enabled and self._event_manager: + send_streaming_message = create_send_streaming_message_tool(self._event_manager) tools.append(send_streaming_message) logger.debug("Added streaming status tool") diff --git a/modules/features/chatbot/interfaceFeatureChatbot.py b/modules/features/chatbot/interfaceFeatureChatbot.py index d7c1e8fa..15711e7e 100644 --- a/modules/features/chatbot/interfaceFeatureChatbot.py +++ b/modules/features/chatbot/interfaceFeatureChatbot.py @@ -1027,7 +1027,7 @@ class ChatObjects: totalPages=totalPages ) - def createMessage(self, messageData: Dict[str, Any]) -> ChatbotMessage: + def createMessage(self, messageData: Dict[str, Any], event_manager=None) -> ChatbotMessage: """Creates a message for a conversation if user has access. Accepts workflowId (from bridge) or conversationId.""" try: if "id" not in messageData or not messageData["id"]: @@ -1131,23 +1131,21 @@ class ChatObjects: actionName=createdMessage.get("actionName") ) - try: - from modules.features.chatbot.streaming.events import get_event_manager - event_manager = get_event_manager() - message_timestamp = parseTimestamp(chat_message.publishedAt, default=getUtcTimestamp()) - asyncio.create_task(event_manager.emit_event( - context_id=conversationId, - event_type="chatdata", - data={ - "type": "message", - "createdAt": message_timestamp, - "item": chat_message.model_dump() - }, - event_category="chat" - )) - except Exception as e: - # Event manager not available or error - continue without emitting - logger.debug(f"Could not emit message event: {e}") + if event_manager: + try: + message_timestamp = parseTimestamp(chat_message.publishedAt, default=getUtcTimestamp()) + asyncio.create_task(event_manager.emit_event( + context_id=conversationId, + event_type="chatdata", + data={ + "type": "message", + "createdAt": message_timestamp, + "item": chat_message.model_dump() + }, + event_category="chat" + )) + except Exception as e: + logger.debug(f"Could not emit message event: {e}") # Debug: Store message and documents for debugging - only if debug enabled storeDebugMessageAndDocuments(chat_message, self.currentUser, mandateId=self.mandateId, featureInstanceId=self.featureInstanceId) @@ -1387,7 +1385,7 @@ class ChatObjects: totalPages=totalPages ) - def createLog(self, logData: Dict[str, Any]) -> Optional[ChatbotLog]: + def createLog(self, logData: Dict[str, Any], event_manager=None) -> Optional[ChatbotLog]: """Creates a log entry for a conversation if user has access. Accepts workflowId for backward compat.""" conversationId = logData.get("conversationId") or logData.get("workflowId") if not conversationId: @@ -1420,19 +1418,18 @@ class ChatObjects: if not createdLog: return None - try: - from modules.features.chatbot.streaming.events import get_event_manager - event_manager = get_event_manager() - log_timestamp = parseTimestamp(createdLog.get("timestamp"), default=getUtcTimestamp()) - asyncio.create_task(event_manager.emit_event( - context_id=conversationId, - event_type="chatdata", - data={"type": "log", "createdAt": log_timestamp, "item": ChatbotLog(**createdLog).model_dump()}, - event_category="log", - message="New log" - )) - except Exception as e: - logger.debug(f"Could not emit log event: {e}") + if event_manager: + try: + log_timestamp = parseTimestamp(createdLog.get("timestamp"), default=getUtcTimestamp()) + asyncio.create_task(event_manager.emit_event( + context_id=conversationId, + event_type="chatdata", + data={"type": "log", "createdAt": log_timestamp, "item": ChatbotLog(**createdLog).model_dump()}, + event_category="log", + message="New log" + )) + except Exception as e: + logger.debug(f"Could not emit log event: {e}") return ChatbotLog(**createdLog) diff --git a/modules/features/chatbot/routeFeatureChatbot.py b/modules/features/chatbot/routeFeatureChatbot.py index a88c5242..d8231a07 100644 --- a/modules/features/chatbot/routeFeatureChatbot.py +++ b/modules/features/chatbot/routeFeatureChatbot.py @@ -31,7 +31,7 @@ from modules.features.chatbot.interfaceFeatureChatbot import ChatbotConversation # Import chatbot feature from modules.features.chatbot import chatProcess -from modules.features.chatbot.streaming.events import get_event_manager +from modules.services.serviceStreaming import get_event_manager # Configure logger logger = logging.getLogger(__name__) @@ -244,8 +244,11 @@ async def stream_chatbot_start( final_workflow_id = workflowId or userInput.workflowId # Start background processing (this will create the workflow and event queue) - # Pass featureInstanceId to chatProcess - workflow = await chatProcess(context.user, mandateId, userInput, final_workflow_id, featureInstanceId=instanceId) + # Pass featureInstanceId and event_manager to chatProcess + workflow = await chatProcess( + context.user, mandateId, userInput, final_workflow_id, + featureInstanceId=instanceId, event_manager=event_manager + ) # Check if workflow was created successfully if not workflow: @@ -464,7 +467,8 @@ async def stop_chatbot( "lastActivity": getUtcTimestamp() }) - # Store log entry + event_manager = get_event_manager() + # Store log entry (createLog emits when event_manager is provided) interfaceDbChat.createLog({ "id": f"log_{uuid.uuid4()}", "workflowId": workflowId, @@ -473,13 +477,12 @@ async def stop_chatbot( "status": "stopped", "timestamp": getUtcTimestamp(), "roundNumber": workflow.currentRound if workflow else 1 - }) + }, event_manager=event_manager) # Reload workflow to return updated version workflow = interfaceDbChat.getWorkflow(workflowId) # Emit stopped event to active streams - event_manager = get_event_manager() await event_manager.emit_event( context_id=workflowId, event_type="stopped", diff --git a/modules/features/chatbot/service.py b/modules/features/chatbot/service.py index f3ab4e0b..c419b623 100644 --- a/modules/features/chatbot/service.py +++ b/modules/features/chatbot/service.py @@ -20,7 +20,6 @@ from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, Operati from modules.datamodels.datamodelDocref import DocumentReferenceList, DocumentItemReference from modules.shared.timeUtils import getUtcTimestamp, parseTimestamp from modules.services import getInterface as getServices -from modules.features.chatbot.streaming.events import get_event_manager from modules.features.chatbot.chatbot import Chatbot from modules.features.chatbot.bridges.ai import AICenterChatModel, clear_workflow_allowed_providers from modules.features.chatbot.bridges.memory import DatabaseCheckpointer @@ -69,7 +68,8 @@ async def chatProcess( mandateId: Optional[str], userInput: UserInputRequest, workflowId: Optional[str] = None, - featureInstanceId: Optional[str] = None + featureInstanceId: Optional[str] = None, + event_manager=None # Required when called from streaming route ) -> ChatbotConversation: """ Simple chatbot processing - analyze user input and generate queries. @@ -104,10 +104,7 @@ async def chatProcess( from modules.features.chatbot.interfaceFeatureChatbot import getInterface as getChatbotInterface interfaceDbChat = getChatbotInterface(currentUser, mandateId=mandateId, featureInstanceId=featureInstanceId) - # Get event manager and create queue if needed - event_manager = get_event_manager() - - # Create or load workflow + # Create or load workflow (event_manager passed from route) if workflowId: workflow = interfaceDbChat.getWorkflow(workflowId) if not workflow: @@ -221,22 +218,11 @@ async def chatProcess( } if user_documents: userMessageData["documents"] = [d.model_dump() for d in user_documents] - userMessage = interfaceDbChat.createMessage(userMessageData) + # Don't pass event_manager: event_stream sends initial chatData from DB (includes user msg). + # Emitting here would duplicate it (initial chatData + queue event). + userMessage = interfaceDbChat.createMessage(userMessageData, event_manager=None) logger.info(f"Stored user message: {userMessage.id} with {len(user_documents)} document(s)") - - # Emit message event for streaming (exact chatData format) - message_timestamp = parseTimestamp(userMessage.publishedAt, default=getUtcTimestamp()) - await event_manager.emit_event( - context_id=workflow.id, - event_type="chatdata", - data={ - "type": "message", - "createdAt": message_timestamp, - "item": userMessage.model_dump() - }, - event_category="chat" - ) - + # Update workflow status interfaceDbChat.updateWorkflow(workflow.id, { "status": "running", @@ -255,7 +241,8 @@ async def chatProcess( userInput, userMessage.id, featureInstanceId=featureInstanceId, - config=chatbot_config + config=chatbot_config, + event_manager=event_manager )) # Reload workflow to include new message @@ -407,34 +394,8 @@ async def _emit_log_and_event( "status": status, "roundNumber": round_number } - # Store log in database - created_log = interfaceDbChat.createLog(log_data) - - # Emit event directly for streaming (using correct signature) - if created_log and event_manager: - try: - # Convert to dict if it's a Pydantic model (ChatbotLog from createLog) - if hasattr(created_log, "model_dump"): - log_dict = created_log.model_dump() - elif hasattr(created_log, "dict"): - log_dict = created_log.dict() - else: - log_dict = log_data - - await event_manager.emit_event( - context_id=workflowId, - event_type="chatdata", - data={ - "type": "log", - "createdAt": log_timestamp, - "item": log_dict - }, - event_category="chat", - message="New log", - step="log" - ) - except Exception as emit_error: - logger.warning(f"Error emitting log event: {emit_error}") + # Store log in database (createLog emits when event_manager is provided) + created_log = interfaceDbChat.createLog(log_data, event_manager=event_manager) except Exception as e: logger.error(f"Error storing log: {e}", exc_info=True) @@ -1019,7 +980,7 @@ async def _bridge_chatbot_events( } try: - assistant_msg = interface_db_chat.createMessage(message_data) + assistant_msg = interface_db_chat.createMessage(message_data, event_manager=event_manager) final_message_stored = True # Emit message event @@ -1104,7 +1065,7 @@ async def _bridge_chatbot_events( "success": False } - error_msg = interface_db_chat.createMessage(error_message_data) + error_msg = interface_db_chat.createMessage(error_message_data, event_manager=event_manager) # Emit message event message_timestamp = parseTimestamp(error_msg.publishedAt, default=getUtcTimestamp()) @@ -1272,7 +1233,8 @@ async def _processChatbotMessageLangGraph( userInput: UserInputRequest, userMessageId: str, featureInstanceId: Optional[str] = None, - config: Optional[ChatbotConfig] = None + config: Optional[ChatbotConfig] = None, + event_manager=None ): """ Process chatbot message using LangGraph. @@ -1285,9 +1247,8 @@ async def _processChatbotMessageLangGraph( userInput: User input request userMessageId: User message ID featureInstanceId: Optional feature instance ID for loading instance-specific config + event_manager: Event manager for streaming (passed from chatProcess) """ - event_manager = get_event_manager() - try: from modules.features.chatbot.interfaceFeatureChatbot import getInterface as getChatbotInterface interfaceDbChat = getChatbotInterface(currentUser, mandateId=services.mandateId, featureInstanceId=featureInstanceId) @@ -1376,7 +1337,8 @@ async def _processChatbotMessageLangGraph( memory=memory, system_prompt=system_prompt, workflow_id=workflowId, - config=config + config=config, + event_manager=event_manager ) # Emit synthetic status for real-time UI feedback @@ -1436,7 +1398,7 @@ async def _processChatbotMessageLangGraph( "taskNumber": 0, "actionNumber": 0 } - errorMessage = interfaceDbChat.createMessage(errorMessageData) + errorMessage = interfaceDbChat.createMessage(errorMessageData, event_manager=event_manager) # Emit message event message_timestamp = parseTimestamp(errorMessage.publishedAt, default=getUtcTimestamp()) diff --git a/modules/features/chatbot/streaming/__init__.py b/modules/features/chatbot/streaming/__init__.py deleted file mode 100644 index a5b2eedb..00000000 --- a/modules/features/chatbot/streaming/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -# Copyright (c) 2025 Patrick Motsch -# All rights reserved. -"""Streaming infrastructure for chatbot events.""" diff --git a/modules/features/codeeditor/routeFeatureCodeeditor.py b/modules/features/codeeditor/routeFeatureCodeeditor.py index 0d76389c..1feacd53 100644 --- a/modules/features/codeeditor/routeFeatureCodeeditor.py +++ b/modules/features/codeeditor/routeFeatureCodeeditor.py @@ -17,7 +17,7 @@ from modules.auth import limiter, getRequestContext, RequestContext from modules.interfaces import interfaceDbChat, interfaceDbManagement from modules.interfaces.interfaceAiObjects import AiObjects from modules.datamodels.datamodelChat import UserInputRequest -from modules.features.chatbot.streaming.events import get_event_manager +from modules.services.serviceStreaming import get_event_manager from modules.features.codeeditor import codeEditorProcessor, fileContextManager from modules.features.codeeditor.datamodelCodeeditor import FileEditProposal, EditStatusEnum diff --git a/modules/interfaces/interfaceDbChat.py b/modules/interfaces/interfaceDbChat.py index dfdf9886..5ed7fa6c 100644 --- a/modules/interfaces/interfaceDbChat.py +++ b/modules/interfaces/interfaceDbChat.py @@ -1003,7 +1003,7 @@ class ChatObjects: totalPages=totalPages ) - def createMessage(self, messageData: Dict[str, Any]) -> ChatMessage: + def createMessage(self, messageData: Dict[str, Any], event_manager=None) -> ChatMessage: """Creates a message for a workflow if user has access.""" try: # Ensure ID is present @@ -1121,25 +1121,23 @@ class ChatObjects: actionName=createdMessage.get("actionName") ) - # Emit message event for streaming (if event manager is available) - try: - from modules.features.chatbot.streaming.events import get_event_manager - event_manager = get_event_manager() - message_timestamp = parseTimestamp(chat_message.publishedAt, default=getUtcTimestamp()) - # Emit message event in exact chatData format: {type, createdAt, item} - asyncio.create_task(event_manager.emit_event( - context_id=workflowId, - event_type="chatdata", - data={ - "type": "message", - "createdAt": message_timestamp, - "item": chat_message.dict() - }, - event_category="chat" - )) - except Exception as e: - # Event manager not available or error - continue without emitting - logger.debug(f"Could not emit message event: {e}") + # Emit message event for streaming (if event manager is provided) + if event_manager: + try: + message_timestamp = parseTimestamp(chat_message.publishedAt, default=getUtcTimestamp()) + # Emit message event in exact chatData format: {type, createdAt, item} + asyncio.create_task(event_manager.emit_event( + context_id=workflowId, + event_type="chatdata", + data={ + "type": "message", + "createdAt": message_timestamp, + "item": chat_message.dict() + }, + event_category="chat" + )) + except Exception as e: + logger.debug(f"Could not emit message event: {e}") # Debug: Store message and documents for debugging - only if debug enabled storeDebugMessageAndDocuments(chat_message, self.currentUser, mandateId=self.mandateId, featureInstanceId=self.featureInstanceId) @@ -1447,7 +1445,7 @@ class ChatObjects: totalPages=totalPages ) - def createLog(self, logData: Dict[str, Any]) -> ChatLog: + def createLog(self, logData: Dict[str, Any], event_manager=None) -> ChatLog: """Creates a log entry for a workflow if user has access.""" # Check workflow access workflowId = logData.get("workflowId") @@ -1498,25 +1496,23 @@ class ChatObjects: # Create log in normalized table createdLog = self.db.recordCreate(ChatLog, log_model) - # Emit log event for streaming (if event manager is available) - try: - from modules.features.chatbot.streaming.events import get_event_manager - event_manager = get_event_manager() - log_timestamp = parseTimestamp(createdLog.get("timestamp"), default=getUtcTimestamp()) - # Emit log event in exact chatData format: {type, createdAt, item} - asyncio.create_task(event_manager.emit_event( - context_id=workflowId, - event_type="chatdata", - data={ - "type": "log", - "createdAt": log_timestamp, - "item": ChatLog(**createdLog).dict() - }, - event_category="chat" - )) - except Exception as e: - # Event manager not available or error - continue without emitting - logger.debug(f"Could not emit log event: {e}") + # Emit log event for streaming (if event manager is provided) + if event_manager: + try: + log_timestamp = parseTimestamp(createdLog.get("timestamp"), default=getUtcTimestamp()) + # Emit log event in exact chatData format: {type, createdAt, item} + asyncio.create_task(event_manager.emit_event( + context_id=workflowId, + event_type="chatdata", + data={ + "type": "log", + "createdAt": log_timestamp, + "item": ChatLog(**createdLog).dict() + }, + event_category="chat" + )) + except Exception as e: + logger.debug(f"Could not emit log event: {e}") # Return validated ChatLog instance return ChatLog(**createdLog) diff --git a/modules/services/__init__.py b/modules/services/__init__.py index 6712372a..f6ac292a 100644 --- a/modules/services/__init__.py +++ b/modules/services/__init__.py @@ -107,6 +107,9 @@ class Services: from .serviceMessaging.mainServiceMessaging import MessagingService self.messaging = PublicService(MessagingService(self)) + from .serviceStreaming.mainServiceStreaming import StreamingService + self.streaming = PublicService(StreamingService(self)) + # ============================================================ # AI SERVICES (from modules/services/) # ============================================================ diff --git a/modules/services/serviceStreaming/__init__.py b/modules/services/serviceStreaming/__init__.py new file mode 100644 index 00000000..e26c24ff --- /dev/null +++ b/modules/services/serviceStreaming/__init__.py @@ -0,0 +1,8 @@ +# Copyright (c) 2025 Patrick Motsch +# All rights reserved. +"""Streaming service for SSE event management across features.""" + +from .eventManager import EventManager, get_event_manager +from .helpers import ChatStreamingHelper + +__all__ = ["EventManager", "get_event_manager", "ChatStreamingHelper"] diff --git a/modules/features/chatbot/streaming/events.py b/modules/services/serviceStreaming/eventManager.py similarity index 96% rename from modules/features/chatbot/streaming/events.py rename to modules/services/serviceStreaming/eventManager.py index 7a65205e..ddd98946 100644 --- a/modules/features/chatbot/streaming/events.py +++ b/modules/services/serviceStreaming/eventManager.py @@ -1,21 +1,20 @@ # Copyright (c) 2025 Patrick Motsch # All rights reserved. """ -Event manager for chatbot streaming. -Manages event queues for Server-Sent Events (SSE) streaming. +Event manager for SSE streaming. +Manages event queues for Server-Sent Events (SSE) streaming across features. """ import logging import asyncio from typing import Dict, Optional, Any -from collections import defaultdict logger = logging.getLogger(__name__) class EventManager: """ - Manages event queues for chatbot streaming. + Manages event queues for SSE streaming. Each workflow has its own async queue for events. """ diff --git a/modules/features/chatbot/streaming/helpers.py b/modules/services/serviceStreaming/helpers.py similarity index 100% rename from modules/features/chatbot/streaming/helpers.py rename to modules/services/serviceStreaming/helpers.py diff --git a/modules/services/serviceStreaming/mainServiceStreaming.py b/modules/services/serviceStreaming/mainServiceStreaming.py new file mode 100644 index 00000000..352688dc --- /dev/null +++ b/modules/services/serviceStreaming/mainServiceStreaming.py @@ -0,0 +1,36 @@ +# Copyright (c) 2025 Patrick Motsch +# All rights reserved. +""" +Streaming service for SSE event management. +Provides access to the global event manager for workflow streaming. +""" + +import logging +from typing import Any + +from modules.services.serviceStreaming.eventManager import EventManager, get_event_manager + +logger = logging.getLogger(__name__) + + +class StreamingService: + """ + Streaming service providing access to SSE event infrastructure. + """ + + def __init__(self, services: Any): + """Initialize streaming service with service center access. + + Args: + services: Service center instance providing access to interfaces + """ + self.services = services + + def getEventManager(self) -> EventManager: + """ + Get the global event manager instance for SSE streaming. + + Returns: + EventManager instance + """ + return get_event_manager()