1289 lines
53 KiB
Python
1289 lines
53 KiB
Python
# Copyright (c) 2025 Patrick Motsch
|
|
# All rights reserved.
|
|
"""
|
|
Simple chatbot feature - basic implementation.
|
|
User input is processed by AI to create list of needed queries.
|
|
Those queries get streamed back.
|
|
"""
|
|
|
|
import logging
|
|
import json
|
|
import uuid
|
|
import asyncio
|
|
import re
|
|
from typing import Optional, Dict, Any, List
|
|
|
|
from modules.datamodels.datamodelChat import ChatWorkflow, UserInputRequest, WorkflowModeEnum, ChatLog, ChatDocument
|
|
from modules.datamodels.datamodelUam import User
|
|
from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationTypeEnum, ProcessingModeEnum
|
|
from modules.datamodels.datamodelDocref import DocumentReferenceList, DocumentItemReference
|
|
from modules.shared.timeUtils import getUtcTimestamp, parseTimestamp
|
|
from modules.services import getInterface as getServices
|
|
from modules.features.chatbot.streaming.events import get_event_manager
|
|
from modules.features.chatbot.chatbot import Chatbot
|
|
from modules.features.chatbot.bridges.ai import AICenterChatModel
|
|
from modules.features.chatbot.bridges.memory import DatabaseCheckpointer
|
|
from modules.features.chatbot.config import (
|
|
load_chatbot_config_from_instance,
|
|
ChatbotConfig
|
|
)
|
|
from modules.datamodels.datamodelAi import OperationTypeEnum, ProcessingModeEnum
|
|
from modules.workflows.methods.methodAi.methodAi import MethodAi
|
|
from modules.connectors.connectorPreprocessor import PreprocessorConnector
|
|
from modules.features.chatbot.chatbotConstants import generate_conversation_name
|
|
import base64
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def _extractJsonFromResponse(content: str) -> Optional[dict]:
|
|
"""Extract JSON from AI response, handling markdown code blocks."""
|
|
# Try direct JSON parse first
|
|
try:
|
|
return json.loads(content.strip())
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
# Try to extract JSON from markdown code blocks
|
|
json_match = re.search(r'```(?:json)?\s*(\{.*?\})\s*```', content, re.DOTALL)
|
|
if json_match:
|
|
try:
|
|
return json.loads(json_match.group(1))
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
# Try to find JSON object in the text
|
|
json_match = re.search(r'\{.*\}', content, re.DOTALL)
|
|
if json_match:
|
|
try:
|
|
return json.loads(json_match.group(0))
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
return None
|
|
|
|
|
|
async def chatProcess(
|
|
currentUser: User,
|
|
mandateId: Optional[str],
|
|
userInput: UserInputRequest,
|
|
workflowId: Optional[str] = None,
|
|
featureInstanceId: Optional[str] = None
|
|
) -> ChatWorkflow:
|
|
"""
|
|
Simple chatbot processing - analyze user input and generate queries.
|
|
|
|
Flow:
|
|
1. Create or load workflow
|
|
2. Store user message
|
|
3. AI analyzes user input to create list of needed queries
|
|
4. Stream queries back
|
|
|
|
Args:
|
|
currentUser: Current user
|
|
mandateId: Mandate ID for the workflow
|
|
userInput: User input request
|
|
workflowId: Optional workflow ID to continue existing conversation
|
|
featureInstanceId: Feature instance ID for loading instance-specific config
|
|
|
|
Returns:
|
|
ChatWorkflow instance
|
|
"""
|
|
try:
|
|
# Get services with mandate and feature instance context
|
|
services = getServices(currentUser, mandateId=mandateId, featureInstanceId=featureInstanceId)
|
|
services.featureCode = 'chatbot'
|
|
|
|
interfaceDbChat = services.interfaceDbChat
|
|
|
|
# Get event manager and create queue if needed
|
|
event_manager = get_event_manager()
|
|
|
|
# Create or load workflow
|
|
if workflowId:
|
|
workflow = interfaceDbChat.getWorkflow(workflowId)
|
|
if not workflow:
|
|
raise ValueError(f"Workflow {workflowId} not found")
|
|
|
|
# Resume workflow: increment round number
|
|
new_round = workflow.currentRound + 1
|
|
interfaceDbChat.updateWorkflow(workflowId, {
|
|
"status": "running",
|
|
"currentRound": new_round,
|
|
"lastActivity": getUtcTimestamp()
|
|
})
|
|
workflow = interfaceDbChat.getWorkflow(workflowId)
|
|
logger.info(f"Resumed workflow {workflowId}, round incremented to {new_round}")
|
|
|
|
# Create event queue if it doesn't exist (for streaming)
|
|
if not event_manager.has_queue(workflowId):
|
|
event_manager.create_queue(workflowId)
|
|
else:
|
|
# Generate conversation name based on user's prompt
|
|
conversation_name = await generate_conversation_name(
|
|
services,
|
|
userInput.prompt,
|
|
userInput.userLanguage
|
|
)
|
|
|
|
# Create new workflow
|
|
workflowData = {
|
|
"id": str(uuid.uuid4()),
|
|
"mandateId": mandateId,
|
|
"featureInstanceId": featureInstanceId, # Store feature instance for RBAC
|
|
"status": "running",
|
|
"name": conversation_name,
|
|
"currentRound": 1,
|
|
"currentTask": 0,
|
|
"currentAction": 0,
|
|
"totalTasks": 0,
|
|
"totalActions": 0,
|
|
"workflowMode": WorkflowModeEnum.WORKFLOW_CHATBOT.value,
|
|
"startedAt": getUtcTimestamp(),
|
|
"lastActivity": getUtcTimestamp()
|
|
}
|
|
workflow = interfaceDbChat.createWorkflow(workflowData)
|
|
logger.info(f"Created new chatbot workflow: {workflow.id} with name: {conversation_name}")
|
|
|
|
# Create event queue for new workflow (for streaming)
|
|
event_manager.create_queue(workflow.id)
|
|
|
|
# Reload workflow to get current message count
|
|
workflow = interfaceDbChat.getWorkflow(workflow.id)
|
|
|
|
# Process uploaded files and create ChatDocuments
|
|
user_documents = []
|
|
if userInput.listFileId and len(userInput.listFileId) > 0:
|
|
logger.info(f"Processing {len(userInput.listFileId)} uploaded file(s) for user message")
|
|
for fileId in userInput.listFileId:
|
|
try:
|
|
# Get file info from chat service
|
|
fileInfo = services.chat.getFileInfo(fileId)
|
|
if not fileInfo:
|
|
logger.warning(f"No file info found for file ID {fileId}")
|
|
continue
|
|
|
|
originalFileName = fileInfo.get("fileName", "unknown")
|
|
originalMimeType = fileInfo.get("mimeType", "application/octet-stream")
|
|
fileSizeToUse = fileInfo.get("size", 0)
|
|
|
|
# Create ChatDocument for the file
|
|
document = ChatDocument(
|
|
id=str(uuid.uuid4()),
|
|
messageId="", # Will be set when message is created
|
|
fileId=fileId,
|
|
fileName=originalFileName,
|
|
fileSize=fileSizeToUse,
|
|
mimeType=originalMimeType,
|
|
roundNumber=workflow.currentRound,
|
|
taskNumber=0,
|
|
actionNumber=0
|
|
)
|
|
user_documents.append(document)
|
|
logger.info(f"Created ChatDocument for file {fileId} -> {originalFileName}")
|
|
except Exception as e:
|
|
logger.error(f"Error processing file ID {fileId}: {e}", exc_info=True)
|
|
|
|
# Store user message
|
|
userMessageData = {
|
|
"id": f"msg_{uuid.uuid4()}",
|
|
"workflowId": workflow.id,
|
|
"message": userInput.prompt,
|
|
"role": "user",
|
|
"status": "first" if workflowId is None else "step",
|
|
"sequenceNr": len(workflow.messages) + 1,
|
|
"publishedAt": getUtcTimestamp(),
|
|
"roundNumber": workflow.currentRound,
|
|
"taskNumber": 0,
|
|
"actionNumber": 0
|
|
}
|
|
|
|
userMessage = interfaceDbChat.createMessage(userMessageData)
|
|
logger.info(f"Stored user message: {userMessage.id} with {len(user_documents)} document(s)")
|
|
|
|
# Emit message event for streaming (exact chatData format)
|
|
message_timestamp = parseTimestamp(userMessage.publishedAt, default=getUtcTimestamp())
|
|
await event_manager.emit_event(
|
|
context_id=workflow.id,
|
|
event_type="chatdata",
|
|
data={
|
|
"type": "message",
|
|
"createdAt": message_timestamp,
|
|
"item": userMessage.dict()
|
|
},
|
|
event_category="chat"
|
|
)
|
|
|
|
# Update workflow status
|
|
interfaceDbChat.updateWorkflow(workflow.id, {
|
|
"status": "running",
|
|
"lastActivity": getUtcTimestamp()
|
|
})
|
|
|
|
# Process in background using LangGraph (async)
|
|
asyncio.create_task(_processChatbotMessageLangGraph(
|
|
services,
|
|
currentUser,
|
|
workflow.id,
|
|
userInput,
|
|
userMessage.id,
|
|
featureInstanceId=featureInstanceId
|
|
))
|
|
|
|
# Reload workflow to include new message
|
|
workflow = interfaceDbChat.getWorkflow(workflow.id)
|
|
return workflow
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in chatProcess: {str(e)}", exc_info=True)
|
|
raise
|
|
|
|
|
|
async def _execute_queries_parallel(queries: List[Dict[str, Any]]) -> Dict[str, Any]:
|
|
"""
|
|
Execute multiple SQL queries in parallel with shared connector.
|
|
|
|
Args:
|
|
queries: List of query dictionaries, each containing:
|
|
- "query": SQL query string
|
|
- "purpose": Description of what the query retrieves
|
|
- "table": Primary table name
|
|
|
|
Returns:
|
|
Dictionary mapping query indices to results:
|
|
- "query_1", "query_2", etc.: Success result text
|
|
- "query_1_data", "query_2_data", etc.: Raw data arrays
|
|
- "query_1_error", "query_2_error", etc.: Error messages if query failed
|
|
"""
|
|
# Create single connector instance to reuse across all queries
|
|
connector = PreprocessorConnector()
|
|
try:
|
|
async def execute_single_query(idx: int, query_info: Dict[str, Any]):
|
|
"""Execute a single query using shared connector."""
|
|
try:
|
|
query_text = query_info.get("query", "")
|
|
result = await connector.executeQuery(query_text, return_json=True)
|
|
return idx, result, None
|
|
except Exception as e:
|
|
return idx, None, str(e)
|
|
|
|
# Execute all queries in parallel with shared connector
|
|
tasks = [execute_single_query(i, q) for i, q in enumerate(queries)]
|
|
results = await asyncio.gather(*tasks, return_exceptions=True)
|
|
finally:
|
|
# Close connector once after all queries complete
|
|
await connector.close()
|
|
|
|
# Process results into dictionary
|
|
query_results = {}
|
|
for result in results:
|
|
if isinstance(result, Exception):
|
|
# Handle exceptions from gather
|
|
logger.error(f"Exception in parallel query execution: {result}")
|
|
continue
|
|
|
|
idx, result_data, error = result
|
|
|
|
if error:
|
|
query_results[f"query_{idx+1}_error"] = error
|
|
logger.error(f"Query {idx+1} failed: {error}")
|
|
else:
|
|
if result_data and not result_data.get("text", "").startswith(("Error:", "Query failed:")):
|
|
query_results[f"query_{idx+1}"] = result_data.get("text", "")
|
|
query_results[f"query_{idx+1}_data"] = result_data.get("data", [])
|
|
row_count = len(result_data.get('data', []))
|
|
logger.info(f"Query {idx+1} executed successfully, returned {row_count} rows")
|
|
else:
|
|
error_text = result_data.get("text", "Query failed") if result_data else "Query failed: No response"
|
|
query_results[f"query_{idx+1}_error"] = error_text
|
|
logger.error(f"Query {idx+1} failed: {error_text}")
|
|
|
|
return query_results
|
|
|
|
|
|
async def _emit_log_and_event(
|
|
interfaceDbChat,
|
|
workflowId: str,
|
|
event_manager,
|
|
message: str,
|
|
log_type: str = "info",
|
|
status: str = "running",
|
|
round_number: Optional[int] = None
|
|
) -> None:
|
|
"""
|
|
Store log in database and emit event for streaming.
|
|
|
|
Args:
|
|
interfaceDbChat: Database interface
|
|
workflowId: Workflow ID
|
|
event_manager: Event manager for streaming
|
|
message: Log message
|
|
log_type: Log type (info, warning, error)
|
|
status: Status string
|
|
round_number: Optional round number (will be fetched from workflow if not provided)
|
|
"""
|
|
try:
|
|
# Get round number from workflow if not provided
|
|
if round_number is None:
|
|
workflow = interfaceDbChat.getWorkflow(workflowId)
|
|
if workflow:
|
|
round_number = workflow.currentRound
|
|
|
|
log_timestamp = getUtcTimestamp()
|
|
log_data = {
|
|
"id": f"log_{uuid.uuid4()}",
|
|
"workflowId": workflowId,
|
|
"message": message,
|
|
"type": log_type,
|
|
"timestamp": log_timestamp,
|
|
"status": status,
|
|
"roundNumber": round_number
|
|
}
|
|
# Store log in database
|
|
created_log = interfaceDbChat.createLog(log_data)
|
|
|
|
# Emit event directly for streaming (using correct signature)
|
|
if created_log and event_manager:
|
|
try:
|
|
from modules.datamodels.datamodelChat import ChatLog
|
|
# Convert to dict if it's a Pydantic model
|
|
if hasattr(created_log, "model_dump"):
|
|
log_dict = created_log.model_dump()
|
|
elif hasattr(created_log, "dict"):
|
|
log_dict = created_log.dict()
|
|
else:
|
|
log_dict = log_data
|
|
|
|
await event_manager.emit_event(
|
|
context_id=workflowId,
|
|
event_type="chatdata",
|
|
data={
|
|
"type": "log",
|
|
"createdAt": log_timestamp,
|
|
"item": log_dict
|
|
},
|
|
event_category="chat",
|
|
message="New log",
|
|
step="log"
|
|
)
|
|
except Exception as emit_error:
|
|
logger.warning(f"Error emitting log event: {emit_error}")
|
|
except Exception as e:
|
|
logger.error(f"Error storing log: {e}", exc_info=True)
|
|
|
|
|
|
async def _check_workflow_stopped(interfaceDbChat, workflowId: str) -> bool:
|
|
"""
|
|
Check if workflow was stopped.
|
|
|
|
Args:
|
|
interfaceDbChat: Database interface
|
|
workflowId: Workflow ID
|
|
|
|
Returns:
|
|
True if workflow is stopped, False otherwise
|
|
"""
|
|
try:
|
|
workflow = interfaceDbChat.getWorkflow(workflowId)
|
|
return workflow and workflow.status == "stopped"
|
|
except Exception as e:
|
|
logger.warning(f"Error checking workflow status: {e}")
|
|
return False
|
|
|
|
|
|
def _buildWebResearchQuery(userPrompt: str, workflowMessages: List, queryResults: Optional[Dict[str, Any]] = None) -> str:
|
|
"""
|
|
Build enriched web research query by extracting product context from conversation history and current prompt.
|
|
|
|
Extracts product information from:
|
|
1. Current user prompt (article numbers, product mentions)
|
|
2. Database query results (if available)
|
|
3. Previous assistant messages (conversation history)
|
|
|
|
Args:
|
|
userPrompt: Current user prompt
|
|
workflowMessages: List of workflow messages (conversation history)
|
|
queryResults: Optional database query results to extract product info from
|
|
|
|
Returns:
|
|
Enriched search query string
|
|
"""
|
|
# Normalize user prompt for detection
|
|
prompt_lower = userPrompt.lower().strip()
|
|
|
|
# Patterns that indicate a search request
|
|
search_patterns = [
|
|
"ja", "yes", "oui", "si",
|
|
"such", "suche", "search", "recherche", "recherchier",
|
|
"internet", "web", "online",
|
|
"datenblatt", "datasheet", "fiche technique",
|
|
"mehr informationen", "more information", "plus d'information",
|
|
"weitere informationen", "further information", "additional information"
|
|
]
|
|
|
|
# Certification patterns that require web research
|
|
certification_patterns = [
|
|
"ul", "ce", "tüv", "vde", "iec", "en", "iso",
|
|
"zertifiziert", "certified", "certification", "zertifizierung",
|
|
"geprüft", "approved", "compliance"
|
|
]
|
|
|
|
# Check if current prompt contains search-related keywords
|
|
has_search_intent = any(pattern in prompt_lower for pattern in search_patterns)
|
|
|
|
# Check if prompt contains certification-related keywords
|
|
has_certification_intent = any(pattern in prompt_lower for pattern in certification_patterns)
|
|
|
|
# Extract product information - try multiple sources
|
|
article_number = None
|
|
article_description = None
|
|
supplier = None
|
|
|
|
# Pattern for article numbers like "6AV2 181-8XP00-0AX0" or "6AV2181-8XP00-0AX0"
|
|
article_patterns = [
|
|
r'\b[A-Z0-9]{2,}\s+[0-9]{3,}-[A-Z0-9-]+\b', # With space: "6AV2 181-8XP00-0AX0"
|
|
r'\b[A-Z0-9]{4,}[\s-][A-Z0-9-]{6,}\b', # General pattern
|
|
r'\b[A-Z]{2,}[0-9]+\s+[0-9]+-[A-Z0-9-]+\b', # Specific Siemens pattern
|
|
]
|
|
|
|
# 1. First, try to extract from current user prompt
|
|
for pattern in article_patterns:
|
|
matches = re.findall(pattern, userPrompt)
|
|
if matches:
|
|
article_number = matches[0]
|
|
logger.info(f"Extracted article number from user prompt: {article_number}")
|
|
break
|
|
|
|
# 2. Try to extract from database query results if available
|
|
# Always check queryResults to enrich with product description and supplier, even if article_number was already found
|
|
if queryResults:
|
|
# Look for article numbers in query result text (if not already found)
|
|
if not article_number:
|
|
for key in queryResults.keys():
|
|
if key.startswith("query_") and not key.endswith("_error") and not key.endswith("_data"):
|
|
result_text = queryResults.get(key, "")
|
|
if isinstance(result_text, str):
|
|
for pattern in article_patterns:
|
|
matches = re.findall(pattern, result_text)
|
|
if matches:
|
|
article_number = matches[0]
|
|
logger.info(f"Extracted article number from query results: {article_number}")
|
|
break
|
|
if article_number:
|
|
break
|
|
|
|
# Always check data arrays for product description and supplier (even if article_number already found)
|
|
for key in queryResults.keys():
|
|
if key.startswith("query_") and not key.endswith("_error") and not key.endswith("_data"):
|
|
data_key = f"{key}_data"
|
|
if data_key in queryResults:
|
|
data_array = queryResults[data_key]
|
|
if isinstance(data_array, list) and len(data_array) > 0:
|
|
# Look for article number in first row (if not already found)
|
|
first_row = data_array[0]
|
|
if isinstance(first_row, dict):
|
|
# Check common article number fields (if not already found)
|
|
if not article_number:
|
|
for field in ["Artikelnummer", "Artikelkürzel", "article_number", "articleNumber"]:
|
|
if field in first_row and first_row[field]:
|
|
article_number = str(first_row[field])
|
|
logger.info(f"Extracted article number from query data: {article_number}")
|
|
break
|
|
|
|
# Always check article description (can enrich even if article_number already found)
|
|
if not article_description:
|
|
for field in ["Artikelbezeichnung", "Bezeichnung", "article_description", "description"]:
|
|
if field in first_row and first_row[field]:
|
|
article_description = str(first_row[field])
|
|
logger.info(f"Extracted article description from query data: {article_description}")
|
|
break
|
|
|
|
# Always check supplier (can enrich even if article_number already found)
|
|
if not supplier:
|
|
for field in ["Lieferant", "Supplier", "supplier"]:
|
|
if field in first_row and first_row[field]:
|
|
supplier = str(first_row[field])
|
|
logger.info(f"Extracted supplier from query data: {supplier}")
|
|
break
|
|
|
|
# If we found all needed info, we can stop
|
|
if article_number and article_description and supplier:
|
|
break
|
|
|
|
# Check if current prompt is an explicit search request that should NOT use context
|
|
# If user explicitly asks to search for something, prioritize that over previous messages
|
|
explicit_search_patterns = [
|
|
r"recherchier\s+(?:im\s+internet\s+)?nach\s+(.+)",
|
|
r"suche\s+(?:im\s+internet\s+)?nach\s+(.+)",
|
|
r"search\s+(?:the\s+internet\s+)?for\s+(.+)",
|
|
r"find\s+(?:information\s+)?(?:about\s+)?(.+)",
|
|
r"recherche\s+(?:sur\s+internet\s+)?(.+)"
|
|
]
|
|
|
|
explicit_search_term = None
|
|
for pattern in explicit_search_patterns:
|
|
match = re.search(pattern, userPrompt, re.IGNORECASE)
|
|
if match:
|
|
explicit_search_term = match.group(1).strip()
|
|
logger.info(f"Found explicit search term in prompt: '{explicit_search_term}'")
|
|
break
|
|
|
|
# 3. Extract from previous assistant messages (conversation history)
|
|
# ONLY if there's no explicit search term (to avoid using old context for new searches)
|
|
if not explicit_search_term and (not article_number or not article_description):
|
|
for msg in reversed(workflowMessages[-10:]):
|
|
if msg.role == "assistant":
|
|
message_text = msg.message
|
|
|
|
# Extract article number if not found yet
|
|
if not article_number:
|
|
for pattern in article_patterns:
|
|
matches = re.findall(pattern, message_text)
|
|
if matches:
|
|
article_number = matches[0]
|
|
break
|
|
|
|
# Extract article description if not found yet
|
|
if not article_description:
|
|
description_patterns = [
|
|
r'Es handelt sich um\s+([^\.]+)',
|
|
r'It is a\s+([^\.]+)',
|
|
r'C\'est\s+([^\.]+)',
|
|
r'Bezeichnung:\s*([^\n]+)',
|
|
r'Description:\s*([^\n]+)',
|
|
r'Artikelbezeichnung:\s*([^\n]+)',
|
|
r'Artikelbezeichnung:\s*([^\n]+)'
|
|
]
|
|
for pattern in description_patterns:
|
|
match = re.search(pattern, message_text, re.IGNORECASE)
|
|
if match:
|
|
article_description = match.group(1).strip()
|
|
break
|
|
|
|
# Extract supplier if not found yet
|
|
if not supplier:
|
|
supplier_patterns = [
|
|
r'von\s+([A-Z][A-Za-z\s]+(?:AG|GmbH|Ltd|Inc|Corp)?)',
|
|
r'from\s+([A-Z][A-Za-z\s]+(?:AG|GmbH|Ltd|Inc|Corp)?)',
|
|
r'Lieferant:\s*([^\n]+)',
|
|
r'Supplier:\s*([^\n]+)'
|
|
]
|
|
for pattern in supplier_patterns:
|
|
match = re.search(pattern, message_text, re.IGNORECASE)
|
|
if match:
|
|
supplier = match.group(1).strip()
|
|
break
|
|
|
|
# Stop if we found everything
|
|
if article_number and article_description and supplier:
|
|
break
|
|
|
|
# Build enriched search query
|
|
query_parts = []
|
|
|
|
# If we have an explicit search term, use it as the primary query
|
|
if explicit_search_term:
|
|
query_parts.append(explicit_search_term)
|
|
logger.info(f"Using explicit search term as primary query: '{explicit_search_term}'")
|
|
# If we have search intent but no product info, try to use the user prompt intelligently
|
|
elif has_search_intent and not article_number and not article_description:
|
|
# Try to extract meaningful parts from the prompt
|
|
# Remove common search phrases and keep the product-related parts
|
|
cleaned_prompt = userPrompt
|
|
for phrase in ["recherchier", "recherche", "suche nach", "search for", "find", "informationen zu", "information about", "weitere informationen", "further information", "im internet", "the internet", "sur internet"]:
|
|
cleaned_prompt = re.sub(phrase, "", cleaned_prompt, flags=re.IGNORECASE)
|
|
cleaned_prompt = cleaned_prompt.strip()
|
|
|
|
# Use cleaned prompt if it has meaningful content
|
|
if cleaned_prompt and len(cleaned_prompt) > 2:
|
|
query_parts.append(cleaned_prompt)
|
|
|
|
# Add article description if found (but NOT if we have an explicit search term)
|
|
if article_description and not explicit_search_term:
|
|
query_parts.append(article_description)
|
|
|
|
# Add article number if found (but NOT if we have an explicit search term)
|
|
if article_number and not explicit_search_term:
|
|
query_parts.append(article_number)
|
|
|
|
# Add supplier if found (but NOT if we have an explicit search term)
|
|
if supplier and not explicit_search_term:
|
|
query_parts.append(supplier)
|
|
|
|
# Extract certification information from prompt if present
|
|
certification_terms = []
|
|
if has_certification_intent:
|
|
# Extract specific certification mentions
|
|
cert_keywords = {
|
|
"ul": "UL certification",
|
|
"ce": "CE certification",
|
|
"tüv": "TÜV certification",
|
|
"vde": "VDE certification",
|
|
"iec": "IEC certification",
|
|
"iso": "ISO certification"
|
|
}
|
|
for cert_key, cert_term in cert_keywords.items():
|
|
if cert_key in prompt_lower:
|
|
certification_terms.append(cert_term)
|
|
|
|
# If no specific certification found but certification intent detected, add generic term
|
|
if not certification_terms:
|
|
certification_terms.append("certification")
|
|
|
|
# Add certification terms to query if found
|
|
if certification_terms:
|
|
query_parts.extend(certification_terms)
|
|
|
|
# Add "Datenblatt" or "datasheet" if user requested it or if we have product info
|
|
# But NOT if we have an explicit search term (user wants to search for something specific)
|
|
if not explicit_search_term:
|
|
if "datenblatt" in prompt_lower or "datasheet" in prompt_lower or "fiche technique" in prompt_lower:
|
|
query_parts.append("Datenblatt")
|
|
elif query_parts and (article_number or article_description):
|
|
# If we have product info but no explicit request for datasheet, add it anyway
|
|
query_parts.append("Datenblatt")
|
|
|
|
# If we found product information or built a meaningful query, use it
|
|
if query_parts:
|
|
enriched_query = " ".join(query_parts)
|
|
logger.info(f"Built enriched search query: '{enriched_query}' from context (original: '{userPrompt}')")
|
|
return enriched_query
|
|
else:
|
|
# Fall back to original prompt, but try to clean it up
|
|
logger.info(f"No product context found, using original prompt: '{userPrompt}'")
|
|
return userPrompt
|
|
|
|
|
|
async def _convert_file_ids_to_document_references(
|
|
services,
|
|
file_ids: List[str]
|
|
) -> DocumentReferenceList:
|
|
"""
|
|
Convert file IDs to DocumentReferenceList for use with ai.process.
|
|
|
|
Args:
|
|
services: Services instance
|
|
file_ids: List of file IDs to convert
|
|
|
|
Returns:
|
|
DocumentReferenceList with docItem references
|
|
"""
|
|
references = []
|
|
|
|
# Get workflow to search for ChatDocuments
|
|
workflow = services.workflow
|
|
if not workflow:
|
|
logger.error("Cannot convert file IDs to document references: workflow not set in services")
|
|
return DocumentReferenceList(references=[])
|
|
|
|
for file_id in file_ids:
|
|
try:
|
|
# Get file info to verify it exists
|
|
file_info = services.chat.getFileInfo(file_id)
|
|
if not file_info:
|
|
logger.warning(f"File {file_id} not found, skipping")
|
|
continue
|
|
|
|
# Find ChatDocument that has this fileId
|
|
document_id = None
|
|
if workflow.messages:
|
|
for message in workflow.messages:
|
|
if hasattr(message, 'documents') and message.documents:
|
|
for doc in message.documents:
|
|
if getattr(doc, 'fileId', None) == file_id:
|
|
document_id = getattr(doc, 'id', None)
|
|
break
|
|
if document_id:
|
|
break
|
|
|
|
# Search database if not found in messages
|
|
if not document_id:
|
|
try:
|
|
from modules.interfaces.interfaceRbac import getRecordsetWithRBAC
|
|
documents = getRecordsetWithRBAC(
|
|
services.interfaceDbChat.db,
|
|
ChatDocument,
|
|
services.currentUser,
|
|
recordFilter={"fileId": file_id}
|
|
)
|
|
if documents:
|
|
workflow_message_ids = {msg.id for msg in workflow.messages} if workflow.messages else set()
|
|
for doc in documents:
|
|
if doc.get("messageId") in workflow_message_ids:
|
|
document_id = doc.get("id")
|
|
break
|
|
except Exception:
|
|
pass # Fallback to fileId
|
|
|
|
# Use ChatDocument ID if found, otherwise use fileId as fallback
|
|
ref = DocumentItemReference(documentId=document_id if document_id else file_id)
|
|
references.append(ref)
|
|
except Exception as e:
|
|
logger.error(f"Error converting fileId {file_id}: {e}", exc_info=True)
|
|
|
|
logger.info(f"Converted {len(references)} file IDs to document references")
|
|
return DocumentReferenceList(references=references)
|
|
|
|
|
|
def _format_query_results_as_lookup(query_data: Dict[str, List[Dict]]) -> str:
|
|
"""
|
|
Format database query results as JSON lookup table for Excel matching.
|
|
Converts query result data into structured JSON format: {Artikelnummer: {columns...}}
|
|
|
|
Args:
|
|
query_data: Dict with query_key -> list of row dicts (from connector with return_json=True)
|
|
|
|
Returns:
|
|
JSON string formatted as lookup table
|
|
"""
|
|
lookup_table = {}
|
|
|
|
for query_key, rows in query_data.items():
|
|
if query_key == "error" or not rows:
|
|
logger.warning(f"Skipping query key '{query_key}' - no rows or error")
|
|
continue
|
|
|
|
logger.info(f"Processing {len(rows)} rows from query '{query_key}'")
|
|
|
|
for row in rows:
|
|
if not isinstance(row, dict):
|
|
logger.warning(f"Skipping non-dict row: {type(row)}")
|
|
continue
|
|
|
|
# Find Artikelnummer field (case-insensitive)
|
|
artikelnummer = None
|
|
for key in row.keys():
|
|
if key.lower() in ['artikelnummer', 'artikel_nummer', 'art_nr', 'part_number']:
|
|
artikelnummer = str(row[key])
|
|
break
|
|
|
|
if artikelnummer:
|
|
lookup_table[artikelnummer] = row
|
|
else:
|
|
logger.warning(f"No Artikelnummer found in row with keys: {list(row.keys())}")
|
|
|
|
logger.info(f"Generated lookup table with {len(lookup_table)} entries")
|
|
if lookup_table:
|
|
sample_keys = list(lookup_table.keys())[:3]
|
|
logger.info(f"Sample Artikelnummern: {sample_keys}")
|
|
if sample_keys:
|
|
sample_entry = lookup_table[sample_keys[0]]
|
|
logger.info(f"Sample entry keys: {list(sample_entry.keys())}")
|
|
|
|
return json.dumps(lookup_table, ensure_ascii=False, indent=2)
|
|
|
|
|
|
async def _create_chat_document_from_action_document(
|
|
services,
|
|
action_document,
|
|
message_id: str,
|
|
workflow_id: str,
|
|
round_number: int
|
|
) -> ChatDocument:
|
|
"""
|
|
Create a ChatDocument from an ActionDocument by storing the file data.
|
|
|
|
Args:
|
|
services: Services instance
|
|
action_document: ActionDocument from ai.process result
|
|
message_id: ID of the message to attach to
|
|
workflow_id: Workflow ID
|
|
round_number: Round number
|
|
|
|
Returns:
|
|
ChatDocument instance
|
|
"""
|
|
try:
|
|
# Get file data (could be bytes or string)
|
|
document_data = action_document.documentData
|
|
|
|
# Convert to bytes if needed
|
|
if isinstance(document_data, str):
|
|
# Check if it's base64 encoded
|
|
try:
|
|
# Try to decode as base64 first
|
|
file_bytes = base64.b64decode(document_data)
|
|
except Exception:
|
|
# Not base64, encode as UTF-8
|
|
file_bytes = document_data.encode('utf-8')
|
|
elif isinstance(document_data, bytes):
|
|
file_bytes = document_data
|
|
else:
|
|
# Try to convert to bytes
|
|
try:
|
|
file_bytes = bytes(document_data)
|
|
except Exception:
|
|
# Last resort: convert to string then encode
|
|
file_bytes = str(document_data).encode('utf-8')
|
|
|
|
# Get MIME type (default to Excel)
|
|
mime_type = action_document.mimeType or "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
|
|
|
|
# Get file name
|
|
file_name = action_document.documentName or "data_export.xlsx"
|
|
# Ensure it has .xlsx extension
|
|
if not file_name.lower().endswith('.xlsx'):
|
|
# Remove any existing extension and add .xlsx
|
|
file_name = file_name.rsplit('.', 1)[0] + '.xlsx'
|
|
|
|
# Store file using component interface
|
|
file_item = services.interfaceDbComponent.createFile(
|
|
name=file_name,
|
|
mimeType=mime_type,
|
|
content=file_bytes
|
|
)
|
|
|
|
# Store file data
|
|
success = services.interfaceDbComponent.createFileData(file_item.id, file_bytes)
|
|
if not success:
|
|
logger.warning(f"Failed to store file data for {file_item.id}, but continuing...")
|
|
|
|
# Create ChatDocument
|
|
chat_document = ChatDocument(
|
|
id=str(uuid.uuid4()),
|
|
messageId=message_id,
|
|
fileId=file_item.id,
|
|
fileName=file_name,
|
|
fileSize=len(file_bytes),
|
|
mimeType=mime_type,
|
|
roundNumber=round_number,
|
|
taskNumber=0,
|
|
actionNumber=0
|
|
)
|
|
|
|
logger.info(f"Created ChatDocument {chat_document.id} from ActionDocument {file_name} (size: {len(file_bytes)} bytes)")
|
|
return chat_document
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error creating ChatDocument from ActionDocument: {e}", exc_info=True)
|
|
raise
|
|
|
|
|
|
async def _bridge_chatbot_events(
|
|
event_stream,
|
|
event_manager,
|
|
workflow_id: str,
|
|
interface_db_chat
|
|
):
|
|
"""
|
|
Bridge legacy chatbot events to current event manager format.
|
|
|
|
Args:
|
|
event_stream: Async iterator from chatbot.stream_events()
|
|
event_manager: Event manager instance
|
|
workflow_id: Workflow ID
|
|
interface_db_chat: Database interface for storing messages
|
|
"""
|
|
try:
|
|
final_message_stored = False
|
|
|
|
async for event in event_stream:
|
|
event_type = event.get("type")
|
|
|
|
# Handle status updates - emit immediately for real-time UI feedback
|
|
# Note: Status updates are transient UI feedback, no need to persist them
|
|
if event_type == "status":
|
|
label = event.get("label", "")
|
|
if label:
|
|
# Emit the status event directly for real-time UI feedback
|
|
await event_manager.emit_event(
|
|
context_id=workflow_id,
|
|
event_type="chatdata",
|
|
data={
|
|
"type": "status",
|
|
"label": label.strip()
|
|
},
|
|
event_category="chat",
|
|
message="Status update",
|
|
step="status"
|
|
)
|
|
continue
|
|
|
|
# Handle final response
|
|
if event_type == "final":
|
|
response_data = event.get("response", {})
|
|
chat_history = response_data.get("chat_history", [])
|
|
|
|
# The final message should already be stored by the memory/checkpointer
|
|
# We just need to emit the event, not store it again
|
|
# Check if the message was already stored by checking the workflow
|
|
workflow = interface_db_chat.getWorkflow(workflow_id)
|
|
if workflow and workflow.messages:
|
|
# Find the last assistant message in the workflow (already stored by memory)
|
|
last_message = workflow.messages[-1]
|
|
if last_message.role == "assistant":
|
|
final_message_stored = True
|
|
|
|
# Emit message event for the already-stored message
|
|
message_timestamp = parseTimestamp(last_message.publishedAt, default=getUtcTimestamp())
|
|
await event_manager.emit_event(
|
|
context_id=workflow_id,
|
|
event_type="chatdata",
|
|
data={
|
|
"type": "message",
|
|
"createdAt": message_timestamp,
|
|
"item": last_message.dict()
|
|
},
|
|
event_category="chat"
|
|
)
|
|
else:
|
|
# If no assistant message found, try to store from chat_history
|
|
assistant_message = None
|
|
for msg in reversed(chat_history):
|
|
if msg.get("role") == "assistant" and msg.get("content"):
|
|
assistant_message = msg
|
|
break
|
|
|
|
if assistant_message:
|
|
message_data = {
|
|
"id": f"msg_{workflow_id}_{getUtcTimestamp()}",
|
|
"workflowId": workflow_id,
|
|
"message": assistant_message.get("content", ""),
|
|
"role": "assistant",
|
|
"status": "last",
|
|
"sequenceNr": len(workflow.messages) + 1 if workflow.messages else 1,
|
|
"publishedAt": getUtcTimestamp(),
|
|
"roundNumber": workflow.currentRound if workflow else 1,
|
|
"taskNumber": 0,
|
|
"actionNumber": 0,
|
|
"success": True
|
|
}
|
|
|
|
try:
|
|
assistant_msg = interface_db_chat.createMessage(message_data)
|
|
final_message_stored = True
|
|
|
|
# Emit message event
|
|
message_timestamp = parseTimestamp(assistant_msg.publishedAt, default=getUtcTimestamp())
|
|
await event_manager.emit_event(
|
|
context_id=workflow_id,
|
|
event_type="chatdata",
|
|
data={
|
|
"type": "message",
|
|
"createdAt": message_timestamp,
|
|
"item": assistant_msg.dict()
|
|
},
|
|
event_category="chat"
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Error storing assistant message: {e}", exc_info=True)
|
|
|
|
# Emit completion event
|
|
await event_manager.emit_event(
|
|
context_id=workflow_id,
|
|
event_type="complete",
|
|
data={"workflowId": workflow_id},
|
|
event_category="workflow",
|
|
message="Chatbot-Verarbeitung abgeschlossen",
|
|
step="complete"
|
|
)
|
|
|
|
# Update workflow status
|
|
try:
|
|
interface_db_chat.updateWorkflow(workflow_id, {
|
|
"status": "completed",
|
|
"lastActivity": getUtcTimestamp()
|
|
})
|
|
except Exception as e:
|
|
logger.error(f"Error updating workflow status: {e}", exc_info=True)
|
|
|
|
return
|
|
|
|
# Handle errors
|
|
if event_type == "error":
|
|
error_msg = event.get("message", "Unknown error")
|
|
|
|
await event_manager.emit_event(
|
|
context_id=workflow_id,
|
|
event_type="error",
|
|
data={"error": error_msg},
|
|
event_category="workflow",
|
|
message=f"Fehler beim Verarbeiten: {error_msg}",
|
|
step="error"
|
|
)
|
|
|
|
# Update workflow status
|
|
try:
|
|
interface_db_chat.updateWorkflow(workflow_id, {
|
|
"status": "error",
|
|
"lastActivity": getUtcTimestamp()
|
|
})
|
|
except Exception as e:
|
|
logger.error(f"Error updating workflow status: {e}", exc_info=True)
|
|
|
|
return
|
|
|
|
# If stream ended without final message, store error message
|
|
if not final_message_stored:
|
|
logger.warning(f"Stream ended for workflow {workflow_id} without a final message")
|
|
try:
|
|
workflow = interface_db_chat.getWorkflow(workflow_id)
|
|
if workflow:
|
|
error_message_data = {
|
|
"id": f"msg_{workflow_id}_{getUtcTimestamp()}",
|
|
"workflowId": workflow_id,
|
|
"message": "Entschuldigung, ich konnte keine vollständige Antwort generieren. Bitte versuchen Sie es erneut.",
|
|
"role": "assistant",
|
|
"status": "last",
|
|
"sequenceNr": len(workflow.messages) + 1 if workflow.messages else 1,
|
|
"publishedAt": getUtcTimestamp(),
|
|
"roundNumber": workflow.currentRound if workflow else 1,
|
|
"taskNumber": 0,
|
|
"actionNumber": 0,
|
|
"success": False
|
|
}
|
|
|
|
error_msg = interface_db_chat.createMessage(error_message_data)
|
|
|
|
# Emit message event
|
|
message_timestamp = parseTimestamp(error_msg.publishedAt, default=getUtcTimestamp())
|
|
await event_manager.emit_event(
|
|
context_id=workflow_id,
|
|
event_type="chatdata",
|
|
data={
|
|
"type": "message",
|
|
"createdAt": message_timestamp,
|
|
"item": error_msg.dict()
|
|
},
|
|
event_category="chat"
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Error storing error message: {e}", exc_info=True)
|
|
|
|
# Emit completion event
|
|
await event_manager.emit_event(
|
|
context_id=workflow_id,
|
|
event_type="complete",
|
|
data={"workflowId": workflow_id},
|
|
event_category="workflow",
|
|
message="Chatbot-Verarbeitung abgeschlossen",
|
|
step="complete"
|
|
)
|
|
|
|
# Update workflow status
|
|
try:
|
|
interface_db_chat.updateWorkflow(workflow_id, {
|
|
"status": "completed",
|
|
"lastActivity": getUtcTimestamp()
|
|
})
|
|
except Exception as e:
|
|
logger.error(f"Error updating workflow status: {e}", exc_info=True)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in bridge_chatbot_events: {e}", exc_info=True)
|
|
|
|
# Emit error event
|
|
await event_manager.emit_event(
|
|
context_id=workflow_id,
|
|
event_type="error",
|
|
data={"error": str(e)},
|
|
event_category="workflow",
|
|
message=f"Fehler beim Verarbeiten: {str(e)}",
|
|
step="error"
|
|
)
|
|
|
|
|
|
async def _load_chatbot_config(featureInstanceId: Optional[str]) -> ChatbotConfig:
|
|
"""
|
|
Load chatbot configuration from FeatureInstance (database).
|
|
|
|
Args:
|
|
featureInstanceId: Feature instance ID to load config from
|
|
|
|
Returns:
|
|
ChatbotConfig instance
|
|
|
|
Raises:
|
|
ValueError: If no featureInstanceId provided or instance not found
|
|
"""
|
|
if not featureInstanceId:
|
|
raise ValueError("featureInstanceId is required to load chatbot config")
|
|
|
|
try:
|
|
# Import here to avoid circular imports
|
|
from modules.interfaces.interfaceDbApp import getRootInterface
|
|
from modules.interfaces.interfaceFeatures import getFeatureInterface
|
|
|
|
# Get feature instance from database
|
|
rootInterface = getRootInterface()
|
|
featureInterface = getFeatureInterface(rootInterface.db)
|
|
instance = featureInterface.getFeatureInstance(featureInstanceId)
|
|
|
|
if not instance:
|
|
raise ValueError(f"FeatureInstance {featureInstanceId} not found")
|
|
|
|
logger.info(f"Loading chatbot config from FeatureInstance {featureInstanceId}")
|
|
return load_chatbot_config_from_instance(instance)
|
|
except ValueError:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error loading config from FeatureInstance {featureInstanceId}: {e}")
|
|
raise
|
|
|
|
|
|
async def _processChatbotMessageLangGraph(
|
|
services,
|
|
currentUser: User,
|
|
workflowId: str,
|
|
userInput: UserInputRequest,
|
|
userMessageId: str,
|
|
featureInstanceId: Optional[str] = None
|
|
):
|
|
"""
|
|
Process chatbot message using LangGraph.
|
|
Uses LangGraph workflow with AI center models and tools.
|
|
|
|
Args:
|
|
services: Service container
|
|
currentUser: Current user
|
|
workflowId: Workflow ID
|
|
userInput: User input request
|
|
userMessageId: User message ID
|
|
featureInstanceId: Optional feature instance ID for loading instance-specific config
|
|
"""
|
|
event_manager = get_event_manager()
|
|
|
|
try:
|
|
interfaceDbChat = services.interfaceDbChat
|
|
|
|
# Reload workflow to get current messages
|
|
workflow = interfaceDbChat.getWorkflow(workflowId)
|
|
if not workflow:
|
|
logger.error(f"Workflow {workflowId} not found during processing")
|
|
await event_manager.emit_event(
|
|
context_id=workflowId,
|
|
event_type="error",
|
|
data={"error": f"Workflow {workflowId} nicht gefunden"},
|
|
event_category="workflow",
|
|
message=f"Workflow {workflowId} nicht gefunden",
|
|
step="error"
|
|
)
|
|
return
|
|
|
|
# Check if workflow was stopped before starting
|
|
if await _check_workflow_stopped(interfaceDbChat, workflowId):
|
|
logger.info(f"Workflow {workflowId} was stopped, aborting processing")
|
|
return
|
|
|
|
# Load configuration from FeatureInstance (database) or fall back to file
|
|
config = await _load_chatbot_config(featureInstanceId)
|
|
|
|
# Replace {{DATE}} placeholder in system prompt
|
|
from datetime import datetime
|
|
system_prompt = config.systemPrompt.replace(
|
|
"{{DATE}}",
|
|
datetime.now().strftime("%d.%m.%Y")
|
|
)
|
|
|
|
# Create AI center model
|
|
operation_type = OperationTypeEnum[config.model.operationType]
|
|
processing_mode = ProcessingModeEnum[config.model.processingMode]
|
|
|
|
model = AICenterChatModel(
|
|
user=currentUser,
|
|
operation_type=operation_type,
|
|
processing_mode=processing_mode
|
|
)
|
|
|
|
# Create memory/checkpointer
|
|
memory = DatabaseCheckpointer(user=currentUser, workflow_id=workflowId)
|
|
|
|
# Create chatbot instance with config for dynamic tool configuration
|
|
chatbot = await Chatbot.create(
|
|
model=model,
|
|
memory=memory,
|
|
system_prompt=system_prompt,
|
|
workflow_id=workflowId,
|
|
config=config
|
|
)
|
|
|
|
# Stream events using chatbot
|
|
event_stream = chatbot.stream_events(
|
|
message=userInput.prompt,
|
|
chat_id=workflowId
|
|
)
|
|
|
|
# Bridge chatbot events to event manager
|
|
await _bridge_chatbot_events(
|
|
event_stream=event_stream,
|
|
event_manager=event_manager,
|
|
workflow_id=workflowId,
|
|
interface_db_chat=interfaceDbChat
|
|
)
|
|
|
|
# Schedule cleanup
|
|
await event_manager.cleanup(workflowId, delay=300.0) # 5 minutes delay
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing chatbot message with LangGraph: {str(e)}", exc_info=True)
|
|
|
|
# Check if workflow was stopped - if so, don't store error message
|
|
if await _check_workflow_stopped(interfaceDbChat, workflowId):
|
|
logger.info(f"Workflow {workflowId} was stopped, not storing error message")
|
|
return
|
|
|
|
# Store error message
|
|
try:
|
|
workflow = interfaceDbChat.getWorkflow(workflowId)
|
|
|
|
if workflow and workflow.status == "stopped":
|
|
logger.info(f"Workflow {workflowId} was stopped, not storing error message")
|
|
return
|
|
|
|
errorMessageData = {
|
|
"id": f"msg_{uuid.uuid4()}",
|
|
"workflowId": workflowId,
|
|
"parentMessageId": userMessageId,
|
|
"message": f"Sorry, I encountered an error: {str(e)}",
|
|
"role": "assistant",
|
|
"status": "last",
|
|
"sequenceNr": len(workflow.messages) + 1 if workflow else 1,
|
|
"publishedAt": getUtcTimestamp(),
|
|
"success": False,
|
|
"roundNumber": workflow.currentRound if workflow else 1,
|
|
"taskNumber": 0,
|
|
"actionNumber": 0
|
|
}
|
|
errorMessage = interfaceDbChat.createMessage(errorMessageData)
|
|
|
|
# Emit message event
|
|
message_timestamp = parseTimestamp(errorMessage.publishedAt, default=getUtcTimestamp())
|
|
await event_manager.emit_event(
|
|
context_id=workflowId,
|
|
event_type="chatdata",
|
|
data={
|
|
"type": "message",
|
|
"createdAt": message_timestamp,
|
|
"item": errorMessage.dict()
|
|
},
|
|
event_category="chat"
|
|
)
|
|
|
|
# Update workflow status
|
|
if not await _check_workflow_stopped(interfaceDbChat, workflowId):
|
|
interfaceDbChat.updateWorkflow(workflowId, {
|
|
"status": "error",
|
|
"lastActivity": getUtcTimestamp()
|
|
})
|
|
|
|
# Schedule cleanup
|
|
await event_manager.cleanup(workflowId)
|
|
except Exception as storeError:
|
|
logger.error(f"Error storing error message: {storeError}")
|
|
|
|
|
|
async def _processChatbotMessage(
|
|
services,
|
|
workflowId: str,
|
|
userInput: UserInputRequest,
|
|
userMessageId: str
|
|
):
|
|
"""
|
|
DEPRECATED: Old chatbot processing implementation.
|
|
Kept for backward compatibility but redirects to LangGraph implementation.
|
|
"""
|
|
logger.warning("_processChatbotMessage is deprecated, using LangGraph implementation")
|
|
# Note: currentUser should be passed, but this function signature doesn't have it
|
|
# This is a deprecated function, so we'll need to get user from workflow or services
|
|
# For now, raise an error to indicate this needs to be fixed
|
|
raise NotImplementedError("_processChatbotMessage is deprecated and requires currentUser parameter")
|