feat: langgraph as state manager

This commit is contained in:
Ida Dittrich 2026-01-19 17:44:58 +01:00
parent 8147f3f7c8
commit c4d195464c
22 changed files with 4024 additions and 1934 deletions

View file

@ -34,4 +34,19 @@ Web_Crawl_RETRY_DELAY = 2
# Web Research configuration
Web_Research_MAX_DEPTH = 2
Web_Research_MAX_LINKS_PER_DOMAIN = 4
Web_Research_CRAWL_TIMEOUT_MINUTES = 10
<<<<<<< Updated upstream
Web_Research_CRAWL_TIMEOUT_MINUTES = 10
=======
Web_Research_CRAWL_TIMEOUT_MINUTES = 10
# STAC API Connector configuration (Swiss Topo)
Connector_StacSwisstopo_BASE_URL = https://data.geo.admin.ch/api/stac/v1
Connector_StacSwisstopo_TIMEOUT = 30
Connector_StacSwisstopo_MAX_RETRIES = 3
Connector_StacSwisstopo_RETRY_DELAY = 1.0
Connector_StacSwisstopo_ENABLE_CACHE = True
# Tavily AI Connector configuration (Web Search & Research)
# Get your API key from https://tavily.com
# Connector_AiTavily_API_SECRET = your_tavily_api_key_here
>>>>>>> Stashed changes

View file

@ -1,7 +1,9 @@
# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Chatbot feature - LangGraph-based chatbot implementation.
"""
from .mainChatbot import chatProcess
from .service import chatProcess
__all__ = ['chatProcess']

View file

@ -0,0 +1,3 @@
# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""Bridges to external systems (AI models, database, tools)."""

View file

@ -0,0 +1,547 @@
# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
AI Center to LangChain bridge.
Implements LangChain BaseChatModel interface using AI center models.
"""
import logging
import asyncio
from typing import Any, AsyncIterator, Dict, List, Optional
from datetime import datetime
from langchain_core.language_models.chat_models import BaseChatModel
from langchain_core.messages import (
BaseMessage,
HumanMessage,
SystemMessage,
AIMessage,
ToolMessage,
convert_to_openai_messages,
)
from langchain_core.outputs import ChatGeneration, ChatResult
from langchain_core.runnables import RunnableConfig
from modules.aicore.aicoreModelRegistry import modelRegistry
from modules.aicore.aicoreModelSelector import modelSelector
from modules.datamodels.datamodelAi import (
AiModel,
AiModelCall,
AiModelResponse,
AiCallOptions,
OperationTypeEnum,
ProcessingModeEnum,
)
from modules.datamodels.datamodelUam import User
logger = logging.getLogger(__name__)
class AICenterChatModel(BaseChatModel):
"""
LangChain-compatible chat model that uses AI center models.
Bridges AI center model selection and calling to LangChain's BaseChatModel interface.
"""
def __init__(
self,
user: User,
operation_type: OperationTypeEnum = OperationTypeEnum.DATA_ANALYSE,
processing_mode: ProcessingModeEnum = ProcessingModeEnum.DETAILED,
**kwargs
):
"""
Initialize the AI center chat model bridge.
Args:
user: Current user for RBAC and model selection
operation_type: Operation type for model selection
processing_mode: Processing mode for model selection
**kwargs: Additional arguments passed to BaseChatModel
"""
super().__init__(**kwargs)
# Use object.__setattr__ to bypass Pydantic validation for custom attributes
object.__setattr__(self, "user", user)
object.__setattr__(self, "operation_type", operation_type)
object.__setattr__(self, "processing_mode", processing_mode)
object.__setattr__(self, "_selected_model", None)
@property
def _llm_type(self) -> str:
"""Return type of LLM."""
return "aicenter"
def _select_model(self, messages: List[BaseMessage]) -> AiModel:
"""
Select the best AI center model for the given messages.
Args:
messages: List of LangChain messages
Returns:
Selected AI model
"""
# Convert messages to prompt/context format for model selector
prompt_parts = []
context_parts = []
for msg in messages:
if isinstance(msg, SystemMessage):
prompt_parts.append(msg.content)
elif isinstance(msg, HumanMessage):
prompt_parts.append(msg.content)
elif isinstance(msg, AIMessage):
context_parts.append(msg.content)
elif isinstance(msg, ToolMessage):
context_parts.append(f"Tool {msg.name}: {msg.content}")
prompt = "\n".join(prompt_parts)
context = "\n".join(context_parts) if context_parts else ""
# Get available models with RBAC filtering
from modules.security.rbac import RbacClass
from modules.security.rootAccess import getRootDbAppConnector
from modules.connectors.connectorDbPostgre import DatabaseConnector
from modules.shared.configuration import APP_CONFIG
# Get database connectors for RBAC
# Create a database connector instance for RBAC with proper configuration
dbHost = APP_CONFIG.get("DB_MANAGEMENT_HOST")
dbDatabase = APP_CONFIG.get("DB_MANAGEMENT_DATABASE", "management")
dbUser = APP_CONFIG.get("DB_MANAGEMENT_USER")
dbPassword = APP_CONFIG.get("DB_MANAGEMENT_PASSWORD_SECRET")
dbPort = int(APP_CONFIG.get("DB_MANAGEMENT_PORT"))
db = DatabaseConnector(
dbHost=dbHost,
dbDatabase=dbDatabase,
dbUser=dbUser,
dbPassword=dbPassword,
dbPort=dbPort,
userId=self.user.id if hasattr(self.user, 'id') else None
)
dbApp = getRootDbAppConnector()
rbac_instance = RbacClass(db, dbApp=dbApp)
available_models = modelRegistry.getAvailableModels(
currentUser=self.user,
rbacInstance=rbac_instance
)
# Create options for model selector
options = AiCallOptions(
operationType=self.operation_type,
processingMode=self.processing_mode
)
# Select model
selected_model = modelSelector.selectModel(
prompt=prompt,
context=context,
options=options,
availableModels=available_models
)
if not selected_model:
raise ValueError(f"No suitable model found for operation type {self.operation_type.value}")
logger.info(f"Selected AI center model: {selected_model.displayName} ({selected_model.name})")
object.__setattr__(self, "_selected_model", selected_model)
return selected_model
def _convert_messages_to_ai_format(self, messages: List[BaseMessage]) -> List[Dict[str, Any]]:
"""
Convert LangChain messages to AI center format (OpenAI-style).
Args:
messages: List of LangChain messages
Returns:
List of messages in OpenAI format
"""
# Use LangChain's built-in conversion
openai_messages = convert_to_openai_messages(messages)
return openai_messages
def _convert_ai_response_to_langchain(
self,
response: AiModelResponse,
tool_calls: Optional[List[Dict[str, Any]]] = None
) -> AIMessage:
"""
Convert AI center response to LangChain AIMessage.
Args:
response: AI center response
tool_calls: Optional tool calls from the response (format: [{"id": "...", "name": "...", "args": {...}}])
Returns:
LangChain AIMessage with tool_calls if present
"""
# LangChain expects tool_calls in format: [{"id": "...", "name": "...", "args": {...}}]
# The tool_calls parameter should already be in this format
kwargs = {}
if tool_calls:
kwargs["tool_calls"] = tool_calls
return AIMessage(content=response.content or "", **kwargs)
def _generate(
self,
messages: List[BaseMessage],
stop: Optional[List[str]] = None,
run_manager: Optional[Any] = None,
**kwargs: Any,
) -> ChatResult:
"""
Synchronous generate method required by BaseChatModel.
Wraps the async _agenerate method.
Args:
messages: List of LangChain messages
stop: Optional stop sequences
run_manager: Optional callback manager
**kwargs: Additional arguments
Returns:
ChatResult with generations
"""
# Try to get the current event loop
try:
loop = asyncio.get_event_loop()
if loop.is_running():
# If we're in an async context, raise an error
raise RuntimeError(
"AICenterChatModel._generate() called from async context. "
"Use _agenerate() instead."
)
except RuntimeError:
# No event loop, we can create one
pass
# Run the async method synchronously
return asyncio.run(self._agenerate(messages, stop=stop, run_manager=run_manager, **kwargs))
async def _agenerate(
self,
messages: List[BaseMessage],
stop: Optional[List[str]] = None,
run_manager: Optional[Any] = None,
**kwargs: Any,
) -> ChatResult:
"""
Async generate method required by BaseChatModel.
Args:
messages: List of LangChain messages
stop: Optional stop sequences
run_manager: Optional callback manager
**kwargs: Additional arguments (may include tools for tool calling)
Returns:
ChatResult with generations
"""
# Select model if not already selected
if not self._selected_model:
self._select_model(messages)
# Check if tools are bound (for tool calling)
tools = getattr(self, "_bound_tools", None)
# Convert messages to AI center format
ai_messages = self._convert_messages_to_ai_format(messages)
# If tools are bound, add tool definitions to the system message
# This ensures the model knows about available tools
# Some models need explicit tool definitions to enable tool calling
if tools:
# Find or create system message
system_message_idx = None
for i, msg in enumerate(ai_messages):
if msg.get("role") == "system":
system_message_idx = i
break
# Build tool descriptions for the system message
tool_descriptions = []
for tool in tools:
if hasattr(tool, "name") and hasattr(tool, "description"):
# Get tool parameters for better description
args_schema = getattr(tool, "args_schema", None)
params_info = ""
if args_schema:
try:
if hasattr(args_schema, "model_json_schema"):
schema = args_schema.model_json_schema()
if "properties" in schema:
params = list(schema["properties"].keys())
params_info = f" (Parameter: {', '.join(params)})"
except:
pass
tool_descriptions.append(f"- {tool.name}: {tool.description}{params_info}")
if tool_descriptions:
tools_text = "\n".join(tool_descriptions)
tools_note = f"\n\n⚠️⚠️⚠️ KRITISCH - TOOL-NUTZUNG ⚠️⚠️⚠️\n\nVERFÜGBARE TOOLS:\n{tools_text}\n\nABSOLUT VERBINDLICH:\n- Du MUSST diese Tools verwenden, um Anfragen zu bearbeiten!\n- Für Status-Updates MUSST du IMMER das Tool 'send_streaming_message' verwenden!\n- VERBOTEN: Normale Text-Nachrichten für Status-Updates!\n- Du MUSST Tools aufrufen, nicht nur darüber sprechen!\n\nBeispiel FALSCH: \"Ich werde die Datenbank durchsuchen...\"\nBeispiel RICHTIG: Rufe das Tool 'send_streaming_message' mit \"Durchsuche Datenbank...\" auf!"
if system_message_idx is not None:
# Append to existing system message
ai_messages[system_message_idx]["content"] += tools_note
else:
# Add new system message at the beginning
ai_messages.insert(0, {
"role": "system",
"content": tools_note.strip()
})
# Convert LangChain tools to OpenAI tool format for potential use
# Note: The actual tool calling is handled by the connector if it supports it
# This conversion is kept for potential future use or connector support
openai_tools = None
if tools and self._selected_model.connectorType == "openai":
# Convert LangChain tools to OpenAI tool format
openai_tools = []
for tool in tools:
if hasattr(tool, "name") and hasattr(tool, "description"):
# Get tool parameters schema
args_schema = getattr(tool, "args_schema", None)
parameters = {}
if args_schema:
# Check if it's a Pydantic model class or instance
from pydantic import BaseModel
# Check if it's a class (not an instance)
if isinstance(args_schema, type) and issubclass(args_schema, BaseModel):
# It's a Pydantic model class - get JSON schema
if hasattr(args_schema, "model_json_schema"):
# Pydantic v2
parameters = args_schema.model_json_schema()
elif hasattr(args_schema, "schema"):
# Pydantic v1
parameters = args_schema.schema()
elif isinstance(args_schema, BaseModel):
# It's a Pydantic model instance
if hasattr(args_schema, "model_dump"):
# Pydantic v2
parameters = args_schema.model_dump()
elif hasattr(args_schema, "dict"):
# Pydantic v1
parameters = args_schema.dict()
elif hasattr(args_schema, "schema"):
# Has schema method (might be a class)
try:
parameters = args_schema.schema()
except TypeError:
# If schema() requires instance, try model_json_schema
if hasattr(args_schema, "model_json_schema"):
parameters = args_schema.model_json_schema()
else:
parameters = {}
elif isinstance(args_schema, dict):
# Already a dict
parameters = args_schema
tool_schema = {
"type": "function",
"function": {
"name": tool.name,
"description": tool.description or "",
"parameters": parameters
}
}
openai_tools.append(tool_schema)
# Store tools for potential use by connector
# Note: The connector may need to access tools from the model_call
# This is a workaround since AiModelCall doesn't have a tools field
# Tools are added to system message above to ensure model knows about them
# Create model call
model_call = AiModelCall(
messages=ai_messages,
model=self._selected_model,
options=AiCallOptions(
operationType=self.operation_type,
processingMode=self.processing_mode,
temperature=self._selected_model.temperature
)
)
# If tools are bound and this is an OpenAI model, we need to call the API directly
# with tools included, since the connector interface doesn't support tools
if openai_tools and self._selected_model.connectorType == "openai":
# Call OpenAI API directly with tools (like legacy ChatAnthropic does)
import httpx
from modules.shared.configuration import APP_CONFIG
api_key = APP_CONFIG.get('Connector_AiOpenai_API_SECRET')
if not api_key:
raise ValueError("OpenAI API key not configured")
payload = {
"model": self._selected_model.name,
"messages": ai_messages,
"tools": openai_tools,
"tool_choice": "auto", # Let model decide when to use tools
"temperature": self._selected_model.temperature,
"max_tokens": self._selected_model.maxTokens
}
async with httpx.AsyncClient(timeout=600.0) as client:
response_obj = await client.post(
self._selected_model.apiUrl,
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
},
json=payload
)
if response_obj.status_code != 200:
error_msg = f"OpenAI API error: {response_obj.status_code} - {response_obj.text}"
logger.error(error_msg)
raise ValueError(error_msg)
response_json = response_obj.json()
choice = response_json["choices"][0]
message = choice["message"]
# Extract content and tool calls
content = message.get("content", "")
tool_calls_raw = message.get("tool_calls")
# Convert OpenAI tool_calls format to LangChain format
# LangChain expects: [{"id": "...", "name": "...", "args": {...}}]
tool_calls = None
if tool_calls_raw:
tool_calls = []
for tc in tool_calls_raw:
func_data = tc.get("function", {})
func_name = func_data.get("name")
func_args_str = func_data.get("arguments", "{}")
# Parse JSON arguments string to dict
import json
try:
func_args = json.loads(func_args_str) if isinstance(func_args_str, str) else func_args_str
except:
func_args = {}
tool_calls.append({
"id": tc.get("id"),
"name": func_name,
"args": func_args
})
# Create response object
response = AiModelResponse(
content=content or "",
success=True,
modelId=self._selected_model.name,
metadata={
"response_id": response_json.get("id", ""),
"tool_calls": tool_calls
}
)
else:
# No tools or not OpenAI - use connector normally
if not self._selected_model.functionCall:
raise ValueError(f"Model {self._selected_model.displayName} has no functionCall defined")
response: AiModelResponse = await self._selected_model.functionCall(model_call)
if not response.success:
raise ValueError(f"AI model call failed: {response.error or 'Unknown error'}")
# Extract tool calls from response metadata if present
tool_calls = None
if response.metadata:
# Check for tool calls in metadata (format may vary by connector)
tool_calls = response.metadata.get("tool_calls") or response.metadata.get("function_calls")
# Convert response to LangChain format with tool calls
ai_message = self._convert_ai_response_to_langchain(response, tool_calls=tool_calls)
# Create generation and result
generation = ChatGeneration(message=ai_message)
return ChatResult(generations=[generation])
def bind_tools(self, tools: List[Any], **kwargs: Any) -> "AICenterChatModel":
"""
Bind tools to the model (required for LangGraph tool calling).
Args:
tools: List of LangChain tools
**kwargs: Additional arguments
Returns:
New instance with tools bound
"""
# Create a new instance with tools bound
# Note: The actual tool binding happens in LangGraph's ToolNode
# This method is called by LangGraph to prepare the model
bound_model = AICenterChatModel(
user=self.user,
operation_type=self.operation_type,
processing_mode=self.processing_mode
)
object.__setattr__(bound_model, "_selected_model", self._selected_model)
# Store tools for potential use in message conversion
object.__setattr__(bound_model, "_bound_tools", tools)
return bound_model
def invoke(
self,
input: List[BaseMessage],
config: Optional[RunnableConfig] = None,
**kwargs: Any,
) -> BaseMessage:
"""
Synchronous invoke method (required by BaseChatModel).
Note: This is a wrapper around async _agenerate.
Args:
input: List of LangChain messages
config: Optional runnable config
**kwargs: Additional arguments
Returns:
AIMessage response
"""
import asyncio
# Try to get existing event loop
try:
loop = asyncio.get_event_loop()
if loop.is_running():
# If loop is running, we need to use a different approach
# This shouldn't happen in LangGraph context, but handle it gracefully
raise RuntimeError("Cannot use synchronous invoke in async context. Use ainvoke instead.")
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# Run async generation
result = loop.run_until_complete(self._agenerate(input, **kwargs))
return result.generations[0].message
async def ainvoke(
self,
input: List[BaseMessage],
config: Optional[RunnableConfig] = None,
**kwargs: Any,
) -> BaseMessage:
"""
Async invoke method (required by BaseChatModel).
Args:
input: List of LangChain messages
config: Optional runnable config
**kwargs: Additional arguments
Returns:
AIMessage response
"""
result = await self._agenerate(input, **kwargs)
return result.generations[0].message

View file

@ -0,0 +1,432 @@
# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Custom LangGraph checkpointer using existing database interface.
Maps LangGraph state to existing message storage format.
"""
import logging
import uuid
from typing import Any, Dict, List, Optional, Tuple, NamedTuple
from datetime import datetime
from langgraph.checkpoint.base import BaseCheckpointSaver, Checkpoint, CheckpointMetadata
# CheckpointTuple might not be directly importable, so we define it as a NamedTuple
# Based on LangGraph's usage, it needs config, checkpoint, metadata, parent_config, and pending_writes
class CheckpointTuple(NamedTuple):
"""Tuple containing config, checkpoint, metadata, parent_config, and pending_writes."""
config: Dict[str, Any]
checkpoint: Checkpoint
metadata: CheckpointMetadata
parent_config: Optional[Dict[str, Any]] = None
pending_writes: Optional[List[Tuple[str, Any]]] = None
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage, SystemMessage, ToolMessage
from modules.interfaces.interfaceDbChatObjects import getInterface
from modules.datamodels.datamodelChat import ChatMessage, ChatWorkflow
from modules.datamodels.datamodelUam import User
from modules.shared.timeUtils import getUtcTimestamp
logger = logging.getLogger(__name__)
class DatabaseCheckpointer(BaseCheckpointSaver):
"""
Custom LangGraph checkpointer that uses the existing database interface.
Maps LangGraph thread_id to workflow.id and stores messages in the existing format.
"""
def __init__(self, user: User, workflow_id: str):
"""
Initialize the database checkpointer.
Args:
user: Current user for database access
workflow_id: Workflow ID (maps to LangGraph thread_id)
"""
self.user = user
self.workflow_id = workflow_id
self.interface = getInterface(user)
def _convert_langchain_to_db_message(
self,
msg: BaseMessage,
sequence_nr: int,
round_number: int
) -> Dict[str, Any]:
"""
Convert LangChain message to database message format.
Args:
msg: LangChain message
sequence_nr: Sequence number for ordering
round_number: Round number in workflow
Returns:
Dictionary in database message format
"""
import uuid
role = "user"
content = ""
if isinstance(msg, HumanMessage):
role = "user"
content = msg.content if isinstance(msg.content, str) else str(msg.content)
elif isinstance(msg, AIMessage):
role = "assistant"
content = msg.content if isinstance(msg.content, str) else str(msg.content)
elif isinstance(msg, SystemMessage):
# System messages are stored but marked as system
role = "system"
content = msg.content if isinstance(msg.content, str) else str(msg.content)
elif isinstance(msg, ToolMessage):
# Tool messages are stored as assistant messages with tool info
role = "assistant"
content = f"Tool {msg.name}: {msg.content}"
return {
"id": f"msg_{uuid.uuid4()}",
"workflowId": self.workflow_id,
"message": content,
"role": role,
"status": "step" if sequence_nr > 1 else "first",
"sequenceNr": sequence_nr,
"publishedAt": getUtcTimestamp(),
"roundNumber": round_number,
"taskNumber": 0,
"actionNumber": 0
}
def _convert_db_to_langchain_messages(
self,
messages: List[ChatMessage]
) -> List[BaseMessage]:
"""
Convert database messages to LangChain messages.
Args:
messages: List of database ChatMessage objects
Returns:
List of LangChain BaseMessage objects
"""
langchain_messages = []
for msg in messages:
if msg.role == "user":
langchain_messages.append(HumanMessage(content=msg.message))
elif msg.role == "assistant":
langchain_messages.append(AIMessage(content=msg.message))
elif msg.role == "system":
langchain_messages.append(SystemMessage(content=msg.message))
# Skip other roles for now
return langchain_messages
def put(
self,
config: Dict[str, Any],
checkpoint: Checkpoint,
metadata: CheckpointMetadata,
new_versions: Dict[str, int],
) -> None:
"""
Store a checkpoint in the database.
Args:
config: LangGraph config (contains thread_id)
checkpoint: Checkpoint to store
metadata: Checkpoint metadata
new_versions: New version numbers
"""
try:
# Extract thread_id from config (maps to workflow_id)
thread_id = config.get("configurable", {}).get("thread_id", self.workflow_id)
# Get current workflow to determine round number
workflow = self.interface.getWorkflow(thread_id)
if not workflow:
logger.warning(f"Workflow {thread_id} not found, cannot store checkpoint")
return
round_number = workflow.currentRound if workflow else 1
# Extract messages from checkpoint
state = checkpoint.get("channel_values", {})
messages = state.get("messages", [])
if not messages:
logger.debug(f"No messages in checkpoint for workflow {thread_id}")
return
# Get existing messages to determine what's already stored
existing_messages = self.interface.getMessages(thread_id)
existing_count = len(existing_messages) if existing_messages else 0
# Create a set of existing message content+role for quick lookup
existing_content_set = set()
if existing_messages:
for existing_msg in existing_messages:
# Create a unique key from role and message content
content_key = (existing_msg.role, existing_msg.message)
existing_content_set.add(content_key)
# Filter checkpoint messages to only user/assistant (skip system)
# Skip intermediate AIMessages with tool_calls (these are tool call requests, not final answers)
checkpoint_user_assistant_messages = []
for msg in messages:
if isinstance(msg, HumanMessage):
# Always store user messages
checkpoint_user_assistant_messages.append(msg)
elif isinstance(msg, AIMessage):
# Check if this message has tool_calls
tool_calls = getattr(msg, "tool_calls", None)
# Skip messages with tool_calls - these are intermediate tool call requests
if tool_calls and len(tool_calls) > 0:
logger.debug(f"Skipping intermediate AIMessage with tool_calls for workflow {thread_id}")
continue
# Store all other AIMessages (final answers)
checkpoint_user_assistant_messages.append(msg)
# Only store new messages that aren't already in the database
new_messages_to_store = []
for msg in checkpoint_user_assistant_messages:
# Determine role
role = "user" if isinstance(msg, HumanMessage) else "assistant"
content = msg.content if isinstance(msg.content, str) else str(msg.content)
# Skip empty messages (they might be status updates)
if not content or not content.strip():
continue
# Check if this message already exists
content_key = (role, content)
if content_key not in existing_content_set:
new_messages_to_store.append(msg)
existing_content_set.add(content_key) # Mark as seen to avoid duplicates in this batch
# Store only the new messages
if new_messages_to_store:
for i, msg in enumerate(new_messages_to_store, 1):
sequence_nr = existing_count + i
# Convert to database format
db_message_data = self._convert_langchain_to_db_message(
msg,
sequence_nr,
round_number
)
# Store the message
try:
self.interface.createMessage(db_message_data)
logger.debug(f"Stored message {db_message_data['id']} for workflow {thread_id}")
existing_count += 1 # Update count for next message
except Exception as e:
logger.error(f"Error storing message: {e}", exc_info=True)
else:
logger.debug(f"No new messages to store for workflow {thread_id} (existing: {existing_count}, checkpoint: {len(checkpoint_user_assistant_messages)})")
# Update workflow last activity
self.interface.updateWorkflow(thread_id, {
"lastActivity": getUtcTimestamp()
})
except Exception as e:
logger.error(f"Error storing checkpoint: {e}", exc_info=True)
raise
def get(
self,
config: Dict[str, Any],
) -> Optional[Checkpoint]:
"""
Retrieve a checkpoint from the database.
Args:
config: LangGraph config (contains thread_id)
Returns:
Checkpoint if found, None otherwise
"""
try:
# Extract thread_id from config (maps to workflow_id)
thread_id = config.get("configurable", {}).get("thread_id", self.workflow_id)
# Get workflow
workflow = self.interface.getWorkflow(thread_id)
if not workflow:
logger.debug(f"Workflow {thread_id} not found")
return None
# Get messages
messages = self.interface.getMessages(thread_id)
checkpoint_id = str(uuid.uuid4())
if not messages:
# Return empty checkpoint for new workflow
return {
"id": checkpoint_id,
"v": 1,
"ts": getUtcTimestamp(),
"channel_values": {
"messages": []
},
"channel_versions": {},
"versions_seen": {}
}
# Convert to LangChain messages
langchain_messages = self._convert_db_to_langchain_messages(messages)
# Build checkpoint
checkpoint = {
"id": checkpoint_id,
"v": 1,
"ts": getUtcTimestamp(),
"channel_values": {
"messages": langchain_messages
},
"channel_versions": {},
"versions_seen": {}
}
return checkpoint
except Exception as e:
logger.error(f"Error retrieving checkpoint: {e}", exc_info=True)
return None
def list(
self,
config: Dict[str, Any],
filter: Optional[Dict[str, Any]] = None,
before: Optional[str] = None,
limit: Optional[int] = None,
) -> List[Checkpoint]:
"""
List checkpoints (not fully implemented - returns current checkpoint).
Args:
config: LangGraph config
filter: Optional filter
before: Optional timestamp before which to list
limit: Optional limit on number of results
Returns:
List of checkpoints
"""
checkpoint = self.get(config)
if checkpoint:
return [checkpoint]
return []
def put_writes(
self,
config: Dict[str, Any],
writes: List[Tuple[str, Any]],
task_id: str,
) -> None:
"""
Store checkpoint writes (not used in current implementation).
Args:
config: LangGraph config
writes: List of write operations
task_id: Task ID
"""
# Not implemented - using put() instead
pass
async def aget_tuple(
self,
config: Dict[str, Any],
) -> Optional[CheckpointTuple]:
"""
Async version of get that returns tuple of (config, checkpoint, metadata).
Args:
config: LangGraph config (contains thread_id)
Returns:
CheckpointTuple with config, checkpoint and metadata if found, None otherwise
"""
checkpoint = self.get(config)
if checkpoint:
# Return checkpoint with metadata including step
# CheckpointMetadata is typically a TypedDict
# LangGraph expects 'step' in metadata
metadata: CheckpointMetadata = {
"step": 0 # Start at step 0, LangGraph will increment
}
return CheckpointTuple(
config=config,
checkpoint=checkpoint,
metadata=metadata,
parent_config=None, # No parent checkpoint for our implementation
pending_writes=None # No pending writes in our implementation
)
return None
async def aput(
self,
config: Dict[str, Any],
checkpoint: Checkpoint,
metadata: CheckpointMetadata,
new_versions: Dict[str, int],
) -> None:
"""
Async version of put.
Args:
config: LangGraph config (contains thread_id)
checkpoint: Checkpoint to store
metadata: Checkpoint metadata
new_versions: New version numbers
"""
self.put(config, checkpoint, metadata, new_versions)
async def alist(
self,
config: Dict[str, Any],
filter: Optional[Dict[str, Any]] = None,
before: Optional[str] = None,
limit: Optional[int] = None,
) -> List[Checkpoint]:
"""
Async version of list.
Args:
config: LangGraph config
filter: Optional filter
before: Optional timestamp before which to list
limit: Optional limit on number of results
Returns:
List of checkpoints
"""
return self.list(config, filter, before, limit)
async def aput_writes(
self,
config: Dict[str, Any],
writes: List[Tuple[str, Any]],
task_id: str,
) -> None:
"""
Async version of put_writes.
Store checkpoint writes (not used in current implementation).
Args:
config: LangGraph config
writes: List of write operations
task_id: Task ID
"""
# Not implemented - using aput() instead
# This method is called by LangGraph but we handle writes through aput()
pass

View file

@ -0,0 +1,313 @@
# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Chatbot tools for LangGraph integration.
Includes SQL query tool, Tavily search tool, and streaming status tool.
"""
import logging
from typing import Optional
from langchain_core.tools import tool
from modules.connectors.connectorPreprocessor import PreprocessorConnector
from modules.shared.configuration import APP_CONFIG
logger = logging.getLogger(__name__)
@tool
async def sqlite_query(query: str) -> str:
"""
Execute a SQL SELECT query on the Althaus AG database.
This tool allows you to query the SQLite database to find articles, prices,
inventory levels, and other product information.
Args:
query: A valid SQL SELECT query. Must use double quotes for column names
with spaces or special characters (e.g., "Artikelnummer", "S_IST_BESTAND").
Only SELECT queries are allowed.
Returns:
Query results as a formatted string, or an error message if the query fails.
Examples:
- Find articles by name:
SELECT a."Artikelnummer", a."Artikelbezeichnung", a."Lieferant"
FROM Artikel a
WHERE a."Artikelbezeichnung" LIKE '%Motor%'
LIMIT 20
- Find articles with price and inventory:
SELECT a."Artikelnummer", a."Artikelbezeichnung", e."EP_CHF",
lp."Lagerplatz" as "Lagerplatzname", l."S_IST_BESTAND",
l."S_RESERVIERTER__BESTAND",
CASE WHEN l."S_IST_BESTAND" != 'Unbekannt'
THEN CAST(l."S_IST_BESTAND" AS INTEGER) - COALESCE(l."S_RESERVIERTER__BESTAND", 0)
ELSE NULL END as "Verfügbarer Bestand"
FROM Artikel a
LEFT JOIN Einkaufspreis e ON a."I_ID" = e."m_Artikel"
LEFT JOIN Lagerplatz_Artikel l ON a."I_ID" = l."R_ARTIKEL"
LEFT JOIN Lagerplatz lp ON l."R_LAGERPLATZ" = lp."I_ID"
WHERE a."Artikelbezeichnung" LIKE '%Netzgerät%'
LIMIT 20
"""
try:
connector = PreprocessorConnector()
try:
result = await connector.executeQuery(query, return_json=True)
if result.get("text", "").startswith(("Error:", "Query failed:")):
error_msg = result.get("text", "Query failed")
logger.error(f"SQL query failed: {error_msg}")
return error_msg
# Format results
data = result.get("data", [])
row_count = result.get("row_count", len(data))
if not data:
return f"Query executed successfully. Returned {row_count} rows (no data)."
# Format as readable string
lines = [f"Query executed successfully. Returned {row_count} rows:"]
# Show column headers from first row
if data and isinstance(data[0], dict):
headers = list(data[0].keys())
lines.append("\nColumns: " + ", ".join(headers))
lines.append("\nResults:")
# Show first 50 rows
for i, row in enumerate(data[:50], 1):
row_str = ", ".join([f"{k}: {v}" for k, v in row.items()])
lines.append(f"{i}. {row_str}")
if row_count > 50:
lines.append(f"\n(Showing first 50 of {row_count} rows)")
else:
# Fallback for non-dict rows
for i, row in enumerate(data[:50], 1):
lines.append(f"{i}. {row}")
return "\n".join(lines)
finally:
await connector.close()
except Exception as e:
error_msg = f"Error executing SQL query: {str(e)}"
logger.error(error_msg, exc_info=True)
return error_msg
@tool
async def tavily_search(query: str) -> str:
"""
Search the internet for comprehensive information using Tavily search via AI Center.
Use this tool when you need to find detailed product information, datasheets,
certifications, technical specifications, market trends, or other comprehensive
information that is not in the database.
IMPORTANT: This tool returns FULL content from search results (not truncated).
Use all available information to provide comprehensive, detailed answers with
specific facts, numbers, dates, and technical details.
Args:
query: Search query string. Be specific and include product names,
model numbers, or other relevant keywords. For comprehensive
research, use broad queries like "latest developments in LED technology 2026"
Returns:
Comprehensive search results with full content, titles, URLs, and sources.
Results include up to 15 sources with complete content for detailed analysis.
Examples:
- Search for comprehensive product information:
tavily_search("latest LED technology developments 2026")
- Search for product datasheet:
tavily_search("Siemens 6AV2 181-8XP00-0AX0 datasheet")
- Search for market trends:
tavily_search("LED market trends efficiency breakthroughs 2025")
"""
try:
# Use AI Center Tavily plugin instead of direct langchain-tavily
from modules.aicore.aicoreModelRegistry import modelRegistry
from modules.aicore.aicoreModelSelector import modelSelector
from modules.datamodels.datamodelAi import (
AiModelCall,
AiModelResponse,
AiCallOptions,
OperationTypeEnum,
ProcessingModeEnum,
AiCallPromptWebSearch
)
import json
# Discover and register connectors if not already registered
if not modelRegistry._connectors:
discovered_connectors = modelRegistry.discoverConnectors()
for connector in discovered_connectors:
modelRegistry.registerConnector(connector)
# Refresh models to ensure Tavily is available
modelRegistry.refreshModels()
# Get available Tavily models (without RBAC filtering since tools don't have user context)
available_models = modelRegistry.getAvailableModels()
tavily_models = [m for m in available_models if m.connectorType == "tavily"]
if not tavily_models:
return "Error: Tavily model not available in AI Center. Please check configuration."
# Select the best Tavily model for web search
options = AiCallOptions(
operationType=OperationTypeEnum.WEB_SEARCH_DATA,
processingMode=ProcessingModeEnum.BASIC
)
# Use model selector to choose the best Tavily model
# Since we only have Tavily models, we can just pick the first one
# or use selector if multiple Tavily models exist
if len(tavily_models) == 1:
selected_model = tavily_models[0]
else:
selected_model = modelSelector.selectModel(
prompt=query,
context="",
options=options,
availableModels=tavily_models
)
if not selected_model:
return "Error: Could not select Tavily model for web search."
# Create web search prompt with more results and deeper research
web_search_prompt = AiCallPromptWebSearch(
instruction=query,
maxNumberPages=15, # Request more results for comprehensive information
country=None, # No country filter by default
language=None, # No language filter by default
researchDepth="deep" # Deep research for comprehensive results
)
# Create model call with JSON prompt
model_call = AiModelCall(
messages=[
{
"role": "user",
"content": json.dumps(web_search_prompt.model_dump())
}
],
model=selected_model,
options=options
)
# Call the model's functionCall (which routes to _routeWebOperation)
if not selected_model.functionCall:
return "Error: Tavily model has no functionCall defined."
response: AiModelResponse = await selected_model.functionCall(model_call)
if not response.success:
error_msg = response.error or "Unknown error"
logger.error(f"Tavily search failed: {error_msg}")
return f"Error performing Tavily search: {error_msg}"
# Parse response content (should be JSON with URLs and content)
try:
result_data = json.loads(response.content) if response.content else {}
# Handle different response formats
if isinstance(result_data, list):
# List of URLs or results
results = result_data
elif isinstance(result_data, dict):
# Dictionary with URLs or results key
results = result_data.get("urls", []) or result_data.get("results", []) or []
else:
results = []
if not results:
return f"No results found for query: {query}"
# Format results with full content (not truncated)
lines = [f"Internet search results for: {query}\n"]
# Return all results with full content (up to 15 results)
for i, result in enumerate(results[:15], 1):
if isinstance(result, str):
# Simple URL string
lines.append(f"{i}. {result}")
lines.append(f" URL: {result}")
elif isinstance(result, dict):
# Dictionary with url, title, content
url = result.get("url", "")
title = result.get("title", url)
content = result.get("content", "")
lines.append(f"{i}. {title}")
lines.append(f" URL: {url}")
if content:
# Return FULL content, not truncated - let the LLM decide what to use
lines.append(f" Content: {content}")
else:
# Fallback
lines.append(f"{i}. {str(result)}")
lines.append("")
return "\n".join(lines)
except json.JSONDecodeError:
# If response is not JSON, try to parse as plain text
if response.content:
return f"Internet search results for: {query}\n\n{response.content}"
return f"No results found for query: {query}"
except Exception as e:
error_msg = f"Error performing Tavily search via AI Center: {str(e)}"
logger.error(error_msg, exc_info=True)
return error_msg
# Note: send_streaming_message will be created in the LangGraph integration
# where it has access to the event manager. For now, we define it here as a placeholder.
def create_send_streaming_message_tool(event_manager=None):
"""
Create the send_streaming_message tool with access to event manager.
Args:
event_manager: Event manager instance for emitting events (not used directly,
events are captured via LangGraph tool events)
Returns:
LangChain tool for sending streaming messages
"""
@tool
async def send_streaming_message(message: str) -> str:
"""
Send a streaming status update to the user.
Use this tool frequently to keep the user informed about what you are doing.
This helps provide a better user experience by showing progress updates.
Args:
message: A short message describing what you are currently doing.
Examples:
- "Durchsuche Datenbank nach Lampen, LED, Leuchten, und Ähnlichem."
- "Suche im Internet nach Produktinformationen."
- "Analysiere Suchergebnisse und bereite Antwort vor."
Returns:
Confirmation that the message was sent.
"""
# This tool doesn't actually do anything in the tool execution
# The actual event emission happens in the streaming bridge
# This is just for LangGraph to recognize it as a tool call
return f"Status-Update gesendet: {message}"
return send_streaming_message

View file

@ -0,0 +1,348 @@
# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""Chatbot domain logic."""
import logging
from dataclasses import dataclass
from typing import Annotated, AsyncIterator, Any, List
from pydantic import BaseModel
from langchain_core.messages import (
BaseMessage,
HumanMessage,
SystemMessage,
trim_messages,
)
from langgraph.graph.message import add_messages
from langgraph.graph import StateGraph, START, END
from langgraph.graph.state import CompiledStateGraph
from langgraph.prebuilt import ToolNode
from modules.features.chatbot.bridges.ai import AICenterChatModel
from modules.features.chatbot.bridges.memory import DatabaseCheckpointer
from modules.features.chatbot.bridges.tools import (
sqlite_query,
tavily_search,
create_send_streaming_message_tool,
)
from modules.features.chatbot.streaming.helpers import ChatStreamingHelper
from modules.features.chatbot.streaming.events import get_event_manager
from modules.datamodels.datamodelUam import User
logger = logging.getLogger(__name__)
class ChatState(BaseModel):
"""Represents the state of a chat session."""
messages: Annotated[List[BaseMessage], add_messages]
@dataclass
class Chatbot:
"""Represents a chatbot."""
model: AICenterChatModel
memory: DatabaseCheckpointer
app: CompiledStateGraph = None
system_prompt: str = "You are a helpful assistant."
workflow_id: str = "default"
@classmethod
async def create(
cls,
model: AICenterChatModel,
memory: DatabaseCheckpointer,
system_prompt: str,
workflow_id: str = "default",
) -> "Chatbot":
"""Factory method to create and configure a Chatbot instance.
Args:
model: The chat model to use (AICenterChatModel).
memory: The chat memory to use (DatabaseCheckpointer).
system_prompt: The system prompt to initialize the chatbot.
workflow_id: The workflow ID (maps to thread_id).
Returns:
A configured Chatbot instance.
"""
instance = Chatbot(
model=model,
memory=memory,
system_prompt=system_prompt,
workflow_id=workflow_id,
)
configured_tools = await instance._configure_tools()
instance.app = instance._build_app(memory, configured_tools)
return instance
async def _configure_tools(self) -> List[Any]:
"""Configure tools for the chatbot.
Returns:
List of configured tools.
"""
tools = []
# SQL query tool
tools.append(sqlite_query)
# Tavily search tool
tools.append(tavily_search)
# Streaming status tool (needs event manager)
event_manager = get_event_manager()
send_streaming_message = create_send_streaming_message_tool(event_manager)
tools.append(send_streaming_message)
return tools
def _build_app(
self, memory: DatabaseCheckpointer, tools: List[Any]
) -> CompiledStateGraph[ChatState, None, ChatState, ChatState]:
"""Builds the chatbot application workflow using LangGraph.
Args:
memory: The chat memory to use.
tools: The list of tools the chatbot can use.
Returns:
A compiled state graph representing the chatbot application.
"""
llm_with_tools = self.model.bind_tools(tools=tools)
def select_window(msgs: List[BaseMessage]) -> List[BaseMessage]:
"""Selects a window of messages that fit within the context window size.
Args:
msgs: The list of messages to select from.
Returns:
A list of messages that fit within the context window size.
"""
def approx_counter(items: List[BaseMessage]) -> int:
"""Approximate token counter for messages.
Args:
items: List of messages to count tokens for.
Returns:
Approximate number of tokens in the messages.
"""
return sum(len(getattr(m, "content", "") or "") for m in items)
# Use model's context length if available, otherwise default
max_tokens = getattr(self.model._selected_model, "contextLength", 128000) if hasattr(self.model, "_selected_model") and self.model._selected_model else 128000
return trim_messages(
msgs,
strategy="last",
token_counter=approx_counter,
max_tokens=int(max_tokens * 0.8), # Use 80% of context window
start_on="human",
end_on=("human", "tool"),
include_system=True,
)
async def agent_node(state: ChatState) -> dict:
"""Agent node for the chatbot workflow.
Args:
state: The current chat state.
Returns:
The updated chat state after processing.
"""
# Select the message window to fit in context (trim if needed)
window = select_window(state.messages)
# Ensure the system prompt is present at the start
if not window or not isinstance(window[0], SystemMessage):
window = [SystemMessage(content=self.system_prompt)] + window
# Call the LLM with tools (use ainvoke for async)
response = await llm_with_tools.ainvoke(window)
# Return the new state
return {"messages": [response]}
def should_continue(state: ChatState) -> str:
"""Determines whether to continue the workflow or end it.
This conditional edge is called after the agent node to decide
whether to continue to the tools node (if the last message contains
tool calls) or to end the workflow (if no tool calls are present).
Args:
state: The current chat state.
Returns:
The next node to transition to ("tools" or END).
"""
# Get the last message
last_message = state.messages[-1]
# Check if the last message contains tool calls
# If so, continue to the tools node; otherwise, end the workflow
return "tools" if getattr(last_message, "tool_calls", None) else END
async def tools_with_retry(state: ChatState) -> dict:
"""Tools node with retry logic.
Args:
state: The current chat state.
Returns:
The updated chat state after tool execution.
"""
# Execute tools normally
tool_node = ToolNode(tools=tools)
result = await tool_node.ainvoke(state)
# Check if we got no results and should retry
no_results_keywords = [
"returned 0 rows",
"no data",
"keine artikel gefunden",
"keine ergebnisse"
]
# Check tool results for no data
for msg in result.get("messages", []):
content = getattr(msg, "content", "")
if isinstance(content, str):
content_lower = content.lower()
if any(keyword in content_lower for keyword in no_results_keywords):
# Check if we haven't retried yet (avoid infinite loops)
retry_count = sum(1 for m in state.messages if "retry" in str(getattr(m, "content", "")).lower())
if retry_count < 2: # Allow max 2 retries
logger.info("No results found in tool output, adding retry instruction")
retry_message = HumanMessage(
content="WICHTIG: Die vorherige Suche hat keine Ergebnisse gefunden. "
"Bitte versuche eine alternative Suchstrategie:\n"
"1. Wenn die Frage im Format 'X von Y' war (z.B. 'Lampen von Eaton'), "
"verwende IMMER eine Kombination aus Lieferanten-Filter (WHERE a.\"Lieferant\" LIKE '%Y%') "
"UND Produkttyp-Filter (WHERE a.\"Artikelbezeichnung\" LIKE '%X%' OR ...)\n"
"2. Verwende mehrere Synonyme für den Produkttyp (z.B. bei 'Lampen': Lampe, LED, Beleuchtung, Licht, Leuchte, Strahler)\n"
"3. Führe zuerst eine COUNT-Abfrage durch, dann die Detail-Abfrage mit Lagerbeständen\n"
"4. Verwende LIKE '%Lieferant%' für den Lieferanten-Filter, um auch Varianten zu finden"
)
result["messages"].append(retry_message)
break
return result
# Compose the workflow
workflow = StateGraph(ChatState)
workflow.add_node("agent", agent_node)
workflow.add_node("tools", tools_with_retry)
workflow.add_edge(START, "agent")
workflow.add_conditional_edges("agent", should_continue)
workflow.add_edge("tools", "agent")
return workflow.compile(checkpointer=memory)
async def chat(self, message: str, chat_id: str = "default") -> List[BaseMessage]:
"""Processes a chat message by calling the LLM and tools and returns the chat history.
Args:
message: The user message to process.
chat_id: The chat thread ID.
Returns:
The list of messages in the chat history.
"""
# Set the right thread ID for memory
config = {"configurable": {"thread_id": chat_id}}
# Single-turn chat (non-streaming)
result = await self.app.ainvoke(
{"messages": [HumanMessage(content=message)]}, config=config
)
# Extract and return the messages from the result
return result["messages"]
async def stream_events(
self, *, message: str, chat_id: str = "default"
) -> AsyncIterator[dict]:
"""Stream UI-focused events using astream_events v2.
Args:
message: The user message to process.
chat_id: Logical thread identifier; forwarded in the runnable config so
memory and tools are scoped per thread.
Yields:
dict: One of:
- ``{"type": "status", "label": str}`` for short progress updates.
- ``{"type": "final", "response": {"thread": str, "chat_history": list[dict]}}``
where ``chat_history`` only includes ``user``/``assistant`` roles.
- ``{"type": "error", "message": str}`` if an exception occurs.
"""
# Thread-aware config for LangGraph/LangChain
config = {"configurable": {"thread_id": chat_id}}
def _is_root(ev: dict) -> bool:
"""Return True if the event is from the root run (v2: empty parent_ids)."""
return not ev.get("parent_ids")
try:
async for event in self.app.astream_events(
{"messages": [HumanMessage(content=message)]},
config=config,
version="v2",
):
etype = event.get("event")
ename = event.get("name") or ""
edata = event.get("data") or {}
# Stream human-readable progress via the special send_streaming_message tool
# Match the legacy implementation exactly (line 267-272 in legacy/chatbot.py)
if etype == "on_tool_start":
# Log all tool starts to debug
logger.debug(f"Tool start event: name='{ename}', event='{etype}'")
if ename == "send_streaming_message":
tool_in = edata.get("input") or {}
msg = tool_in.get("message")
logger.info(f"send_streaming_message tool called with input: {tool_in}")
if isinstance(msg, str) and msg.strip():
logger.info(f"Status-Update gesendet: {msg.strip()}")
yield {"type": "status", "label": msg.strip()}
continue
# Emit the final payload when the root run finishes
if etype == "on_chain_end" and _is_root(event):
output_obj = edata.get("output")
# Extract message list from the graph's final output
final_msgs = ChatStreamingHelper.extract_messages_from_output(
output_obj=output_obj
)
# Normalize for the frontend (only user/assistant with text content)
chat_history_payload: List[dict] = []
for m in final_msgs:
if isinstance(m, BaseMessage):
d = ChatStreamingHelper.message_to_dict(msg=m)
elif isinstance(m, dict):
d = ChatStreamingHelper.dict_message_to_dict(obj=m)
else:
continue
if d.get("role") in ("user", "assistant") and d.get("content"):
chat_history_payload.append(d)
yield {
"type": "final",
"response": {
"thread": chat_id,
"chat_history": chat_history_payload,
},
}
return
except Exception as exc:
# Emit a single error envelope and end the stream
logger.error(f"Exception in stream_events: {exc}", exc_info=True)
yield {"type": "error", "message": f"Fehler beim Verarbeiten: {exc}"}

View file

@ -1,731 +1,170 @@
# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Constants and utility functions for the chatbot module.
Contains system prompts and conversation name generation.
Chatbot constants and helper functions.
"""
import logging
import re
import datetime
from typing import Optional, List
from typing import Optional
from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationTypeEnum, ProcessingModeEnum
from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationTypeEnum, PriorityEnum, ProcessingModeEnum
logger = logging.getLogger(__name__)
def get_analysis_system_prompt() -> str:
"""
Get the system prompt for analyzing user input and creating queries.
Focuses on understanding the question and determining what queries are needed.
"""
current_date = datetime.datetime.now().strftime("%d.%m.%Y")
return f"""Heute ist der {current_date}.
Du bist ein Chatbot der Althaus AG.
Deine Aufgabe ist es, Benutzeranfragen zu analysieren und zu bestimmen, welche Datenbankabfragen oder Web-Recherchen benötigt werden, um die Frage zu beantworten.
DATENBANK-INFORMATIONEN:
- Datenbankdatei: /data/database.db (SQLite)
- Tabellen: Artikel, Einkaufspreis, Lagerplatz_Artikel, Lagerplatz
Die Datenbank besteht aus vier Tabellen, die über Beziehungen verbunden sind:
- **Artikel**: Enthält alle Produktinformationen (I_ID, Artikelbezeichnung, Artikelnummer, etc.)
- **Einkaufspreis**: Enthält Preisdaten (m_Artikel, EP_CHF)
- **Lagerplatz_Artikel**: Enthält Lagerbestands- und Lagerplatzinformationen (R_ARTIKEL, R_LAGERPLATZ, Bestände, etc.)
- **Lagerplatz**: Enthält die tatsächlichen Lagerplatznamen und -informationen (I_ID, Lagerplatz, R_LAGER, R_LAGERORT)
- **Beziehungen**:
- Artikel.I_ID = Einkaufspreis.m_Artikel
- Artikel.I_ID = Lagerplatz_Artikel.R_ARTIKEL
- Lagerplatz_Artikel.R_LAGERPLATZ = Lagerplatz.I_ID (WICHTIG: R_LAGERPLATZ enthält die ID, nicht den Namen!)
TABELLEN-SCHEMA (WICHTIG - Spalten mit Leerzeichen/Sonderzeichen IMMER in doppelte Anführungszeichen setzen):
Tabelle 1: Artikel
CREATE TABLE Artikel (
"I_ID" INTEGER PRIMARY KEY,
"Artikelbeschrieb" TEXT,
"Artikelbezeichnung" TEXT,
"Artikelgruppe" TEXT,
"Artikelkategorie" TEXT,
"Artikelkürzel" TEXT,
"Artikelnummer" TEXT,
"Einheit" TEXT,
"Gesperrt" TEXT,
"Keywords" TEXT,
"Lieferant" TEXT,
"Warengruppe" TEXT
)
Tabelle 2: Einkaufspreis
CREATE TABLE Einkaufspreis (
"m_Artikel" INTEGER,
"EP_CHF" FLOAT
)
Tabelle 3: Lagerplatz_Artikel
CREATE TABLE Lagerplatz_Artikel (
"R_ARTIKEL" INTEGER,
"R_LAGERPLATZ" TEXT,
"S_BESTELLTER__BESTAND" INTEGER,
"S_IST_BESTAND" TEXT,
"S_MAXIMALBESTAND" INTEGER,
"S_MINDESTBESTAND" INTEGER,
"S_RESERVIERTER__BESTAND" INTEGER,
"S_SOLL_BESTAND" INTEGER
)
Tabelle 4: Lagerplatz
CREATE TABLE Lagerplatz (
"I_ID" INTEGER PRIMARY KEY,
"Lagerplatz" TEXT,
"R_LAGER" TEXT,
"R_LAGERORT" TEXT
)
KRITISCH - LAGERBESTANDSABFRAGEN - ABSOLUT VERBINDLICH
JEDE SQL-Abfrage, die Lagerbestände (S_IST_BESTAND) zeigt oder verwendet, MUSS IMMER auch enthalten:
- l."S_RESERVIERTER__BESTAND" (Reservierte Bestände) - OBLIGATORISCH!
- Berechnung des verfügbaren Bestands - OBLIGATORISCH!
- JOIN mit Lagerplatz-Tabelle für den Lagerplatznamen - OBLIGATORISCH!
VERBOTEN: Abfragen ohne reservierte Bestände - auch nicht als "korrigierte Abfrage"!
VERBOTEN: Zwischenschritte ohne reservierte Bestände!
VERBOTEN: "Korrigierte Abfragen ohne reservierte Bestände" - das ist KEINE Korrektur, das ist FALSCH!
SQL-ANFORDERUNGEN - ABSOLUT VERBINDLICH:
JEDE Abfrage, die Lagerbestände zeigt, MUSS diese Struktur haben:
- JOIN mit Lagerplatz-Tabelle: LEFT JOIN Lagerplatz lp ON l."R_LAGERPLATZ" = lp."I_ID"
- Lagerplatzname anzeigen: lp."Lagerplatz" as "Lagerplatzname" (NICHT l."R_LAGERPLATZ"!)
- Ist-Bestand: l."S_IST_BESTAND"
- Reservierte Bestände: IMMER l."S_RESERVIERTER__BESTAND" hinzufügen (OBLIGATORISCH!)
- Verfügbarer Bestand berechnen: CASE WHEN l."S_IST_BESTAND" != 'Unbekannt' THEN CAST(l."S_IST_BESTAND" AS INTEGER) - COALESCE(l."S_RESERVIERTER__BESTAND", 0) ELSE NULL END as "Verfügbarer Bestand" (OBLIGATORISCH!)
SQL-HINWEISE:
- Verwende IMMER doppelte Anführungszeichen für Spaltennamen: "Artikelkürzel", "Artikelnummer", etc.
- Für Textsuche verwende LIKE mit Wildcards: WHERE a."Artikelbezeichnung" LIKE '%suchbegriff%'
- Für Preisabfragen: Nutze JOINs um auf e."EP_CHF" zuzugreifen
- Für Lagerbestände: Nutze JOINs um auf l."S_IST_BESTAND", l."S_SOLL_BESTAND", etc. zuzugreifen
- WICHTIG bei S_IST_BESTAND: Dieser Wert kann "Unbekannt" sein (TEXT), nicht nur Zahlen! Prüfe mit WHERE l."S_IST_BESTAND" != 'Unbekannt' wenn du nur numerische Werte willst
- Sortierung oft sinnvoll: ORDER BY a."Artikelnummer" ASC, ORDER BY e."EP_CHF" DESC, oder ORDER BY l."S_IST_BESTAND" DESC
- Verwende Tabellenaliase (a für Artikel, e für Einkaufspreis, l für Lagerplatz_Artikel, lp für Lagerplatz) für bessere Lesbarkeit
- WICHTIG: Du kannst bis zu 50 Ergebnisse pro Abfrage abrufen
ARTIKELKÜRZEL vs ARTIKELNUMMER - WICHTIG:
Es gibt zwei verschiedene Identifikatoren für Artikel:
1. **Artikelkürzel**: Numerisches Format (z.B. "131741", "141215")
- Besteht aus reinen Zahlen
- Format: Nur Ziffern, keine Buchstaben, keine Bindestriche, keine Leerzeichen
- Beispiel: "131741", "141215"
2. **Artikelnummer**: Alphanumerisches Format (z.B. "6AV2 181-8XP00-0AX0", "AX5206")
- Kann Buchstaben, Zahlen, Bindestriche und Leerzeichen enthalten
- Format: Alphanumerisch, kann Bindestriche und Leerzeichen enthalten
- Beispiel: "6AV2 181-8XP00-0AX0", "AX5206", "SIE.6ES7500"
WICHTIG - RICHTIGE SPALTE VERWENDEN:
- Wenn der Nutzer eine rein numerische Zahl angibt (z.B. "131741", "141215") Suche in a."Artikelkürzel"
- Wenn der Nutzer eine alphanumerische Bezeichnung angibt mit Buchstaben, Bindestrichen oder Leerzeichen (z.B. "6AV2 181-8XP00-0AX0", "AX5206") Suche in a."Artikelnummer"
Beispiele:
- "Wie viele von 141215 haben wir auf Lager?" Artikelkürzel "141215" WHERE a."Artikelkürzel" = '141215'
- "Wie viel von 6AV2 181-8XP00-0AX0 haben wir auf Lager?" Artikelnummer "6AV2 181-8XP00-0AX0" WHERE a."Artikelnummer" = '6AV2 181-8XP00-0AX0'
- "Zeig mir Informationen zu AX5206" Artikelnummer "AX5206" WHERE a."Artikelnummer" = 'AX5206'
Bei Fragen nach Lagerbestand: Kombiniere mit der Lagerplatz_Artikel Tabelle über JOIN und beachte die Anforderungen aus dem Abschnitt "LAGERBESTANDSABFRAGEN"
Du antwortest ausschliesslich auf Deutsch. Nutze kein sz(ß) sondern immer ss.
"""
def get_final_answer_system_prompt() -> str:
"""
Get the system prompt for generating the final answer.
Focuses on formatting, presenting results, and user engagement.
"""
current_date = datetime.datetime.now().strftime("%d.%m.%Y")
return f"""Heute ist der {current_date}.
Du bist ein Chatbot der Althaus AG.
Deine Aufgabe ist es, auf Basis von Datenbank-Ergebnissen und Web-Recherchen hilfreiche, präzise Antworten zu geben.
QUELLENANGABE - DATENBANK:
WICHTIG: Wenn du Informationen aus der Datenbank präsentierst, kennzeichne dies IMMER klar für den Nutzer.
- Beginne deine Antwort mit einer klaren Kennzeichnung, z.B.: "Aus der Datenbank habe ich folgende Artikel gefunden:"
- Bei kombinierten Informationen (Datenbank + Internet): Trenne klar zwischen beiden Quellen
QUELLENANGABE - INTERNET - ABSOLUT VERBINDLICH
Wenn du Informationen aus einer Web-Recherche präsentierst, MUSS du dies IMMER explizit kennzeichnen und die Quellen angeben:
- VERBOTEN: Informationen aus Web-Recherchen ohne explizite Kennzeichnung zu präsentieren
- VERBOTEN: Informationen aus Web-Recherchen ohne Quellenangabe zu präsentieren
- VERBOTEN: Quellen nur am Ende als Liste zu präsentieren
- OBLIGATORISCH: Beginne IMMER mit einer expliziten Kennzeichnung, z.B.:
* "Aus meiner Web-Recherche habe ich folgende Informationen gefunden:"
* "Laut meiner Internet-Recherche:"
* "Aus meiner Online-Suche:"
- OBLIGATORISCH: Gib IMMER die konkreten Quellen DIREKT NACH der jeweiligen Information an (nicht am Ende!)
- OBLIGATORISCH: Format: [Information] ([Quelle: Website-Name](URL))
- OBLIGATORISCH: Bei mehreren Informationen: Gib nach JEDER Information die entsprechende Quelle an
- OBLIGATORISCH: Trenne klar zwischen Datenbank-Informationen und Web-Recherchen
- OBLIGATORISCH: Wenn sowohl Datenbank- als auch Web-Informationen vorhanden sind, trenne diese klar in separaten Abschnitten
DATENBLATT-LINKS - ABSOLUT VERBINDLICH
Wenn Web-Recherche-Ergebnisse vorhanden sind, MUSS du IMMER:
- OBLIGATORISCH: Explizit erwähnen, dass Datenblätter verfügbar sind
- OBLIGATORISCH: ALLE verfügbaren Datenblatt-Links angeben (vollständige URLs)
- OBLIGATORISCH: Format: "Datenblätter verfügbar: [Link 1](URL1), [Link 2](URL2)"
- OBLIGATORISCH: Wenn keine direkten Datenblatt-Links vorhanden sind, gib Links zu Seiten mit technischen Informationen an
- VERBOTEN: Datenblatt-Links zu verschweigen oder nicht explizit zu erwähnen
AUSFÜHRLICHE INFORMATIONEN - ABSOLUT VERBINDLICH
Wenn Web-Recherche-Ergebnisse vorhanden sind, MUSS du:
- OBLIGATORISCH: AUSFÜHRLICHE Informationen präsentieren (nicht nur kurze Zusammenfassungen!)
- OBLIGATORISCH: Alle relevanten technischen Details angeben:
* Technische Spezifikationen (Größe, Gewicht, Abmessungen, etc.)
* Betriebsbedingungen (Temperatur, Spannung, etc.)
* Kompatibilität und Anwendungsbereiche
* Zertifizierungen und Normen
* Installation und Verwendung
* Weitere relevante Produktdetails
- OBLIGATORISCH: Strukturiere die Informationen übersichtlich (z.B. mit Abschnitten oder Aufzählungen)
- VERBOTEN: Nur oberflächliche Informationen zu geben
- VERBOTEN: Wichtige Details auszulassen
BEISPIEL FÜR KORREKTE QUELLENANGABE MIT INLINE-QUELLEN:
"Aus meiner Web-Recherche habe ich folgende Informationen gefunden:
**Technische Spezifikationen:**
- Speicherkapazität: 2 GB ([Quelle: Siemens Support](https://...))
- Format: Secure Digital (SD) Card ([Quelle: Best4Automation](https://...))
- Betriebsspannung: 3,3 V DC ([Quelle: Automation24](https://...))
**Kompatibilität:**
- Geeignet für SIMATIC HMI Comfort Panels ([Quelle: Siemens Support](https://...))
- Montage im Hoch- und Querformat möglich ([Quelle: Best4Automation](https://...))
**Zertifizierungen:**
- CE-zertifiziert ([Quelle: Automation24](https://...))
- Für ATEX-Zonen geeignet ([Quelle: Elit](https://...))
**Datenblätter verfügbar:**
- [Siemens Produktdatenblatt](https://...)
- [Technische Dokumentation](https://...)"
NIEMALS Informationen aus Web-Recherchen präsentieren, ohne explizit zu erwähnen, dass es sich um eine Web-Recherche handelt und ohne die Quellen DIREKT NACH der jeweiligen Information anzugeben!
TABELLENLÄNGE UND ARTIKELANZAHL - KRITISCH:
WICHTIG: Zeige MAXIMAL 20 Artikel in Tabellen. Du darfst und sollst aber ausführliche Erklärungen liefern!
STRATEGIE FÜR VIELE TREFFER (> 20):
Zeige Zusammenfassung mit Statistiken (Anzahl, Lieferanten, Preisspanne, Kategorien, Lagerbestände)
Dann: Tabelle mit den 20 relevantesten/ersten Artikeln
Unter der Tabelle: Hinweis dass weitere Artikel existieren
Biete Filteroptionen an (nach Lieferant, Preis, Lagerbestand, etc.)
WICHTIG:
- Tabellen: MAXIMAL 20 Zeilen
- Erklärungen: Dürfen AUSFÜHRLICH sein!
- Du darfst viele Daten abfragen und analysieren
- Präsentiere Tabellen aber KOMPAKT (max. 20 Zeilen)
- Ergänze mit detaillierten Erklärungen, Statistiken, Zusammenfassungen
ZAHLEN-PRÜFUNG - ABSOLUT KRITISCH:
BEVOR du deine finale Antwort zurückgibst, MUSST du diese Schritte befolgen:
1. ZÄHLE die TATSÄCHLICHEN Zeilen in deiner finalen Tabelle
2. Diese Zahl ist die EINZIGE korrekte Anzahl für deine Antwort
3. Verwende diese Zahl KONSISTENT überall in deiner Antwort:
- In der Tabellenüberschrift
- In Texten unter der Tabelle
- In der Zusammenfassung
- Überall wo du die Anzahl erwähnst
VERBOTEN - Inkonsistente Zahlen:
FALSCH: "Verfügbare Lampen (50 Artikel)" + "Zeige die ersten 30 Artikel"
RICHTIG: "Verfügbare Lampen (30 Artikel)" + "Zeige 30 Artikel"
Falls du dem User strukturierte Daten zurückgibst, formatiere sie bitte als Tabelle.
WICHTIG! Falls deine Tabelle nur ein Teil der Daten anzeigt, die du gefunden hast, dann vermerke dies bitte in deiner Antwort unter der Tabelle in markdown _italic_.
Wenn immer du eine Artikelnummer innerhalb einer Tabelle zurückgibst bitte markiere diese als Markdownlink:
[ARTIKELNUMMER](/details/ARTIKELNUMMER). ARTIKELNUMMER ist hierbei der Platzhalter, den du ersetzen musst.
WICHTIG! Du musst im Link die ARTIKELNUMMER sicher URL-encodieren. Encodiere aber NICHT die Artikelnummer in eckigen Klammern. Also encodiere den Ankertext nicht!
Ausserhalb einer Tabelle musst du keine Links auf Artikelnummern setzen.
Die erste Nachricht das Nutzers ist eine Antwort auf die folgende Nachricht:
"Hallo! Ich bin Ihr KI-Assistent für die Materialverwaltung. Wie kann ich Ihnen heute helfen?"
ABSOLUT KRITISCH - KEINE DATEN ERFINDEN
NIEMALS Daten erfinden oder halluzinieren:
- VERBOTEN: Preise erfinden (z.B. "Der Preis beträgt 1200 CHF" wenn kein Preis in den Daten ist)
- VERBOTEN: Lagerplätze erfinden (z.B. "Lager A-01" wenn dieser nicht in den Daten steht)
- VERBOTEN: Lagerbestände erfinden (z.B. "50 Stück" wenn dieser Wert nicht in den Daten ist)
- VERBOTEN: Artikelbezeichnungen erfinden oder ändern
- VERBOTEN: Lieferanten erfinden oder ändern
- VERBOTEN: Jegliche Werte erfinden, die nicht explizit in den Datenbank-Ergebnissen stehen
RICHTIG: Wenn Daten fehlen, schreibe "Nicht verfügbar" oder "N/A"
RICHTIG: Verwende NUR die tatsächlichen Werte aus den Datenbank-Ergebnissen
RICHTIG: Wenn ein Wert NULL oder leer ist, schreibe "Nicht verfügbar"
FORMATIERUNGSREGELN FÜR ARTIKEL-ANFRAGEN:
1. Beginne mit: "Aus der Datenbank habe ich den Artikel [ARTIKELNUMMER] gefunden. Es handelt sich um [ARTIKELBEZEICHNUNG] von [LIEFERANT]."
- Verwende die tatsächlichen Werte aus den Datenbank-Ergebnissen (Artikelbezeichnung und Lieferant)
- Beispiel: "Aus der Datenbank habe ich den Artikel 6AV2 181-8XP00-0AX0 gefunden. Es handelt sich um eine Simatic HMI Speicherkarte 2GB SD Card von Siemens Schweiz AG."
- Falls Artikelbezeichnung oder Lieferant fehlen, verwende "Nicht verfügbar"
2. Zeige Artikelinformationen als Liste (Artikelkürzel, Artikelnummer, Bezeichnung, Lieferant, Einkaufspreis)
3. Zeige Lagerbestände als Tabelle mit ALLEN Lagerplätzen
4. Berechne Gesamtbestand aus den tatsächlichen Daten
5. Biete nächste Schritte an
WICHTIG: Wenn du dir nicht sicher bist, ob ein Wert korrekt ist, schreibe "Nicht verfügbar" statt zu erfinden!
ABSOLUT KRITISCH - KEINE PLANUNGSSCHRITTE IN DER ANTWORT
NIEMALS Planungsschritte, SQL-Queries oder Zwischenschritte in deine finale Antwort einbauen:
- VERBOTEN: "Ich werde jetzt die Datenbank durchsuchen..."
- VERBOTEN: "Suche in der Datenbank nach..."
- VERBOTEN: "Führe SQL-Abfrage aus..."
- VERBOTEN: SQL-Queries (SELECT-Statements) zeigen
- VERBOTEN: "Analysiere die Ergebnisse..."
- VERBOTEN: "Bereite die Abfrageergebnisse auf..."
- VERBOTEN: Jegliche Erklärungen über den Prozess oder die Methode
RICHTIG: Beginne DIREKT mit "Aus der Datenbank habe ich den Artikel [ARTIKELNUMMER] gefunden:"
RICHTIG: Zeige NUR die finale Antwort mit den Daten
RICHTIG: Keine Planungsschritte, keine Queries, keine Zwischenschritte
Deine Antwort soll NUR die finale Antwort enthalten - keine Planung, keine Queries, keine Zwischenschritte!
ABSOLUT KRITISCH - KEINE BEISPIELDATEN ERFINDEN
NIEMALS Beispielartikel oder Testdaten erfinden:
- VERBOTEN: Beispielartikel wie "123456", "789012", "Beispielartikel 1", etc.
- VERBOTEN: Erfundene Lieferanten wie "Lieferant A", "Lieferant B"
- VERBOTEN: Erfundene Preise oder Bestände
- VERBOTEN: Jegliche Testdaten oder Beispieldaten
Wenn KEINE echten Daten aus der Datenbank vorhanden sind:
- Schreibe: "Es wurden keine Artikel in der Datenbank gefunden."
- Oder: "Die Datenbankabfrage hat keine Ergebnisse zurückgegeben."
- Oder: "Keine Daten verfügbar für diese Anfrage."
ERFINDE NIEMALS Daten, auch nicht als "Beispiel" oder "Test"!
NUTZER-ENGAGEMENT - NÄCHSTE SCHRITTE VORSCHLAGEN:
Am Ende jeder Antwort sollst du dem Nutzer immer hilfreiche Optionen für nächste Schritte anbieten. Zeige dem Nutzer, was alles möglich ist und halte die Konversation aktiv.
Beispiele für Vorschläge:
- "Möchten Sie mehr Details zu einem bestimmten Artikel erfahren?"
- "Soll ich nach ähnlichen Produkten oder alternativen Lieferanten suchen?"
- "Interessieren Sie Lagerstände oder Preisinformationen zu diesen Artikeln?"
- "Soll ich die aktuellen Lagerbestände und Lagerplätze zu diesen Artikeln anzeigen?"
- "Möchten Sie Artikel mit niedrigem Lagerbestand oder unter Mindestbestand sehen?"
- "Kann ich Ihnen bei einer spezifischeren Suche helfen?"
- "Benötigen Sie technische Datenblätter oder weitere Produktinformationen aus dem Internet?"
Passe deine Vorschläge an den Kontext der Anfrage an und sei kreativ. Ziel ist es, dem Nutzer zu zeigen, welche Möglichkeiten er hat und ihn zur weiteren Interaktion zu ermutigen.
Du antwortest ausschliesslich auf Deutsch. Nutze kein sz(ß) sondern immer ss.
"""
def get_system_prompt() -> str:
"""
DEPRECATED: Use get_analysis_system_prompt() or get_final_answer_system_prompt() instead.
Kept for backward compatibility.
"""
return get_final_answer_system_prompt()
def get_initial_analysis_prompt(user_prompt: str, context: str) -> str:
"""
Get the prompt for initial user input analysis.
Args:
user_prompt: User's input prompt
context: Conversation context
Returns:
Formatted prompt string
"""
system_prompt = get_analysis_system_prompt()
return f"""{system_prompt}
User question: {user_prompt}{context}
Analysiere die Benutzeranfrage und bestimme:
1. Ob eine Datenbankabfrage benötigt wird (needsDatabaseQuery)
2. Ob eine Web-Recherche benötigt wird (needsWebResearch)
3. Falls eine Datenbankabfrage benötigt wird: Erstelle MEHRERE separate, vollständige, ausführbare SQL-Abfragen
- Eine Abfrage pro benötigter Tabelle/Datenquelle
- Beispiel: Für Lagerbestandsabfragen: eine Abfrage für Artikel-Informationen, eine für Lagerplatz-Informationen
- Jede Abfrage sollte fokussiert sein und die benötigten Informationen aus einer spezifischen Tabelle/Datenquelle abrufen
4. Begründung für deine Entscheidung
WICHTIG - WEB-RECHERCHE BEI ZUSÄTZLICHEN INFORMATIONEN
Wenn der Nutzer nach zusätzlichen Informationen fragt oder explizit eine Recherche anfordert, MUSS IMMER eine Web-Recherche durchgeführt werden (needsWebResearch = true).
Beispiele für solche Anfragen:
- "recherchier nach weiteren informationen zu diesem produkt"
- "suche nach zusätzlichen informationen"
- "finde mehr details"
- "recherchiere im internet"
- "suche online nach"
- Ähnliche Formulierungen, die eine Recherche oder zusätzliche Informationen anfordern
In diesen Fällen IMMER needsWebResearch auf true setzen!
WICHTIG für SQL-Abfragen:
- Verwende IMMER doppelte Anführungszeichen für Spaltennamen
- Bei Lagerbestandsabfragen: IMMER S_RESERVIERTER__BESTAND und verfügbaren Bestand einbeziehen
- Bei Lagerplatzabfragen: IMMER JOIN mit Lagerplatz-Tabelle für den Namen
- Abfragen müssen direkt ausführbar sein (keine Platzhalter)
- Erstelle SEPARATE Abfragen für verschiedene Tabellen/Datenquellen, nicht eine große JOIN-Abfrage
STRATEGIE FÜR MEHRERE ABFRAGEN:
- Analysiere welche Informationen benötigt werden
- Identifiziere welche Tabellen diese Informationen enthalten
- Erstelle für jede Tabelle/Datenquelle eine separate, fokussierte Abfrage
- Beispiel für "wie viel von 6AV2 181-8XP00-0AX0 haben wir auf lager":
* Abfrage 1: Artikel-Informationen (Artikelbezeichnung, Lieferant, etc.) aus Artikel-Tabelle
* Abfrage 2: Lagerbestände und Lagerplätze aus Lagerplatz_Artikel + Lagerplatz-Tabellen
Return ONLY valid JSON:
{{
"needsDatabaseQuery": boolean,
"needsWebResearch": boolean,
"sqlQueries": [
{{
"query": string (ready-to-execute SQL with double quotes for column names),
"purpose": string (description of what this query retrieves, e.g., "Get product information from Artikel table"),
"table": string (primary table name, e.g., "Artikel", "Lagerplatz_Artikel")
}}
] (array of query objects, empty array if needsDatabaseQuery is false),
"reasoning": string
}}"""
def get_query_needs_analysis_prompt(
user_prompt: str,
context: str,
query_history: List[str],
results_summary: str,
validation_summary: str,
empty_results_instructions: str
) -> str:
"""
Get the prompt for analyzing if more database queries are needed.
Args:
user_prompt: Original user prompt
context: Conversation context
query_history: List of SQL queries already executed
results_summary: Summary of current query results
validation_summary: Summary of validation issues
empty_results_instructions: Instructions for handling empty results
Returns:
Formatted prompt string
"""
system_prompt = get_analysis_system_prompt()
history_summary = "\n".join([f"- {q[:100]}..." for q in query_history]) if query_history else "No queries executed yet."
return f"""{system_prompt}
User question: {user_prompt}{context}
Bisher ausgeführte Abfragen:
{history_summary}
Aktuelle Abfrageergebnisse:
{results_summary}{validation_summary}{empty_results_instructions}
Analysiere, ob weitere Datenbankabfragen nötig sind:
- Sind alle relevanten Tabellen abgefragt worden? (Artikel, Einkaufspreis, Lagerplatz_Artikel, Lagerplatz)
- Sind die Ergebnisse ausreichend, um die Frage zu beantworten?
- Fehlen JOINs oder Beziehungen zwischen Tabellen?
- Gibt es Fehler, die korrigiert werden müssen?
- Werden alle benötigten Informationen abgerufen (z.B. Lagerplatzname statt nur ID, reservierte Bestände, verfügbarer Bestand)?
- Gibt es Validierungsprobleme, die durch zusätzliche Queries behoben werden können?
- **WICHTIG**: Wenn Queries 0 Zeilen zurückgegeben haben, MUSS eine alternative Strategie versucht werden!
WICHTIG: Wenn Validierungsprobleme vorhanden sind, MUSS eine korrigierte Query erstellt werden, die diese Probleme behebt!
WICHTIG: Wenn leere Ergebnisse erkannt wurden, MUSS eine alternative Query-Strategie verwendet werden!
Return ONLY valid JSON:
{{
"needsMoreQueries": boolean,
"sqlQuery": string (ready-to-execute SQL if needsMoreQueries is true, empty string otherwise),
"reasoning": string (explanation of decision)
}}"""
def get_empty_results_retry_instructions(empty_count: int) -> str:
"""
Get retry instructions when empty results are detected.
Args:
empty_count: Number of queries that returned empty results
Returns:
Formatted instructions string
"""
if empty_count == 0:
return ""
return f"""
KRITISCH - LEERE ERGEBNISSE ERKANNT
Es wurden {empty_count} Query(s) ausgeführt, die 0 Zeilen zurückgegeben haben. Dies bedeutet, dass die bisherige Query-Strategie nicht erfolgreich war.
DU MUSST JETZT EINE ALTERNATIVE QUERY-STRATEGIE VERSUCHEN!
Verfügbare Tabellen im System:
1. Artikel - Enthält alle Produktinformationen (I_ID, Artikelbezeichnung, Artikelnummer, etc.)
2. Einkaufspreis - Enthält Preisdaten (m_Artikel, EP_CHF)
3. Lagerplatz_Artikel - Enthält Lagerbestands- und Lagerplatzinformationen (R_ARTIKEL, R_LAGERPLATZ, Bestände, etc.)
4. Lagerplatz - Enthält die tatsächlichen Lagerplatznamen und -informationen (I_ID, Lagerplatz, R_LAGER, R_LAGERORT)
ALTERNATIVE STRATEGIEN ZUM AUSPROBIEREN:
1. **Direkte Lagerplatz-Suche**: Prüfe zuerst, ob der Lagerplatzname in der Lagerplatz-Tabelle existiert:
SELECT * FROM Lagerplatz WHERE "Lagerplatz" LIKE '%[Suchbegriff]%'
2. **Verschiedene Schreibweisen**: Versuche verschiedene Schreibweisen (Groß-/Kleinschreibung, Teilstrings):
- UPPER/LOWER Funktionen verwenden
- Verschiedene LIKE-Patterns: '%term%', 'term%', '%term'
3. **JOIN-Strategie überprüfen**: Stelle sicher, dass R_LAGERPLATZ korrekt mit Lagerplatz.I_ID gejoint wird:
- R_LAGERPLATZ in Lagerplatz_Artikel enthält die ID (nicht den Namen!)
- Verwende: LEFT JOIN Lagerplatz lp ON l."R_LAGERPLATZ" = lp."I_ID"
4. **Breitere Suche**: Versuche eine breitere Suche ohne exakte Filter:
- Entferne zu spezifische WHERE-Bedingungen
- Verwende OR-Bedingungen für verschiedene Suchvarianten
5. **Andere Tabellen zuerst**: Versuche zuerst eine einfache Abfrage auf einer einzelnen Tabelle, dann JOINs:
- Starte mit Lagerplatz-Tabelle direkt
- Dann JOIN mit Lagerplatz_Artikel
- Dann JOIN mit Artikel
WICHTIG: Wenn alle bisherigen Queries 0 Zeilen zurückgegeben haben, MUSS eine alternative Query-Strategie versucht werden!
Erstelle eine neue Query, die eine der oben genannten Strategien verwendet. Versuche verschiedene Ansätze, bis Ergebnisse gefunden werden.
"""
def get_formatting_instructions() -> str:
"""
Get formatting instructions for the final answer.
Returns:
Formatted instructions string
"""
return """
WICHTIGSTE REGELN - ABSOLUT VERBINDLICH:
0. VERBOTEN IN DER ANTWORT - ABSOLUT NICHT ZEIGEN:
KEINE Planungsschritte ("Ich werde jetzt...", "Suche in der Datenbank...", etc.)
KEINE SQL-Queries (SELECT-Statements)
KEINE Zwischenschritte ("Führe SQL-Abfrage aus...", "Analysiere Ergebnisse...", etc.)
KEINE Erklärungen über den Prozess oder die Methode
KEINE "Ich werde..."- oder "Ich suche..."-Sätze
NUR die finale Antwort mit den Daten!
1. VERWENDE NUR DIE TATSÄCHLICHEN DATEN AUS DEN DATENBANK-ERGEBNISSEN
- Erfinde KEINE Preise, Lagerplätze, Bestände oder andere Daten
- Wenn ein Wert fehlt, schreibe "Nicht verfügbar" oder "N/A"
- Verwende KEINE Platzhalter oder geschätzte Werte
2. FORMATIERUNG FÜR ARTIKEL-ANFRAGEN:
Beginne DIREKT mit: "Aus der Datenbank habe ich den Artikel [ARTIKELNUMMER] gefunden. Es handelt sich um [ARTIKELBEZEICHNUNG] von [LIEFERANT]."
- Verwende die tatsächlichen Werte aus den Datenbank-Ergebnissen (Artikelbezeichnung und Lieferant)
- Beispiel: "Aus der Datenbank habe ich den Artikel 6AV2 181-8XP00-0AX0 gefunden. Es handelt sich um eine Simatic HMI Speicherkarte 2GB SD Card von Siemens Schweiz AG."
- Falls Artikelbezeichnung oder Lieferant fehlen, verwende "Nicht verfügbar"
Dann zeige:
Artikelinformationen
- Artikelkürzel: [Wert aus Datenbank oder "Nicht verfügbar"]
- Artikelnummer: [Wert aus Datenbank oder "Nicht verfügbar"]
- Bezeichnung: [Wert aus Datenbank oder "Nicht verfügbar"]
- Lieferant: [Wert aus Datenbank oder "Nicht verfügbar"]
- Einkaufspreis: [Wert aus Datenbank oder "Nicht verfügbar"]
Lagerbestände nach Lagerplätzen
[Tabelle mit ALLEN Lagerplätzen aus den Daten]
Lagerplatz | Ist-Bestand | Soll-Bestand | Min-Bestand | Max-Bestand | Reservierter Bestand | Verfügbarer Bestand
Gesamtbestand: [Summe aller Ist-Bestände] Stück (alle am Lagerplatz "[Lagerplatzname]")
Möchten Sie:
- Mehr technische Details zu diesem Artikel erfahren?
- Nach ähnlichen Artikeln suchen?
- Informationen zu anderen Artikeln im Lager anzeigen?
- Den aktuellen Preis oder Lieferzeiten prüfen?
3. STELLE SICHER, DASS ALLE LAGERPLÄTZE ANGEZEIGT WERDEN
- Wenn mehrere Lagerplätze vorhanden sind, zeige ALLE in der Tabelle
- Gruppiere nicht - zeige jeden Lagerplatz als separate Zeile
4. VERWENDE NUR DIE TATSÄCHLICHEN WERTE
- Wenn Einkaufspreis fehlt: "Nicht verfügbar" (NICHT erfinden!)
- Wenn Lagerplatz fehlt: "Nicht verfügbar" (NICHT erfinden!)
- Wenn Bestand fehlt: "Nicht verfügbar" (NICHT erfinden!)
"""
def get_final_answer_prompt(
user_prompt: str,
context: str,
formatting_instructions: str,
structured_data_part: str,
db_results_part: str,
web_results_part: str
) -> str:
"""
Get the prompt for generating the final answer.
Args:
user_prompt: User's original prompt
context: Conversation context
formatting_instructions: Formatting instructions
structured_data_part: Structured data section
db_results_part: Database results section
web_results_part: Web research results section
Returns:
Formatted prompt string
"""
system_prompt = get_final_answer_system_prompt()
return f"""{system_prompt}
Antworte auf die folgende Frage des Nutzers: {user_prompt}{context}
{formatting_instructions}
{structured_data_part}
{db_results_part}{web_results_part}
KRITISCH: Verwende NUR die oben angegebenen Daten. Erfinde KEINE Werte. Wenn Daten fehlen, schreibe "Nicht verfügbar".
ABSOLUT KRITISCH - WEB-RECHERCHE QUELLENANGABE
Wenn WEB-RECHERCHE-ERGEBNISSE oben vorhanden sind, MUSS du:
- IMMER explizit erwähnen, dass die Informationen aus einer Web-Recherche stammen
- IMMER alle Quellen DIREKT NACH der jeweiligen Information angeben (INLINE, nicht am Ende!)
- Format: [Information] ([Quelle: Website-Name](URL))
- IMMER AUSFÜHRLICHE Informationen präsentieren (nicht nur kurze Zusammenfassungen!)
- IMMER alle verfügbaren Datenblatt-Links explizit erwähnen und angeben
- Format für Datenblätter: "Datenblätter verfügbar: [Link 1](URL1), [Link 2](URL2)"
- Die Web-Recherche-Informationen klar von Datenbank-Informationen trennen
- VERBOTEN: Web-Recherche-Informationen ohne explizite Kennzeichnung zu präsentieren
- VERBOTEN: Web-Recherche-Informationen ohne Quellenangabe zu präsentieren
- VERBOTEN: Quellen nur am Ende als Liste zu präsentieren
- VERBOTEN: Datenblatt-Links zu verschweigen oder nicht explizit zu erwähnen
- VERBOTEN: Nur oberflächliche Informationen zu geben
ABSOLUT VERBOTEN - KEINE DATEN ERFINDEN
Wenn KEINE Datenbank-Ergebnisse vorhanden sind (keine DATENBANK-ERGEBNISSE oder STRUKTURIERTE DATEN oben), dann:
- ERFINDE KEINE Artikelnummern, Artikelbezeichnungen, Preise oder Lagerbestände!
- ERFINDE KEINE Beispielartikel wie "123456", "789012", "Beispielartikel 1", "Lieferant A", etc.!
- ERFINDE KEINE Daten, auch nicht als "Beispiel"!
- Wenn DATENBANK-FEHLER vorhanden sind, bedeutet das: KEINE DATEN VERFÜGBAR - ERFINDE NICHTS!
- Schreibe stattdessen: "Es wurden keine Artikel in der Datenbank gefunden." oder "Die Datenbankabfrage ist fehlgeschlagen."
- Wenn Fehler vorhanden sind: "Die Datenbankabfrage konnte nicht ausgeführt werden. Bitte versuchen Sie es später erneut oder kontaktieren Sie den Administrator."
WICHTIG: Deine Antwort soll NUR die finale Antwort enthalten - KEINE Planungsschritte, KEINE SQL-Queries, KEINE Zwischenschritte!
Beginne DIREKT mit "Aus der Datenbank habe ich..." (wenn Daten vorhanden) oder "Es wurden keine Artikel gefunden" (wenn keine Daten vorhanden).
Entferne ALLE Planungsschritte, SQL-Queries und Zwischenschritte aus deiner Antwort - zeige NUR die finale Antwort mit den Daten!"""
async def generate_conversation_name(
services,
userPrompt: str,
userLanguage: str = "en"
prompt: str,
user_language: Optional[str] = None
) -> str:
"""
Generate a short, descriptive conversation name based on user's prompt.
Generate a conversation name from the user's prompt using AI.
Creates a concise, informative summary name in German based on the user input.
Args:
services: Services instance with AI access
userPrompt: The user's input prompt
userLanguage: User's preferred language (for prompt localization)
services: Services object with AI service
prompt: User's input prompt (always in German)
user_language: User's language preference (not used, always German)
Returns:
Short conversation name (max 60 characters)
A short, informative conversation name in German
"""
if not prompt or not prompt.strip():
return "Neue Unterhaltung"
try:
truncated_prompt = userPrompt[:200] if len(userPrompt) > 200 else userPrompt
name_prompt = f"""Create a professional conversation title in THE SAME LANGUAGE as the user's question.
Question: "{truncated_prompt}"
Rules:
- Title MUST be in the same language as the question (GermanGerman, FrenchFrench, EnglishEnglish)
- Max 60 characters, no punctuation (?, !, .)
- Professional and concise
- Respond ONLY with the title, nothing else"""
# Check if AI service is available
if not hasattr(services, 'ai') or services.ai is None:
logger.warning("AI service not available, generating name from prompt")
return _generate_name_from_prompt(prompt)
# Ensure AI service is initialized before use
await services.ai.ensureAiObjectsInitialized()
nameRequest = AiCallRequest(
prompt=name_prompt,
# Create AI prompt - very explicit that answer must be in German
ai_prompt = f"""Du bist ein deutscher Assistent. Der Benutzer hat folgende Anfrage auf Deutsch gestellt:
"{prompt.strip()}"
Erstelle einen kurzen, zusammenfassenden Titel für diese Unterhaltung. Der Titel muss:
- Auf Deutsch sein (KEIN Englisch!)
- Maximal 50 Zeichen lang sein
- Das Hauptthema zusammenfassen
- Informativ sein
Beispiele für gute deutsche Titel:
- "LED-Artikel Suche"
- "Lagerbestandsabfrage"
- "Produktinformationen"
- "Artikel-Suche"
Antworte NUR mit dem deutschen Titel, ohne Anführungszeichen oder Erklärungen."""
# Create AI request
request = AiCallRequest(
prompt=ai_prompt,
context="",
options=AiCallOptions(
resultFormat="txt",
operationType=OperationTypeEnum.DATA_GENERATE,
processingMode=ProcessingModeEnum.DETAILED,
temperature=0.7
priority=PriorityEnum.SPEED,
processingMode=ProcessingModeEnum.BASIC,
compressPrompt=False,
compressContext=False,
temperature=0.3 # Lower temperature for more consistent German output
)
)
nameResponse = await services.ai.callAi(nameRequest)
generated_name = nameResponse.content.strip()
# Call AI service
logger.info(f"Calling AI to generate conversation name for prompt: {prompt[:50]}...")
response = await services.ai.callAi(request)
# Extract first line and clean up
generated_name = generated_name.split('\n')[0].strip()
generated_name = re.sub(r'^(Title|Titel|Titre|Name|Name:):\s*', '', generated_name, flags=re.IGNORECASE)
generated_name = re.sub(r'^["\']|["\']$', '', generated_name)
generated_name = re.sub(r'[?!.]+$', '', generated_name) # Remove trailing punctuation
if not response or not hasattr(response, 'content') or not response.content:
logger.warning("AI response invalid, generating name from prompt")
return _generate_name_from_prompt(prompt)
# Apply title case
if generated_name:
words = generated_name.split()
capitalized_words = []
for word in words:
if word.isupper() and len(word) > 1:
capitalized_words.append(word) # Keep acronyms
else:
capitalized_words.append(word.capitalize())
generated_name = " ".join(capitalized_words).strip()
logger.info(f"AI response received: {response.content[:100]}...")
# Validate and truncate if needed
if not generated_name or len(generated_name) < 3:
if userLanguage == "de":
generated_name = "Chatbot Konversation"
elif userLanguage == "fr":
generated_name = "Conversation Chatbot"
else:
generated_name = "Chatbot Conversation"
# Clean up the AI response
name = str(response.content).strip()
name = name.strip('"\'')
if len(generated_name) > 60:
truncated = generated_name[:57]
last_space = truncated.rfind(' ')
generated_name = truncated[:last_space] + "..." if last_space > 30 else truncated + "..."
# Remove markdown code blocks if present
if name.startswith('```'):
lines = name.split('\n')
if len(lines) > 1:
name = '\n'.join(lines[1:-1]) if lines[-1].strip() == '```' else '\n'.join(lines[1:])
logger.info(f"Generated conversation name: '{generated_name}'")
return generated_name
# Remove newlines and extra spaces
name = " ".join(name.split())
# Check if name contains English words - if so, generate from prompt instead
name_lower = name.lower()
english_words = ["search", "find", "show", "display", "query", "article", "product", "item", "led articles", "product search"]
if any(word in name_lower for word in english_words):
logger.warning(f"AI generated English name '{name}', generating from prompt instead")
return _generate_name_from_prompt(prompt)
# Limit to 50 characters
if len(name) > 50:
name = name[:47] + "..."
# If we got a valid name, return it
if name and len(name) >= 3:
logger.info(f"Successfully generated conversation name via AI: '{name}'")
return name
else:
logger.warning(f"Generated name is too short: '{name}', generating from prompt")
return _generate_name_from_prompt(prompt)
except Exception as e:
logger.error(f"Error generating conversation name: {e}", exc_info=True)
if userLanguage == "de":
return "Chatbot Konversation"
elif userLanguage == "fr":
return "Conversation Chatbot"
else:
return "Chatbot Conversation"
logger.error(f"Error generating conversation name with AI: {e}", exc_info=True)
return _generate_name_from_prompt(prompt)
def _generate_name_from_prompt(prompt: str) -> str:
"""
Generate a conversation name directly from the German prompt.
Creates a concise title by extracting key words and formatting them.
Args:
prompt: User's input prompt in German
Returns:
A short conversation name in German
"""
if not prompt or not prompt.strip():
return "Neue Unterhaltung"
# Clean up the prompt
name = prompt.strip()
# Remove newlines and extra spaces
name = " ".join(name.split())
# Remove common question words and phrases
question_words = ["wie", "was", "wo", "wann", "wer", "welche", "welcher", "welches"]
words = name.split()
filtered_words = [w for w in words if w.lower() not in question_words]
if filtered_words:
name = " ".join(filtered_words)
# Capitalize first letter
if name:
name = name[0].upper() + name[1:] if len(name) > 1 else name.upper()
# Limit to 50 characters
if len(name) > 50:
# Try to cut at word boundary
truncated = name[:47]
last_space = truncated.rfind(' ')
if last_space > 20: # Only cut at word boundary if reasonable
name = truncated[:last_space] + "..."
else:
name = truncated + "..."
# If name is empty or too short, use default
if not name or len(name) < 3:
return "Neue Unterhaltung"
return name

View file

@ -0,0 +1,130 @@
# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Configuration system for chatbot instances.
Loads JSON configuration files from configs/ directory.
"""
import logging
import json
from pathlib import Path
from dataclasses import dataclass
from typing import Optional, Dict, Any
logger = logging.getLogger(__name__)
# Cache for loaded configs
_config_cache: Dict[str, 'ChatbotConfig'] = {}
@dataclass
class DatabaseConfig:
"""Database configuration for a chatbot instance."""
schema: Dict[str, Any]
connector: str = "preprocessor"
@dataclass
class ToolConfig:
"""Tool configuration for a chatbot instance."""
sql: Dict[str, Any]
tavily: Optional[Dict[str, Any]] = None
streaming: Dict[str, Any] = None
@dataclass
class ModelConfig:
"""Model configuration for a chatbot instance."""
operationType: str = "DATA_ANALYSE"
processingMode: str = "DETAILED"
@dataclass
class ChatbotConfig:
"""Configuration for a chatbot instance."""
id: str
name: str
systemPrompt: str
database: DatabaseConfig
tools: ToolConfig
model: ModelConfig
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'ChatbotConfig':
"""Create ChatbotConfig from dictionary."""
return cls(
id=data.get("id", "default"),
name=data.get("name", "Default Chatbot"),
systemPrompt=data.get("systemPrompt", "You are a helpful assistant."),
database=DatabaseConfig(
schema=data.get("database", {}).get("schema", {}),
connector=data.get("database", {}).get("connector", "preprocessor")
),
tools=ToolConfig(
sql=data.get("tools", {}).get("sql", {"enabled": True}),
tavily=data.get("tools", {}).get("tavily"),
streaming=data.get("tools", {}).get("streaming", {"enabled": True})
),
model=ModelConfig(
operationType=data.get("model", {}).get("operationType", "DATA_ANALYSE"),
processingMode=data.get("model", {}).get("processingMode", "DETAILED")
)
)
def load_chatbot_config(config_id: str) -> ChatbotConfig:
"""
Load chatbot configuration from JSON file.
Args:
config_id: Configuration ID (e.g., "althaus", "default")
Returns:
ChatbotConfig instance
Raises:
FileNotFoundError: If config file not found
ValueError: If config file is invalid
"""
# Check cache first
if config_id in _config_cache:
logger.debug(f"Returning cached config for {config_id}")
return _config_cache[config_id]
# Get path to configs directory
current_dir = Path(__file__).parent
configs_dir = current_dir / "configs"
config_file = configs_dir / f"{config_id}.json"
if not config_file.exists():
# Try default config if requested config not found
if config_id != "default":
logger.warning(f"Config {config_id} not found, trying default")
return load_chatbot_config("default")
raise FileNotFoundError(f"Chatbot config file not found: {config_file}")
try:
with open(config_file, 'r', encoding='utf-8') as f:
data = json.load(f)
config = ChatbotConfig.from_dict(data)
# Cache the config
_config_cache[config_id] = config
logger.info(f"Loaded chatbot config: {config_id} ({config.name})")
return config
except json.JSONDecodeError as e:
logger.error(f"Error parsing chatbot config JSON {config_file}: {e}")
raise ValueError(f"Invalid JSON in config file {config_file}: {e}")
except Exception as e:
logger.error(f"Error loading chatbot config {config_file}: {e}")
raise
def clear_config_cache():
"""Clear the configuration cache."""
global _config_cache
_config_cache.clear()
logger.debug("Cleared chatbot config cache")

File diff suppressed because one or more lines are too long

View file

@ -0,0 +1,31 @@
{
"id": "default",
"name": "Default Chatbot",
"systemPrompt": "You are a helpful assistant. You have access to SQL query tools and web search tools. Use them to help answer user questions.",
"database": {
"schema": {
"database": {
"path": "/data/database.db",
"type": "SQLite"
},
"tables": {},
"relationships": []
},
"connector": "preprocessor"
},
"tools": {
"sql": {
"enabled": true
},
"tavily": {
"enabled": false
},
"streaming": {
"enabled": true
}
},
"model": {
"operationType": "DATA_ANALYSE",
"processingMode": "DETAILED"
}
}

View file

@ -1,225 +0,0 @@
# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Generic streaming event manager for real-time updates.
Manages event queues for SSE streaming across all features (chatbot, workflows, documents, etc.).
Supports event-driven streaming instead of polling.
"""
import logging
import asyncio
from typing import Dict, Optional, Any, List, AsyncIterator, Set
from datetime import datetime
logger = logging.getLogger(__name__)
class StreamingEventManager:
"""
Generic event manager for real-time streaming across all features.
Supports multiple event types and contexts (workflows, documents, tasks, etc.).
Thread-safe event emission and queue management.
"""
def __init__(self):
"""Initialize the event manager."""
self._queues: Dict[str, asyncio.Queue] = {}
self._locks: Dict[str, asyncio.Lock] = {}
self._cleanup_tasks: Dict[str, asyncio.Task] = {}
self._subscribers: Dict[str, Set[str]] = {} # context_id -> set of queue_ids (for future multi-subscriber support)
def create_queue(self, context_id: str) -> asyncio.Queue:
"""
Create a new event queue for a context.
Args:
context_id: Context ID (workflow_id, document_id, task_id, etc.)
Returns:
Event queue for the context
"""
if context_id not in self._queues:
self._queues[context_id] = asyncio.Queue()
self._locks[context_id] = asyncio.Lock()
self._subscribers[context_id] = set()
logger.debug(f"Created event queue for context {context_id}")
return self._queues[context_id]
def get_queue(self, context_id: str) -> Optional[asyncio.Queue]:
"""
Get existing event queue for a context.
Args:
context_id: Context ID
Returns:
Event queue if exists, None otherwise
"""
return self._queues.get(context_id)
async def emit_event(
self,
context_id: str,
event_type: str,
data: Dict[str, Any],
event_category: str = "default",
message: Optional[str] = None,
step: Optional[str] = None
):
"""
Emit an event to the context's event queue.
Args:
context_id: Context ID (workflow_id, document_id, etc.)
event_type: Type of event ("message", "log", "status", "progress", "complete", "error", "chatdata")
data: Event data dictionary (will be included in event)
event_category: Category of event for filtering ("chat", "workflow", "document", etc.)
message: Optional event message (for backward compatibility)
step: Optional processing step (for backward compatibility)
"""
queue = self.get_queue(context_id)
if not queue:
logger.debug(f"No event queue found for context {context_id}, skipping event")
return
event = {
"type": event_type,
"category": event_category,
"timestamp": datetime.now().timestamp(),
"data": data,
"message": message, # For backward compatibility
"step": step # For backward compatibility
}
try:
await queue.put(event)
logger.debug(f"Emitted {event_type} event (category: {event_category}) for context {context_id}")
except Exception as e:
logger.error(f"Error emitting event for context {context_id}: {e}")
async def stream_events(
self,
context_id: str,
event_categories: Optional[List[str]] = None,
timeout: Optional[float] = None
) -> AsyncIterator[Dict[str, Any]]:
"""
Async generator for streaming events from a context.
Args:
context_id: Context ID to stream events from
event_categories: Optional list of event categories to filter by
timeout: Optional timeout in seconds (None = no timeout)
Yields:
Event dictionaries
"""
queue = self.get_queue(context_id)
if not queue:
logger.warning(f"No queue found for context {context_id}")
return
start_time = asyncio.get_event_loop().time() if timeout else None
while True:
# Check timeout
if timeout and start_time:
elapsed = asyncio.get_event_loop().time() - start_time
if elapsed > timeout:
logger.debug(f"Stream timeout for context {context_id}")
break
try:
# Wait for event with timeout
wait_timeout = 1.0 # Check timeout every second
if timeout and start_time:
remaining = timeout - (asyncio.get_event_loop().time() - start_time)
if remaining <= 0:
break
wait_timeout = min(wait_timeout, remaining)
event = await asyncio.wait_for(queue.get(), timeout=wait_timeout)
# Filter by category if specified
if event_categories and event.get("category") not in event_categories:
continue
yield event
except asyncio.TimeoutError:
# Check if we should continue or timeout
if timeout and start_time:
elapsed = asyncio.get_event_loop().time() - start_time
if elapsed >= timeout:
break
continue
except Exception as e:
logger.error(f"Error in stream_events for context {context_id}: {e}")
break
async def cleanup(self, context_id: str, delay: float = 60.0):
"""
Schedule cleanup of event queue after delay.
This allows time for any remaining events to be consumed.
Args:
context_id: Context ID
delay: Delay in seconds before cleanup (default: 60 seconds)
"""
if context_id in self._cleanup_tasks:
# Cancel existing cleanup task
self._cleanup_tasks[context_id].cancel()
async def _cleanup():
try:
await asyncio.sleep(delay)
if context_id in self._queues:
# Drain remaining events
queue = self._queues[context_id]
while not queue.empty():
try:
queue.get_nowait()
except asyncio.QueueEmpty:
break
del self._queues[context_id]
if context_id in self._locks:
del self._locks[context_id]
if context_id in self._subscribers:
del self._subscribers[context_id]
logger.info(f"Cleaned up event queue for context {context_id}")
except asyncio.CancelledError:
pass
except Exception as e:
logger.error(f"Error during cleanup for context {context_id}: {e}")
finally:
if context_id in self._cleanup_tasks:
del self._cleanup_tasks[context_id]
self._cleanup_tasks[context_id] = asyncio.create_task(_cleanup())
def has_queue(self, context_id: str) -> bool:
"""
Check if a queue exists for a context.
Args:
context_id: Context ID
Returns:
True if queue exists, False otherwise
"""
return context_id in self._queues
# Backward compatibility: ChatbotEventManager is an alias
ChatbotEventManager = StreamingEventManager
# Global singleton instance
_event_manager = StreamingEventManager()
def get_event_manager() -> StreamingEventManager:
"""Get the global event manager instance."""
return _event_manager

View file

@ -1,999 +0,0 @@
# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Simple chatbot feature - basic implementation.
User input is processed by AI to create list of needed queries.
Those queries get streamed back.
"""
import logging
import json
import uuid
import asyncio
import re
from typing import Optional, Dict, Any, List
from modules.datamodels.datamodelChat import ChatWorkflow, UserInputRequest, WorkflowModeEnum, ChatLog
from modules.datamodels.datamodelUam import User
from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationTypeEnum, ProcessingModeEnum
from modules.shared.timeUtils import getUtcTimestamp, parseTimestamp
from modules.services import getInterface as getServices
from modules.features.chatbot.eventManager import get_event_manager
from modules.workflows.methods.methodAi.methodAi import MethodAi
from modules.connectors.connectorPreprocessor import PreprocessorConnector
from modules.features.chatbot.chatbotConstants import (
get_initial_analysis_prompt,
generate_conversation_name,
get_final_answer_system_prompt
)
logger = logging.getLogger(__name__)
def _extractJsonFromResponse(content: str) -> Optional[dict]:
"""Extract JSON from AI response, handling markdown code blocks."""
# Try direct JSON parse first
try:
return json.loads(content.strip())
except json.JSONDecodeError:
pass
# Try to extract JSON from markdown code blocks
json_match = re.search(r'```(?:json)?\s*(\{.*?\})\s*```', content, re.DOTALL)
if json_match:
try:
return json.loads(json_match.group(1))
except json.JSONDecodeError:
pass
# Try to find JSON object in the text
json_match = re.search(r'\{.*\}', content, re.DOTALL)
if json_match:
try:
return json.loads(json_match.group(0))
except json.JSONDecodeError:
pass
return None
async def chatProcess(
currentUser: User,
userInput: UserInputRequest,
workflowId: Optional[str] = None
) -> ChatWorkflow:
"""
Simple chatbot processing - analyze user input and generate queries.
Flow:
1. Create or load workflow
2. Store user message
3. AI analyzes user input to create list of needed queries
4. Stream queries back
Args:
currentUser: Current user
userInput: User input request
workflowId: Optional workflow ID to continue existing conversation
Returns:
ChatWorkflow instance
"""
try:
# Get services
services = getServices(currentUser, None)
interfaceDbChat = services.interfaceDbChat
# Get event manager and create queue if needed
event_manager = get_event_manager()
# Create or load workflow
if workflowId:
workflow = interfaceDbChat.getWorkflow(workflowId)
if not workflow:
raise ValueError(f"Workflow {workflowId} not found")
# Resume workflow: increment round number
new_round = workflow.currentRound + 1
interfaceDbChat.updateWorkflow(workflowId, {
"status": "running",
"currentRound": new_round,
"lastActivity": getUtcTimestamp()
})
workflow = interfaceDbChat.getWorkflow(workflowId)
logger.info(f"Resumed workflow {workflowId}, round incremented to {new_round}")
# Create event queue if it doesn't exist (for streaming)
if not event_manager.has_queue(workflowId):
event_manager.create_queue(workflowId)
else:
# Generate conversation name based on user's prompt
conversation_name = await generate_conversation_name(
services,
userInput.prompt,
userInput.userLanguage
)
# Create new workflow
workflowData = {
"id": str(uuid.uuid4()),
"mandateId": currentUser.mandateId,
"status": "running",
"name": conversation_name,
"currentRound": 1,
"currentTask": 0,
"currentAction": 0,
"totalTasks": 0,
"totalActions": 0,
"workflowMode": WorkflowModeEnum.WORKFLOW_CHATBOT.value,
"startedAt": getUtcTimestamp(),
"lastActivity": getUtcTimestamp()
}
workflow = interfaceDbChat.createWorkflow(workflowData)
logger.info(f"Created new chatbot workflow: {workflow.id} with name: {conversation_name}")
# Create event queue for new workflow (for streaming)
event_manager.create_queue(workflow.id)
# Reload workflow to get current message count
workflow = interfaceDbChat.getWorkflow(workflow.id)
# Store user message
userMessageData = {
"id": f"msg_{uuid.uuid4()}",
"workflowId": workflow.id,
"message": userInput.prompt,
"role": "user",
"status": "first" if workflowId is None else "step",
"sequenceNr": len(workflow.messages) + 1,
"publishedAt": getUtcTimestamp(),
"roundNumber": workflow.currentRound,
"taskNumber": 0,
"actionNumber": 0
}
userMessage = interfaceDbChat.createMessage(userMessageData)
logger.info(f"Stored user message: {userMessage.id}")
# Emit message event for streaming (exact chatData format)
message_timestamp = parseTimestamp(userMessage.publishedAt, default=getUtcTimestamp())
await event_manager.emit_event(
context_id=workflow.id,
event_type="chatdata",
data={
"type": "message",
"createdAt": message_timestamp,
"item": userMessage.dict()
},
event_category="chat"
)
# Update workflow status
interfaceDbChat.updateWorkflow(workflow.id, {
"status": "running",
"lastActivity": getUtcTimestamp()
})
# Process in background (async)
asyncio.create_task(_processChatbotMessage(
services,
workflow.id,
userInput,
userMessage.id
))
# Reload workflow to include new message
workflow = interfaceDbChat.getWorkflow(workflow.id)
return workflow
except Exception as e:
logger.error(f"Error in chatProcess: {str(e)}", exc_info=True)
raise
async def _execute_queries_parallel(queries: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
Execute multiple SQL queries in parallel.
Args:
queries: List of query dictionaries, each containing:
- "query": SQL query string
- "purpose": Description of what the query retrieves
- "table": Primary table name
Returns:
Dictionary mapping query indices to results:
- "query_1", "query_2", etc.: Success result text
- "query_1_data", "query_2_data", etc.: Raw data arrays
- "query_1_error", "query_2_error", etc.: Error messages if query failed
"""
async def execute_single_query(idx: int, query_info: Dict[str, Any]):
"""Execute a single query and return result."""
connector = PreprocessorConnector()
try:
query_text = query_info.get("query", "")
result = await connector.executeQuery(query_text, return_json=True)
await connector.close()
return idx, result, None
except Exception as e:
await connector.close()
return idx, None, str(e)
# Execute all queries in parallel
tasks = [execute_single_query(i, q) for i, q in enumerate(queries)]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Process results into dictionary
query_results = {}
for result in results:
if isinstance(result, Exception):
# Handle exceptions from gather
logger.error(f"Exception in parallel query execution: {result}")
continue
idx, result_data, error = result
if error:
query_results[f"query_{idx+1}_error"] = error
logger.error(f"Query {idx+1} failed: {error}")
else:
if result_data and not result_data.get("text", "").startswith(("Error:", "Query failed:")):
query_results[f"query_{idx+1}"] = result_data.get("text", "")
query_results[f"query_{idx+1}_data"] = result_data.get("data", [])
row_count = len(result_data.get('data', []))
logger.info(f"Query {idx+1} executed successfully, returned {row_count} rows")
else:
error_text = result_data.get("text", "Query failed") if result_data else "Query failed: No response"
query_results[f"query_{idx+1}_error"] = error_text
logger.error(f"Query {idx+1} failed: {error_text}")
return query_results
async def _emit_log_and_event(
interfaceDbChat,
workflowId: str,
event_manager,
message: str,
log_type: str = "info",
status: str = "running",
round_number: Optional[int] = None
) -> None:
"""
Store log in database. The route's periodic chat data fetch will handle emitting it.
This avoids duplicate log emissions.
Args:
interfaceDbChat: Database interface
workflowId: Workflow ID
event_manager: Event manager (unused, kept for compatibility)
message: Log message
log_type: Log type (info, warning, error)
status: Status string
round_number: Optional round number (will be fetched from workflow if not provided)
"""
try:
# Get round number from workflow if not provided
if round_number is None:
workflow = interfaceDbChat.getWorkflow(workflowId)
if workflow:
round_number = workflow.currentRound
log_timestamp = getUtcTimestamp()
log_data = {
"id": f"log_{uuid.uuid4()}",
"workflowId": workflowId,
"message": message,
"type": log_type,
"timestamp": log_timestamp,
"status": status,
"roundNumber": round_number
}
# Only store in database - route's periodic fetch will emit it
interfaceDbChat.createLog(log_data)
except Exception as e:
logger.error(f"Error storing log: {e}")
async def _check_workflow_stopped(interfaceDbChat, workflowId: str) -> bool:
"""
Check if workflow was stopped.
Args:
interfaceDbChat: Database interface
workflowId: Workflow ID
Returns:
True if workflow is stopped, False otherwise
"""
try:
workflow = interfaceDbChat.getWorkflow(workflowId)
return workflow and workflow.status == "stopped"
except Exception as e:
logger.warning(f"Error checking workflow status: {e}")
return False
def _buildWebResearchQuery(userPrompt: str, workflowMessages: List, queryResults: Optional[Dict[str, Any]] = None) -> str:
"""
Build enriched web research query by extracting product context from conversation history and current prompt.
Extracts product information from:
1. Current user prompt (article numbers, product mentions)
2. Database query results (if available)
3. Previous assistant messages (conversation history)
Args:
userPrompt: Current user prompt
workflowMessages: List of workflow messages (conversation history)
queryResults: Optional database query results to extract product info from
Returns:
Enriched search query string
"""
# Normalize user prompt for detection
prompt_lower = userPrompt.lower().strip()
# Patterns that indicate a search request
search_patterns = [
"ja", "yes", "oui", "si",
"such", "suche", "search", "recherche", "recherchier",
"internet", "web", "online",
"datenblatt", "datasheet", "fiche technique",
"mehr informationen", "more information", "plus d'information",
"weitere informationen", "further information", "additional information"
]
# Check if current prompt contains search-related keywords
has_search_intent = any(pattern in prompt_lower for pattern in search_patterns)
# Extract product information - try multiple sources
article_number = None
article_description = None
supplier = None
# Pattern for article numbers like "6AV2 181-8XP00-0AX0" or "6AV2181-8XP00-0AX0"
article_patterns = [
r'\b[A-Z0-9]{2,}\s+[0-9]{3,}-[A-Z0-9-]+\b', # With space: "6AV2 181-8XP00-0AX0"
r'\b[A-Z0-9]{4,}[\s-][A-Z0-9-]{6,}\b', # General pattern
r'\b[A-Z]{2,}[0-9]+\s+[0-9]+-[A-Z0-9-]+\b', # Specific Siemens pattern
]
# 1. First, try to extract from current user prompt
for pattern in article_patterns:
matches = re.findall(pattern, userPrompt)
if matches:
article_number = matches[0]
logger.info(f"Extracted article number from user prompt: {article_number}")
break
# 2. Try to extract from database query results if available
# Always check queryResults to enrich with product description and supplier, even if article_number was already found
if queryResults:
# Look for article numbers in query result text (if not already found)
if not article_number:
for key in queryResults.keys():
if key.startswith("query_") and not key.endswith("_error") and not key.endswith("_data"):
result_text = queryResults.get(key, "")
if isinstance(result_text, str):
for pattern in article_patterns:
matches = re.findall(pattern, result_text)
if matches:
article_number = matches[0]
logger.info(f"Extracted article number from query results: {article_number}")
break
if article_number:
break
# Always check data arrays for product description and supplier (even if article_number already found)
for key in queryResults.keys():
if key.startswith("query_") and not key.endswith("_error") and not key.endswith("_data"):
data_key = f"{key}_data"
if data_key in queryResults:
data_array = queryResults[data_key]
if isinstance(data_array, list) and len(data_array) > 0:
# Look for article number in first row (if not already found)
first_row = data_array[0]
if isinstance(first_row, dict):
# Check common article number fields (if not already found)
if not article_number:
for field in ["Artikelnummer", "Artikelkürzel", "article_number", "articleNumber"]:
if field in first_row and first_row[field]:
article_number = str(first_row[field])
logger.info(f"Extracted article number from query data: {article_number}")
break
# Always check article description (can enrich even if article_number already found)
if not article_description:
for field in ["Artikelbezeichnung", "Bezeichnung", "article_description", "description"]:
if field in first_row and first_row[field]:
article_description = str(first_row[field])
logger.info(f"Extracted article description from query data: {article_description}")
break
# Always check supplier (can enrich even if article_number already found)
if not supplier:
for field in ["Lieferant", "Supplier", "supplier"]:
if field in first_row and first_row[field]:
supplier = str(first_row[field])
logger.info(f"Extracted supplier from query data: {supplier}")
break
# If we found all needed info, we can stop
if article_number and article_description and supplier:
break
# 3. Extract from previous assistant messages (conversation history)
if not article_number or not article_description:
for msg in reversed(workflowMessages[-10:]):
if msg.role == "assistant":
message_text = msg.message
# Extract article number if not found yet
if not article_number:
for pattern in article_patterns:
matches = re.findall(pattern, message_text)
if matches:
article_number = matches[0]
break
# Extract article description if not found yet
if not article_description:
description_patterns = [
r'Es handelt sich um\s+([^\.]+)',
r'It is a\s+([^\.]+)',
r'C\'est\s+([^\.]+)',
r'Bezeichnung:\s*([^\n]+)',
r'Description:\s*([^\n]+)',
r'Artikelbezeichnung:\s*([^\n]+)',
r'Artikelbezeichnung:\s*([^\n]+)'
]
for pattern in description_patterns:
match = re.search(pattern, message_text, re.IGNORECASE)
if match:
article_description = match.group(1).strip()
break
# Extract supplier if not found yet
if not supplier:
supplier_patterns = [
r'von\s+([A-Z][A-Za-z\s]+(?:AG|GmbH|Ltd|Inc|Corp)?)',
r'from\s+([A-Z][A-Za-z\s]+(?:AG|GmbH|Ltd|Inc|Corp)?)',
r'Lieferant:\s*([^\n]+)',
r'Supplier:\s*([^\n]+)'
]
for pattern in supplier_patterns:
match = re.search(pattern, message_text, re.IGNORECASE)
if match:
supplier = match.group(1).strip()
break
# Stop if we found everything
if article_number and article_description and supplier:
break
# Build enriched search query
query_parts = []
# If we have search intent but no product info, try to use the user prompt intelligently
if has_search_intent and not article_number and not article_description:
# Try to extract meaningful parts from the prompt
# Remove common search phrases and keep the product-related parts
cleaned_prompt = userPrompt
for phrase in ["recherchier nach", "recherche", "suche nach", "search for", "find", "informationen zu", "information about", "weitere informationen", "further information"]:
cleaned_prompt = re.sub(phrase, "", cleaned_prompt, flags=re.IGNORECASE)
cleaned_prompt = cleaned_prompt.strip()
# If cleaned prompt still has content and is different, use it
if cleaned_prompt and cleaned_prompt != userPrompt and len(cleaned_prompt) > 10:
query_parts.append(cleaned_prompt)
# Add article description if found
if article_description:
query_parts.append(article_description)
# Add article number if found
if article_number:
query_parts.append(article_number)
# Add supplier if found
if supplier:
query_parts.append(supplier)
# Add "Datenblatt" or "datasheet" if user requested it or if we have product info
if "datenblatt" in prompt_lower or "datasheet" in prompt_lower or "fiche technique" in prompt_lower:
query_parts.append("Datenblatt")
elif query_parts:
# If we have product info but no explicit request for datasheet, add it anyway
query_parts.append("Datenblatt")
# If we found product information or built a meaningful query, use it
if query_parts:
enriched_query = " ".join(query_parts)
logger.info(f"Built enriched search query: '{enriched_query}' from context (original: '{userPrompt}')")
return enriched_query
else:
# Fall back to original prompt, but try to clean it up
logger.info(f"No product context found, using original prompt: '{userPrompt}'")
return userPrompt
async def _processChatbotMessage(
services,
workflowId: str,
userInput: UserInputRequest,
userMessageId: str
):
"""
Process chatbot message in background.
Analyzes user input and generates list of queries, then streams them back.
"""
event_manager = get_event_manager()
try:
interfaceDbChat = services.interfaceDbChat
# Reload workflow to get current messages
workflow = interfaceDbChat.getWorkflow(workflowId)
if not workflow:
logger.error(f"Workflow {workflowId} not found during processing")
await event_manager.emit_event(
context_id=workflowId,
event_type="error",
data={"error": f"Workflow {workflowId} nicht gefunden"},
event_category="workflow",
message=f"Workflow {workflowId} nicht gefunden",
step="error"
)
return
# Check if workflow was stopped before starting
if await _check_workflow_stopped(interfaceDbChat, workflowId):
logger.info(f"Workflow {workflowId} was stopped, aborting processing")
return
# Build conversation context from history
context = ""
if workflow.messages:
recent_messages = workflow.messages[-5:]
context = "\n\nPrevious conversation:\n"
for msg in recent_messages:
if msg.role == "user":
context += f"User: {msg.message}\n"
elif msg.role == "assistant":
context += f"Assistant: {msg.message}\n"
await services.ai.ensureAiObjectsInitialized()
# Step 1: Analyze user input to generate queries
logger.info("Analyzing user input to generate queries...")
await _emit_log_and_event(interfaceDbChat, workflowId, event_manager, "Analysiere Benutzeranfrage...")
analysisPrompt = get_initial_analysis_prompt(userInput.prompt, context)
# AI call for analysis
method_ai = MethodAi(services)
analysis_result = await method_ai.process({
"aiPrompt": analysisPrompt,
"documentList": None,
"resultType": "json",
"simpleMode": True
})
# Extract content from ActionResult
analysis_content = None
if analysis_result.success and analysis_result.documents:
analysis_content = analysis_result.documents[0].documentData
if isinstance(analysis_content, bytes):
analysis_content = analysis_content.decode('utf-8')
if not analysis_content:
logger.warning("Analysis failed, using fallback")
analysis = {}
else:
analysis = _extractJsonFromResponse(analysis_content)
# Extract analysis results
needsDatabaseQuery = analysis.get("needsDatabaseQuery", False) if analysis else False
needsWebResearch = analysis.get("needsWebResearch", False) if analysis else False
sql_queries = analysis.get("sqlQueries", [])
# Support legacy single query format for backward compatibility
if not sql_queries and analysis.get("sqlQuery"):
sql_queries = [{
"query": analysis.get("sqlQuery", ""),
"purpose": "Database query",
"table": "Unknown"
}]
reasoning = analysis.get("reasoning", "")
logger.info(f"Analysis: DB={needsDatabaseQuery}, Web={needsWebResearch}, SQL queries={len(sql_queries)}")
# Build initial enriched web research query if needed (for logging, will be rebuilt after DB queries)
enriched_web_query = None
if needsWebResearch:
enriched_web_query = _buildWebResearchQuery(userInput.prompt, workflow.messages)
# Build list of queries to stream back
queries = []
if needsDatabaseQuery and sql_queries:
for i, sql_query_info in enumerate(sql_queries, 1):
queries.append({
"type": "database",
"query": sql_query_info.get("query", ""),
"purpose": sql_query_info.get("purpose", f"Query {i}"),
"table": sql_query_info.get("table", "Unknown"),
"reasoning": reasoning
})
if needsWebResearch:
queries.append({
"type": "web",
"query": enriched_web_query or userInput.prompt,
"reasoning": reasoning
})
# Format queries as log text
log_lines = []
if queries:
db_queries = [q for q in queries if q["type"] == "database"]
log_lines.append(f"Generiert: {len(db_queries)} Datenbankabfrage(n) und {len(queries) - len(db_queries)} Web-Recherche(n)\n\n")
for i, q in enumerate(queries, 1):
if q["type"] == "database":
log_lines.append(f"{i}. Datenbankabfrage ({q.get('table', 'Unknown')}):\n")
log_lines.append(f" Zweck: {q.get('purpose', 'Nicht angegeben')}\n")
log_lines.append(f"```sql\n{q['query']}\n```\n")
elif q["type"] == "web":
log_lines.append(f"{i}. Web-Recherche:\n")
log_lines.append(f" Suchbegriff: {q['query']}\n")
if q.get("reasoning"):
log_lines.append(f" Begründung: {q['reasoning']}\n")
log_lines.append("\n")
else:
log_lines.append("Keine Abfragen erforderlich.")
log_text = "".join(log_lines)
# Stream queries as a log
await _emit_log_and_event(interfaceDbChat, workflowId, event_manager, log_text)
# Check if workflow was stopped before executing queries
if await _check_workflow_stopped(interfaceDbChat, workflowId):
logger.info(f"Workflow {workflowId} was stopped, aborting query execution")
return
# Step 2: Execute queries
queryResults = {}
webResearchResults = ""
# Execute database queries in parallel
if needsDatabaseQuery and sql_queries:
logger.info(f"Executing {len(sql_queries)} database queries in parallel...")
await _emit_log_and_event(interfaceDbChat, workflowId, event_manager, f"Führe {len(sql_queries)} Datenbankabfrage(n) parallel aus...")
try:
queryResults = await _execute_queries_parallel(sql_queries)
# Log results summary
successful_queries = [k for k in queryResults.keys() if k.startswith("query_") and not k.endswith("_error") and not k.endswith("_data")]
failed_queries = [k for k in queryResults.keys() if k.endswith("_error")]
if successful_queries:
total_rows = sum(len(queryResults.get(f"{k}_data", [])) for k in successful_queries)
logger.info(f"Successfully executed {len(successful_queries)} query/queries, total {total_rows} rows")
await _emit_log_and_event(
interfaceDbChat,
workflowId,
event_manager,
f"Abgeschlossen: {len(successful_queries)} Abfrage(n) erfolgreich, {total_rows} Ergebnis{'e' if total_rows != 1 else ''} gefunden"
)
if failed_queries:
logger.warning(f"{len(failed_queries)} query/queries failed")
await _emit_log_and_event(
interfaceDbChat,
workflowId,
event_manager,
f"Warnung: {len(failed_queries)} Abfrage(n) fehlgeschlagen",
log_type="warning"
)
except Exception as e:
logger.error(f"Error executing parallel queries: {e}")
queryResults["error"] = f"Error executing queries: {str(e)}"
await _emit_log_and_event(
interfaceDbChat,
workflowId,
event_manager,
"Fehler bei parallelen Datenbankabfragen",
log_type="error"
)
# Execute web research
if needsWebResearch:
logger.info("Performing web research...")
await _emit_log_and_event(interfaceDbChat, workflowId, event_manager, "Suche im Internet nach Informationen...")
try:
# Rebuild enriched query with database results if available (better product context)
web_research_query = _buildWebResearchQuery(
userInput.prompt,
workflow.messages,
queryResults if queryResults else None
)
logger.info(f"Using enriched web research query: '{web_research_query}'")
researchResult = await services.web.performWebResearch(
prompt=web_research_query,
urls=[],
country=None,
language=userInput.userLanguage or "de",
researchDepth="general",
operationId=None
)
webResearchResults = json.dumps(researchResult, ensure_ascii=False, indent=2) if isinstance(researchResult, dict) else str(researchResult)
await _emit_log_and_event(interfaceDbChat, workflowId, event_manager, "Internet-Recherche abgeschlossen")
except Exception as e:
logger.error(f"Web research failed: {e}", exc_info=True)
webResearchResults = f"Web research error: {str(e)}"
await _emit_log_and_event(interfaceDbChat, workflowId, event_manager, "Internet-Recherche fehlgeschlagen", log_type="warning")
# Check if workflow was stopped before generating final answer
if await _check_workflow_stopped(interfaceDbChat, workflowId):
logger.info(f"Workflow {workflowId} was stopped, aborting final answer generation")
return
# Step 3: Generate final answer using AI
logger.info("Generating final answer with AI...")
await _emit_log_and_event(interfaceDbChat, workflowId, event_manager, "Formuliere finale Antwort...")
# Build prompt for final answer
system_prompt = get_final_answer_system_prompt()
# Build answer context with query results
answerContext = f"User question: {userInput.prompt}{context}\n\n"
# Add database results - organize by query with metadata
db_results_part = ""
if queryResults:
successful_results = []
error_results = []
# Extract query metadata from sql_queries if available
query_metadata = {}
if sql_queries:
for i, q_info in enumerate(sql_queries, 1):
query_metadata[f"query_{i}"] = {
"purpose": q_info.get("purpose", f"Query {i}"),
"table": q_info.get("table", "Unknown")
}
# Organize results by query number
query_numbers = set()
for key in queryResults.keys():
if key.startswith("query_") and not key.endswith("_data"):
# Extract query number (e.g., "query_1" -> 1)
try:
num = int(key.split("_")[1])
query_numbers.add(num)
except (ValueError, IndexError):
pass
# Build results with metadata
for query_num in sorted(query_numbers):
query_key = f"query_{query_num}"
error_key = f"{query_key}_error"
if error_key in queryResults:
error_msg = queryResults[error_key]
metadata = query_metadata.get(query_key, {})
purpose = metadata.get("purpose", f"Query {query_num}")
table = metadata.get("table", "Unknown")
error_results.append(f"Abfrage {query_num} ({table} - {purpose}): {error_msg}")
elif query_key in queryResults:
result_text = queryResults[query_key]
metadata = query_metadata.get(query_key, {})
purpose = metadata.get("purpose", f"Query {query_num}")
table = metadata.get("table", "Unknown")
successful_results.append(f"=== Abfrage {query_num}: {purpose} (Tabelle: {table}) ===\n{result_text}")
# Handle general error if present
if "error" in queryResults:
error_results.append(f"Allgemeiner Fehler: {queryResults['error']}")
if successful_results:
db_results_part = "\n\nDATENBANK-ERGEBNISSE:\n" + "\n\n".join(successful_results)
answerContext += "DATENBANK-ERGEBNISSE:\n" + "\n\n".join(successful_results) + "\n\n"
if error_results:
db_results_part += "\n\nDATENBANK-FEHLER:\n" + "\n".join(error_results)
answerContext += "DATENBANK-FEHLER:\n" + "\n".join(error_results) + "\n\n"
# Add web research results
web_results_part = ""
if webResearchResults:
web_results_part = f"\n\nINTERNET-RECHERCHE:\n{webResearchResults}"
answerContext += f"INTERNET-RECHERCHE:\n{webResearchResults}\n\n"
# Check if we have any actual data
successful_query_keys = [k for k in queryResults.keys() if k.startswith("query_") and not k.endswith("_error") and not k.endswith("_data")]
has_query_results = bool(successful_query_keys)
error_query_keys = [k for k in queryResults.keys() if k.endswith("_error")]
has_only_errors = bool(error_query_keys and not successful_query_keys)
if not has_query_results and needsDatabaseQuery:
db_results_part = "\n\nWICHTIG: Es wurden KEINE Datenbank-Ergebnisse gefunden. Die Datenbankabfrage wurde nicht ausgeführt oder hat keine Ergebnisse zurückgegeben."
if has_only_errors:
db_results_part += "\n\n⚠️⚠️⚠️ KRITISCH - ALLE QUERIES FEHLGESCHLAGEN ⚠️⚠️⚠️\n" + \
"ALLE Datenbankabfragen sind fehlgeschlagen. Es gibt KEINE gültigen Daten aus der Datenbank.\n" + \
"DU DARFST KEINE DATEN ERFINDEN! Schreibe stattdessen: 'Es wurden keine Artikel gefunden' oder 'Die Datenbankabfrage ist fehlgeschlagen'."
answer_prompt = f"""{system_prompt}
Antworte auf die folgende Frage des Nutzers: {userInput.prompt}{context}
{db_results_part}{web_results_part}
KRITISCH: Verwende NUR die oben angegebenen Daten. Erfinde KEINE Werte. Wenn Daten fehlen, schreibe "Nicht verfügbar".
WICHTIG - MEHRERE ABFRAGEN:
Die oben angegebenen DATENBANK-ERGEBNISSE können aus mehreren separaten Abfragen stammen. Jede Abfrage ist mit "=== Abfrage X ===" markiert und enthält Informationen zu einem spezifischen Aspekt (z.B. Artikel-Informationen, Lagerbestände, etc.).
- Kombiniere die Informationen aus ALLEN erfolgreichen Abfragen zu einer umfassenden Antwort
- Beispiel: Wenn Abfrage 1 Artikel-Informationen liefert und Abfrage 2 Lagerbestände liefert, kombiniere beide in deiner Antwort
- Verwende ALLE verfügbaren Informationen aus den verschiedenen Abfragen
ABSOLUT VERBOTEN - KEINE DATEN ERFINDEN
Wenn KEINE Datenbank-Ergebnisse vorhanden sind, dann:
- ERFINDE KEINE Artikelnummern, Artikelbezeichnungen, Preise oder Lagerbestände!
- ERFINDE KEINE Beispielartikel!
- Schreibe stattdessen: "Es wurden keine Artikel in der Datenbank gefunden." oder "Die Datenbankabfrage ist fehlgeschlagen."
WICHTIG: Deine Antwort soll NUR die finale Antwort enthalten - KEINE Planungsschritte, KEINE SQL-Queries, KEINE Zwischenschritte!
Beginne DIREKT mit "Aus der Datenbank habe ich..." (wenn Daten vorhanden) oder "Es wurden keine Artikel gefunden" (wenn keine Daten vorhanden)."""
answerRequest = AiCallRequest(
prompt=answer_prompt,
context=answerContext if (queryResults or webResearchResults) else None,
options=AiCallOptions(
resultFormat="txt",
operationType=OperationTypeEnum.DATA_ANALYSE,
processingMode=ProcessingModeEnum.DETAILED
)
)
answerResponse = await services.ai.callAi(answerRequest)
finalAnswer = answerResponse.content
logger.info("Final answer generated")
# Check if workflow was stopped during AI call - if so, don't store the message
if await _check_workflow_stopped(interfaceDbChat, workflowId):
logger.info(f"Workflow {workflowId} was stopped during final answer generation, not storing message")
return
# Reload workflow to get current message count
workflow = interfaceDbChat.getWorkflow(workflowId)
# Double-check workflow wasn't stopped while we were reloading
if workflow and workflow.status == "stopped":
logger.info(f"Workflow {workflowId} was stopped, not storing final message")
return
# Create assistant message with final answer
assistantMessageData = {
"id": f"msg_{uuid.uuid4()}",
"workflowId": workflowId,
"parentMessageId": userMessageId,
"message": finalAnswer,
"role": "assistant",
"status": "last",
"sequenceNr": len(workflow.messages) + 1,
"publishedAt": getUtcTimestamp(),
"success": True,
"roundNumber": workflow.currentRound,
"taskNumber": 0,
"actionNumber": 0
}
assistantMessage = interfaceDbChat.createMessage(assistantMessageData)
logger.info(f"Stored assistant message with final answer: {assistantMessage.id}")
# Emit message event for streaming (exact chatData format)
message_timestamp = parseTimestamp(assistantMessage.publishedAt, default=getUtcTimestamp())
await event_manager.emit_event(
context_id=workflowId,
event_type="chatdata",
data={
"type": "message",
"createdAt": message_timestamp,
"item": assistantMessage.dict()
},
event_category="chat"
)
# Update workflow status to completed (only if not stopped)
if not await _check_workflow_stopped(interfaceDbChat, workflowId):
interfaceDbChat.updateWorkflow(workflowId, {
"status": "completed",
"lastActivity": getUtcTimestamp()
})
else:
logger.info(f"Workflow {workflowId} was stopped, not updating status to completed")
logger.info(f"Chatbot processing completed for workflow {workflowId}, generated {len(queries)} queries and final answer")
# Emit completion event only if workflow wasn't stopped
if not await _check_workflow_stopped(interfaceDbChat, workflowId):
await event_manager.emit_event(
context_id=workflowId,
event_type="complete",
data={"workflowId": workflowId},
event_category="workflow",
message="Chatbot-Verarbeitung abgeschlossen",
step="complete"
)
# Schedule cleanup
await event_manager.cleanup(workflowId)
except Exception as e:
logger.error(f"Error processing chatbot message: {str(e)}", exc_info=True)
# Check if workflow was stopped - if so, don't store error message
if await _check_workflow_stopped(interfaceDbChat, workflowId):
logger.info(f"Workflow {workflowId} was stopped, not storing error message")
return
# Store error message
try:
# Reload workflow to get current message count
workflow = interfaceDbChat.getWorkflow(workflowId)
# Double-check workflow wasn't stopped while we were reloading
if workflow and workflow.status == "stopped":
logger.info(f"Workflow {workflowId} was stopped, not storing error message")
return
errorMessageData = {
"id": f"msg_{uuid.uuid4()}",
"workflowId": workflowId,
"parentMessageId": userMessageId,
"message": f"Sorry, I encountered an error: {str(e)}",
"role": "assistant",
"status": "last",
"sequenceNr": len(workflow.messages) + 1,
"publishedAt": getUtcTimestamp(),
"success": False,
"roundNumber": workflow.currentRound if workflow else 1,
"taskNumber": 0,
"actionNumber": 0
}
errorMessage = interfaceDbChat.createMessage(errorMessageData)
# Emit message event for streaming (exact chatData format)
message_timestamp = parseTimestamp(errorMessage.publishedAt, default=getUtcTimestamp())
await event_manager.emit_event(
context_id=workflowId,
event_type="chatdata",
data={
"type": "message",
"createdAt": message_timestamp,
"item": errorMessage.dict()
},
event_category="chat"
)
# Update workflow status to error (only if not stopped)
if not await _check_workflow_stopped(interfaceDbChat, workflowId):
interfaceDbChat.updateWorkflow(workflowId, {
"status": "error",
"lastActivity": getUtcTimestamp()
})
else:
logger.info(f"Workflow {workflowId} was stopped, not updating status to error")
# Schedule cleanup
await event_manager.cleanup(workflowId)
except Exception as storeError:
logger.error(f"Error storing error message: {storeError}")

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,3 @@
# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""Streaming infrastructure for chatbot events."""

View file

@ -0,0 +1,159 @@
# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Event manager for chatbot streaming.
Manages event queues for Server-Sent Events (SSE) streaming.
"""
import logging
import asyncio
from typing import Dict, Optional, Any
from collections import defaultdict
logger = logging.getLogger(__name__)
class EventManager:
"""
Manages event queues for chatbot streaming.
Each workflow has its own async queue for events.
"""
def __init__(self):
"""Initialize the event manager."""
self._queues: Dict[str, asyncio.Queue] = {}
self._cleanup_tasks: Dict[str, asyncio.Task] = {}
def create_queue(self, workflow_id: str) -> asyncio.Queue:
"""
Create an event queue for a workflow.
Args:
workflow_id: Workflow ID
Returns:
Async queue for events
"""
if workflow_id not in self._queues:
self._queues[workflow_id] = asyncio.Queue()
logger.debug(f"Created event queue for workflow {workflow_id}")
return self._queues[workflow_id]
def get_queue(self, workflow_id: str) -> Optional[asyncio.Queue]:
"""
Get the event queue for a workflow.
Args:
workflow_id: Workflow ID
Returns:
Async queue if exists, None otherwise
"""
return self._queues.get(workflow_id)
def has_queue(self, workflow_id: str) -> bool:
"""
Check if a queue exists for a workflow.
Args:
workflow_id: Workflow ID
Returns:
True if queue exists, False otherwise
"""
return workflow_id in self._queues
async def emit_event(
self,
context_id: str,
event_type: str,
data: Dict[str, Any],
event_category: str = "chat",
message: Optional[str] = None,
step: Optional[str] = None
) -> None:
"""
Emit an event to the queue for a workflow.
Args:
context_id: Workflow ID (context)
event_type: Type of event (e.g., "chatdata", "complete", "error")
data: Event data dictionary
event_category: Category of event (e.g., "chat", "workflow")
message: Optional message string
step: Optional step identifier
"""
queue = self._queues.get(context_id)
if not queue:
logger.warning(f"No queue found for workflow {context_id}, event not emitted")
return
event = {
"type": event_type,
"data": data,
"category": event_category,
"message": message,
"step": step
}
try:
await queue.put(event)
logger.debug(f"Emitted {event_type} event for workflow {context_id}")
except Exception as e:
logger.error(f"Error emitting event for workflow {context_id}: {e}", exc_info=True)
async def cleanup(self, workflow_id: str, delay: float = 60.0) -> None:
"""
Schedule cleanup of a queue after a delay.
Args:
workflow_id: Workflow ID
delay: Delay in seconds before cleanup
"""
# Cancel existing cleanup task if any
if workflow_id in self._cleanup_tasks:
self._cleanup_tasks[workflow_id].cancel()
async def _cleanup():
try:
await asyncio.sleep(delay)
if workflow_id in self._queues:
# Drain remaining events
queue = self._queues[workflow_id]
while not queue.empty():
try:
queue.get_nowait()
except asyncio.QueueEmpty:
break
# Remove queue
del self._queues[workflow_id]
logger.info(f"Cleaned up event queue for workflow {workflow_id}")
except asyncio.CancelledError:
logger.debug(f"Cleanup cancelled for workflow {workflow_id}")
except Exception as e:
logger.error(f"Error during cleanup for workflow {workflow_id}: {e}", exc_info=True)
finally:
if workflow_id in self._cleanup_tasks:
del self._cleanup_tasks[workflow_id]
# Schedule cleanup
task = asyncio.create_task(_cleanup())
self._cleanup_tasks[workflow_id] = task
# Global event manager instance
_event_manager: Optional[EventManager] = None
def get_event_manager() -> EventManager:
"""
Get the global event manager instance.
Returns:
EventManager instance
"""
global _event_manager
if _event_manager is None:
_event_manager = EventManager()
return _event_manager

View file

@ -0,0 +1,242 @@
# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""Streaming helper utilities for chat message processing and normalization."""
from __future__ import annotations
from typing import Any, Dict, List, Literal, Mapping, Optional
from langchain_core.messages import (
AIMessage,
BaseMessage,
HumanMessage,
SystemMessage,
ToolMessage,
)
Role = Literal["user", "assistant", "system", "tool"]
class ChatStreamingHelper:
"""Pure helper methods for streaming and message normalization.
This class provides static utility methods for converting between different
message formats, extracting content, and normalizing message structures
for streaming chat applications.
"""
@staticmethod
def role_from_message(*, msg: BaseMessage) -> Role:
"""Extract the role from a BaseMessage instance.
Args:
msg: The BaseMessage instance to extract the role from.
Returns:
The role as a string literal: "user", "assistant", "system", or "tool".
Defaults to "assistant" if the message type is not recognized.
Examples:
>>> from langchain_core.messages import HumanMessage
>>> msg = HumanMessage(content="Hello")
>>> ChatStreamingHelper.role_from_message(msg=msg)
'user'
"""
if isinstance(msg, HumanMessage):
return "user"
if isinstance(msg, AIMessage):
return "assistant"
if isinstance(msg, SystemMessage):
return "system"
if isinstance(msg, ToolMessage):
return "tool"
return getattr(msg, "role", "assistant")
@staticmethod
def flatten_content(*, content: Any) -> str:
"""Convert complex content structures to plain text.
This method handles various content formats including strings, lists of
content parts, and dictionaries with text fields. It's designed to
normalize content from different message sources into a consistent
plain text format.
Args:
content: The content to flatten. Can be:
- str: Returned as-is after stripping whitespace
- list: Each item processed and joined with newlines
- dict: Text extracted from "text" or "content" fields
- None: Returns empty string
- Any other type: Converted to string
Returns:
The flattened content as a plain text string with whitespace stripped.
Examples:
>>> content = [{"type": "text", "text": "Hello"}, {"type": "text", "text": "world"}]
>>> ChatStreamingHelper.flatten_content(content=content)
'Hello\nworld'
>>> content = {"text": "Simple message"}
>>> ChatStreamingHelper.flatten_content(content=content)
'Simple message'
"""
if content is None:
return ""
if isinstance(content, str):
return content.strip()
if isinstance(content, list):
parts: List[str] = []
for part in content:
if isinstance(part, dict):
if "text" in part and isinstance(part["text"], str):
parts.append(part["text"])
elif part.get("type") == "text" and isinstance(
part.get("text"), str
):
parts.append(part["text"])
elif "content" in part and isinstance(part["content"], str):
parts.append(part["content"])
else:
# Fallback for unknown dictionary structures
val = part.get("value")
if isinstance(val, str):
parts.append(val)
else:
parts.append(str(part))
return "\n".join(p.strip() for p in parts if p is not None)
if isinstance(content, dict):
if "text" in content and isinstance(content["text"], str):
return content["text"].strip()
if "content" in content and isinstance(content["content"], str):
return content["content"].strip()
return str(content).strip()
@staticmethod
def message_to_dict(*, msg: BaseMessage) -> Dict[str, Any]:
"""Convert a BaseMessage instance to a dictionary for streaming output.
This method normalizes BaseMessage instances into a consistent dictionary
format suitable for JSON serialization and streaming to clients.
Args:
msg: The BaseMessage instance to convert.
Returns:
A dictionary containing:
- "role": The message role (user, assistant, system, tool)
- "content": The flattened message content as plain text
- "tool_calls": Tool calls if present (optional)
- "name": Message name if present (optional)
Examples:
>>> from langchain_core.messages import HumanMessage
>>> msg = HumanMessage(content="Hello there")
>>> result = ChatStreamingHelper.message_to_dict(msg=msg)
>>> result["role"]
'user'
>>> result["content"]
'Hello there'
"""
payload: Dict[str, Any] = {
"role": ChatStreamingHelper.role_from_message(msg=msg),
"content": ChatStreamingHelper.flatten_content(
content=getattr(msg, "content", "")
),
}
tool_calls = getattr(msg, "tool_calls", None)
if tool_calls:
payload["tool_calls"] = tool_calls
name = getattr(msg, "name", None)
if name:
payload["name"] = name
return payload
@staticmethod
def dict_message_to_dict(*, obj: Mapping[str, Any]) -> Dict[str, Any]:
"""Convert a dictionary-shaped message to a normalized dictionary.
This method handles messages that come from serialized state and are
represented as dictionaries rather than BaseMessage instances. It
normalizes various dictionary formats into a consistent structure.
Args:
obj: The dictionary-shaped message to convert. Expected to contain
fields like "role", "type", "content", "text", etc.
Returns:
A normalized dictionary containing:
- "role": The message role (user, assistant, system, tool)
- "content": The flattened message content as plain text
- "tool_calls": Tool calls if present (optional)
- "name": Message name if present (optional)
Examples:
>>> obj = {"type": "human", "content": "Hello"}
>>> result = ChatStreamingHelper.dict_message_to_dict(obj=obj)
>>> result["role"]
'user'
>>> result["content"]
'Hello'
"""
role: Optional[str] = obj.get("role")
if not role:
# Handle alternative type field mappings
typ = obj.get("type")
if typ in ("human", "user"):
role = "user"
elif typ in ("ai", "assistant"):
role = "assistant"
elif typ in ("system",):
role = "system"
elif typ in ("tool", "function"):
role = "tool"
content = obj.get("content")
if content is None and "text" in obj:
content = obj["text"]
out: Dict[str, Any] = {
"role": role or "assistant",
"content": ChatStreamingHelper.flatten_content(content=content),
}
if "tool_calls" in obj:
out["tool_calls"] = obj["tool_calls"]
if obj.get("name"):
out["name"] = obj["name"]
return out
@staticmethod
def extract_messages_from_output(*, output_obj: Any) -> List[Any]:
"""Extract messages from LangGraph output objects.
This method handles various output formats from LangGraph execution,
extracting the messages list from different possible structures.
Args:
output_obj: The output object from LangGraph execution. Can be:
- An object with a "messages" attribute
- A dictionary with a "messages" key
- Any other object (returns empty list)
Returns:
A list of extracted messages, or an empty list if no messages
are found or if the output object is None.
Examples:
>>> output = {"messages": [{"role": "user", "content": "Hello"}]}
>>> messages = ChatStreamingHelper.extract_messages_from_output(output_obj=output)
>>> len(messages)
1
"""
if output_obj is None:
return []
# Try to parse dicts first
if isinstance(output_obj, dict):
msgs = output_obj.get("messages")
return msgs if isinstance(msgs, list) else []
# Then try to get messages attribute
msgs = getattr(output_obj, "messages", None)
return msgs if isinstance(msgs, list) else []

View file

@ -1060,7 +1060,7 @@ class ChatObjects:
# Emit message event for streaming (if event manager is available)
try:
from modules.features.chatbot.eventManager import get_event_manager
from modules.features.chatbot.streaming.events import get_event_manager
event_manager = get_event_manager()
message_timestamp = parseTimestamp(chat_message.publishedAt, default=getUtcTimestamp())
# Emit message event in exact chatData format: {type, createdAt, item}
@ -1434,7 +1434,7 @@ class ChatObjects:
# Emit log event for streaming (if event manager is available)
try:
from modules.features.chatbot.eventManager import get_event_manager
from modules.features.chatbot.streaming.events import get_event_manager
event_manager = get_event_manager()
log_timestamp = parseTimestamp(createdLog.get("timestamp"), default=getUtcTimestamp())
# Emit log event in exact chatData format: {type, createdAt, item}

View file

@ -28,7 +28,7 @@ from modules.datamodels.datamodelPagination import PaginationParams, PaginatedRe
# Import chatbot feature
from modules.features.chatbot import chatProcess
from modules.features.chatbot.eventManager import get_event_manager
from modules.features.chatbot.streaming.events import get_event_manager
# Import workflow control functions
from modules.features.workflow import chatStop
@ -174,17 +174,26 @@ async def stream_chatbot_start(
event_type = event.get("type")
event_data = event.get("data", {})
# Emit chatdata events (messages, logs, stats) in exact chatData format
# Emit chatdata events (messages, logs, stats, status) in exact chatData format
if event_type == "chatdata" and event_data:
# Emit item directly in exact chatData format: {type, createdAt, item}
chatdata_item = event_data
# Ensure item field is serializable (convert Pydantic models to dicts)
if isinstance(chatdata_item, dict) and "item" in chatdata_item:
item_obj = chatdata_item.get("item")
if hasattr(item_obj, "dict"):
chatdata_item = chatdata_item.copy()
chatdata_item["item"] = item_obj.dict()
yield f"data: {json.dumps(chatdata_item)}\n\n"
# Handle status events (transient UI feedback)
if event_data.get("type") == "status":
# Status events have simple structure: {type: "status", label: "..."}
status_item = {
"type": "status",
"label": event_data.get("label", "")
}
yield f"data: {json.dumps(status_item)}\n\n"
else:
# Emit other chatdata items (messages, logs, stats) in exact chatData format
chatdata_item = event_data
# Ensure item field is serializable (convert Pydantic models to dicts)
if isinstance(chatdata_item, dict) and "item" in chatdata_item:
item_obj = chatdata_item.get("item")
if hasattr(item_obj, "dict"):
chatdata_item = chatdata_item.copy()
chatdata_item["item"] = item_obj.dict()
yield f"data: {json.dumps(chatdata_item)}\n\n"
# Handle completion/stopped events to close stream
elif event_type == "complete":

View file

@ -78,6 +78,9 @@ azure-communication-email>=1.0.0 # Azure Communication Services Email
pytest>=8.0.0
pytest-asyncio>=0.21.0
## Configuration Validation
jsonschema>=4.0.0 # Required for chatbot workflow config validation
## For Scheduling / Repeated Tasks
APScheduler==3.11.0

View file

@ -0,0 +1,3 @@
# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""Chatbot functional tests."""

View file

@ -0,0 +1,217 @@
#!/usr/bin/env python3
# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Chatbot Functional Tests
Tests the chatbot implementation to ensure:
1. Chatbot initialization works correctly
2. Streaming events are emitted properly
3. Tool calls execute correctly
4. Messages are stored in database
5. No infinite loops occur
"""
import asyncio
import os
import sys
from pathlib import Path
# Add the gateway to path (go up 2 levels from tests/functional/chatbot/)
_gateway_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", ".."))
if _gateway_path not in sys.path:
sys.path.insert(0, _gateway_path)
import pytest
from modules.features.chatbot.chatbot import Chatbot
from modules.features.chatbot.chatbotAIBridge import AICenterChatModel
from modules.features.chatbot.chatbotMemory import DatabaseCheckpointer
from modules.features.chatbot.chatbotConfig import load_chatbot_config
from modules.features.chatbot.streamingHelper import ChatStreamingHelper
from modules.datamodels.datamodelUam import User
from modules.datamodels.datamodelAi import OperationTypeEnum, ProcessingModeEnum
class TestChatbot:
"""Test suite for chatbot functionality."""
@pytest.fixture
def test_user(self):
"""Create a test user."""
return User(
id="test_user_chatbot",
username="test_chatbot",
email="test@example.com",
fullName="Test Chatbot User",
language="de",
mandateId="test_mandate",
)
@pytest.fixture
def workflow_id(self):
"""Generate a test workflow ID."""
import uuid
return str(uuid.uuid4())
@pytest.mark.asyncio
async def test_chatbot_initialization(self, test_user, workflow_id):
"""Test that chatbot can be initialized correctly."""
# Load config
config = load_chatbot_config("althaus")
# Create system prompt
from datetime import datetime
system_prompt = config.systemPrompt.replace(
"{{DATE}}",
datetime.now().strftime("%d.%m.%Y")
)
# Create AI center model
operation_type = OperationTypeEnum[config.model.operationType]
processing_mode = ProcessingModeEnum[config.model.processingMode]
model = AICenterChatModel(
user=test_user,
operation_type=operation_type,
processing_mode=processing_mode
)
# Create memory/checkpointer
memory = DatabaseCheckpointer(user=test_user, workflow_id=workflow_id)
# Create chatbot instance
chatbot = await Chatbot.create(
model=model,
memory=memory,
system_prompt=system_prompt,
workflow_id=workflow_id
)
assert chatbot is not None
assert chatbot.model is not None
assert chatbot.memory is not None
assert chatbot.app is not None
assert chatbot.system_prompt == system_prompt
@pytest.mark.asyncio
async def test_streaming_helper_role_from_message(self):
"""Test ChatStreamingHelper.role_from_message."""
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
human_msg = HumanMessage(content="Hello")
assert ChatStreamingHelper.role_from_message(msg=human_msg) == "user"
ai_msg = AIMessage(content="Hi there")
assert ChatStreamingHelper.role_from_message(msg=ai_msg) == "assistant"
system_msg = SystemMessage(content="You are a helpful assistant")
assert ChatStreamingHelper.role_from_message(msg=system_msg) == "system"
@pytest.mark.asyncio
async def test_streaming_helper_flatten_content(self):
"""Test ChatStreamingHelper.flatten_content."""
# Test string
assert ChatStreamingHelper.flatten_content(content="Hello") == "Hello"
# Test list
content_list = [{"type": "text", "text": "Hello"}, {"type": "text", "text": "World"}]
result = ChatStreamingHelper.flatten_content(content=content_list)
assert "Hello" in result
assert "World" in result
# Test dict
content_dict = {"text": "Simple message"}
assert ChatStreamingHelper.flatten_content(content=content_dict) == "Simple message"
# Test None
assert ChatStreamingHelper.flatten_content(content=None) == ""
@pytest.mark.asyncio
async def test_streaming_helper_message_to_dict(self):
"""Test ChatStreamingHelper.message_to_dict."""
from langchain_core.messages import HumanMessage
msg = HumanMessage(content="Hello there")
result = ChatStreamingHelper.message_to_dict(msg=msg)
assert result["role"] == "user"
assert result["content"] == "Hello there"
@pytest.mark.asyncio
async def test_streaming_helper_extract_messages_from_output(self):
"""Test ChatStreamingHelper.extract_messages_from_output."""
# Test dict with messages
output_dict = {"messages": [{"role": "user", "content": "Hello"}]}
messages = ChatStreamingHelper.extract_messages_from_output(output_obj=output_dict)
assert len(messages) == 1
# Test None
messages = ChatStreamingHelper.extract_messages_from_output(output_obj=None)
assert len(messages) == 0
# Test object with messages attribute
class MockOutput:
def __init__(self):
self.messages = [{"role": "assistant", "content": "Hi"}]
mock_output = MockOutput()
messages = ChatStreamingHelper.extract_messages_from_output(output_obj=mock_output)
assert len(messages) == 1
@pytest.mark.asyncio
async def test_chatbot_should_continue_logic(self, test_user, workflow_id):
"""Test that should_continue logic works correctly (no infinite loops)."""
# Load config
config = load_chatbot_config("althaus")
# Create system prompt
from datetime import datetime
system_prompt = config.systemPrompt.replace(
"{{DATE}}",
datetime.now().strftime("%d.%m.%Y")
)
# Create AI center model
operation_type = OperationTypeEnum[config.model.operationType]
processing_mode = ProcessingModeEnum[config.model.processingMode]
model = AICenterChatModel(
user=test_user,
operation_type=operation_type,
processing_mode=processing_mode
)
# Create memory/checkpointer
memory = DatabaseCheckpointer(user=test_user, workflow_id=workflow_id)
# Create chatbot instance
chatbot = await Chatbot.create(
model=model,
memory=memory,
system_prompt=system_prompt,
workflow_id=workflow_id
)
# The should_continue logic is internal to the workflow
# We can test that the workflow compiles successfully
assert chatbot.app is not None
# Test that we can invoke the workflow (this will test should_continue internally)
# Use a simple message that shouldn't cause infinite loops
try:
result = await chatbot.chat(
message="Hallo",
chat_id=workflow_id
)
# Should return messages without infinite loop
assert result is not None
assert isinstance(result, list)
except Exception as e:
# If there's an error, it shouldn't be an infinite loop error
# (infinite loops would timeout or hit max iterations)
assert "infinite" not in str(e).lower()
assert "loop" not in str(e).lower()
if __name__ == "__main__":
pytest.main([__file__, "-v"])