changes to chatbot

This commit is contained in:
Ida Dittrich 2026-01-30 11:59:21 +01:00
parent e4662b19e2
commit 7a86914040
10 changed files with 82 additions and 5142 deletions

View file

@ -1,170 +0,0 @@
# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Adapter to use AI Center as a LangChain-compatible chat model.
Maps LangChain message format to AI Center requests and responses.
"""
import logging
from typing import Any, AsyncIterator, Iterator, List, Optional
from langchain_core.language_models.chat_models import BaseChatModel
from langchain_core.messages import (
AIMessage,
BaseMessage,
HumanMessage,
SystemMessage,
)
from langchain_core.outputs import ChatGeneration, ChatResult
from langchain_core.callbacks import AsyncCallbackHandlerForLLMRun, CallbackManagerForLLMRun
from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationTypeEnum, ProcessingModeEnum
logger = logging.getLogger(__name__)
class AICenterChatModel(BaseChatModel):
"""
Adapter to use AI center as LangChain chat model.
Converts LangChain messages to AI center format and back.
"""
def __init__(
self,
services,
system_prompt: str = "",
temperature: float = 0.2,
**kwargs
):
"""
Initialize AI Center chat model adapter.
Args:
services: Services instance with AI access
system_prompt: System prompt to use
temperature: Temperature for AI calls
"""
super().__init__(**kwargs)
self.services = services
self.system_prompt = system_prompt
self.temperature = temperature
@property
def _llm_type(self) -> str:
"""Return identifier of LLM type."""
return "ai_center"
def _generate(
self,
messages: List[BaseMessage],
stop: Optional[List[str]] = None,
run_manager: Optional[CallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> ChatResult:
"""
Synchronous generation - not supported, use async version.
"""
raise NotImplementedError("Use async version: _agenerate")
async def _agenerate(
self,
messages: List[BaseMessage],
stop: Optional[List[str]] = None,
run_manager: Optional[AsyncCallbackHandlerForLLMRun] = None,
**kwargs: Any,
) -> ChatResult:
"""
Generate chat response using AI center.
Args:
messages: List of LangChain messages
stop: Optional list of stop sequences
run_manager: Optional callback manager
**kwargs: Additional arguments
Returns:
ChatResult with generated message
"""
# Convert LangChain messages to AI center prompt format
prompt_parts = []
# Add system prompt if present
if self.system_prompt:
prompt_parts.append(self.system_prompt)
# Convert messages to text format
for msg in messages:
if isinstance(msg, SystemMessage):
# System messages are already in system_prompt or can be added here
if not self.system_prompt:
prompt_parts.append(f"System: {msg.content}")
elif isinstance(msg, HumanMessage):
prompt_parts.append(f"User: {msg.content}")
elif isinstance(msg, AIMessage):
prompt_parts.append(f"Assistant: {msg.content}")
else:
# Generic message
prompt_parts.append(str(msg.content))
# Combine into single prompt
full_prompt = "\n\n".join(prompt_parts)
# Create AI center request
ai_request = AiCallRequest(
prompt=full_prompt,
options=AiCallOptions(
resultFormat="txt",
operationType=OperationTypeEnum.DATA_ANALYSE,
processingMode=ProcessingModeEnum.DETAILED,
temperature=self.temperature
)
)
# Call AI center
try:
await self.services.ai.ensureAiObjectsInitialized()
ai_response = await self.services.ai.callAi(ai_request)
# Extract content
content = ai_response.content if hasattr(ai_response, 'content') else str(ai_response)
# Create AIMessage from response
ai_message = AIMessage(content=content)
# Create ChatGeneration
generation = ChatGeneration(message=ai_message)
# Return ChatResult
return ChatResult(generations=[generation])
except Exception as e:
logger.error(f"Error calling AI center: {e}", exc_info=True)
# Return error message
error_message = AIMessage(content=f"Error: {str(e)}")
generation = ChatGeneration(message=error_message)
return ChatResult(generations=[generation])
async def astream(
self,
messages: List[BaseMessage],
stop: Optional[List[str]] = None,
run_manager: Optional[AsyncCallbackHandlerForLLMRun] = None,
**kwargs: Any,
) -> AsyncIterator[BaseMessage]:
"""
Stream chat response (not fully supported by AI center, returns single chunk).
Args:
messages: List of LangChain messages
stop: Optional list of stop sequences
run_manager: Optional callback manager
**kwargs: Additional arguments
Yields:
BaseMessage chunks
"""
# For now, just return the full response as a single chunk
# TODO: Implement proper streaming if AI center supports it
result = await self._agenerate(messages, stop, run_manager, **kwargs)
if result.generations:
yield result.generations[0].message

View file

@ -1,231 +0,0 @@
# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Chatbot instance configuration management.
Handles loading and applying instance-specific configurations.
"""
import logging
from typing import Optional, Dict, Any, List
from modules.interfaces.interfaceFeatures import getFeatureInterface
from modules.interfaces.interfaceDbApp import getRootInterface
logger = logging.getLogger(__name__)
class ChatbotConfig:
"""
Chatbot instance configuration structure.
Provides defaults and validation for chatbot instance configs.
"""
# Default configuration
DEFAULT_CONFIG = {
"connector": {
"types": ["preprocessor"], # Array of database connector types: "preprocessor", "custom"
"type": "preprocessor", # Legacy: single connector type (for backward compatibility)
"customConnectorClass": None # For custom connectors
},
"prompts": {
"useCustomPrompts": False,
"customAnalysisPrompt": None,
"customFinalAnswerPrompt": None,
"customSystemPrompt": None # For LangGraph workflow (single system prompt)
},
"behavior": {
"maxQueries": 5,
"enableWebResearch": True,
"enableRetryOnEmpty": True,
"maxRetryAttempts": 2
},
"database": {
"schema": None, # Custom schema info if needed
"tablePrefix": None # Custom table prefix if needed
}
}
def __init__(self, config: Optional[Dict[str, Any]] = None):
"""
Initialize chatbot config with defaults and overrides.
Args:
config: Instance-specific config dict (from FeatureInstance.config)
"""
self.config = self._merge_config(config or {})
def _merge_config(self, instance_config: Dict[str, Any]) -> Dict[str, Any]:
"""
Merge instance config with defaults, handling nested dicts.
Args:
instance_config: Instance-specific config
Returns:
Merged configuration dict
"""
merged = self.DEFAULT_CONFIG.copy()
# Deep merge nested dicts
for key, value in instance_config.items():
if key in merged and isinstance(merged[key], dict) and isinstance(value, dict):
merged[key] = {**merged[key], **value}
else:
merged[key] = value
return merged
@property
def connector_types(self) -> List[str]:
"""Get connector types as list (supports multiple connectors)."""
connector_config = self.config.get("connector", {})
# Support new array format
types = []
if "types" in connector_config and isinstance(connector_config["types"], list):
types = connector_config["types"]
# Fallback to legacy single type format
elif "type" in connector_config:
types = [connector_config["type"]]
else:
types = ["preprocessor"]
# Filter out 'websearch' (not a database connector, handled separately via enableWebResearch)
types = [t for t in types if t != "websearch"]
# Ensure at least one connector
if not types:
types = ["preprocessor"]
return types
@property
def connector_type(self) -> str:
"""Get primary connector type (preprocessor, custom)."""
# For backward compatibility, return first connector type
types = self.connector_types
return types[0] if types else "preprocessor"
@property
def custom_connector_class(self) -> Optional[str]:
"""Get custom connector class name if using custom connector."""
return self.config.get("connector", {}).get("customConnectorClass")
@property
def use_custom_prompts(self) -> bool:
"""Check if custom prompts should be used. Always true since prompts are required."""
# Prompts are now required, so this is always true if prompts are configured
return bool(self.config.get("prompts", {}).get("customAnalysisPrompt") or
self.config.get("prompts", {}).get("customFinalAnswerPrompt"))
@property
def custom_analysis_prompt(self) -> Optional[str]:
"""Get custom analysis prompt (required for chatbot instances)."""
prompt = self.config.get("prompts", {}).get("customAnalysisPrompt")
if not prompt:
logger.warning("custom_analysis_prompt is not configured - this is required for chatbot instances")
return prompt
@property
def custom_final_answer_prompt(self) -> Optional[str]:
"""Get custom final answer prompt (required for chatbot instances)."""
prompt = self.config.get("prompts", {}).get("customFinalAnswerPrompt")
if not prompt:
logger.warning("custom_final_answer_prompt is not configured - this is required for chatbot instances")
return prompt
@property
def custom_system_prompt(self) -> Optional[str]:
"""Get custom system prompt for LangGraph workflow."""
# Prefer customSystemPrompt, fallback to customAnalysisPrompt
prompt = self.config.get("prompts", {}).get("customSystemPrompt")
if not prompt:
prompt = self.config.get("prompts", {}).get("customAnalysisPrompt")
return prompt
@property
def max_queries(self) -> int:
"""Get maximum number of queries allowed."""
return self.config.get("behavior", {}).get("maxQueries", 5)
@property
def enable_web_research(self) -> bool:
"""Check if web research is enabled."""
return self.config.get("behavior", {}).get("enableWebResearch", True)
@property
def enable_retry_on_empty(self) -> bool:
"""Check if retry on empty results is enabled."""
return self.config.get("behavior", {}).get("enableRetryOnEmpty", True)
@property
def max_retry_attempts(self) -> int:
"""Get maximum retry attempts."""
return self.config.get("behavior", {}).get("maxRetryAttempts", 2)
def get_connector_instance(self):
"""
Get connector instance based on configuration.
Uses the primary (first) connector type from the configured connectors.
Returns:
Connector instance (PreprocessorConnector, or custom connector if configured)
"""
# Use primary connector type (first in the list)
connector_type = self.connector_type.lower()
if connector_type == "preprocessor":
from modules.connectors.connectorPreprocessor import PreprocessorConnector
return PreprocessorConnector()
elif connector_type == "custom" and self.custom_connector_class:
# Dynamic import for custom connectors
try:
module_path, class_name = self.custom_connector_class.rsplit(".", 1)
module = __import__(module_path, fromlist=[class_name])
connector_class = getattr(module, class_name)
return connector_class()
except Exception as e:
logger.error(f"Failed to load custom connector {self.custom_connector_class}: {e}")
raise ValueError(f"Invalid custom connector: {self.custom_connector_class}")
else:
# Default to PreprocessorConnector
logger.warning(f"Unknown connector type '{connector_type}', using PreprocessorConnector")
from modules.connectors.connectorPreprocessor import PreprocessorConnector
return PreprocessorConnector()
def get_chatbot_config(instance_id: Optional[str]) -> ChatbotConfig:
"""
Load chatbot configuration for a feature instance.
Args:
instance_id: FeatureInstance ID (None for default config)
Returns:
ChatbotConfig instance with merged defaults and instance config
"""
if not instance_id:
# Return default config if no instance ID provided
return ChatbotConfig()
try:
rootInterface = getRootInterface()
featureInterface = getFeatureInterface(rootInterface.db)
instance = featureInterface.getFeatureInstance(instance_id)
if not instance:
logger.warning(f"Feature instance {instance_id} not found, using default config")
return ChatbotConfig()
# Verify it's a chatbot instance
if instance.featureCode != "chatbot":
logger.warning(f"Instance {instance_id} is not a chatbot instance, using default config")
return ChatbotConfig()
# Load config from instance
instance_config = instance.config if hasattr(instance, 'config') and instance.config else {}
return ChatbotConfig(instance_config)
except Exception as e:
logger.error(f"Error loading chatbot config for instance {instance_id}: {e}")
# Return default config on error
return ChatbotConfig()

View file

@ -1,160 +0,0 @@
# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Utility functions for the chatbot module.
Contains conversation name generation and other utilities.
"""
import logging
import re
from typing import Optional
from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationTypeEnum, ProcessingModeEnum
logger = logging.getLogger(__name__)
async def generate_conversation_name(
services,
userPrompt: str,
userLanguage: str = "en"
) -> str:
"""
Generate a short, descriptive conversation name based on user's prompt.
Args:
services: Services instance with AI access
userPrompt: The user's input prompt
userLanguage: User's preferred language (for prompt localization)
Returns:
Short conversation name (max 60 characters)
"""
try:
truncated_prompt = userPrompt[:200] if len(userPrompt) > 200 else userPrompt
name_prompt = f"""Create a professional conversation title in THE SAME LANGUAGE as the user's question.
Question: "{truncated_prompt}"
Rules:
- Title MUST be in the same language as the question (GermanGerman, FrenchFrench, EnglishEnglish)
- Max 60 characters, no punctuation (?, !, .)
- Professional and concise
- Respond ONLY with the title, nothing else"""
await services.ai.ensureAiObjectsInitialized()
nameRequest = AiCallRequest(
prompt=name_prompt,
options=AiCallOptions(
resultFormat="txt",
operationType=OperationTypeEnum.DATA_GENERATE,
processingMode=ProcessingModeEnum.DETAILED,
temperature=0.7
)
)
nameResponse = await services.ai.callAi(nameRequest)
generated_name = nameResponse.content.strip()
# Extract first line and clean up
generated_name = generated_name.split('\n')[0].strip()
generated_name = re.sub(r'^(Title|Titel|Titre|Name|Name:):\s*', '', generated_name, flags=re.IGNORECASE)
generated_name = re.sub(r'^["\']|["\']$', '', generated_name)
generated_name = re.sub(r'[?!.]+$', '', generated_name) # Remove trailing punctuation
# Apply title case
if generated_name:
words = generated_name.split()
capitalized_words = []
for word in words:
if word.isupper() and len(word) > 1:
capitalized_words.append(word) # Keep acronyms
else:
capitalized_words.append(word.capitalize())
generated_name = " ".join(capitalized_words).strip()
# Validate and truncate if needed
if not generated_name or len(generated_name) < 3:
if userLanguage == "de":
generated_name = "Chatbot Konversation"
elif userLanguage == "fr":
generated_name = "Conversation Chatbot"
else:
generated_name = "Chatbot Conversation"
if len(generated_name) > 60:
truncated = generated_name[:57]
last_space = truncated.rfind(' ')
generated_name = truncated[:last_space] + "..." if last_space > 30 else truncated + "..."
logger.info(f"Generated conversation name: '{generated_name}'")
return generated_name
except Exception as e:
logger.error(f"Error generating conversation name: {e}", exc_info=True)
if userLanguage == "de":
return "Chatbot Konversation"
elif userLanguage == "fr":
return "Conversation Chatbot"
else:
return "Chatbot Conversation"
def get_empty_results_retry_instructions(empty_count: int) -> str:
"""
Get retry instructions when empty results are detected.
Args:
empty_count: Number of queries that returned empty results
Returns:
Formatted instructions string
"""
if empty_count == 0:
return ""
return f"""
LEERE ERGEBNISSE ERKANNT
Es wurden {empty_count} Query(s) ausgeführt, die 0 Zeilen zurückgegeben haben. Versuche alternative Strategien.
WICHTIG - MAXIMAL 5 QUERIES FÜR PERFORMANCE
Erstelle MAXIMAL 5 alternative SQL-Queries mit komplett anderen Strategien:
1. **Breitere Suche ohne Zertifizierung**: Entferne Zertifizierungsfilter komplett
- Beispiel: Suche nur nach Netzgerät + einphasig + 10A (ohne UL)
- Suche in Artikelbezeichnung, Artikelbeschrieb, Keywords
2. **Erweiterte Suche nach Netzgeräten mit Ampere-Angaben**: Breitere Ampere-Patterns
- Beispiel: (Netzteil OR Netzgerät) AND (10A OR 15A OR 20A OR Ampere)
- Suche auch nach "Ampere" als Begriff, nicht nur Zahlen
3. **Breitere UL-Suche bei Netzgeräten**: Suche UL in allen Feldern
- Beispiel: (UL OR UL-zertifiziert) AND (Netzgerät OR Netzteil OR Power Supply)
- Suche auch in Keywords-Feld
4. **Netzgeräte mit 10A ohne weitere Filter**: Minimaler Filter
- Beispiel: (Netzgerät OR Netzteil) AND (10A OR 15A OR 20A)
- Keine Filter auf einphasig oder Zertifizierung
5. **Zertifizierte Netzgeräte allgemein**: Breite Zertifizierungs-Suche
- Beispiel: (UL OR CE OR TÜV OR certified OR zertifiziert) AND (Netzgerät OR Netzteil)
6. **COUNT-Abfrage für Statistik**: Prüfe ob überhaupt Artikel existieren
- SELECT COUNT(*) WHERE (Netzgerät OR Netzteil) AND (10A OR 15A OR 20A)
7. **Spezifische Suche nach einphasigen Netzgeräten**: Ohne Zertifizierung
- Beispiel: (einphasig OR 1-phasig OR single phase) AND (Netzgerät OR Netzteil)
8. **Fallback mit minimalen Filtern**: Nur Hauptkriterien
- Beispiel: Netzgerät AND (10A OR 15A OR 20A) - keine weiteren Filter
WICHTIG:
- Erstelle MAXIMAL 5 Queries mit unterschiedlichen Strategien (für Performance)
- Verwende breitere OR-Bedingungen für alternative Begriffe
- Entferne zu spezifische Filter, die möglicherweise keine Treffer finden
- Suche in Artikelbezeichnung, Artikelbeschrieb UND Keywords-Feld
"""

File diff suppressed because it is too large Load diff

View file

@ -1,243 +0,0 @@
# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Generic streaming event manager for real-time updates.
Manages event queues for SSE streaming across all features (chatbot, workflows, documents, etc.).
Supports event-driven streaming instead of polling.
"""
import logging
import asyncio
from typing import Dict, Optional, Any, List, AsyncIterator, Set
from datetime import datetime
logger = logging.getLogger(__name__)
class StreamingEventManager:
"""
Generic event manager for real-time streaming across all features.
Supports multiple event types and contexts (workflows, documents, tasks, etc.).
Thread-safe event emission and queue management.
"""
def __init__(self):
"""Initialize the event manager."""
self._queues: Dict[str, asyncio.Queue] = {}
self._locks: Dict[str, asyncio.Lock] = {}
self._cleanup_tasks: Dict[str, asyncio.Task] = {}
self._subscribers: Dict[str, Set[str]] = {} # context_id -> set of queue_ids (for future multi-subscriber support)
def create_queue(self, context_id: str) -> asyncio.Queue:
"""
Create a new event queue for a context.
Args:
context_id: Context ID (workflow_id, document_id, task_id, etc.)
Returns:
Event queue for the context
"""
if context_id not in self._queues:
self._queues[context_id] = asyncio.Queue()
self._locks[context_id] = asyncio.Lock()
self._subscribers[context_id] = set()
logger.debug(f"Created event queue for context {context_id}")
return self._queues[context_id]
def get_queue(self, context_id: str) -> Optional[asyncio.Queue]:
"""
Get existing event queue for a context.
Args:
context_id: Context ID
Returns:
Event queue if exists, None otherwise
"""
return self._queues.get(context_id)
async def emit_event(
self,
context_id: str,
event_type: str,
data: Dict[str, Any],
event_category: str = "default",
message: Optional[str] = None,
step: Optional[str] = None
):
"""
Emit an event to the context's event queue.
Args:
context_id: Context ID (workflow_id, document_id, etc.)
event_type: Type of event ("message", "log", "status", "progress", "complete", "error", "chatdata")
data: Event data dictionary (will be included in event)
event_category: Category of event for filtering ("chat", "workflow", "document", etc.)
message: Optional event message (for backward compatibility)
step: Optional processing step (for backward compatibility)
"""
queue = self.get_queue(context_id)
if not queue:
logger.debug(f"No event queue found for context {context_id}, skipping event")
return
event = {
"type": event_type,
"category": event_category,
"timestamp": datetime.now().timestamp(),
"data": data,
"message": message, # For backward compatibility
"step": step # For backward compatibility
}
try:
await queue.put(event)
logger.debug(f"Emitted {event_type} event (category: {event_category}) for context {context_id}")
except Exception as e:
logger.error(f"Error emitting event for context {context_id}: {e}")
async def stream_events(
self,
context_id: str,
event_categories: Optional[List[str]] = None,
timeout: Optional[float] = None
) -> AsyncIterator[Dict[str, Any]]:
"""
Async generator for streaming events from a context.
Args:
context_id: Context ID to stream events from
event_categories: Optional list of event categories to filter by
timeout: Optional timeout in seconds (None = no timeout, default: 300s for long-running streams)
Yields:
Event dictionaries
"""
queue = self.get_queue(context_id)
if not queue:
logger.warning(f"No queue found for context {context_id}")
return
# Default timeout of 5 minutes for long-running streams if not specified
effective_timeout = timeout if timeout is not None else 300.0
start_time = asyncio.get_event_loop().time()
last_event_time = start_time
heartbeat_interval = 30.0 # Send heartbeat every 30 seconds to keep connection alive
while True:
# Check timeout
elapsed = asyncio.get_event_loop().time() - start_time
if elapsed > effective_timeout:
logger.debug(f"Stream timeout for context {context_id} after {effective_timeout}s")
break
try:
# Wait for event with longer timeout to avoid premature closure
wait_timeout = heartbeat_interval # Check every 30 seconds
if effective_timeout:
remaining = effective_timeout - elapsed
if remaining <= 0:
break
wait_timeout = min(wait_timeout, remaining)
event = await asyncio.wait_for(queue.get(), timeout=wait_timeout)
last_event_time = asyncio.get_event_loop().time()
# Filter by category if specified
if event_categories and event.get("category") not in event_categories:
continue
yield event
except asyncio.TimeoutError:
# Send heartbeat to keep connection alive if no events
time_since_last_event = asyncio.get_event_loop().time() - last_event_time
if time_since_last_event >= heartbeat_interval:
# Send heartbeat event to keep stream alive
heartbeat_event = {
"type": "heartbeat",
"category": "system",
"timestamp": datetime.now().timestamp(),
"data": {"status": "alive"},
"message": None,
"step": None
}
yield heartbeat_event
last_event_time = asyncio.get_event_loop().time()
# Check if we should continue or timeout
elapsed = asyncio.get_event_loop().time() - start_time
if elapsed >= effective_timeout:
break
continue
except Exception as e:
logger.error(f"Error in stream_events for context {context_id}: {e}")
break
async def cleanup(self, context_id: str, delay: float = 60.0):
"""
Schedule cleanup of event queue after delay.
This allows time for any remaining events to be consumed.
Args:
context_id: Context ID
delay: Delay in seconds before cleanup (default: 60 seconds)
"""
if context_id in self._cleanup_tasks:
# Cancel existing cleanup task
self._cleanup_tasks[context_id].cancel()
async def _cleanup():
try:
await asyncio.sleep(delay)
if context_id in self._queues:
# Drain remaining events
queue = self._queues[context_id]
while not queue.empty():
try:
queue.get_nowait()
except asyncio.QueueEmpty:
break
del self._queues[context_id]
if context_id in self._locks:
del self._locks[context_id]
if context_id in self._subscribers:
del self._subscribers[context_id]
logger.info(f"Cleaned up event queue for context {context_id}")
except asyncio.CancelledError:
pass
except Exception as e:
logger.error(f"Error during cleanup for context {context_id}: {e}")
finally:
if context_id in self._cleanup_tasks:
del self._cleanup_tasks[context_id]
self._cleanup_tasks[context_id] = asyncio.create_task(_cleanup())
def has_queue(self, context_id: str) -> bool:
"""
Check if a queue exists for a context.
Args:
context_id: Context ID
Returns:
True if queue exists, False otherwise
"""
return context_id in self._queues
# Backward compatibility: ChatbotEventManager is an alias
ChatbotEventManager = StreamingEventManager
# Global singleton instance
_event_manager = StreamingEventManager()
def get_event_manager() -> StreamingEventManager:
"""Get the global event manager instance."""
return _event_manager

View file

@ -1,345 +0,0 @@
# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
LangGraph-based chatbot implementation.
Uses LangGraph workflow with AI Center integration and connector tools.
"""
import logging
from dataclasses import dataclass
from typing import Annotated, AsyncIterator, Any, Optional, List
from pydantic import BaseModel
from langchain_core.messages import (
BaseMessage,
HumanMessage,
SystemMessage,
trim_messages,
)
from langgraph.graph.message import add_messages
from langgraph.graph import StateGraph, START, END
from langgraph.graph.state import CompiledStateGraph
from langgraph.prebuilt import ToolNode
from langgraph.checkpoint.memory import MemorySaver
from modules.features.chatbot.aiCenterAdapter import AICenterChatModel
from modules.features.chatbot.langgraphTools import (
send_streaming_message,
create_sql_tool,
create_tavily_tools,
)
from modules.shared.configuration import APP_CONFIG
logger = logging.getLogger(__name__)
class ChatState(BaseModel):
"""Represents the state of a chat session."""
messages: Annotated[List[BaseMessage], add_messages]
@dataclass
class LangGraphChatbot:
"""LangGraph-based chatbot with AI Center integration."""
model: AICenterChatModel
memory: Any
app: Optional[CompiledStateGraph] = None
system_prompt: str = "You are a helpful assistant."
@classmethod
async def create(
cls,
services,
system_prompt: str,
connector_instance,
enable_web_research: bool = True,
tavily_api_key: Optional[str] = None,
context_window_size: int = 8000,
) -> "LangGraphChatbot":
"""
Factory method to create and configure a LangGraphChatbot instance.
Args:
services: Services instance with AI access
system_prompt: The system prompt to initialize the chatbot
connector_instance: Database connector instance (PreprocessorConnector)
enable_web_research: Whether to enable web research tools
tavily_api_key: Tavily API key for web research (if None, uses APP_CONFIG)
context_window_size: Maximum context window size in tokens
Returns:
A configured LangGraphChatbot instance
"""
# Get Tavily API key from config if not provided
if tavily_api_key is None:
tavily_api_key = APP_CONFIG.get("Connector_AiTavily_API_SECRET")
# Create AI Center chat model adapter
model = AICenterChatModel(
services=services,
system_prompt=system_prompt,
temperature=0.2
)
# Create memory/checkpointer
memory = MemorySaver()
instance = LangGraphChatbot(
model=model,
memory=memory,
system_prompt=system_prompt,
)
# Configure tools
configured_tools = await instance._configure_tools(
connector_instance,
enable_web_research,
tavily_api_key
)
# Build LangGraph app
instance.app = instance._build_app(memory, configured_tools, context_window_size)
return instance
async def _configure_tools(
self,
connector_instance,
enable_web_research: bool,
tavily_api_key: Optional[str]
) -> List:
"""
Configure tools for the chatbot.
Args:
connector_instance: Database connector instance
enable_web_research: Whether web research is enabled
tavily_api_key: Tavily API key
Returns:
List of configured tools
"""
tools = []
# SQL tool using connector
sql_tool = create_sql_tool(connector_instance)
tools.append(sql_tool)
# Streaming message tool
tools.append(send_streaming_message)
# Tavily tools (if enabled)
if enable_web_research:
tavily_tools = create_tavily_tools(tavily_api_key, enable_web_research)
tools.extend(tavily_tools)
logger.info(f"Configured {len(tools)} tools for LangGraph chatbot")
return tools
def _build_app(
self,
memory: Any,
tools: List,
context_window_size: int
) -> CompiledStateGraph[ChatState, None, ChatState, ChatState]:
"""
Builds the chatbot application workflow using LangGraph.
Args:
memory: The chat memory/checkpointer to use
tools: The list of tools the chatbot can use
context_window_size: Maximum context window size
Returns:
A compiled state graph representing the chatbot application
"""
# Bind tools to model
llm_with_tools = self.model.bind_tools(tools=tools)
def select_window(msgs: List[BaseMessage]) -> List[BaseMessage]:
"""Selects a window of messages that fit within the context window size.
Args:
msgs: The list of messages to select from.
Returns:
A list of messages that fit within the context window size.
"""
def approx_counter(items: List[BaseMessage]) -> int:
"""Approximate token counter for messages.
Args:
items: List of messages to count tokens for.
Returns:
Approximate number of tokens in the messages.
"""
return sum(len(getattr(m, "content", "") or "") for m in items)
return trim_messages(
msgs,
strategy="last",
token_counter=approx_counter,
max_tokens=context_window_size,
start_on="human",
end_on=("human", "tool"),
include_system=True,
)
def agent_node(state: ChatState) -> dict:
"""Agent node for the chatbot workflow.
Args:
state: The current chat state.
Returns:
The updated chat state after processing.
"""
# Select the message window to fit in context (trim if needed)
window = select_window(state.messages)
# Ensure the system prompt is present at the start
if not window or not isinstance(window[0], SystemMessage):
window = [SystemMessage(content=self.system_prompt)] + window
# Call the LLM with tools
response = llm_with_tools.invoke(window)
# Return the new state
return {"messages": [response]}
def should_continue(state: ChatState) -> str:
"""Determines whether to continue the workflow or end it.
This conditional edge is called after the agent node to decide
whether to continue to the tools node (if the last message contains
tool calls) or to end the workflow (if no tool calls are present).
Args:
state: The current chat state.
Returns:
The next node to transition to ("tools" or END).
"""
# Get the last message
last_message = state.messages[-1]
# Check if the last message contains tool calls
# If so, continue to the tools node; otherwise, end the workflow
return "tools" if getattr(last_message, "tool_calls", None) else END
# Compose the workflow
workflow = StateGraph(ChatState)
workflow.add_node("agent", agent_node)
workflow.add_node("tools", ToolNode(tools=tools))
workflow.add_edge(START, "agent")
workflow.add_conditional_edges("agent", should_continue)
workflow.add_edge("tools", "agent")
return workflow.compile(checkpointer=memory)
async def chat(self, message: str, chat_id: str = "default") -> List[BaseMessage]:
"""
Process a chat message by calling the LLM and tools and returns the chat history.
Args:
message: The user message to process
chat_id: The chat thread ID
Returns:
The list of messages in the chat history
"""
if not self.app:
raise RuntimeError("Chatbot app not initialized. Call create() first.")
# Set the right thread ID for memory
config = {"configurable": {"thread_id": chat_id}}
# Single-turn chat (non-streaming)
result = await self.app.ainvoke(
{"messages": [HumanMessage(content=message)]}, config=config
)
# Extract and return the messages from the result
return result["messages"]
async def stream_events(
self, *, message: str, chat_id: str = "default"
) -> AsyncIterator[dict]:
"""
Stream UI-focused events using astream_events v2.
Args:
message: The user message to process
chat_id: Logical thread identifier; forwarded in the runnable config so
memory and tools are scoped per thread
Yields:
dict: One of:
- ``{"type": "status", "label": str}`` for short progress updates.
- ``{"type": "final", "response": {"thread": str, "chat_history": list[dict]}}``
where ``chat_history`` only includes ``user``/``assistant`` roles.
- ``{"type": "error", "message": str}`` if an exception occurs.
"""
if not self.app:
raise RuntimeError("Chatbot app not initialized. Call create() first.")
# Thread-aware config for LangGraph/LangChain
config = {"configurable": {"thread_id": chat_id}}
def _is_root(ev: dict) -> bool:
"""Return True if the event is from the root run (v2: empty parent_ids)."""
return not ev.get("parent_ids")
try:
async for event in self.app.astream_events(
{"messages": [HumanMessage(content=message)]},
config=config,
version="v2",
):
etype = event.get("event")
ename = event.get("name") or ""
edata = event.get("data") or {}
# Stream human-readable progress via the special send_streaming_message tool
if etype == "on_tool_start" and ename == "send_streaming_message":
tool_in = edata.get("input") or {}
msg = tool_in.get("message")
if isinstance(msg, str) and msg.strip():
yield {"type": "status", "label": msg.strip()}
continue
# Emit the final payload when the root run finishes
if etype == "on_chain_end" and _is_root(event):
output_obj = edata.get("output")
# Extract message list from the graph's final output
final_msgs = output_obj.get("messages", []) if isinstance(output_obj, dict) else []
# Normalize for the frontend (only user/assistant with text content)
chat_history_payload: List[dict] = []
for m in final_msgs:
if isinstance(m, BaseMessage):
role = "user" if isinstance(m, HumanMessage) else "assistant" if isinstance(m, BaseMessage) else None
content = getattr(m, "content", "")
if role and content:
chat_history_payload.append({
"role": role,
"content": content
})
yield {
"type": "final",
"response": {
"thread": chat_id,
"chat_history": chat_history_payload,
},
}
return
except Exception as exc:
# Emit a single error envelope and end the stream
logger.error(f"Exception in stream_events: {exc}", exc_info=True)
yield {"type": "error", "message": f"Fehler beim Verarbeiten: {exc}"}

View file

@ -1,166 +0,0 @@
# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
LangGraph-compatible tools for chatbot.
Wraps connectors and external services as LangGraph tools.
"""
import logging
import json
from typing import Optional
from langchain_core.tools import tool
logger = logging.getLogger(__name__)
@tool
def send_streaming_message(message: str) -> str:
"""Send a streaming message to the user to provide updates during processing.
Use this tool to send short status updates to the user while you are working
on their request. This helps keep the user informed about what you are doing.
Args:
message: A short German message describing what you are currently doing.
Examples: "Durchsuche Datenbank nach Lampen, LED, Leuchten, und Ähnlichem."
"Suche im Internet nach Produktinformationen."
"Analysiere Suchergebnisse."
Returns:
A confirmation that the message was sent.
"""
# This tool doesn't actually do anything - it's just for the AI to signal
# what it's doing to the frontend via the tool call mechanism
return f"Status-Update gesendet: {message}"
def create_sql_tool(connector_instance):
"""
Create a LangGraph-compatible SQL tool using a connector instance.
Args:
connector_instance: PreprocessorConnector or similar connector instance
Returns:
LangChain tool for SQL queries
"""
# Store connector in closure
connector = connector_instance
@tool
async def execute_sql_query(query: str) -> str:
"""Execute a SQL SELECT query on the database.
This tool allows you to query the database to find articles, prices,
inventory levels, and other information.
Args:
query: A valid SQL SELECT query. Only SELECT queries are allowed.
Use double quotes for column names with spaces or special characters.
Example: SELECT "Artikelnummer", "Artikelbezeichnung" FROM Artikel
WHERE "Artikelbezeichnung" LIKE '%Lampe%' LIMIT 20
Returns:
Query results as formatted string with data rows
"""
try:
logger.info(f"Executing SQL query via connector: {query[:100]}...")
# Ensure connector is initialized
if connector is None:
return "Error: Database connector not initialized"
# Execute query
result = await connector.executeQuery(query, return_json=True)
if isinstance(result, dict):
# Return formatted text result
text_result = result.get("text", "Query executed successfully but returned no results.")
# Also include data count if available
data = result.get("data", [])
if data:
text_result += f"\n\nFound {len(data)} row(s)."
return text_result
else:
# Return string result directly
return str(result)
except Exception as e:
error_msg = f"Error executing SQL query: {str(e)}"
logger.error(error_msg, exc_info=True)
return error_msg
# Set tool metadata for better AI understanding
execute_sql_query.name = "execute_sql_query"
execute_sql_query.description = """Execute a SQL SELECT query on the database.
Use this tool to search for articles, check prices, inventory levels, suppliers, etc.
Only SELECT queries are allowed. Use double quotes for column names with spaces.
Database tables: Artikel, Einkaufspreis_neu, Lagerplatz_Artikel, Lagerplatz
Example queries:
- SELECT "Artikelnummer", "Artikelbezeichnung" FROM Artikel WHERE "Artikelbezeichnung" LIKE '%Lampe%'
- SELECT a."Artikelnummer", e."EP_CHF" FROM Artikel a LEFT JOIN Einkaufspreis_neu e ON a."I_ID" = e."ARTIKEL"
- SELECT a."Artikelnummer", l."S_IST_BESTAND" FROM Artikel a LEFT JOIN Lagerplatz_Artikel l ON a."I_ID" = l."R_ARTIKEL"
"""
return execute_sql_query
def create_tavily_tools(tavily_api_key: Optional[str] = None, enable_web_research: bool = True):
"""
Create Tavily search tools for web research.
Args:
tavily_api_key: Tavily API key (if None, tools will return error messages)
enable_web_research: Whether web research is enabled
Returns:
List of Tavily tools (search and extract)
"""
tools = []
if not enable_web_research or not tavily_api_key:
# Return dummy tools that explain web research is disabled
@tool
def tavily_search_disabled(query: str) -> str:
"""Web research is disabled for this chatbot instance."""
return "Web research is not enabled for this chatbot instance."
@tool
def tavily_extract_disabled(urls: str) -> str:
"""Web research is disabled for this chatbot instance."""
return "Web research is not enabled for this chatbot instance."
return [tavily_search_disabled, tavily_extract_disabled]
try:
from langchain_tavily import TavilySearchResults, TavilyExtract
# Create Tavily search tool
tavily_search = TavilySearchResults(
tavily_api_key=tavily_api_key,
max_results=5
)
# Create Tavily extract tool
tavily_extract = TavilyExtract(tavily_api_key=tavily_api_key)
return [tavily_search, tavily_extract]
except ImportError:
logger.warning("langchain_tavily not available, creating dummy tools")
@tool
def tavily_search_fallback(query: str) -> str:
"""Tavily search tool (not available - langchain_tavily not installed)."""
return "Tavily search is not available. Please install langchain_tavily package."
@tool
def tavily_extract_fallback(urls: str) -> str:
"""Tavily extract tool (not available - langchain_tavily not installed)."""
return "Tavily extract is not available. Please install langchain_tavily package."
return [tavily_search_fallback, tavily_extract_fallback]

File diff suppressed because it is too large Load diff

View file

@ -2,7 +2,7 @@
# All rights reserved.
"""
Chatbot routes for the backend API.
Implements simple chatbot endpoints using direct AI center calls via chatbot feature.
Implements chatbot endpoints using LangGraph-based conversation workflows.
"""
import logging
@ -32,9 +32,6 @@ from modules.datamodels.datamodelPagination import PaginationParams, PaginatedRe
from . import chatProcess
from .eventManager import get_event_manager
# Import workflow control functions
from modules.workflows.automation import chatStop
# Configure logger
logger = logging.getLogger(__name__)
@ -378,83 +375,9 @@ async def stop_chatbot(
detail=str(e)
)
# Delete chatbot workflow endpoint
@router.delete("/{instanceId}/{workflowId}", response_model=Dict[str, Any])
@limiter.limit("120/minute")
async def delete_chatbot(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
workflowId: str = Path(..., description="ID of the workflow to delete"),
context: RequestContext = Depends(getRequestContext)
) -> Dict[str, Any]:
"""Deletes a chatbot workflow and its associated data."""
# Validate instance access
mandateId = await _validateInstanceAccess(instanceId, context)
try:
# Get service center
interfaceDbChat = _getServiceChat(context, instanceId)
# Check workflow access and permission using RBAC
workflows = getRecordsetWithRBAC(
interfaceDbChat.db,
ChatWorkflow,
context.user,
recordFilter={"id": workflowId}
)
if not workflows:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Workflow with ID {workflowId} not found"
)
workflow_data = workflows[0]
# Check if workflow is a chatbot workflow
if workflow_data.get("workflowMode") != WorkflowModeEnum.WORKFLOW_CHATBOT.value:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Workflow {workflowId} is not a chatbot workflow"
)
# Verify workflow belongs to this instance
workflow_instance_id = workflow_data.get("featureInstanceId")
if workflow_instance_id != instanceId:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Workflow {workflowId} does not belong to instance '{instanceId}'"
)
# Check if user has permission to delete using RBAC
if not interfaceDbChat.checkRbacPermission(ChatWorkflow, "delete", workflowId):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You don't have permission to delete this workflow"
)
# Delete workflow
success = interfaceDbChat.deleteWorkflow(workflowId)
if not success:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to delete workflow"
)
return {
"id": workflowId,
"message": "Chatbot workflow and associated data deleted successfully"
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error in delete_chatbot: {str(e)}", exc_info=True)
raise HTTPException(
status_code=500,
detail=f"Error deleting chatbot workflow: {str(e)}"
)
# List chatbot threads/workflows or get specific thread details
# NOTE: This route MUST be defined BEFORE /{instanceId}/{workflowId} routes
# to prevent "threads" from being matched as a workflowId
@router.get("/{instanceId}/threads")
@limiter.limit("120/minute")
async def get_chatbot_threads(
@ -583,3 +506,80 @@ async def get_chatbot_threads(
status_code=500,
detail=f"Error getting chatbot threads: {str(e)}"
)
# Delete chatbot workflow endpoint
# NOTE: This catch-all route MUST be defined AFTER more specific routes like /threads
@router.delete("/{instanceId}/{workflowId}", response_model=Dict[str, Any])
@limiter.limit("120/minute")
async def delete_chatbot(
request: Request,
instanceId: str = Path(..., description="Feature Instance ID"),
workflowId: str = Path(..., description="ID of the workflow to delete"),
context: RequestContext = Depends(getRequestContext)
) -> Dict[str, Any]:
"""Deletes a chatbot workflow and its associated data."""
# Validate instance access
mandateId = await _validateInstanceAccess(instanceId, context)
try:
# Get service center
interfaceDbChat = _getServiceChat(context, instanceId)
# Check workflow access and permission using RBAC
workflows = getRecordsetWithRBAC(
interfaceDbChat.db,
ChatWorkflow,
context.user,
recordFilter={"id": workflowId}
)
if not workflows:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Workflow with ID {workflowId} not found"
)
workflow_data = workflows[0]
# Check if workflow is a chatbot workflow
if workflow_data.get("workflowMode") != WorkflowModeEnum.WORKFLOW_CHATBOT.value:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Workflow {workflowId} is not a chatbot workflow"
)
# Verify workflow belongs to this instance
workflow_instance_id = workflow_data.get("featureInstanceId")
if workflow_instance_id != instanceId:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Workflow {workflowId} does not belong to instance '{instanceId}'"
)
# Check if user has permission to delete using RBAC
if not interfaceDbChat.checkRbacPermission(ChatWorkflow, "delete", workflowId):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You don't have permission to delete this workflow"
)
# Delete workflow
success = interfaceDbChat.deleteWorkflow(workflowId)
if not success:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to delete workflow"
)
return {
"id": workflowId,
"message": "Chatbot workflow and associated data deleted successfully"
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error in delete_chatbot: {str(e)}", exc_info=True)
raise HTTPException(
status_code=500,
detail=f"Error deleting chatbot workflow: {str(e)}"
)

View file

@ -115,3 +115,4 @@ langchain>=0.1.0
langchain-core>=0.1.0
langgraph>=0.0.20
langchain-tavily>=0.0.1
nest-asyncio>=1.6.0 # For running async code in sync context (LangGraph compatibility)