gateway/modules/features/chatbot/mainChatbot.py

1246 lines
53 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Simple chatbot feature - basic implementation.
User input is processed by AI to create list of needed queries.
Those queries get streamed back.
"""
import logging
import json
import uuid
import asyncio
import re
from typing import Optional, Dict, Any, List
from modules.datamodels.datamodelChat import ChatWorkflow, UserInputRequest, WorkflowModeEnum, ChatLog
from modules.datamodels.datamodelUam import User
from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationTypeEnum, ProcessingModeEnum
from modules.shared.timeUtils import getUtcTimestamp, parseTimestamp
from modules.services import getInterface as getServices
from modules.features.chatbot.eventManager import get_event_manager
from modules.workflows.methods.methodAi.methodAi import MethodAi
from modules.connectors.connectorPreprocessor import PreprocessorConnector
from modules.features.chatbot.chatbotConstants import (
get_initial_analysis_prompt,
generate_conversation_name,
get_final_answer_system_prompt
)
logger = logging.getLogger(__name__)
def _extractJsonFromResponse(content: str) -> Optional[dict]:
"""Extract JSON from AI response, handling markdown code blocks."""
# Try direct JSON parse first
try:
return json.loads(content.strip())
except json.JSONDecodeError:
pass
# Try to extract JSON from markdown code blocks
json_match = re.search(r'```(?:json)?\s*(\{.*?\})\s*```', content, re.DOTALL)
if json_match:
try:
return json.loads(json_match.group(1))
except json.JSONDecodeError:
pass
# Try to find JSON object in the text
json_match = re.search(r'\{.*\}', content, re.DOTALL)
if json_match:
try:
return json.loads(json_match.group(0))
except json.JSONDecodeError:
pass
return None
async def chatProcess(
currentUser: User,
userInput: UserInputRequest,
workflowId: Optional[str] = None
) -> ChatWorkflow:
"""
Simple chatbot processing - analyze user input and generate queries.
Flow:
1. Create or load workflow
2. Store user message
3. AI analyzes user input to create list of needed queries
4. Stream queries back
Args:
currentUser: Current user
userInput: User input request
workflowId: Optional workflow ID to continue existing conversation
Returns:
ChatWorkflow instance
"""
try:
# Get services
services = getServices(currentUser, None)
interfaceDbChat = services.interfaceDbChat
# Get event manager and create queue if needed
event_manager = get_event_manager()
# Create or load workflow
if workflowId:
workflow = interfaceDbChat.getWorkflow(workflowId)
if not workflow:
raise ValueError(f"Workflow {workflowId} not found")
# Resume workflow: increment round number
new_round = workflow.currentRound + 1
interfaceDbChat.updateWorkflow(workflowId, {
"status": "running",
"currentRound": new_round,
"lastActivity": getUtcTimestamp()
})
workflow = interfaceDbChat.getWorkflow(workflowId)
logger.info(f"Resumed workflow {workflowId}, round incremented to {new_round}")
# Create event queue if it doesn't exist (for streaming)
if not event_manager.has_queue(workflowId):
event_manager.create_queue(workflowId)
else:
# Generate conversation name based on user's prompt
conversation_name = await generate_conversation_name(
services,
userInput.prompt,
userInput.userLanguage
)
# Create new workflow
workflowData = {
"id": str(uuid.uuid4()),
"mandateId": currentUser.mandateId,
"status": "running",
"name": conversation_name,
"currentRound": 1,
"currentTask": 0,
"currentAction": 0,
"totalTasks": 0,
"totalActions": 0,
"workflowMode": WorkflowModeEnum.WORKFLOW_CHATBOT.value,
"startedAt": getUtcTimestamp(),
"lastActivity": getUtcTimestamp()
}
workflow = interfaceDbChat.createWorkflow(workflowData)
logger.info(f"Created new chatbot workflow: {workflow.id} with name: {conversation_name}")
# Create event queue for new workflow (for streaming)
event_manager.create_queue(workflow.id)
# Reload workflow to get current message count
workflow = interfaceDbChat.getWorkflow(workflow.id)
# Process uploaded files and create ChatDocuments
user_documents = []
if userInput.listFileId and len(userInput.listFileId) > 0:
logger.info(f"Processing {len(userInput.listFileId)} uploaded file(s) for user message")
for fileId in userInput.listFileId:
try:
# Get file info from chat service
fileInfo = services.chat.getFileInfo(fileId)
if not fileInfo:
logger.warning(f"No file info found for file ID {fileId}")
continue
originalFileName = fileInfo.get("fileName", "unknown")
originalMimeType = fileInfo.get("mimeType", "application/octet-stream")
fileSizeToUse = fileInfo.get("size", 0)
# Create ChatDocument for the file
document = ChatDocument(
id=str(uuid.uuid4()),
messageId="", # Will be set when message is created
fileId=fileId,
fileName=originalFileName,
fileSize=fileSizeToUse,
mimeType=originalMimeType,
roundNumber=workflow.currentRound,
taskNumber=0,
actionNumber=0
)
user_documents.append(document)
logger.info(f"Created ChatDocument for file {fileId} -> {originalFileName}")
except Exception as e:
logger.error(f"Error processing file ID {fileId}: {e}", exc_info=True)
# Store user message
userMessageData = {
"id": f"msg_{uuid.uuid4()}",
"workflowId": workflow.id,
"message": userInput.prompt,
"role": "user",
"status": "first" if workflowId is None else "step",
"sequenceNr": len(workflow.messages) + 1,
"publishedAt": getUtcTimestamp(),
"roundNumber": workflow.currentRound,
"taskNumber": 0,
"actionNumber": 0
}
userMessage = interfaceDbChat.createMessage(userMessageData)
logger.info(f"Stored user message: {userMessage.id} with {len(user_documents)} document(s)")
# Emit message event for streaming (exact chatData format)
message_timestamp = parseTimestamp(userMessage.publishedAt, default=getUtcTimestamp())
await event_manager.emit_event(
context_id=workflow.id,
event_type="chatdata",
data={
"type": "message",
"createdAt": message_timestamp,
"item": userMessage.dict()
},
event_category="chat"
)
# Update workflow status
interfaceDbChat.updateWorkflow(workflow.id, {
"status": "running",
"lastActivity": getUtcTimestamp()
})
# Process in background (async)
asyncio.create_task(_processChatbotMessage(
services,
workflow.id,
userInput,
userMessage.id
))
# Reload workflow to include new message
workflow = interfaceDbChat.getWorkflow(workflow.id)
return workflow
except Exception as e:
logger.error(f"Error in chatProcess: {str(e)}", exc_info=True)
raise
async def _execute_queries_parallel(queries: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
Execute multiple SQL queries in parallel.
Args:
queries: List of query dictionaries, each containing:
- "query": SQL query string
- "purpose": Description of what the query retrieves
- "table": Primary table name
Returns:
Dictionary mapping query indices to results:
- "query_1", "query_2", etc.: Success result text
- "query_1_data", "query_2_data", etc.: Raw data arrays
- "query_1_error", "query_2_error", etc.: Error messages if query failed
"""
async def execute_single_query(idx: int, query_info: Dict[str, Any]):
"""Execute a single query and return result."""
connector = PreprocessorConnector()
try:
query_text = query_info.get("query", "")
result = await connector.executeQuery(query_text, return_json=True)
await connector.close()
return idx, result, None
except Exception as e:
await connector.close()
return idx, None, str(e)
# Execute all queries in parallel
tasks = [execute_single_query(i, q) for i, q in enumerate(queries)]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Process results into dictionary
query_results = {}
for result in results:
if isinstance(result, Exception):
# Handle exceptions from gather
logger.error(f"Exception in parallel query execution: {result}")
continue
idx, result_data, error = result
if error:
query_results[f"query_{idx+1}_error"] = error
logger.error(f"Query {idx+1} failed: {error}")
else:
if result_data and not result_data.get("text", "").startswith(("Error:", "Query failed:")):
query_results[f"query_{idx+1}"] = result_data.get("text", "")
query_results[f"query_{idx+1}_data"] = result_data.get("data", [])
row_count = len(result_data.get('data', []))
logger.info(f"Query {idx+1} executed successfully, returned {row_count} rows")
else:
error_text = result_data.get("text", "Query failed") if result_data else "Query failed: No response"
query_results[f"query_{idx+1}_error"] = error_text
logger.error(f"Query {idx+1} failed: {error_text}")
return query_results
async def _emit_log_and_event(
interfaceDbChat,
workflowId: str,
event_manager,
message: str,
log_type: str = "info",
status: str = "running",
round_number: Optional[int] = None
) -> None:
"""
Store log in database. The route's periodic chat data fetch will handle emitting it.
This avoids duplicate log emissions.
Args:
interfaceDbChat: Database interface
workflowId: Workflow ID
event_manager: Event manager (unused, kept for compatibility)
message: Log message
log_type: Log type (info, warning, error)
status: Status string
round_number: Optional round number (will be fetched from workflow if not provided)
"""
try:
# Get round number from workflow if not provided
if round_number is None:
workflow = interfaceDbChat.getWorkflow(workflowId)
if workflow:
round_number = workflow.currentRound
log_timestamp = getUtcTimestamp()
log_data = {
"id": f"log_{uuid.uuid4()}",
"workflowId": workflowId,
"message": message,
"type": log_type,
"timestamp": log_timestamp,
"status": status,
"roundNumber": round_number
}
# Only store in database - route's periodic fetch will emit it
interfaceDbChat.createLog(log_data)
except Exception as e:
logger.error(f"Error storing log: {e}")
async def _check_workflow_stopped(interfaceDbChat, workflowId: str) -> bool:
"""
Check if workflow was stopped.
Args:
interfaceDbChat: Database interface
workflowId: Workflow ID
Returns:
True if workflow is stopped, False otherwise
"""
try:
workflow = interfaceDbChat.getWorkflow(workflowId)
return workflow and workflow.status == "stopped"
except Exception as e:
logger.warning(f"Error checking workflow status: {e}")
return False
def _buildWebResearchQuery(userPrompt: str, workflowMessages: List, queryResults: Optional[Dict[str, Any]] = None) -> str:
"""
Build enriched web research query by extracting product context from conversation history and current prompt.
Extracts product information from:
1. Current user prompt (article numbers, product mentions)
2. Database query results (if available)
3. Previous assistant messages (conversation history)
Args:
userPrompt: Current user prompt
workflowMessages: List of workflow messages (conversation history)
queryResults: Optional database query results to extract product info from
Returns:
Enriched search query string
"""
# Normalize user prompt for detection
prompt_lower = userPrompt.lower().strip()
# Patterns that indicate a search request
search_patterns = [
"ja", "yes", "oui", "si",
"such", "suche", "search", "recherche", "recherchier",
"internet", "web", "online",
"datenblatt", "datasheet", "fiche technique",
"mehr informationen", "more information", "plus d'information",
"weitere informationen", "further information", "additional information"
]
# Check if current prompt contains search-related keywords
has_search_intent = any(pattern in prompt_lower for pattern in search_patterns)
# Extract product information - try multiple sources
article_number = None
article_description = None
supplier = None
# Pattern for article numbers like "6AV2 181-8XP00-0AX0" or "6AV2181-8XP00-0AX0"
article_patterns = [
r'\b[A-Z0-9]{2,}\s+[0-9]{3,}-[A-Z0-9-]+\b', # With space: "6AV2 181-8XP00-0AX0"
r'\b[A-Z0-9]{4,}[\s-][A-Z0-9-]{6,}\b', # General pattern
r'\b[A-Z]{2,}[0-9]+\s+[0-9]+-[A-Z0-9-]+\b', # Specific Siemens pattern
]
# 1. First, try to extract from current user prompt
for pattern in article_patterns:
matches = re.findall(pattern, userPrompt)
if matches:
article_number = matches[0]
logger.info(f"Extracted article number from user prompt: {article_number}")
break
# 2. Try to extract from database query results if available
# Always check queryResults to enrich with product description and supplier, even if article_number was already found
if queryResults:
# Look for article numbers in query result text (if not already found)
if not article_number:
for key in queryResults.keys():
if key.startswith("query_") and not key.endswith("_error") and not key.endswith("_data"):
result_text = queryResults.get(key, "")
if isinstance(result_text, str):
for pattern in article_patterns:
matches = re.findall(pattern, result_text)
if matches:
article_number = matches[0]
logger.info(f"Extracted article number from query results: {article_number}")
break
if article_number:
break
# Always check data arrays for product description and supplier (even if article_number already found)
for key in queryResults.keys():
if key.startswith("query_") and not key.endswith("_error") and not key.endswith("_data"):
data_key = f"{key}_data"
if data_key in queryResults:
data_array = queryResults[data_key]
if isinstance(data_array, list) and len(data_array) > 0:
# Look for article number in first row (if not already found)
first_row = data_array[0]
if isinstance(first_row, dict):
# Check common article number fields (if not already found)
if not article_number:
for field in ["Artikelnummer", "Artikelkürzel", "article_number", "articleNumber"]:
if field in first_row and first_row[field]:
article_number = str(first_row[field])
logger.info(f"Extracted article number from query data: {article_number}")
break
# Always check article description (can enrich even if article_number already found)
if not article_description:
for field in ["Artikelbezeichnung", "Bezeichnung", "article_description", "description"]:
if field in first_row and first_row[field]:
article_description = str(first_row[field])
logger.info(f"Extracted article description from query data: {article_description}")
break
# Always check supplier (can enrich even if article_number already found)
if not supplier:
for field in ["Lieferant", "Supplier", "supplier"]:
if field in first_row and first_row[field]:
supplier = str(first_row[field])
logger.info(f"Extracted supplier from query data: {supplier}")
break
# If we found all needed info, we can stop
if article_number and article_description and supplier:
break
# 3. Extract from previous assistant messages (conversation history)
if not article_number or not article_description:
for msg in reversed(workflowMessages[-10:]):
if msg.role == "assistant":
message_text = msg.message
# Extract article number if not found yet
if not article_number:
for pattern in article_patterns:
matches = re.findall(pattern, message_text)
if matches:
article_number = matches[0]
break
# Extract article description if not found yet
if not article_description:
description_patterns = [
r'Es handelt sich um\s+([^\.]+)',
r'It is a\s+([^\.]+)',
r'C\'est\s+([^\.]+)',
r'Bezeichnung:\s*([^\n]+)',
r'Description:\s*([^\n]+)',
r'Artikelbezeichnung:\s*([^\n]+)',
r'Artikelbezeichnung:\s*([^\n]+)'
]
for pattern in description_patterns:
match = re.search(pattern, message_text, re.IGNORECASE)
if match:
article_description = match.group(1).strip()
break
# Extract supplier if not found yet
if not supplier:
supplier_patterns = [
r'von\s+([A-Z][A-Za-z\s]+(?:AG|GmbH|Ltd|Inc|Corp)?)',
r'from\s+([A-Z][A-Za-z\s]+(?:AG|GmbH|Ltd|Inc|Corp)?)',
r'Lieferant:\s*([^\n]+)',
r'Supplier:\s*([^\n]+)'
]
for pattern in supplier_patterns:
match = re.search(pattern, message_text, re.IGNORECASE)
if match:
supplier = match.group(1).strip()
break
# Stop if we found everything
if article_number and article_description and supplier:
break
# Build enriched search query
query_parts = []
# If we have search intent but no product info, try to use the user prompt intelligently
if has_search_intent and not article_number and not article_description:
# Try to extract meaningful parts from the prompt
# Remove common search phrases and keep the product-related parts
cleaned_prompt = userPrompt
for phrase in ["recherchier nach", "recherche", "suche nach", "search for", "find", "informationen zu", "information about", "weitere informationen", "further information"]:
cleaned_prompt = re.sub(phrase, "", cleaned_prompt, flags=re.IGNORECASE)
cleaned_prompt = cleaned_prompt.strip()
# If cleaned prompt still has content and is different, use it
if cleaned_prompt and cleaned_prompt != userPrompt and len(cleaned_prompt) > 10:
query_parts.append(cleaned_prompt)
# Add article description if found
if article_description:
query_parts.append(article_description)
# Add article number if found
if article_number:
query_parts.append(article_number)
# Add supplier if found
if supplier:
query_parts.append(supplier)
# Add "Datenblatt" or "datasheet" if user requested it or if we have product info
if "datenblatt" in prompt_lower or "datasheet" in prompt_lower or "fiche technique" in prompt_lower:
query_parts.append("Datenblatt")
elif query_parts:
# If we have product info but no explicit request for datasheet, add it anyway
query_parts.append("Datenblatt")
# If we found product information or built a meaningful query, use it
if query_parts:
enriched_query = " ".join(query_parts)
logger.info(f"Built enriched search query: '{enriched_query}' from context (original: '{userPrompt}')")
return enriched_query
else:
# Fall back to original prompt, but try to clean it up
logger.info(f"No product context found, using original prompt: '{userPrompt}'")
return userPrompt
async def _convert_file_ids_to_document_references(
services,
file_ids: List[str]
) -> DocumentReferenceList:
"""
Convert file IDs to DocumentReferenceList for use with ai.process.
Args:
services: Services instance
file_ids: List of file IDs to convert
Returns:
DocumentReferenceList with docItem references
"""
references = []
# Get workflow to search for ChatDocuments
workflow = services.workflow
if not workflow:
logger.error("Cannot convert file IDs to document references: workflow not set in services")
return DocumentReferenceList(references=[])
for file_id in file_ids:
try:
# Get file info to verify it exists
file_info = services.chat.getFileInfo(file_id)
if not file_info:
logger.warning(f"File {file_id} not found, skipping")
continue
# Find ChatDocument that has this fileId
document_id = None
if workflow.messages:
for message in workflow.messages:
if hasattr(message, 'documents') and message.documents:
for doc in message.documents:
if getattr(doc, 'fileId', None) == file_id:
document_id = getattr(doc, 'id', None)
break
if document_id:
break
# Search database if not found in messages
if not document_id:
try:
from modules.datamodels.datamodelChat import ChatDocument
from modules.shared.databaseUtils import getRecordsetWithRBAC
documents = getRecordsetWithRBAC(
services.interfaceDbChat.db,
ChatDocument,
services.currentUser,
recordFilter={"fileId": file_id}
)
if documents:
workflow_message_ids = {msg.id for msg in workflow.messages} if workflow.messages else set()
for doc in documents:
if doc.get("messageId") in workflow_message_ids:
document_id = doc.get("id")
break
except Exception:
pass # Fallback to fileId
# Use ChatDocument ID if found, otherwise use fileId as fallback
ref = DocumentItemReference(documentId=document_id if document_id else file_id)
references.append(ref)
except Exception as e:
logger.error(f"Error converting fileId {file_id}: {e}", exc_info=True)
logger.info(f"Converted {len(references)} file IDs to document references")
return DocumentReferenceList(references=references)
def _format_query_results_as_lookup(query_data: Dict[str, List[Dict]]) -> str:
"""
Format database query results as JSON lookup table for Excel matching.
Converts query result data into structured JSON format: {Artikelnummer: {columns...}}
Args:
query_data: Dict with query_key -> list of row dicts (from connector with return_json=True)
Returns:
JSON string formatted as lookup table
"""
lookup_table = {}
for query_key, rows in query_data.items():
if query_key == "error" or not rows:
logger.warning(f"Skipping query key '{query_key}' - no rows or error")
continue
logger.info(f"Processing {len(rows)} rows from query '{query_key}'")
for row in rows:
if not isinstance(row, dict):
logger.warning(f"Skipping non-dict row: {type(row)}")
continue
# Find Artikelnummer field (case-insensitive)
artikelnummer = None
for key in row.keys():
if key.lower() in ['artikelnummer', 'artikel_nummer', 'art_nr', 'part_number']:
artikelnummer = str(row[key])
break
if artikelnummer:
lookup_table[artikelnummer] = row
else:
logger.warning(f"No Artikelnummer found in row with keys: {list(row.keys())}")
logger.info(f"Generated lookup table with {len(lookup_table)} entries")
if lookup_table:
sample_keys = list(lookup_table.keys())[:3]
logger.info(f"Sample Artikelnummern: {sample_keys}")
if sample_keys:
sample_entry = lookup_table[sample_keys[0]]
logger.info(f"Sample entry keys: {list(sample_entry.keys())}")
return json.dumps(lookup_table, ensure_ascii=False, indent=2)
async def _create_chat_document_from_action_document(
services,
action_document,
message_id: str,
workflow_id: str,
round_number: int
) -> ChatDocument:
"""
Create a ChatDocument from an ActionDocument by storing the file data.
Args:
services: Services instance
action_document: ActionDocument from ai.process result
message_id: ID of the message to attach to
workflow_id: Workflow ID
round_number: Round number
Returns:
ChatDocument instance
"""
try:
# Get file data (could be bytes or string)
document_data = action_document.documentData
# Convert to bytes if needed
if isinstance(document_data, str):
# Check if it's base64 encoded
try:
# Try to decode as base64 first
file_bytes = base64.b64decode(document_data)
except Exception:
# Not base64, encode as UTF-8
file_bytes = document_data.encode('utf-8')
elif isinstance(document_data, bytes):
file_bytes = document_data
else:
# Try to convert to bytes
try:
file_bytes = bytes(document_data)
except Exception:
# Last resort: convert to string then encode
file_bytes = str(document_data).encode('utf-8')
# Get MIME type (default to Excel)
mime_type = action_document.mimeType or "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
# Get file name
file_name = action_document.documentName or "data_export.xlsx"
# Ensure it has .xlsx extension
if not file_name.lower().endswith('.xlsx'):
# Remove any existing extension and add .xlsx
file_name = file_name.rsplit('.', 1)[0] + '.xlsx'
# Store file using component interface
file_item = services.interfaceDbComponent.createFile(
name=file_name,
mimeType=mime_type,
content=file_bytes
)
# Store file data
success = services.interfaceDbComponent.createFileData(file_item.id, file_bytes)
if not success:
logger.warning(f"Failed to store file data for {file_item.id}, but continuing...")
# Create ChatDocument
chat_document = ChatDocument(
id=str(uuid.uuid4()),
messageId=message_id,
fileId=file_item.id,
fileName=file_name,
fileSize=len(file_bytes),
mimeType=mime_type,
roundNumber=round_number,
taskNumber=0,
actionNumber=0
)
logger.info(f"Created ChatDocument {chat_document.id} from ActionDocument {file_name} (size: {len(file_bytes)} bytes)")
return chat_document
except Exception as e:
logger.error(f"Error creating ChatDocument from ActionDocument: {e}", exc_info=True)
raise
async def _processChatbotMessage(
services,
workflowId: str,
userInput: UserInputRequest,
userMessageId: str
):
"""
Process chatbot message in background.
Analyzes user input and generates list of queries, then streams them back.
"""
event_manager = get_event_manager()
try:
interfaceDbChat = services.interfaceDbChat
# Reload workflow to get current messages
workflow = interfaceDbChat.getWorkflow(workflowId)
if not workflow:
logger.error(f"Workflow {workflowId} not found during processing")
await event_manager.emit_event(
context_id=workflowId,
event_type="error",
data={"error": f"Workflow {workflowId} nicht gefunden"},
event_category="workflow",
message=f"Workflow {workflowId} nicht gefunden",
step="error"
)
return
# Check if workflow was stopped before starting
if await _check_workflow_stopped(interfaceDbChat, workflowId):
logger.info(f"Workflow {workflowId} was stopped, aborting processing")
return
# Build conversation context from history
context = ""
if workflow.messages:
recent_messages = workflow.messages[-5:]
context = "\n\nPrevious conversation:\n"
for msg in recent_messages:
if msg.role == "user":
context += f"User: {msg.message}\n"
elif msg.role == "assistant":
context += f"Assistant: {msg.message}\n"
await services.ai.ensureAiObjectsInitialized()
# Step 1: Analyze user input to generate queries
logger.info("Analyzing user input to generate queries...")
await _emit_log_and_event(interfaceDbChat, workflowId, event_manager, "Analysiere Benutzeranfrage...")
analysisPrompt = get_initial_analysis_prompt(userInput.prompt, context)
# AI call for analysis
method_ai = MethodAi(services)
analysis_result = await method_ai.process({
"aiPrompt": analysisPrompt,
"documentList": None,
"resultType": "json",
"simpleMode": True
})
# Extract content from ActionResult
analysis_content = None
if analysis_result.success and analysis_result.documents:
analysis_content = analysis_result.documents[0].documentData
if isinstance(analysis_content, bytes):
analysis_content = analysis_content.decode('utf-8')
if not analysis_content:
logger.warning("Analysis failed, using fallback")
analysis = {}
else:
analysis = _extractJsonFromResponse(analysis_content)
# Extract analysis results
needsDatabaseQuery = analysis.get("needsDatabaseQuery", False) if analysis else False
needsWebResearch = analysis.get("needsWebResearch", False) if analysis else False
sql_queries = analysis.get("sqlQueries", [])
# Support legacy single query format for backward compatibility
if not sql_queries and analysis.get("sqlQuery"):
sql_queries = [{
"query": analysis.get("sqlQuery", ""),
"purpose": "Database query",
"table": "Unknown"
}]
reasoning = analysis.get("reasoning", "")
logger.info(f"Analysis: DB={needsDatabaseQuery}, Web={needsWebResearch}, SQL queries={len(sql_queries)}")
# Build initial enriched web research query if needed (for logging, will be rebuilt after DB queries)
enriched_web_query = None
if needsWebResearch:
enriched_web_query = _buildWebResearchQuery(userInput.prompt, workflow.messages)
# Build list of queries to stream back
queries = []
if needsDatabaseQuery and sql_queries:
for i, sql_query_info in enumerate(sql_queries, 1):
queries.append({
"type": "database",
"query": sql_query_info.get("query", ""),
"purpose": sql_query_info.get("purpose", f"Query {i}"),
"table": sql_query_info.get("table", "Unknown"),
"reasoning": reasoning
})
if needsWebResearch:
queries.append({
"type": "web",
"query": enriched_web_query or userInput.prompt,
"reasoning": reasoning
})
# Format queries as log text
log_lines = []
if queries:
db_queries = [q for q in queries if q["type"] == "database"]
log_lines.append(f"Generiert: {len(db_queries)} Datenbankabfrage(n) und {len(queries) - len(db_queries)} Web-Recherche(n)\n\n")
for i, q in enumerate(queries, 1):
if q["type"] == "database":
log_lines.append(f"{i}. Datenbankabfrage ({q.get('table', 'Unknown')}):\n")
log_lines.append(f" Zweck: {q.get('purpose', 'Nicht angegeben')}\n")
log_lines.append(f"```sql\n{q['query']}\n```\n")
elif q["type"] == "web":
log_lines.append(f"{i}. Web-Recherche:\n")
log_lines.append(f" Suchbegriff: {q['query']}\n")
if q.get("reasoning"):
log_lines.append(f" Begründung: {q['reasoning']}\n")
log_lines.append("\n")
else:
log_lines.append("Keine Abfragen erforderlich.")
log_text = "".join(log_lines)
# Stream queries as a log
await _emit_log_and_event(interfaceDbChat, workflowId, event_manager, log_text)
# Check if workflow was stopped before executing queries
if await _check_workflow_stopped(interfaceDbChat, workflowId):
logger.info(f"Workflow {workflowId} was stopped, aborting query execution")
return
# Step 2: Execute queries
queryResults = {}
webResearchResults = ""
# Execute database queries in parallel
if needsDatabaseQuery and sql_queries:
logger.info(f"Executing {len(sql_queries)} database queries in parallel...")
await _emit_log_and_event(interfaceDbChat, workflowId, event_manager, f"Führe {len(sql_queries)} Datenbankabfrage(n) parallel aus...")
try:
queryResults = await _execute_queries_parallel(sql_queries)
# Log results summary
successful_queries = [k for k in queryResults.keys() if k.startswith("query_") and not k.endswith("_error") and not k.endswith("_data")]
failed_queries = [k for k in queryResults.keys() if k.endswith("_error")]
if successful_queries:
total_rows = sum(len(queryResults.get(f"{k}_data", [])) for k in successful_queries)
logger.info(f"Successfully executed {len(successful_queries)} query/queries, total {total_rows} rows")
await _emit_log_and_event(
interfaceDbChat,
workflowId,
event_manager,
f"Abgeschlossen: {len(successful_queries)} Abfrage(n) erfolgreich, {total_rows} Ergebnis{'e' if total_rows != 1 else ''} gefunden"
)
if failed_queries:
logger.warning(f"{len(failed_queries)} query/queries failed")
await _emit_log_and_event(
interfaceDbChat,
workflowId,
event_manager,
f"Warnung: {len(failed_queries)} Abfrage(n) fehlgeschlagen",
log_type="warning"
)
except Exception as e:
logger.error(f"Error executing parallel queries: {e}")
queryResults["error"] = f"Error executing queries: {str(e)}"
await _emit_log_and_event(
interfaceDbChat,
workflowId,
event_manager,
"Fehler bei parallelen Datenbankabfragen",
log_type="error"
)
# Execute web research
if needsWebResearch:
logger.info("Performing web research...")
await _emit_log_and_event(interfaceDbChat, workflowId, event_manager, "Suche im Internet nach Informationen...")
try:
# Rebuild enriched query with database results if available (better product context)
web_research_query = _buildWebResearchQuery(
userInput.prompt,
workflow.messages,
queryResults if queryResults else None
)
logger.info(f"Using enriched web research query: '{web_research_query}'")
researchResult = await services.web.performWebResearch(
prompt=web_research_query,
urls=[],
country=None,
language=userInput.userLanguage or "de",
researchDepth="general",
operationId=None
)
webResearchResults = json.dumps(researchResult, ensure_ascii=False, indent=2) if isinstance(researchResult, dict) else str(researchResult)
await _emit_log_and_event(interfaceDbChat, workflowId, event_manager, "Internet-Recherche abgeschlossen")
except Exception as e:
logger.error(f"Web research failed: {e}", exc_info=True)
webResearchResults = f"Web research error: {str(e)}"
await _emit_log_and_event(interfaceDbChat, workflowId, event_manager, "Internet-Recherche fehlgeschlagen", log_type="warning")
# Check if workflow was stopped before generating final answer
if await _check_workflow_stopped(interfaceDbChat, workflowId):
logger.info(f"Workflow {workflowId} was stopped, aborting final answer generation")
return
# Step 3: Generate final answer using AI
logger.info("Generating final answer with AI...")
await _emit_log_and_event(interfaceDbChat, workflowId, event_manager, "Formuliere finale Antwort...")
# Build prompt for final answer
system_prompt = get_final_answer_system_prompt()
# Build answer context with query results
answerContext = f"User question: {userInput.prompt}{context}\n\n"
# Add database results - organize by query with metadata
db_results_part = ""
if queryResults:
successful_results = []
error_results = []
# Extract query metadata from sql_queries if available
query_metadata = {}
if sql_queries:
for i, q_info in enumerate(sql_queries, 1):
query_metadata[f"query_{i}"] = {
"purpose": q_info.get("purpose", f"Query {i}"),
"table": q_info.get("table", "Unknown")
}
# Organize results by query number
query_numbers = set()
for key in queryResults.keys():
if key.startswith("query_") and not key.endswith("_data"):
# Extract query number (e.g., "query_1" -> 1)
try:
num = int(key.split("_")[1])
query_numbers.add(num)
except (ValueError, IndexError):
pass
# Build results with metadata
for query_num in sorted(query_numbers):
query_key = f"query_{query_num}"
error_key = f"{query_key}_error"
if error_key in queryResults:
error_msg = queryResults[error_key]
metadata = query_metadata.get(query_key, {})
purpose = metadata.get("purpose", f"Query {query_num}")
table = metadata.get("table", "Unknown")
error_results.append(f"Abfrage {query_num} ({table} - {purpose}): {error_msg}")
elif query_key in queryResults:
result_text = queryResults[query_key]
metadata = query_metadata.get(query_key, {})
purpose = metadata.get("purpose", f"Query {query_num}")
table = metadata.get("table", "Unknown")
successful_results.append(f"=== Abfrage {query_num}: {purpose} (Tabelle: {table}) ===\n{result_text}")
# Handle general error if present
if "error" in queryResults:
error_results.append(f"Allgemeiner Fehler: {queryResults['error']}")
if successful_results:
db_results_part = "\n\nDATENBANK-ERGEBNISSE:\n" + "\n\n".join(successful_results)
answerContext += "DATENBANK-ERGEBNISSE:\n" + "\n\n".join(successful_results) + "\n\n"
if error_results:
db_results_part += "\n\nDATENBANK-FEHLER:\n" + "\n".join(error_results)
answerContext += "DATENBANK-FEHLER:\n" + "\n".join(error_results) + "\n\n"
# Add web research results
web_results_part = ""
if webResearchResults:
web_results_part = f"\n\nINTERNET-RECHERCHE:\n{webResearchResults}"
answerContext += f"INTERNET-RECHERCHE:\n{webResearchResults}\n\n"
# Check if we have any actual data
successful_query_keys = [k for k in queryResults.keys() if k.startswith("query_") and not k.endswith("_error") and not k.endswith("_data")]
has_query_results = bool(successful_query_keys)
error_query_keys = [k for k in queryResults.keys() if k.endswith("_error")]
has_only_errors = bool(error_query_keys and not successful_query_keys)
if not has_query_results and needsDatabaseQuery:
db_results_part = "\n\nWICHTIG: Es wurden KEINE Datenbank-Ergebnisse gefunden. Die Datenbankabfrage wurde nicht ausgeführt oder hat keine Ergebnisse zurückgegeben."
if has_only_errors:
db_results_part += "\n\n⚠️⚠️⚠️ KRITISCH - ALLE QUERIES FEHLGESCHLAGEN ⚠️⚠️⚠️\n" + \
"ALLE Datenbankabfragen sind fehlgeschlagen. Es gibt KEINE gültigen Daten aus der Datenbank.\n" + \
"DU DARFST KEINE DATEN ERFINDEN! Schreibe stattdessen: 'Es wurden keine Artikel gefunden' oder 'Die Datenbankabfrage ist fehlgeschlagen'."
answer_prompt = f"""{system_prompt}
Antworte auf die folgende Frage des Nutzers: {userInput.prompt}{context}
{db_results_part}{web_results_part}
KRITISCH: Verwende NUR die oben angegebenen Daten. Erfinde KEINE Werte. Wenn Daten fehlen, schreibe "Nicht verfügbar".
WICHTIG - MEHRERE ABFRAGEN:
Die oben angegebenen DATENBANK-ERGEBNISSE können aus mehreren separaten Abfragen stammen. Jede Abfrage ist mit "=== Abfrage X ===" markiert und enthält Informationen zu einem spezifischen Aspekt (z.B. Artikel-Informationen, Lagerbestände, etc.).
- Kombiniere die Informationen aus ALLEN erfolgreichen Abfragen zu einer umfassenden Antwort
- Beispiel: Wenn Abfrage 1 Artikel-Informationen liefert und Abfrage 2 Lagerbestände liefert, kombiniere beide in deiner Antwort
- Verwende ALLE verfügbaren Informationen aus den verschiedenen Abfragen
⚠️⚠️⚠️ ABSOLUT VERBOTEN - KEINE DATEN ERFINDEN ⚠️⚠️⚠️
Wenn KEINE Datenbank-Ergebnisse vorhanden sind, dann:
- ❌ ERFINDE KEINE Artikelnummern, Artikelbezeichnungen, Preise oder Lagerbestände!
- ❌ ERFINDE KEINE Beispielartikel!
- ✓ Schreibe stattdessen: "Es wurden keine Artikel in der Datenbank gefunden." oder "Die Datenbankabfrage ist fehlgeschlagen."
WICHTIG: Deine Antwort soll NUR die finale Antwort enthalten - KEINE Planungsschritte, KEINE SQL-Queries, KEINE Zwischenschritte!
Beginne DIREKT mit "Aus der Datenbank habe ich..." (wenn Daten vorhanden) oder "Es wurden keine Artikel gefunden" (wenn keine Daten vorhanden)."""
WICHTIG:
- Klare, strukturierte Antwort
- Zahlen mit Tausender-Trennzeichen (7'411)
- Markdown-Tabellen für Daten (max 20 Zeilen)
- Artikelnummern als Link: [ARTIKELNUMMER](/details/ARTIKELNUMMER)
- Wenn Excel-Datei erstellt wurde, bestätige dies explizit am Ende
- Wenn Excel-Verarbeitung fehlgeschlagen ist, erkläre dem Nutzer den Fehler klar"""
answerRequest = AiCallRequest(
prompt=answer_prompt,
context=answerContext if (queryResults or webResearchResults) else None,
options=AiCallOptions(
resultFormat="txt",
operationType=OperationTypeEnum.DATA_ANALYSE,
processingMode=ProcessingModeEnum.DETAILED
)
)
answerResponse = await services.ai.callAi(answerRequest)
finalAnswer = answerResponse.content
logger.info("Final answer generated")
# Check if workflow was stopped during AI call - if so, don't store the message
if await _check_workflow_stopped(interfaceDbChat, workflowId):
logger.info(f"Workflow {workflowId} was stopped during final answer generation, not storing message")
return
# Reload workflow to get current message count
workflow = interfaceDbChat.getWorkflow(workflowId)
# Double-check workflow wasn't stopped while we were reloading
if workflow and workflow.status == "stopped":
logger.info(f"Workflow {workflowId} was stopped, not storing final message")
return
# Create assistant message with final answer
assistantMessageData = {
"id": message_id,
"workflowId": workflowId,
"parentMessageId": userMessageId,
"message": finalAnswer,
"role": "assistant",
"status": "last",
"sequenceNr": len(workflow.messages) + 1,
"publishedAt": getUtcTimestamp(),
"success": True,
"roundNumber": workflow.currentRound,
"taskNumber": 0,
"actionNumber": 0
}
assistantMessage = interfaceDbChat.createMessage(assistantMessageData)
logger.info(f"Stored assistant message with final answer: {assistantMessage.id}")
# Emit message event for streaming (exact chatData format)
message_timestamp = parseTimestamp(assistantMessage.publishedAt, default=getUtcTimestamp())
await event_manager.emit_event(
context_id=workflowId,
event_type="chatdata",
data={
"type": "message",
"createdAt": message_timestamp,
"item": assistantMessage.dict()
},
event_category="chat"
)
# Update workflow status to completed (only if not stopped)
if not await _check_workflow_stopped(interfaceDbChat, workflowId):
interfaceDbChat.updateWorkflow(workflowId, {
"status": "completed",
"lastActivity": getUtcTimestamp()
})
else:
logger.info(f"Workflow {workflowId} was stopped, not updating status to completed")
logger.info(f"Chatbot processing completed for workflow {workflowId}, generated {len(queries)} queries and final answer")
# Emit completion event only if workflow wasn't stopped
if not await _check_workflow_stopped(interfaceDbChat, workflowId):
await event_manager.emit_event(
context_id=workflowId,
event_type="complete",
data={"workflowId": workflowId},
event_category="workflow",
message="Chatbot-Verarbeitung abgeschlossen",
step="complete"
)
# Schedule cleanup
await event_manager.cleanup(workflowId)
except Exception as e:
logger.error(f"Error processing chatbot message: {str(e)}", exc_info=True)
# Check if workflow was stopped - if so, don't store error message
if await _check_workflow_stopped(interfaceDbChat, workflowId):
logger.info(f"Workflow {workflowId} was stopped, not storing error message")
return
# Store error message
try:
# Reload workflow to get current message count
workflow = interfaceDbChat.getWorkflow(workflowId)
# Double-check workflow wasn't stopped while we were reloading
if workflow and workflow.status == "stopped":
logger.info(f"Workflow {workflowId} was stopped, not storing error message")
return
errorMessageData = {
"id": f"msg_{uuid.uuid4()}",
"workflowId": workflowId,
"parentMessageId": userMessageId,
"message": f"Sorry, I encountered an error: {str(e)}",
"role": "assistant",
"status": "last",
"sequenceNr": len(workflow.messages) + 1,
"publishedAt": getUtcTimestamp(),
"success": False,
"roundNumber": workflow.currentRound if workflow else 1,
"taskNumber": 0,
"actionNumber": 0
}
errorMessage = interfaceDbChat.createMessage(errorMessageData)
# Emit message event for streaming (exact chatData format)
message_timestamp = parseTimestamp(errorMessage.publishedAt, default=getUtcTimestamp())
await event_manager.emit_event(
context_id=workflowId,
event_type="chatdata",
data={
"type": "message",
"createdAt": message_timestamp,
"item": errorMessage.dict()
},
event_category="chat"
)
# Update workflow status to error (only if not stopped)
if not await _check_workflow_stopped(interfaceDbChat, workflowId):
interfaceDbChat.updateWorkflow(workflowId, {
"status": "error",
"lastActivity": getUtcTimestamp()
})
else:
logger.info(f"Workflow {workflowId} was stopped, not updating status to error")
# Schedule cleanup
await event_manager.cleanup(workflowId)
except Exception as storeError:
logger.error(f"Error storing error message: {storeError}")