readded chatbot functionality

This commit is contained in:
Ida Dittrich 2026-01-30 14:02:02 +01:00
parent 411a6a081a
commit 164e7487cc
11 changed files with 813 additions and 518 deletions

3
app.py
View file

@ -507,6 +507,9 @@ app.include_router(gdprRouter)
from modules.routes.routeChat import router as chatRouter
app.include_router(chatRouter)
from modules.features.chatbot.routeFeatureChatbot import router as chatbotFeatureRouter
app.include_router(chatbotFeatureRouter)
# ============================================================================
# SYSTEM ROUTES (Navigation, etc.)
# ============================================================================

View file

@ -1,127 +0,0 @@
# Chatbot Feature Documentation
## Overview
The chatbot feature provides an intelligent conversational interface that processes user queries, executes database searches, performs web research, and generates contextual responses. The implementation leverages LangGraph to orchestrate complex multi-step workflows while seamlessly integrating with the existing infrastructure including AI Center, database systems, and event streaming.
## Architecture
The chatbot feature follows a modular architecture centered around LangGraph's state graph pattern. The system processes user messages through a structured workflow that can dynamically invoke tools, query databases, search the web, and generate responses based on context.
### Core Components
**Workflow Management**: Each conversation is managed as a workflow with a unique identifier. Workflows track the conversation state, message history, and processing status. New conversations create fresh workflows, while existing conversations resume their workflows with incremented round numbers.
**LangGraph State Graph**: The heart of the chatbot is a LangGraph state graph that orchestrates the conversation flow. The graph maintains conversation state through a checkpointer system and routes between agent processing and tool execution nodes based on the model's decisions.
**Event Streaming**: Real-time updates are delivered to clients through an event-driven streaming system. Status updates, messages, logs, and completion events are emitted asynchronously and queued for delivery to connected clients.
## LangGraph Implementation
### State Management
LangGraph manages conversation state through a state graph that tracks messages in the conversation. The state is persisted using a custom checkpointer that bridges LangGraph's checkpoint system with the existing database infrastructure. This allows conversations to be resumed, state to be maintained across sessions, and message history to be preserved.
### Graph Structure
The workflow graph consists of two primary nodes:
**Agent Node**: Processes user messages and conversation history using the AI model. The agent analyzes the input, determines what actions are needed, and decides whether to generate a response directly or invoke tools. The agent has access to the full conversation history, which is automatically trimmed to fit within the model's context window while preserving the most recent and relevant messages.
**Tools Node**: Executes tools when the agent determines they are needed. Tools can query databases, search the web, or send status updates. After tool execution, the workflow returns to the agent node to process the tool results and generate an appropriate response.
### Conditional Routing
The graph uses conditional edges to determine workflow progression. After the agent processes a message, the system checks whether the agent requested tool calls. If tools were requested, the workflow routes to the tools node. If no tools are needed, the workflow completes and returns the final response to the user.
### Message Window Management
To handle long conversations that exceed model context limits, the system implements intelligent message windowing. Messages are trimmed from the beginning while preserving system prompts and ensuring the conversation ends on a human or tool message. This maintains context continuity while respecting token limits.
## Integration with Existing Infrastructure
### AI Center Integration
The chatbot integrates with the AI Center through a custom bridge that implements LangChain's chat model interface. This bridge allows LangGraph to use AI Center's model selection, routing, and calling infrastructure while maintaining compatibility with LangChain's expected interfaces.
**Model Selection**: When processing messages, the bridge converts LangChain message formats to AI Center's expected format and uses the model selector to choose the appropriate AI model based on operation type, processing mode, and available models. The selection respects role-based access control and considers model capabilities.
**Tool Calling Support**: The bridge handles tool calling by detecting when models support function calling and converting tool definitions between LangChain and AI Center formats. For OpenAI-compatible models, the bridge directly calls the API with tool definitions. For other models, it relies on connector-specific implementations.
**Operation Types**: The chatbot uses AI Center's operation type system to select models appropriate for different tasks. Database queries use data analysis operation types, while web searches use web search operation types, ensuring optimal model selection for each task.
### Database Integration
**Message Storage**: All conversation messages are stored in the existing chat database through the database interface. The custom checkpointer converts between LangGraph's message format and the database's message format, ensuring seamless persistence. Messages are stored with metadata including workflow identifiers, round numbers, sequence numbers, and timestamps.
**Workflow Persistence**: Workflow state is maintained in the database, allowing conversations to be resumed across sessions. The system tracks workflow status, current round numbers, and activity timestamps. When resuming a conversation, the workflow round number is incremented to maintain conversation continuity.
**Document Management**: User-uploaded files are tracked as document references within workflows. The system creates document records that link files to specific messages and rounds, enabling the chatbot to reference and process uploaded documents in its responses.
### Tool Integration
**SQL Query Tool**: The chatbot includes a tool that executes SQL queries against the preprocessor database. This tool uses the existing database connector infrastructure, ensuring proper connection management, query execution, and result formatting. The tool returns formatted results that the agent can use to answer user questions about products, inventory, prices, and other database-stored information.
**Web Search Tool**: Web research capabilities are provided through a Tavily search tool that integrates with AI Center's Tavily connector. The tool uses AI Center's model registry and selector to find and use Tavily models, ensuring consistent integration with the existing AI infrastructure. Search results include full content from multiple sources, allowing comprehensive research.
**Streaming Status Tool**: A special tool allows the agent to send status updates during processing. These updates are captured by the event streaming system and delivered to clients in real-time, providing users with visibility into what the chatbot is doing.
### Event Streaming System
The chatbot uses an event-driven streaming architecture to deliver real-time updates to clients. An event manager maintains queues for each workflow, allowing multiple clients to receive updates for the same conversation.
**Event Types**: The system emits several types of events including chat data events (messages and logs), completion events, and error events. Each event includes metadata about its type, timestamp, and associated workflow.
**Queue Management**: Event queues are created when workflows start and cleaned up after conversations complete. The cleanup system ensures resources are properly released while allowing sufficient time for clients to receive all events.
**Event Bridging**: LangGraph's native event streaming is bridged to the custom event system. Status updates from tool calls are captured and converted to the appropriate event format. Final responses are extracted from LangGraph's output and emitted as message events.
## Configuration System
The chatbot supports multiple configuration profiles loaded from JSON files. Each configuration specifies:
**System Prompts**: Customizable instructions that define the chatbot's behavior, personality, and capabilities. Prompts can include placeholders for dynamic content like dates.
**Database Schema**: Information about available database tables and structures, enabling the agent to generate appropriate queries.
**Tool Configuration**: Settings for which tools are enabled and how they should behave. This includes SQL query settings, web search parameters, and streaming options.
**Model Configuration**: Operation types and processing modes that determine which AI models are selected for different tasks.
## Conversation Flow
### Initial Request Processing
When a user submits a message, the system first creates or loads the workflow. For new conversations, a conversation name is generated using AI based on the user's initial prompt. The user's message is stored in the database and an event is emitted to notify connected clients.
### Background Processing
Message processing occurs asynchronously in the background, allowing the API to return immediately while processing continues. The system creates a LangGraph chatbot instance configured with the appropriate model, memory checkpointer, and tools.
### Tool Execution
When the agent determines that tools are needed, it requests tool calls. The tools node executes the requested tools, which may involve database queries, web searches, or status updates. Tool results are added to the conversation state and returned to the agent for processing.
### Response Generation
After tool execution or when no tools are needed, the agent generates a final response based on the conversation history and any tool results. The response is stored in the database through the checkpointer system and emitted as an event to connected clients.
### Completion
Once processing completes, a completion event is emitted and the workflow status is updated. The event queue remains available for a grace period to ensure all clients receive the final events before cleanup.
## Error Handling
The system includes comprehensive error handling at multiple levels. Workflow errors are caught and stored as error messages in the database. Error events are emitted to notify clients of failures. The system gracefully handles cases where workflows are stopped by users, preventing unnecessary error messages from being stored.
## Memory and Context Management
The custom checkpointer bridges LangGraph's checkpoint system with the database, ensuring conversation history is preserved. The system intelligently filters messages when storing checkpoints, skipping intermediate tool call requests and only storing final user and assistant messages. This prevents duplicate storage while maintaining complete conversation context.
## Multi-Language Support
The system supports multiple languages through configuration. Conversation names are generated in the user's preferred language, and the AI models can process and respond in various languages based on the system prompt and user input.
## Scalability Considerations
The asynchronous processing model allows the system to handle multiple concurrent conversations efficiently. Each workflow operates independently with its own event queue and processing task. The database checkpointer ensures state persistence without blocking processing, and the event streaming system efficiently manages multiple client connections per workflow.

View file

@ -74,6 +74,7 @@ class AICenterChatModel(BaseChatModel):
def _select_model(self, messages: List[BaseMessage]) -> AiModel:
"""
Select the best AI center model for the given messages.
Uses caching to avoid repeated model selection within same session.
Args:
messages: List of LangChain messages
@ -81,6 +82,10 @@ class AICenterChatModel(BaseChatModel):
Returns:
Selected AI model
"""
# Return cached model if already selected (significant performance improvement)
if self._selected_model is not None:
return self._selected_model
# Convert messages to prompt/context format for model selector
prompt_parts = []
context_parts = []
@ -99,29 +104,11 @@ class AICenterChatModel(BaseChatModel):
context = "\n".join(context_parts) if context_parts else ""
# Get available models with RBAC filtering
from modules.security.rbac import RbacClass
from modules.security.rootAccess import getRootDbAppConnector
from modules.connectors.connectorDbPostgre import DatabaseConnector
from modules.shared.configuration import APP_CONFIG
# Use cached/singleton interfaces for better performance
from modules.interfaces.interfaceDbApp import getRootInterface
# Get database connectors for RBAC
# Create a database connector instance for RBAC with proper configuration
dbHost = APP_CONFIG.get("DB_MANAGEMENT_HOST")
dbDatabase = APP_CONFIG.get("DB_MANAGEMENT_DATABASE", "management")
dbUser = APP_CONFIG.get("DB_MANAGEMENT_USER")
dbPassword = APP_CONFIG.get("DB_MANAGEMENT_PASSWORD_SECRET")
dbPort = int(APP_CONFIG.get("DB_MANAGEMENT_PORT"))
db = DatabaseConnector(
dbHost=dbHost,
dbDatabase=dbDatabase,
dbUser=dbUser,
dbPassword=dbPassword,
dbPort=dbPort,
userId=self.user.id if hasattr(self.user, 'id') else None
)
dbApp = getRootDbAppConnector()
rbac_instance = RbacClass(db, dbApp=dbApp)
rootInterface = getRootInterface()
rbac_instance = rootInterface.rbac
available_models = modelRegistry.getAvailableModels(
currentUser=self.user,

View file

@ -23,7 +23,7 @@ class CheckpointTuple(NamedTuple):
pending_writes: Optional[List[Tuple[str, Any]]] = None
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage, SystemMessage, ToolMessage
from modules.interfaces.interfaceDbChatObjects import getInterface
from modules.interfaces.interfaceDbChat import getInterface
from modules.datamodels.datamodelChat import ChatMessage, ChatWorkflow
from modules.datamodels.datamodelUam import User
from modules.shared.timeUtils import getUtcTimestamp

View file

@ -3,10 +3,16 @@
"""
Chatbot tools for LangGraph integration.
Includes SQL query tool, Tavily search tool, and streaming status tool.
Tools can be created with factory functions for dynamic configuration:
- create_sql_query_tool(connector_type) - SQL query tool with configurable connector
- create_tavily_search_tool() - Tavily web search tool
- create_send_streaming_message_tool(event_manager) - Streaming status updates
"""
import logging
from typing import Optional
import asyncio
from typing import Optional, Callable, Dict, Any
from langchain_core.tools import tool
from modules.connectors.connectorPreprocessor import PreprocessorConnector
@ -14,13 +20,62 @@ from modules.shared.configuration import APP_CONFIG
logger = logging.getLogger(__name__)
# =============================================================================
# Connection pool for preprocessor connector (performance optimization)
# =============================================================================
@tool
async def sqlite_query(query: str) -> str:
class ConnectorPool:
"""Simple connection pool for preprocessor connectors."""
_instance: Optional['ConnectorPool'] = None
_connector: Optional[PreprocessorConnector] = None
_lock: asyncio.Lock = None
@classmethod
def get_instance(cls) -> 'ConnectorPool':
if cls._instance is None:
cls._instance = ConnectorPool()
cls._lock = asyncio.Lock()
return cls._instance
async def get_connector(self) -> PreprocessorConnector:
"""Get or create a connector instance."""
if self._connector is None:
self._connector = PreprocessorConnector()
return self._connector
async def close(self):
"""Close the connector."""
if self._connector:
try:
await self._connector.close()
except Exception as e:
logger.debug(f"Error closing connector: {e}")
self._connector = None
# Global pool instance
_connector_pool = ConnectorPool.get_instance()
# =============================================================================
# Factory functions for configurable tools
# =============================================================================
def create_sql_query_tool(connector_type: str = "preprocessor"):
"""
Execute a SQL SELECT query on the Althaus AG database.
Create a SQL query tool with a specific connector type.
This tool allows you to query the SQLite database to find articles, prices,
Args:
connector_type: Type of database connector to use (e.g., "preprocessor")
Returns:
LangChain tool for executing SQL queries
"""
@tool
async def sqlite_query(query: str) -> str:
"""
Execute a SQL SELECT query on the database.
This tool allows you to query the database to find articles, prices,
inventory levels, and other product information.
Args:
@ -53,8 +108,9 @@ async def sqlite_query(query: str) -> str:
LIMIT 20
"""
try:
connector = PreprocessorConnector()
try:
# Use connection pool for better performance
connector = await _connector_pool.get_connector()
result = await connector.executeQuery(query, return_json=True)
if result.get("text", "").startswith(("Error:", "Query failed:")):
@ -69,40 +125,47 @@ async def sqlite_query(query: str) -> str:
if not data:
return f"Query executed successfully. Returned {row_count} rows (no data)."
# Format as readable string
lines = [f"Query executed successfully. Returned {row_count} rows:"]
# Format as readable string - optimized for faster output
lines = [f"Query returned {row_count} rows:"]
# Show column headers from first row
if data and isinstance(data[0], dict):
headers = list(data[0].keys())
lines.append("\nColumns: " + ", ".join(headers))
lines.append("\nResults:")
lines.append("Columns: " + ", ".join(headers))
# Show first 50 rows
for i, row in enumerate(data[:50], 1):
# Show first 30 rows (reduced for faster response)
max_rows = min(30, len(data))
for i, row in enumerate(data[:max_rows], 1):
row_str = ", ".join([f"{k}: {v}" for k, v in row.items()])
lines.append(f"{i}. {row_str}")
if row_count > 50:
lines.append(f"\n(Showing first 50 of {row_count} rows)")
if row_count > max_rows:
lines.append(f"(Showing first {max_rows} of {row_count} rows)")
else:
# Fallback for non-dict rows
for i, row in enumerate(data[:50], 1):
for i, row in enumerate(data[:30], 1):
lines.append(f"{i}. {row}")
return "\n".join(lines)
finally:
await connector.close()
# Note: Connection is reused, not closed after each query
except Exception as e:
error_msg = f"Error executing SQL query: {str(e)}"
logger.error(error_msg, exc_info=True)
return error_msg
return sqlite_query
@tool
async def tavily_search(query: str) -> str:
def create_tavily_search_tool():
"""
Create a Tavily web search tool.
Returns:
LangChain tool for executing Tavily web searches
"""
@tool
async def tavily_search(query: str) -> str:
"""
Search the internet for comprehensive information using Tavily search via AI Center.
@ -272,9 +335,23 @@ async def tavily_search(query: str) -> str:
logger.error(error_msg, exc_info=True)
return error_msg
return tavily_search
# Note: send_streaming_message will be created in the LangGraph integration
# where it has access to the event manager. For now, we define it here as a placeholder.
# =============================================================================
# Legacy tool definitions (kept for backwards compatibility)
# =============================================================================
# Legacy sqlite_query tool using default preprocessor connector
sqlite_query = create_sql_query_tool("preprocessor")
# Legacy tavily_search tool
tavily_search = create_tavily_search_tool()
# =============================================================================
# Streaming message tool factory
# =============================================================================
def create_send_streaming_message_tool(event_manager=None):
"""

View file

@ -3,14 +3,15 @@
"""Chatbot domain logic."""
import logging
from dataclasses import dataclass
from typing import Annotated, AsyncIterator, Any, List
from dataclasses import dataclass, field
from typing import Annotated, AsyncIterator, Any, List, Optional, TYPE_CHECKING
from pydantic import BaseModel
from langchain_core.messages import (
BaseMessage,
HumanMessage,
SystemMessage,
ToolMessage,
trim_messages,
)
from langgraph.graph.message import add_messages
@ -21,14 +22,17 @@ from langgraph.prebuilt import ToolNode
from modules.features.chatbot.bridges.ai import AICenterChatModel
from modules.features.chatbot.bridges.memory import DatabaseCheckpointer
from modules.features.chatbot.bridges.tools import (
sqlite_query,
tavily_search,
create_sql_query_tool,
create_tavily_search_tool,
create_send_streaming_message_tool,
)
from modules.features.chatbot.streaming.helpers import ChatStreamingHelper
from modules.features.chatbot.streaming.events import get_event_manager
from modules.datamodels.datamodelUam import User
if TYPE_CHECKING:
from modules.features.chatbot.config import ChatbotConfig
logger = logging.getLogger(__name__)
@ -47,6 +51,7 @@ class Chatbot:
app: CompiledStateGraph = None
system_prompt: str = "You are a helpful assistant."
workflow_id: str = "default"
config: Optional["ChatbotConfig"] = None
@classmethod
async def create(
@ -55,6 +60,7 @@ class Chatbot:
memory: DatabaseCheckpointer,
system_prompt: str,
workflow_id: str = "default",
config: Optional["ChatbotConfig"] = None,
) -> "Chatbot":
"""Factory method to create and configure a Chatbot instance.
@ -63,6 +69,7 @@ class Chatbot:
memory: The chat memory to use (DatabaseCheckpointer).
system_prompt: The system prompt to initialize the chatbot.
workflow_id: The workflow ID (maps to thread_id).
config: Optional chatbot configuration for dynamic tool enablement.
Returns:
A configured Chatbot instance.
@ -72,30 +79,55 @@ class Chatbot:
memory=memory,
system_prompt=system_prompt,
workflow_id=workflow_id,
config=config,
)
configured_tools = await instance._configure_tools()
instance.app = instance._build_app(memory, configured_tools)
return instance
async def _configure_tools(self) -> List[Any]:
"""Configure tools for the chatbot.
"""Configure tools for the chatbot based on config.
Returns:
List of configured tools.
List of configured tools based on config settings.
"""
tools = []
# SQL query tool
tools.append(sqlite_query)
# Get tool enablement from config (use defaults if no config)
sql_enabled = True
tavily_enabled = False
streaming_enabled = True
connector_type = "preprocessor"
# Tavily search tool
tools.append(tavily_search)
if self.config:
sql_enabled = self.config.tools.is_sql_enabled()
tavily_enabled = self.config.tools.is_tavily_enabled()
streaming_enabled = self.config.tools.is_streaming_enabled()
connector_type = self.config.database.connector
# Streaming status tool (needs event manager)
logger.info(f"Chatbot tools config - SQL: {sql_enabled}, Tavily: {tavily_enabled}, "
f"Streaming: {streaming_enabled}, Connector: {connector_type}")
# SQL query tool (if enabled)
if sql_enabled:
sql_tool = create_sql_query_tool(connector_type=connector_type)
tools.append(sql_tool)
logger.debug(f"Added SQL query tool with connector: {connector_type}")
# Tavily search tool (if enabled)
if tavily_enabled:
tavily_tool = create_tavily_search_tool()
tools.append(tavily_tool)
logger.debug("Added Tavily search tool")
# Streaming status tool (if enabled)
if streaming_enabled:
event_manager = get_event_manager()
send_streaming_message = create_send_streaming_message_tool(event_manager)
tools.append(send_streaming_message)
logger.debug("Added streaming status tool")
logger.info(f"Configured {len(tools)} tools for chatbot workflow {self.workflow_id}")
return tools
def _build_app(
@ -189,7 +221,7 @@ class Chatbot:
return "tools" if getattr(last_message, "tool_calls", None) else END
async def tools_with_retry(state: ChatState) -> dict:
"""Tools node with retry logic.
"""Tools node with parallel execution and retry logic.
Args:
state: The current chat state.
@ -197,9 +229,84 @@ class Chatbot:
Returns:
The updated chat state after tool execution.
"""
# Execute tools normally
tool_node = ToolNode(tools=tools)
result = await tool_node.ainvoke(state)
import asyncio
# Get tool calls from the last message
last_message = state.messages[-1]
tool_calls = getattr(last_message, "tool_calls", [])
if not tool_calls:
return {"messages": []}
# Create a lookup for tools by name
tools_by_name = {t.name: t for t in tools}
async def execute_single_tool(tool_call):
"""Execute a single tool call."""
tool_name = tool_call.get("name") or tool_call.get("function", {}).get("name")
tool_id = tool_call.get("id", f"call_{tool_name}")
args = tool_call.get("args") or tool_call.get("function", {}).get("arguments", {})
if isinstance(args, str):
import json
try:
args = json.loads(args)
except:
args = {"input": args}
tool = tools_by_name.get(tool_name)
if not tool:
return ToolMessage(
content=f"Error: Tool '{tool_name}' not found",
tool_call_id=tool_id,
name=tool_name
)
try:
# Execute tool asynchronously
if asyncio.iscoroutinefunction(tool.coroutine):
result = await tool.coroutine(**args)
elif hasattr(tool, 'ainvoke'):
result = await tool.ainvoke(args)
else:
result = tool.invoke(args)
return ToolMessage(
content=str(result),
tool_call_id=tool_id,
name=tool_name
)
except Exception as e:
logger.error(f"Tool {tool_name} failed: {e}")
return ToolMessage(
content=f"Error executing {tool_name}: {str(e)}",
tool_call_id=tool_id,
name=tool_name
)
# Execute ALL tool calls in parallel
logger.info(f"Executing {len(tool_calls)} tool calls in parallel")
tool_messages = await asyncio.gather(
*[execute_single_tool(tc) for tc in tool_calls],
return_exceptions=True
)
# Convert exceptions to error messages
result_messages = []
for i, msg in enumerate(tool_messages):
if isinstance(msg, Exception):
tool_call = tool_calls[i]
tool_name = tool_call.get("name", "unknown")
tool_id = tool_call.get("id", f"call_{i}")
result_messages.append(ToolMessage(
content=f"Error: {str(msg)}",
tool_call_id=tool_id,
name=tool_name
))
else:
result_messages.append(msg)
result = {"messages": result_messages}
# Check if we got no results and should retry
no_results_keywords = [

View file

@ -2,41 +2,72 @@
# All rights reserved.
"""
Configuration system for chatbot instances.
Loads JSON configuration files from configs/ directory.
Supports loading from:
1. Database (FeatureInstance.config JSONB field) - primary method
2. JSON files from configs/ directory - fallback/legacy method
"""
import logging
import json
import warnings
from pathlib import Path
from dataclasses import dataclass
from typing import Optional, Dict, Any
from dataclasses import dataclass, field
from typing import Optional, Dict, Any, TYPE_CHECKING
if TYPE_CHECKING:
from modules.datamodels.datamodelFeatures import FeatureInstance
logger = logging.getLogger(__name__)
# Cache for loaded configs
# Cache for loaded configs (by instance ID for database configs)
_config_cache: Dict[str, 'ChatbotConfig'] = {}
# Default system prompt when none is configured
DEFAULT_SYSTEM_PROMPT = "You are a helpful assistant. You have access to SQL query tools and web search tools. Use them to help answer user questions."
@dataclass
class DatabaseConfig:
"""Database configuration for a chatbot instance."""
schema: Dict[str, Any]
schema: Dict[str, Any] = field(default_factory=dict)
connector: str = "preprocessor"
def is_sql_enabled(self) -> bool:
"""Check if SQL queries are possible (has connector)."""
return bool(self.connector)
@dataclass
class ToolConfig:
"""Tool configuration for a chatbot instance."""
sql: Dict[str, Any]
sql: Dict[str, Any] = field(default_factory=lambda: {"enabled": True})
tavily: Optional[Dict[str, Any]] = None
streaming: Dict[str, Any] = None
streaming: Dict[str, Any] = field(default_factory=lambda: {"enabled": True})
def is_sql_enabled(self) -> bool:
"""Check if SQL tool is enabled."""
if self.sql is None:
return True # Default enabled
return self.sql.get("enabled", True)
def is_tavily_enabled(self) -> bool:
"""Check if Tavily web search tool is enabled."""
if self.tavily is None:
return False # Default disabled
return self.tavily.get("enabled", False)
def is_streaming_enabled(self) -> bool:
"""Check if streaming status tool is enabled."""
if self.streaming is None:
return True # Default enabled
return self.streaming.get("enabled", True)
@dataclass
class ModelConfig:
"""Model configuration for a chatbot instance."""
operationType: str = "DATA_ANALYSE"
processingMode: str = "DETAILED"
processingMode: str = "BASIC" # Changed from DETAILED for faster responses
@dataclass
@ -50,32 +81,218 @@ class ChatbotConfig:
model: ModelConfig
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'ChatbotConfig':
"""Create ChatbotConfig from dictionary."""
def from_dict(cls, data: Dict[str, Any], config_id: str = "default") -> 'ChatbotConfig':
"""
Create ChatbotConfig from dictionary.
Supports two config formats:
1. New format (file-based): systemPrompt, database, tools, model
2. Legacy frontend format: connector, prompts, behavior
Args:
data: Configuration dictionary (from JSON file or FeatureInstance.config)
config_id: Identifier for this config (instance ID or file name)
Returns:
ChatbotConfig instance with validated values
"""
# Detect config format and normalize
if "prompts" in data or "connector" in data or "behavior" in data:
# Legacy frontend format - convert to new format
data = cls._convert_legacy_config(data)
# Get system prompt - required field, use default if not provided
system_prompt = data.get("systemPrompt")
if not system_prompt:
logger.warning(f"Config {config_id}: No systemPrompt provided, using default")
system_prompt = DEFAULT_SYSTEM_PROMPT
# Parse database config
db_data = data.get("database", {})
database_config = DatabaseConfig(
schema=db_data.get("schema", {}),
connector=db_data.get("connector", "preprocessor")
)
# Parse tools config with defaults
tools_data = data.get("tools", {})
tools_config = ToolConfig(
sql=tools_data.get("sql", {"enabled": True}),
tavily=tools_data.get("tavily", {"enabled": False}),
streaming=tools_data.get("streaming", {"enabled": True})
)
# Parse model config with defaults
model_data = data.get("model", {})
model_config = ModelConfig(
operationType=model_data.get("operationType", "DATA_ANALYSE"),
processingMode=model_data.get("processingMode", "DETAILED")
)
return cls(
id=data.get("id", "default"),
name=data.get("name", "Default Chatbot"),
systemPrompt=data.get("systemPrompt", "You are a helpful assistant."),
database=DatabaseConfig(
schema=data.get("database", {}).get("schema", {}),
connector=data.get("database", {}).get("connector", "preprocessor")
),
tools=ToolConfig(
sql=data.get("tools", {}).get("sql", {"enabled": True}),
tavily=data.get("tools", {}).get("tavily"),
streaming=data.get("tools", {}).get("streaming", {"enabled": True})
),
model=ModelConfig(
operationType=data.get("model", {}).get("operationType", "DATA_ANALYSE"),
processingMode=data.get("model", {}).get("processingMode", "DETAILED")
)
id=data.get("id", config_id),
name=data.get("name", "Chatbot"),
systemPrompt=system_prompt,
database=database_config,
tools=tools_config,
model=model_config
)
@staticmethod
def _convert_legacy_config(data: Dict[str, Any]) -> Dict[str, Any]:
"""
Convert legacy frontend config format to new format.
def load_chatbot_config(config_id: str) -> ChatbotConfig:
Legacy format (from AdminFeatureAccessPage.tsx):
{
"connector": {"types": [...], "type": "preprocessor"},
"prompts": {"customAnalysisPrompt": "...", "customFinalAnswerPrompt": "..."},
"behavior": {"enableWebResearch": true, ...}
}
New format:
{
"systemPrompt": "...",
"database": {"connector": "preprocessor"},
"tools": {"sql": {"enabled": true}, "tavily": {"enabled": true}}
}
"""
converted = {}
# Extract system prompt from prompts section
prompts = data.get("prompts", {})
system_prompt = prompts.get("customAnalysisPrompt") or prompts.get("customFinalAnswerPrompt")
if system_prompt:
converted["systemPrompt"] = system_prompt
# Extract connector from connector section
connector_data = data.get("connector", {})
connector_type = connector_data.get("type") or "preprocessor"
if isinstance(connector_data.get("types"), list) and connector_data["types"]:
connector_type = connector_data["types"][0] # Use first connector as primary
converted["database"] = {
"connector": connector_type,
"schema": {}
}
# Extract tool settings from behavior section
behavior = data.get("behavior", {})
enable_web_research = behavior.get("enableWebResearch", False)
converted["tools"] = {
"sql": {"enabled": True}, # SQL always enabled if connector is set
"tavily": {"enabled": enable_web_research},
"streaming": {"enabled": True} # Streaming always enabled
}
# Model config defaults - use BASIC for faster responses
converted["model"] = {
"operationType": "DATA_ANALYSE",
"processingMode": "BASIC"
}
# Copy other fields
if "id" in data:
converted["id"] = data["id"]
if "name" in data:
converted["name"] = data["name"]
logger.debug(f"Converted legacy config format to new format")
return converted
def to_dict(self) -> Dict[str, Any]:
"""Convert config to dictionary for serialization."""
return {
"id": self.id,
"name": self.name,
"systemPrompt": self.systemPrompt,
"database": {
"schema": self.database.schema,
"connector": self.database.connector
},
"tools": {
"sql": self.tools.sql,
"tavily": self.tools.tavily,
"streaming": self.tools.streaming
},
"model": {
"operationType": self.model.operationType,
"processingMode": self.model.processingMode
}
}
def load_chatbot_config_from_instance(instance: 'FeatureInstance') -> ChatbotConfig:
"""
Load chatbot configuration from a FeatureInstance's config field.
This is the primary method for loading chatbot configuration.
The config is stored in the FeatureInstance.config JSONB field.
Args:
instance: FeatureInstance object with config field
Returns:
ChatbotConfig instance
Raises:
ValueError: If instance has no config and no fallback available
"""
instance_id = instance.id
# Check cache first (by instance ID)
cache_key = f"instance_{instance_id}"
if cache_key in _config_cache:
logger.debug(f"Returning cached config for instance {instance_id}")
return _config_cache[cache_key]
# Get config from instance
config_data = instance.config
if not config_data:
# No config in instance - try to load default from file as fallback
logger.warning(f"Instance {instance_id} has no config, loading default from file")
try:
return load_chatbot_config_from_file("default")
except FileNotFoundError:
# Create minimal default config
logger.warning(f"No default config file found, using minimal defaults")
config_data = {}
# Create config from dictionary
config = ChatbotConfig.from_dict(config_data, config_id=instance_id)
# Cache the config
_config_cache[cache_key] = config
logger.info(f"Loaded chatbot config from instance {instance_id}: {config.name}")
return config
def load_chatbot_config_from_dict(config_data: Dict[str, Any], config_id: str = "custom") -> ChatbotConfig:
"""
Load chatbot configuration from a dictionary.
Useful for testing or when config data is already available.
Args:
config_data: Configuration dictionary
config_id: Identifier for this config
Returns:
ChatbotConfig instance
"""
return ChatbotConfig.from_dict(config_data, config_id=config_id)
def load_chatbot_config_from_file(config_id: str) -> ChatbotConfig:
"""
Load chatbot configuration from JSON file.
This is the legacy/fallback method for loading configuration.
Prefer load_chatbot_config_from_instance() for production use.
Args:
config_id: Configuration ID (e.g., "althaus", "default")
@ -86,10 +303,11 @@ def load_chatbot_config(config_id: str) -> ChatbotConfig:
FileNotFoundError: If config file not found
ValueError: If config file is invalid
"""
# Check cache first
if config_id in _config_cache:
logger.debug(f"Returning cached config for {config_id}")
return _config_cache[config_id]
# Check cache first (by file ID)
cache_key = f"file_{config_id}"
if cache_key in _config_cache:
logger.debug(f"Returning cached config for file {config_id}")
return _config_cache[cache_key]
# Get path to configs directory
current_dir = Path(__file__).parent
@ -99,19 +317,19 @@ def load_chatbot_config(config_id: str) -> ChatbotConfig:
if not config_file.exists():
# Try default config if requested config not found
if config_id != "default":
logger.warning(f"Config {config_id} not found, trying default")
return load_chatbot_config("default")
logger.warning(f"Config file {config_id} not found, trying default")
return load_chatbot_config_from_file("default")
raise FileNotFoundError(f"Chatbot config file not found: {config_file}")
try:
with open(config_file, 'r', encoding='utf-8') as f:
data = json.load(f)
config = ChatbotConfig.from_dict(data)
config = ChatbotConfig.from_dict(data, config_id=config_id)
# Cache the config
_config_cache[config_id] = config
logger.info(f"Loaded chatbot config: {config_id} ({config.name})")
_config_cache[cache_key] = config
logger.info(f"Loaded chatbot config from file: {config_id} ({config.name})")
return config
@ -123,8 +341,42 @@ def load_chatbot_config(config_id: str) -> ChatbotConfig:
raise
def clear_config_cache():
"""Clear the configuration cache."""
def load_chatbot_config(config_id: str) -> ChatbotConfig:
"""
Load chatbot configuration from JSON file.
DEPRECATED: Use load_chatbot_config_from_instance() for database configs
or load_chatbot_config_from_file() for file-based configs.
Args:
config_id: Configuration ID (e.g., "althaus", "default")
Returns:
ChatbotConfig instance
"""
warnings.warn(
"load_chatbot_config() is deprecated. Use load_chatbot_config_from_instance() "
"for database configs or load_chatbot_config_from_file() for file-based configs.",
DeprecationWarning,
stacklevel=2
)
return load_chatbot_config_from_file(config_id)
def clear_config_cache(instance_id: Optional[str] = None):
"""
Clear the configuration cache.
Args:
instance_id: Optional instance ID to clear specific cache entry.
If None, clears entire cache.
"""
global _config_cache
if instance_id:
cache_key = f"instance_{instance_id}"
if cache_key in _config_cache:
del _config_cache[cache_key]
logger.debug(f"Cleared chatbot config cache for instance {instance_id}")
else:
_config_cache.clear()
logger.debug("Cleared chatbot config cache")
logger.debug("Cleared all chatbot config cache")

View file

@ -16,7 +16,7 @@ from modules.security.rbac import RbacClass
from modules.datamodels.datamodelRbac import AccessRuleContext
from modules.datamodels.datamodelUam import AccessLevel
from .datamodelFeatureChatbot import (
from modules.datamodels.datamodelChat import (
ChatDocument,
ChatStat,
ChatLog,

View file

@ -25,7 +25,7 @@ from modules.interfaces.interfaceDbApp import getRootInterface
from modules.interfaces.interfaceFeatures import getFeatureInterface
# Import models
from .datamodelFeatureChatbot import ChatWorkflow, UserInputRequest, WorkflowModeEnum
from modules.datamodels.datamodelChat import ChatWorkflow, UserInputRequest, WorkflowModeEnum
from modules.datamodels.datamodelPagination import PaginationParams, PaginatedResponse, PaginationMetadata
# Import chatbot feature
@ -33,7 +33,7 @@ from modules.features.chatbot import chatProcess
from modules.features.chatbot.streaming.events import get_event_manager
# Import workflow control functions
from modules.features.workflow import chatStop
from modules.workflows.automation import chatStop
# Configure logger
logger = logging.getLogger(__name__)
@ -530,49 +530,30 @@ async def delete_chatbot(
context: RequestContext = Depends(getRequestContext)
) -> Dict[str, Any]:
"""Deletes a chatbot workflow and its associated data."""
# Validate instance access
# Validate instance access - if user has access to instance, they can delete their workflows
mandateId = await _validateInstanceAccess(instanceId, context)
try:
# Get service center
interfaceDbChat = _getServiceChat(context, instanceId)
# Check workflow access and permission using RBAC
workflows = getRecordsetWithRBAC(
interfaceDbChat.db,
ChatWorkflow,
context.user,
recordFilter={"id": workflowId}
)
if not workflows:
# Get workflow directly (interface already handles mandate filtering)
workflow = interfaceDbChat.getWorkflow(workflowId)
if not workflow:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Workflow with ID {workflowId} not found"
)
workflow_data = workflows[0]
# Check if workflow is a chatbot workflow
if workflow_data.get("workflowMode") != WorkflowModeEnum.WORKFLOW_CHATBOT.value:
if workflow.workflowMode != WorkflowModeEnum.WORKFLOW_CHATBOT.value:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Workflow {workflowId} is not a chatbot workflow"
)
# Verify workflow belongs to this instance
workflow_instance_id = workflow_data.get("featureInstanceId")
if workflow_instance_id != instanceId:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Workflow {workflowId} does not belong to instance '{instanceId}'"
)
# Check if user has permission to delete using RBAC
if not interfaceDbChat.checkRbacPermission(ChatWorkflow, "delete", workflowId):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You don't have permission to delete this workflow"
)
# User has instance access, allow delete (no complex RBAC checks needed)
logger.info(f"User {context.user.id} deleting workflow {workflowId} from instance {instanceId}")
# Delete workflow
success = interfaceDbChat.deleteWorkflow(workflowId)

View file

@ -23,7 +23,11 @@ from modules.features.chatbot.streaming.events import get_event_manager
from modules.features.chatbot.chatbot import Chatbot
from modules.features.chatbot.bridges.ai import AICenterChatModel
from modules.features.chatbot.bridges.memory import DatabaseCheckpointer
from modules.features.chatbot.config import load_chatbot_config
from modules.features.chatbot.config import (
load_chatbot_config_from_instance,
load_chatbot_config_from_file,
ChatbotConfig
)
from modules.datamodels.datamodelAi import OperationTypeEnum, ProcessingModeEnum
from modules.workflows.methods.methodAi.methodAi import MethodAi
from modules.connectors.connectorPreprocessor import PreprocessorConnector
@ -62,8 +66,10 @@ def _extractJsonFromResponse(content: str) -> Optional[dict]:
async def chatProcess(
currentUser: User,
mandateId: Optional[str],
userInput: UserInputRequest,
workflowId: Optional[str] = None
workflowId: Optional[str] = None,
featureInstanceId: Optional[str] = None
) -> ChatWorkflow:
"""
Simple chatbot processing - analyze user input and generate queries.
@ -76,15 +82,17 @@ async def chatProcess(
Args:
currentUser: Current user
mandateId: Mandate ID for the workflow
userInput: User input request
workflowId: Optional workflow ID to continue existing conversation
featureInstanceId: Feature instance ID for loading instance-specific config
Returns:
ChatWorkflow instance
"""
try:
# Get services
services = getServices(currentUser, None)
# Get services with mandate context
services = getServices(currentUser, mandateId)
interfaceDbChat = services.interfaceDbChat
# Get event manager and create queue if needed
@ -120,7 +128,8 @@ async def chatProcess(
# Create new workflow
workflowData = {
"id": str(uuid.uuid4()),
"mandateId": currentUser.mandateId,
"mandateId": mandateId,
"featureInstanceId": featureInstanceId, # Store feature instance for RBAC
"status": "running",
"name": conversation_name,
"currentRound": 1,
@ -216,7 +225,8 @@ async def chatProcess(
currentUser,
workflow.id,
userInput,
userMessage.id
userMessage.id,
featureInstanceId=featureInstanceId
))
# Reload workflow to include new message
@ -869,41 +879,12 @@ async def _bridge_chatbot_events(
async for event in event_stream:
event_type = event.get("type")
# Handle status updates
# Handle status updates - emit immediately for real-time UI feedback
# Note: Status updates are transient UI feedback, no need to persist them
if event_type == "status":
label = event.get("label", "")
if label:
# Store status update as a log entry (like the old implementation)
try:
workflow = interface_db_chat.getWorkflow(workflow_id)
if workflow:
log_data = {
"id": f"log_{workflow_id}_{getUtcTimestamp()}",
"workflowId": workflow_id,
"message": label.strip(),
"type": "status",
"step": "status",
"timestamp": getUtcTimestamp(),
"roundNumber": workflow.currentRound if workflow else 1,
"taskNumber": 0,
"actionNumber": 0
}
created_log = interface_db_chat.createLog(log_data)
# Emit as chatdata event with log item
await event_manager.emit_event(
context_id=workflow_id,
event_type="chatdata",
data={
"type": "log",
"createdAt": parseTimestamp(created_log.timestamp, default=getUtcTimestamp()),
"item": created_log.dict()
},
event_category="chat"
)
except Exception as e:
logger.error(f"Error storing status log: {e}", exc_info=True)
# Fallback: emit as status event if log creation fails
# Emit the status event directly for real-time UI feedback
await event_manager.emit_event(
context_id=workflow_id,
event_type="chatdata",
@ -1101,16 +1082,59 @@ async def _bridge_chatbot_events(
)
async def _load_chatbot_config(featureInstanceId: Optional[str]) -> ChatbotConfig:
"""
Load chatbot configuration from FeatureInstance (database) or file fallback.
Args:
featureInstanceId: Feature instance ID to load config from
Returns:
ChatbotConfig instance
"""
if featureInstanceId:
try:
# Import here to avoid circular imports
from modules.interfaces.interfaceDbApp import getRootInterface
from modules.interfaces.interfaceFeatures import getFeatureInterface
# Get feature instance from database
rootInterface = getRootInterface()
featureInterface = getFeatureInterface(rootInterface.db)
instance = featureInterface.getFeatureInstance(featureInstanceId)
if instance and instance.config:
logger.info(f"Loading chatbot config from FeatureInstance {featureInstanceId}")
return load_chatbot_config_from_instance(instance)
else:
logger.warning(f"FeatureInstance {featureInstanceId} has no config, using file fallback")
except Exception as e:
logger.error(f"Error loading config from FeatureInstance {featureInstanceId}: {e}")
# Fallback to file-based config (default)
logger.info("Using file-based chatbot config (default)")
return load_chatbot_config_from_file("default")
async def _processChatbotMessageLangGraph(
services,
currentUser: User,
workflowId: str,
userInput: UserInputRequest,
userMessageId: str
userMessageId: str,
featureInstanceId: Optional[str] = None
):
"""
Process chatbot message using LangGraph.
Uses LangGraph workflow with AI center models and tools.
Args:
services: Service container
currentUser: Current user
workflowId: Workflow ID
userInput: User input request
userMessageId: User message ID
featureInstanceId: Optional feature instance ID for loading instance-specific config
"""
event_manager = get_event_manager()
@ -1136,11 +1160,8 @@ async def _processChatbotMessageLangGraph(
logger.info(f"Workflow {workflowId} was stopped, aborting processing")
return
# Determine config ID (default to "althaus" for now, can be made configurable)
config_id = "althaus" # TODO: Make this configurable per workflow
# Load configuration
config = load_chatbot_config(config_id)
# Load configuration from FeatureInstance (database) or fall back to file
config = await _load_chatbot_config(featureInstanceId)
# Replace {{DATE}} placeholder in system prompt
from datetime import datetime
@ -1162,12 +1183,13 @@ async def _processChatbotMessageLangGraph(
# Create memory/checkpointer
memory = DatabaseCheckpointer(user=currentUser, workflow_id=workflowId)
# Create chatbot instance
# Create chatbot instance with config for dynamic tool configuration
chatbot = await Chatbot.create(
model=model,
memory=memory,
system_prompt=system_prompt,
workflow_id=workflowId
workflow_id=workflowId,
config=config
)
# Stream events using chatbot

View file

@ -810,15 +810,14 @@ class ChatObjects:
for message in messages:
messageId = message.id
if messageId:
# Delete message stats
existing_stats = getRecordsetWithRBAC(self.db, ChatStat, self.currentUser, recordFilter={"messageId": messageId})
for stat in existing_stats:
self.db.recordDelete(ChatStat, stat["id"])
# Delete message documents (but NOT the files!)
# Note: ChatStat does NOT have messageId - stats are only at workflow level
try:
existing_docs = getRecordsetWithRBAC(self.db, ChatDocument, self.currentUser, recordFilter={"messageId": messageId})
for doc in existing_docs:
self.db.recordDelete(ChatDocument, doc["id"])
except Exception as e:
logger.warning(f"Error deleting documents for message {messageId}: {e}")
# Delete the message itself
self.db.recordDelete(ChatMessage, messageId)
@ -1109,8 +1108,6 @@ class ChatObjects:
actionName=createdMessage.get("actionName")
)
<<<<<<< HEAD:modules/interfaces/interfaceDbChat.py
=======
# Emit message event for streaming (if event manager is available)
try:
from modules.features.chatbot.streaming.events import get_event_manager
@ -1131,7 +1128,6 @@ class ChatObjects:
# Event manager not available or error - continue without emitting
logger.debug(f"Could not emit message event: {e}")
>>>>>>> feat/chatbot-althaus-integration:modules/interfaces/interfaceDbChatObjects.py
# Debug: Store message and documents for debugging - only if debug enabled
storeDebugMessageAndDocuments(chat_message, self.currentUser)
@ -1492,8 +1488,6 @@ class ChatObjects:
# Create log in normalized table
createdLog = self.db.recordCreate(ChatLog, log_model)
<<<<<<< HEAD:modules/interfaces/interfaceDbChat.py
=======
# Emit log event for streaming (if event manager is available)
try:
from modules.features.chatbot.streaming.events import get_event_manager
@ -1514,7 +1508,6 @@ class ChatObjects:
# Event manager not available or error - continue without emitting
logger.debug(f"Could not emit log event: {e}")
>>>>>>> feat/chatbot-althaus-integration:modules/interfaces/interfaceDbChatObjects.py
# Return validated ChatLog instance
return ChatLog(**createdLog)