gateway/modules/features/chatbot/bridges/tools.py

313 lines
12 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Chatbot tools for LangGraph integration.
Includes SQL query tool, Tavily search tool, and streaming status tool.
"""
import logging
from typing import Optional
from langchain_core.tools import tool
from modules.connectors.connectorPreprocessor import PreprocessorConnector
from modules.shared.configuration import APP_CONFIG
logger = logging.getLogger(__name__)
@tool
async def sqlite_query(query: str) -> str:
"""
Execute a SQL SELECT query on the Althaus AG database.
This tool allows you to query the SQLite database to find articles, prices,
inventory levels, and other product information.
Args:
query: A valid SQL SELECT query. Must use double quotes for column names
with spaces or special characters (e.g., "Artikelnummer", "S_IST_BESTAND").
Only SELECT queries are allowed.
Returns:
Query results as a formatted string, or an error message if the query fails.
Examples:
- Find articles by name:
SELECT a."Artikelnummer", a."Artikelbezeichnung", a."Lieferant"
FROM Artikel a
WHERE a."Artikelbezeichnung" LIKE '%Motor%'
LIMIT 20
- Find articles with price and inventory:
SELECT a."Artikelnummer", a."Artikelbezeichnung", e."EP_CHF",
lp."Lagerplatz" as "Lagerplatzname", l."S_IST_BESTAND",
l."S_RESERVIERTER__BESTAND",
CASE WHEN l."S_IST_BESTAND" != 'Unbekannt'
THEN CAST(l."S_IST_BESTAND" AS INTEGER) - COALESCE(l."S_RESERVIERTER__BESTAND", 0)
ELSE NULL END as "Verfügbarer Bestand"
FROM Artikel a
LEFT JOIN Einkaufspreis e ON a."I_ID" = e."m_Artikel"
LEFT JOIN Lagerplatz_Artikel l ON a."I_ID" = l."R_ARTIKEL"
LEFT JOIN Lagerplatz lp ON l."R_LAGERPLATZ" = lp."I_ID"
WHERE a."Artikelbezeichnung" LIKE '%Netzgerät%'
LIMIT 20
"""
try:
connector = PreprocessorConnector()
try:
result = await connector.executeQuery(query, return_json=True)
if result.get("text", "").startswith(("Error:", "Query failed:")):
error_msg = result.get("text", "Query failed")
logger.error(f"SQL query failed: {error_msg}")
return error_msg
# Format results
data = result.get("data", [])
row_count = result.get("row_count", len(data))
if not data:
return f"Query executed successfully. Returned {row_count} rows (no data)."
# Format as readable string
lines = [f"Query executed successfully. Returned {row_count} rows:"]
# Show column headers from first row
if data and isinstance(data[0], dict):
headers = list(data[0].keys())
lines.append("\nColumns: " + ", ".join(headers))
lines.append("\nResults:")
# Show first 50 rows
for i, row in enumerate(data[:50], 1):
row_str = ", ".join([f"{k}: {v}" for k, v in row.items()])
lines.append(f"{i}. {row_str}")
if row_count > 50:
lines.append(f"\n(Showing first 50 of {row_count} rows)")
else:
# Fallback for non-dict rows
for i, row in enumerate(data[:50], 1):
lines.append(f"{i}. {row}")
return "\n".join(lines)
finally:
await connector.close()
except Exception as e:
error_msg = f"Error executing SQL query: {str(e)}"
logger.error(error_msg, exc_info=True)
return error_msg
@tool
async def tavily_search(query: str) -> str:
"""
Search the internet for comprehensive information using Tavily search via AI Center.
Use this tool when you need to find detailed product information, datasheets,
certifications, technical specifications, market trends, or other comprehensive
information that is not in the database.
IMPORTANT: This tool returns FULL content from search results (not truncated).
Use all available information to provide comprehensive, detailed answers with
specific facts, numbers, dates, and technical details.
Args:
query: Search query string. Be specific and include product names,
model numbers, or other relevant keywords. For comprehensive
research, use broad queries like "latest developments in LED technology 2026"
Returns:
Comprehensive search results with full content, titles, URLs, and sources.
Results include up to 15 sources with complete content for detailed analysis.
Examples:
- Search for comprehensive product information:
tavily_search("latest LED technology developments 2026")
- Search for product datasheet:
tavily_search("Siemens 6AV2 181-8XP00-0AX0 datasheet")
- Search for market trends:
tavily_search("LED market trends efficiency breakthroughs 2025")
"""
try:
# Use AI Center Tavily plugin instead of direct langchain-tavily
from modules.aicore.aicoreModelRegistry import modelRegistry
from modules.aicore.aicoreModelSelector import modelSelector
from modules.datamodels.datamodelAi import (
AiModelCall,
AiModelResponse,
AiCallOptions,
OperationTypeEnum,
ProcessingModeEnum,
AiCallPromptWebSearch
)
import json
# Discover and register connectors if not already registered
if not modelRegistry._connectors:
discovered_connectors = modelRegistry.discoverConnectors()
for connector in discovered_connectors:
modelRegistry.registerConnector(connector)
# Refresh models to ensure Tavily is available
modelRegistry.refreshModels()
# Get available Tavily models (without RBAC filtering since tools don't have user context)
available_models = modelRegistry.getAvailableModels()
tavily_models = [m for m in available_models if m.connectorType == "tavily"]
if not tavily_models:
return "Error: Tavily model not available in AI Center. Please check configuration."
# Select the best Tavily model for web search
options = AiCallOptions(
operationType=OperationTypeEnum.WEB_SEARCH_DATA,
processingMode=ProcessingModeEnum.BASIC
)
# Use model selector to choose the best Tavily model
# Since we only have Tavily models, we can just pick the first one
# or use selector if multiple Tavily models exist
if len(tavily_models) == 1:
selected_model = tavily_models[0]
else:
selected_model = modelSelector.selectModel(
prompt=query,
context="",
options=options,
availableModels=tavily_models
)
if not selected_model:
return "Error: Could not select Tavily model for web search."
# Create web search prompt with more results and deeper research
web_search_prompt = AiCallPromptWebSearch(
instruction=query,
maxNumberPages=15, # Request more results for comprehensive information
country=None, # No country filter by default
language=None, # No language filter by default
researchDepth="deep" # Deep research for comprehensive results
)
# Create model call with JSON prompt
model_call = AiModelCall(
messages=[
{
"role": "user",
"content": json.dumps(web_search_prompt.model_dump())
}
],
model=selected_model,
options=options
)
# Call the model's functionCall (which routes to _routeWebOperation)
if not selected_model.functionCall:
return "Error: Tavily model has no functionCall defined."
response: AiModelResponse = await selected_model.functionCall(model_call)
if not response.success:
error_msg = response.error or "Unknown error"
logger.error(f"Tavily search failed: {error_msg}")
return f"Error performing Tavily search: {error_msg}"
# Parse response content (should be JSON with URLs and content)
try:
result_data = json.loads(response.content) if response.content else {}
# Handle different response formats
if isinstance(result_data, list):
# List of URLs or results
results = result_data
elif isinstance(result_data, dict):
# Dictionary with URLs or results key
results = result_data.get("urls", []) or result_data.get("results", []) or []
else:
results = []
if not results:
return f"No results found for query: {query}"
# Format results with full content (not truncated)
lines = [f"Internet search results for: {query}\n"]
# Return all results with full content (up to 15 results)
for i, result in enumerate(results[:15], 1):
if isinstance(result, str):
# Simple URL string
lines.append(f"{i}. {result}")
lines.append(f" URL: {result}")
elif isinstance(result, dict):
# Dictionary with url, title, content
url = result.get("url", "")
title = result.get("title", url)
content = result.get("content", "")
lines.append(f"{i}. {title}")
lines.append(f" URL: {url}")
if content:
# Return FULL content, not truncated - let the LLM decide what to use
lines.append(f" Content: {content}")
else:
# Fallback
lines.append(f"{i}. {str(result)}")
lines.append("")
return "\n".join(lines)
except json.JSONDecodeError:
# If response is not JSON, try to parse as plain text
if response.content:
return f"Internet search results for: {query}\n\n{response.content}"
return f"No results found for query: {query}"
except Exception as e:
error_msg = f"Error performing Tavily search via AI Center: {str(e)}"
logger.error(error_msg, exc_info=True)
return error_msg
# Note: send_streaming_message will be created in the LangGraph integration
# where it has access to the event manager. For now, we define it here as a placeholder.
def create_send_streaming_message_tool(event_manager=None):
"""
Create the send_streaming_message tool with access to event manager.
Args:
event_manager: Event manager instance for emitting events (not used directly,
events are captured via LangGraph tool events)
Returns:
LangChain tool for sending streaming messages
"""
@tool
async def send_streaming_message(message: str) -> str:
"""
Send a streaming status update to the user.
Use this tool frequently to keep the user informed about what you are doing.
This helps provide a better user experience by showing progress updates.
Args:
message: A short message describing what you are currently doing.
Examples:
- "Durchsuche Datenbank nach Lampen, LED, Leuchten, und Ähnlichem."
- "Suche im Internet nach Produktinformationen."
- "Analysiere Suchergebnisse und bereite Antwort vor."
Returns:
Confirmation that the message was sent.
"""
# This tool doesn't actually do anything in the tool execution
# The actual event emission happens in the streaming bridge
# This is just for LangGraph to recognize it as a tool call
return f"Status-Update gesendet: {message}"
return send_streaming_message