gateway/modules/features/chatBot/service.py
2025-10-06 15:05:22 +02:00

216 lines
7.5 KiB
Python

"""Service layer for chatbot functionality."""
import json
import logging
from typing import AsyncIterator, List
from modules.features.chatBot.domain.chatbot import Chatbot, get_langchain_model
from modules.features.chatBot.utils.checkpointer import get_checkpointer
from modules.features.chatBot.utils.toolRegistry import get_registry
from modules.features.chatBot.utils import permissions
from modules.datamodels.datamodelChatbot import MessageItem, ChatMessageResponse
from modules.datamodels.datamodelUam import User
from langchain_core.messages import HumanMessage, AIMessage
from modules.shared.configuration import APP_CONFIG
logger = logging.getLogger(__name__)
async def post_message(
*,
thread_id: str,
message: str,
user: User,
) -> ChatMessageResponse:
"""Post a chat message to the chatbot and return the response.
Args:
thread_id: The unique identifier for the chat thread.
message: The content of the chat message.
user: The current user.
Returns:
The response containing the full chat message history and thread ID.
"""
logger.info(f"User {user.id} posted message to thread {thread_id}")
# Get user permissions
tool_ids = permissions.get_chatbot_tools(user_id=user.id)
if not tool_ids:
raise ValueError("User does not have permission to use any chatbot tools")
model_name = permissions.get_chatbot_model(user_id=user.id)
system_prompt = permissions.get_system_prompt(user_id=user.id)
# Get tools from registry
registry = get_registry()
tools = registry.get_tool_instances(tool_ids=tool_ids)
# Get model and checkpointer
model = get_langchain_model(model_name=model_name)
checkpointer = get_checkpointer()
# Get context window size from config
context_window_size = int(
APP_CONFIG.get("CHATBOT_CONTEXT_WINDOW_TOKEN_SIZE", 100000)
)
# Create chatbot instance
chatbot = await Chatbot.create(
model=model,
memory=checkpointer,
system_prompt=system_prompt,
tools=tools,
context_window_size=context_window_size,
)
# Send message to chatbot
response = await chatbot.chat(message=message, chat_id=thread_id)
# Parse the response to the correct format
messages = []
for msg in response:
# Determine the role of the message
if isinstance(msg, HumanMessage):
role = "user"
elif isinstance(msg, AIMessage):
role = "assistant"
else:
continue # Skip any other message types
# Skip messages that are structured content, such as tool calls
if not isinstance(msg.content, str):
continue
# Append message to chat history
item = MessageItem(
role=role,
content=msg.content.strip(),
timestamp=0.0, # TODO: Add proper timestamp handling
)
messages.append(item)
return ChatMessageResponse(thread_id=thread_id, messages=messages)
async def post_message_stream(
*,
thread_id: str,
message: str,
user: User,
) -> AsyncIterator[str]:
"""Post a chat message to the chatbot and stream progress updates (SSE).
Args:
thread_id: The unique identifier for the chat thread.
message: The content of the chat message.
user: The current user.
Yields:
Server-Sent Events formatted strings containing status updates and final response.
"""
logger.info(f"User {user.id} streaming message to thread {thread_id}")
try:
# Get user permissions
tool_ids = permissions.get_chatbot_tools(user_id=user.id)
if not tool_ids:
yield (
"data: "
+ json.dumps(
{
"type": "error",
"message": "User does not have permission to use any chatbot tools",
}
)
+ "\n\n"
)
return
model_name = permissions.get_chatbot_model(user_id=user.id)
system_prompt = permissions.get_system_prompt(user_id=user.id)
# Get tools from registry
registry = get_registry()
tools = registry.get_tool_instances(tool_ids=tool_ids)
# Get model and checkpointer
model = get_langchain_model(model_name=model_name)
checkpointer = get_checkpointer()
# Get context window size from config
context_window_size = int(
APP_CONFIG.get("CHATBOT_CONTEXT_WINDOW_TOKEN_SIZE", 100000)
)
# Create chatbot instance
chatbot = await Chatbot.create(
model=model,
memory=checkpointer,
system_prompt=system_prompt,
tools=tools,
context_window_size=context_window_size,
)
# Stream events from chatbot
async for event in chatbot.stream_events(message=message, chat_id=thread_id):
etype = event.get("type")
# Forward status updates
if etype == "status":
yield f"data: {json.dumps({'type': 'status', 'label': event.get('label')})}\n\n"
continue
# Forward final response
if etype == "final":
response_from_event = event.get("response") or {}
# Use the chat history from the final event (already normalized by stream_events)
chat_history_payload = response_from_event.get("chat_history", [])
if isinstance(chat_history_payload, list):
# Convert to MessageItem format
items: List[MessageItem] = []
for it in chat_history_payload:
role = it.get("role")
content = it.get("content", "")
if role in ("user", "assistant") and content:
items.append(
MessageItem(
role=role,
content=content,
timestamp=0.0, # TODO: Add proper timestamp handling
)
)
response = ChatMessageResponse(thread_id=thread_id, messages=items)
# Yield the final response and exit
yield f"data: {json.dumps({'type': 'final', 'response': response.model_dump()})}\n\n"
return
else:
# Unexpected payload format - log warning and return empty history
logger.warning(
f"Unexpected chat_history format in final event: {type(chat_history_payload)}"
)
response = ChatMessageResponse(thread_id=thread_id, messages=[])
yield f"data: {json.dumps({'type': 'final', 'response': response.model_dump()})}\n\n"
return
# Forward error events
if etype == "error":
yield f"data: {json.dumps(event)}\n\n"
return
except Exception as e:
error_msg = f"{type(e).__name__}: {str(e) or 'No error message provided'}"
logger.error(f"Error in streaming chat: {error_msg}", exc_info=True)
yield (
"data: "
+ json.dumps(
{
"type": "error",
"message": f"An error occurred while processing your request: {error_msg}",
}
)
+ "\n\n"
)