feat: refactor message streaming + add streaming service
This commit is contained in:
parent
1f529568f5
commit
f1231c4b86
12 changed files with 152 additions and 149 deletions
|
|
@ -27,8 +27,7 @@ from modules.features.chatbot.bridges.tools import (
|
|||
create_tavily_search_tool,
|
||||
create_send_streaming_message_tool,
|
||||
)
|
||||
from modules.features.chatbot.streaming.helpers import ChatStreamingHelper
|
||||
from modules.features.chatbot.streaming.events import get_event_manager
|
||||
from modules.services.serviceStreaming import ChatStreamingHelper
|
||||
from modules.datamodels.datamodelUam import User
|
||||
|
||||
if TYPE_CHECKING:
|
||||
|
|
@ -179,6 +178,7 @@ class Chatbot:
|
|||
system_prompt: str = "You are a helpful assistant."
|
||||
workflow_id: str = "default"
|
||||
config: Optional["ChatbotConfig"] = None
|
||||
_event_manager: Any = None
|
||||
|
||||
@classmethod
|
||||
async def create(
|
||||
|
|
@ -188,6 +188,7 @@ class Chatbot:
|
|||
system_prompt: str,
|
||||
workflow_id: str = "default",
|
||||
config: Optional["ChatbotConfig"] = None,
|
||||
event_manager=None,
|
||||
) -> "Chatbot":
|
||||
"""Factory method to create and configure a Chatbot instance.
|
||||
|
||||
|
|
@ -197,6 +198,7 @@ class Chatbot:
|
|||
system_prompt: The system prompt to initialize the chatbot.
|
||||
workflow_id: The workflow ID (maps to thread_id).
|
||||
config: Optional chatbot configuration for dynamic tool enablement.
|
||||
event_manager: Optional event manager for streaming (passed from route).
|
||||
|
||||
Returns:
|
||||
A configured Chatbot instance.
|
||||
|
|
@ -207,6 +209,7 @@ class Chatbot:
|
|||
system_prompt=system_prompt,
|
||||
workflow_id=workflow_id,
|
||||
config=config,
|
||||
_event_manager=event_manager,
|
||||
)
|
||||
configured_tools = await instance._configure_tools()
|
||||
instance.app = instance._build_app(memory, configured_tools)
|
||||
|
|
@ -247,10 +250,9 @@ class Chatbot:
|
|||
tools.append(tavily_tool)
|
||||
logger.debug("Added Tavily search tool")
|
||||
|
||||
# Streaming status tool (if enabled)
|
||||
if streaming_enabled:
|
||||
event_manager = get_event_manager()
|
||||
send_streaming_message = create_send_streaming_message_tool(event_manager)
|
||||
# Streaming status tool (if enabled and event_manager available)
|
||||
if streaming_enabled and self._event_manager:
|
||||
send_streaming_message = create_send_streaming_message_tool(self._event_manager)
|
||||
tools.append(send_streaming_message)
|
||||
logger.debug("Added streaming status tool")
|
||||
|
||||
|
|
|
|||
|
|
@ -1027,7 +1027,7 @@ class ChatObjects:
|
|||
totalPages=totalPages
|
||||
)
|
||||
|
||||
def createMessage(self, messageData: Dict[str, Any]) -> ChatbotMessage:
|
||||
def createMessage(self, messageData: Dict[str, Any], event_manager=None) -> ChatbotMessage:
|
||||
"""Creates a message for a conversation if user has access. Accepts workflowId (from bridge) or conversationId."""
|
||||
try:
|
||||
if "id" not in messageData or not messageData["id"]:
|
||||
|
|
@ -1131,9 +1131,8 @@ class ChatObjects:
|
|||
actionName=createdMessage.get("actionName")
|
||||
)
|
||||
|
||||
if event_manager:
|
||||
try:
|
||||
from modules.features.chatbot.streaming.events import get_event_manager
|
||||
event_manager = get_event_manager()
|
||||
message_timestamp = parseTimestamp(chat_message.publishedAt, default=getUtcTimestamp())
|
||||
asyncio.create_task(event_manager.emit_event(
|
||||
context_id=conversationId,
|
||||
|
|
@ -1146,7 +1145,6 @@ class ChatObjects:
|
|||
event_category="chat"
|
||||
))
|
||||
except Exception as e:
|
||||
# Event manager not available or error - continue without emitting
|
||||
logger.debug(f"Could not emit message event: {e}")
|
||||
|
||||
# Debug: Store message and documents for debugging - only if debug enabled
|
||||
|
|
@ -1387,7 +1385,7 @@ class ChatObjects:
|
|||
totalPages=totalPages
|
||||
)
|
||||
|
||||
def createLog(self, logData: Dict[str, Any]) -> Optional[ChatbotLog]:
|
||||
def createLog(self, logData: Dict[str, Any], event_manager=None) -> Optional[ChatbotLog]:
|
||||
"""Creates a log entry for a conversation if user has access. Accepts workflowId for backward compat."""
|
||||
conversationId = logData.get("conversationId") or logData.get("workflowId")
|
||||
if not conversationId:
|
||||
|
|
@ -1420,9 +1418,8 @@ class ChatObjects:
|
|||
if not createdLog:
|
||||
return None
|
||||
|
||||
if event_manager:
|
||||
try:
|
||||
from modules.features.chatbot.streaming.events import get_event_manager
|
||||
event_manager = get_event_manager()
|
||||
log_timestamp = parseTimestamp(createdLog.get("timestamp"), default=getUtcTimestamp())
|
||||
asyncio.create_task(event_manager.emit_event(
|
||||
context_id=conversationId,
|
||||
|
|
|
|||
|
|
@ -31,7 +31,7 @@ from modules.features.chatbot.interfaceFeatureChatbot import ChatbotConversation
|
|||
|
||||
# Import chatbot feature
|
||||
from modules.features.chatbot import chatProcess
|
||||
from modules.features.chatbot.streaming.events import get_event_manager
|
||||
from modules.services.serviceStreaming import get_event_manager
|
||||
|
||||
# Configure logger
|
||||
logger = logging.getLogger(__name__)
|
||||
|
|
@ -244,8 +244,11 @@ async def stream_chatbot_start(
|
|||
final_workflow_id = workflowId or userInput.workflowId
|
||||
|
||||
# Start background processing (this will create the workflow and event queue)
|
||||
# Pass featureInstanceId to chatProcess
|
||||
workflow = await chatProcess(context.user, mandateId, userInput, final_workflow_id, featureInstanceId=instanceId)
|
||||
# Pass featureInstanceId and event_manager to chatProcess
|
||||
workflow = await chatProcess(
|
||||
context.user, mandateId, userInput, final_workflow_id,
|
||||
featureInstanceId=instanceId, event_manager=event_manager
|
||||
)
|
||||
|
||||
# Check if workflow was created successfully
|
||||
if not workflow:
|
||||
|
|
@ -464,7 +467,8 @@ async def stop_chatbot(
|
|||
"lastActivity": getUtcTimestamp()
|
||||
})
|
||||
|
||||
# Store log entry
|
||||
event_manager = get_event_manager()
|
||||
# Store log entry (createLog emits when event_manager is provided)
|
||||
interfaceDbChat.createLog({
|
||||
"id": f"log_{uuid.uuid4()}",
|
||||
"workflowId": workflowId,
|
||||
|
|
@ -473,13 +477,12 @@ async def stop_chatbot(
|
|||
"status": "stopped",
|
||||
"timestamp": getUtcTimestamp(),
|
||||
"roundNumber": workflow.currentRound if workflow else 1
|
||||
})
|
||||
}, event_manager=event_manager)
|
||||
|
||||
# Reload workflow to return updated version
|
||||
workflow = interfaceDbChat.getWorkflow(workflowId)
|
||||
|
||||
# Emit stopped event to active streams
|
||||
event_manager = get_event_manager()
|
||||
await event_manager.emit_event(
|
||||
context_id=workflowId,
|
||||
event_type="stopped",
|
||||
|
|
|
|||
|
|
@ -20,7 +20,6 @@ from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, Operati
|
|||
from modules.datamodels.datamodelDocref import DocumentReferenceList, DocumentItemReference
|
||||
from modules.shared.timeUtils import getUtcTimestamp, parseTimestamp
|
||||
from modules.services import getInterface as getServices
|
||||
from modules.features.chatbot.streaming.events import get_event_manager
|
||||
from modules.features.chatbot.chatbot import Chatbot
|
||||
from modules.features.chatbot.bridges.ai import AICenterChatModel, clear_workflow_allowed_providers
|
||||
from modules.features.chatbot.bridges.memory import DatabaseCheckpointer
|
||||
|
|
@ -69,7 +68,8 @@ async def chatProcess(
|
|||
mandateId: Optional[str],
|
||||
userInput: UserInputRequest,
|
||||
workflowId: Optional[str] = None,
|
||||
featureInstanceId: Optional[str] = None
|
||||
featureInstanceId: Optional[str] = None,
|
||||
event_manager=None # Required when called from streaming route
|
||||
) -> ChatbotConversation:
|
||||
"""
|
||||
Simple chatbot processing - analyze user input and generate queries.
|
||||
|
|
@ -104,10 +104,7 @@ async def chatProcess(
|
|||
from modules.features.chatbot.interfaceFeatureChatbot import getInterface as getChatbotInterface
|
||||
interfaceDbChat = getChatbotInterface(currentUser, mandateId=mandateId, featureInstanceId=featureInstanceId)
|
||||
|
||||
# Get event manager and create queue if needed
|
||||
event_manager = get_event_manager()
|
||||
|
||||
# Create or load workflow
|
||||
# Create or load workflow (event_manager passed from route)
|
||||
if workflowId:
|
||||
workflow = interfaceDbChat.getWorkflow(workflowId)
|
||||
if not workflow:
|
||||
|
|
@ -221,22 +218,11 @@ async def chatProcess(
|
|||
}
|
||||
if user_documents:
|
||||
userMessageData["documents"] = [d.model_dump() for d in user_documents]
|
||||
userMessage = interfaceDbChat.createMessage(userMessageData)
|
||||
# Don't pass event_manager: event_stream sends initial chatData from DB (includes user msg).
|
||||
# Emitting here would duplicate it (initial chatData + queue event).
|
||||
userMessage = interfaceDbChat.createMessage(userMessageData, event_manager=None)
|
||||
logger.info(f"Stored user message: {userMessage.id} with {len(user_documents)} document(s)")
|
||||
|
||||
# Emit message event for streaming (exact chatData format)
|
||||
message_timestamp = parseTimestamp(userMessage.publishedAt, default=getUtcTimestamp())
|
||||
await event_manager.emit_event(
|
||||
context_id=workflow.id,
|
||||
event_type="chatdata",
|
||||
data={
|
||||
"type": "message",
|
||||
"createdAt": message_timestamp,
|
||||
"item": userMessage.model_dump()
|
||||
},
|
||||
event_category="chat"
|
||||
)
|
||||
|
||||
# Update workflow status
|
||||
interfaceDbChat.updateWorkflow(workflow.id, {
|
||||
"status": "running",
|
||||
|
|
@ -255,7 +241,8 @@ async def chatProcess(
|
|||
userInput,
|
||||
userMessage.id,
|
||||
featureInstanceId=featureInstanceId,
|
||||
config=chatbot_config
|
||||
config=chatbot_config,
|
||||
event_manager=event_manager
|
||||
))
|
||||
|
||||
# Reload workflow to include new message
|
||||
|
|
@ -407,34 +394,8 @@ async def _emit_log_and_event(
|
|||
"status": status,
|
||||
"roundNumber": round_number
|
||||
}
|
||||
# Store log in database
|
||||
created_log = interfaceDbChat.createLog(log_data)
|
||||
|
||||
# Emit event directly for streaming (using correct signature)
|
||||
if created_log and event_manager:
|
||||
try:
|
||||
# Convert to dict if it's a Pydantic model (ChatbotLog from createLog)
|
||||
if hasattr(created_log, "model_dump"):
|
||||
log_dict = created_log.model_dump()
|
||||
elif hasattr(created_log, "dict"):
|
||||
log_dict = created_log.dict()
|
||||
else:
|
||||
log_dict = log_data
|
||||
|
||||
await event_manager.emit_event(
|
||||
context_id=workflowId,
|
||||
event_type="chatdata",
|
||||
data={
|
||||
"type": "log",
|
||||
"createdAt": log_timestamp,
|
||||
"item": log_dict
|
||||
},
|
||||
event_category="chat",
|
||||
message="New log",
|
||||
step="log"
|
||||
)
|
||||
except Exception as emit_error:
|
||||
logger.warning(f"Error emitting log event: {emit_error}")
|
||||
# Store log in database (createLog emits when event_manager is provided)
|
||||
created_log = interfaceDbChat.createLog(log_data, event_manager=event_manager)
|
||||
except Exception as e:
|
||||
logger.error(f"Error storing log: {e}", exc_info=True)
|
||||
|
||||
|
|
@ -1019,7 +980,7 @@ async def _bridge_chatbot_events(
|
|||
}
|
||||
|
||||
try:
|
||||
assistant_msg = interface_db_chat.createMessage(message_data)
|
||||
assistant_msg = interface_db_chat.createMessage(message_data, event_manager=event_manager)
|
||||
final_message_stored = True
|
||||
|
||||
# Emit message event
|
||||
|
|
@ -1104,7 +1065,7 @@ async def _bridge_chatbot_events(
|
|||
"success": False
|
||||
}
|
||||
|
||||
error_msg = interface_db_chat.createMessage(error_message_data)
|
||||
error_msg = interface_db_chat.createMessage(error_message_data, event_manager=event_manager)
|
||||
|
||||
# Emit message event
|
||||
message_timestamp = parseTimestamp(error_msg.publishedAt, default=getUtcTimestamp())
|
||||
|
|
@ -1272,7 +1233,8 @@ async def _processChatbotMessageLangGraph(
|
|||
userInput: UserInputRequest,
|
||||
userMessageId: str,
|
||||
featureInstanceId: Optional[str] = None,
|
||||
config: Optional[ChatbotConfig] = None
|
||||
config: Optional[ChatbotConfig] = None,
|
||||
event_manager=None
|
||||
):
|
||||
"""
|
||||
Process chatbot message using LangGraph.
|
||||
|
|
@ -1285,9 +1247,8 @@ async def _processChatbotMessageLangGraph(
|
|||
userInput: User input request
|
||||
userMessageId: User message ID
|
||||
featureInstanceId: Optional feature instance ID for loading instance-specific config
|
||||
event_manager: Event manager for streaming (passed from chatProcess)
|
||||
"""
|
||||
event_manager = get_event_manager()
|
||||
|
||||
try:
|
||||
from modules.features.chatbot.interfaceFeatureChatbot import getInterface as getChatbotInterface
|
||||
interfaceDbChat = getChatbotInterface(currentUser, mandateId=services.mandateId, featureInstanceId=featureInstanceId)
|
||||
|
|
@ -1376,7 +1337,8 @@ async def _processChatbotMessageLangGraph(
|
|||
memory=memory,
|
||||
system_prompt=system_prompt,
|
||||
workflow_id=workflowId,
|
||||
config=config
|
||||
config=config,
|
||||
event_manager=event_manager
|
||||
)
|
||||
|
||||
# Emit synthetic status for real-time UI feedback
|
||||
|
|
@ -1436,7 +1398,7 @@ async def _processChatbotMessageLangGraph(
|
|||
"taskNumber": 0,
|
||||
"actionNumber": 0
|
||||
}
|
||||
errorMessage = interfaceDbChat.createMessage(errorMessageData)
|
||||
errorMessage = interfaceDbChat.createMessage(errorMessageData, event_manager=event_manager)
|
||||
|
||||
# Emit message event
|
||||
message_timestamp = parseTimestamp(errorMessage.publishedAt, default=getUtcTimestamp())
|
||||
|
|
|
|||
|
|
@ -1,3 +0,0 @@
|
|||
# Copyright (c) 2025 Patrick Motsch
|
||||
# All rights reserved.
|
||||
"""Streaming infrastructure for chatbot events."""
|
||||
|
|
@ -17,7 +17,7 @@ from modules.auth import limiter, getRequestContext, RequestContext
|
|||
from modules.interfaces import interfaceDbChat, interfaceDbManagement
|
||||
from modules.interfaces.interfaceAiObjects import AiObjects
|
||||
from modules.datamodels.datamodelChat import UserInputRequest
|
||||
from modules.features.chatbot.streaming.events import get_event_manager
|
||||
from modules.services.serviceStreaming import get_event_manager
|
||||
from modules.features.codeeditor import codeEditorProcessor, fileContextManager
|
||||
from modules.features.codeeditor.datamodelCodeeditor import FileEditProposal, EditStatusEnum
|
||||
|
||||
|
|
|
|||
|
|
@ -1003,7 +1003,7 @@ class ChatObjects:
|
|||
totalPages=totalPages
|
||||
)
|
||||
|
||||
def createMessage(self, messageData: Dict[str, Any]) -> ChatMessage:
|
||||
def createMessage(self, messageData: Dict[str, Any], event_manager=None) -> ChatMessage:
|
||||
"""Creates a message for a workflow if user has access."""
|
||||
try:
|
||||
# Ensure ID is present
|
||||
|
|
@ -1121,10 +1121,9 @@ class ChatObjects:
|
|||
actionName=createdMessage.get("actionName")
|
||||
)
|
||||
|
||||
# Emit message event for streaming (if event manager is available)
|
||||
# Emit message event for streaming (if event manager is provided)
|
||||
if event_manager:
|
||||
try:
|
||||
from modules.features.chatbot.streaming.events import get_event_manager
|
||||
event_manager = get_event_manager()
|
||||
message_timestamp = parseTimestamp(chat_message.publishedAt, default=getUtcTimestamp())
|
||||
# Emit message event in exact chatData format: {type, createdAt, item}
|
||||
asyncio.create_task(event_manager.emit_event(
|
||||
|
|
@ -1138,7 +1137,6 @@ class ChatObjects:
|
|||
event_category="chat"
|
||||
))
|
||||
except Exception as e:
|
||||
# Event manager not available or error - continue without emitting
|
||||
logger.debug(f"Could not emit message event: {e}")
|
||||
|
||||
# Debug: Store message and documents for debugging - only if debug enabled
|
||||
|
|
@ -1447,7 +1445,7 @@ class ChatObjects:
|
|||
totalPages=totalPages
|
||||
)
|
||||
|
||||
def createLog(self, logData: Dict[str, Any]) -> ChatLog:
|
||||
def createLog(self, logData: Dict[str, Any], event_manager=None) -> ChatLog:
|
||||
"""Creates a log entry for a workflow if user has access."""
|
||||
# Check workflow access
|
||||
workflowId = logData.get("workflowId")
|
||||
|
|
@ -1498,10 +1496,9 @@ class ChatObjects:
|
|||
# Create log in normalized table
|
||||
createdLog = self.db.recordCreate(ChatLog, log_model)
|
||||
|
||||
# Emit log event for streaming (if event manager is available)
|
||||
# Emit log event for streaming (if event manager is provided)
|
||||
if event_manager:
|
||||
try:
|
||||
from modules.features.chatbot.streaming.events import get_event_manager
|
||||
event_manager = get_event_manager()
|
||||
log_timestamp = parseTimestamp(createdLog.get("timestamp"), default=getUtcTimestamp())
|
||||
# Emit log event in exact chatData format: {type, createdAt, item}
|
||||
asyncio.create_task(event_manager.emit_event(
|
||||
|
|
@ -1515,7 +1512,6 @@ class ChatObjects:
|
|||
event_category="chat"
|
||||
))
|
||||
except Exception as e:
|
||||
# Event manager not available or error - continue without emitting
|
||||
logger.debug(f"Could not emit log event: {e}")
|
||||
|
||||
# Return validated ChatLog instance
|
||||
|
|
|
|||
|
|
@ -107,6 +107,9 @@ class Services:
|
|||
from .serviceMessaging.mainServiceMessaging import MessagingService
|
||||
self.messaging = PublicService(MessagingService(self))
|
||||
|
||||
from .serviceStreaming.mainServiceStreaming import StreamingService
|
||||
self.streaming = PublicService(StreamingService(self))
|
||||
|
||||
# ============================================================
|
||||
# AI SERVICES (from modules/services/)
|
||||
# ============================================================
|
||||
|
|
|
|||
8
modules/services/serviceStreaming/__init__.py
Normal file
8
modules/services/serviceStreaming/__init__.py
Normal file
|
|
@ -0,0 +1,8 @@
|
|||
# Copyright (c) 2025 Patrick Motsch
|
||||
# All rights reserved.
|
||||
"""Streaming service for SSE event management across features."""
|
||||
|
||||
from .eventManager import EventManager, get_event_manager
|
||||
from .helpers import ChatStreamingHelper
|
||||
|
||||
__all__ = ["EventManager", "get_event_manager", "ChatStreamingHelper"]
|
||||
|
|
@ -1,21 +1,20 @@
|
|||
# Copyright (c) 2025 Patrick Motsch
|
||||
# All rights reserved.
|
||||
"""
|
||||
Event manager for chatbot streaming.
|
||||
Manages event queues for Server-Sent Events (SSE) streaming.
|
||||
Event manager for SSE streaming.
|
||||
Manages event queues for Server-Sent Events (SSE) streaming across features.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import asyncio
|
||||
from typing import Dict, Optional, Any
|
||||
from collections import defaultdict
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class EventManager:
|
||||
"""
|
||||
Manages event queues for chatbot streaming.
|
||||
Manages event queues for SSE streaming.
|
||||
Each workflow has its own async queue for events.
|
||||
"""
|
||||
|
||||
36
modules/services/serviceStreaming/mainServiceStreaming.py
Normal file
36
modules/services/serviceStreaming/mainServiceStreaming.py
Normal file
|
|
@ -0,0 +1,36 @@
|
|||
# Copyright (c) 2025 Patrick Motsch
|
||||
# All rights reserved.
|
||||
"""
|
||||
Streaming service for SSE event management.
|
||||
Provides access to the global event manager for workflow streaming.
|
||||
"""
|
||||
|
||||
import logging
|
||||
from typing import Any
|
||||
|
||||
from modules.services.serviceStreaming.eventManager import EventManager, get_event_manager
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class StreamingService:
|
||||
"""
|
||||
Streaming service providing access to SSE event infrastructure.
|
||||
"""
|
||||
|
||||
def __init__(self, services: Any):
|
||||
"""Initialize streaming service with service center access.
|
||||
|
||||
Args:
|
||||
services: Service center instance providing access to interfaces
|
||||
"""
|
||||
self.services = services
|
||||
|
||||
def getEventManager(self) -> EventManager:
|
||||
"""
|
||||
Get the global event manager instance for SSE streaming.
|
||||
|
||||
Returns:
|
||||
EventManager instance
|
||||
"""
|
||||
return get_event_manager()
|
||||
Loading…
Reference in a new issue