1481 lines
62 KiB
Python
1481 lines
62 KiB
Python
# Copyright (c) 2025 Patrick Motsch
|
|
# All rights reserved.
|
|
"""
|
|
Simple chatbot feature - basic implementation.
|
|
User input is processed by AI to create list of needed queries.
|
|
Those queries get streamed back.
|
|
"""
|
|
|
|
import logging
|
|
import json
|
|
import uuid
|
|
import asyncio
|
|
import re
|
|
from typing import Optional, Dict, Any, List
|
|
|
|
from modules.datamodels.datamodelChat import UserInputRequest
|
|
from modules.features.chatbot.interfaceFeatureChatbot import ChatbotConversation, ChatbotDocument
|
|
from modules.datamodels.datamodelUam import User
|
|
from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationTypeEnum, ProcessingModeEnum
|
|
from modules.datamodels.datamodelDocref import DocumentReferenceList, DocumentItemReference
|
|
from modules.shared.timeUtils import getUtcTimestamp, parseTimestamp
|
|
from modules.services import getInterface as getServices
|
|
from modules.features.chatbot.streaming.events import get_event_manager
|
|
from modules.features.chatbot.chatbot import Chatbot
|
|
from modules.features.chatbot.bridges.ai import AICenterChatModel, clear_workflow_allowed_providers
|
|
from modules.features.chatbot.bridges.memory import DatabaseCheckpointer
|
|
from modules.features.chatbot.config import (
|
|
load_chatbot_config_from_instance,
|
|
ChatbotConfig
|
|
)
|
|
from modules.datamodels.datamodelAi import OperationTypeEnum, ProcessingModeEnum
|
|
from modules.workflows.methods.methodAi.methodAi import MethodAi
|
|
from modules.connectors.connectorPreprocessor import PreprocessorConnector
|
|
from modules.features.chatbot.chatbotConstants import generate_conversation_name, generate_name_from_prompt
|
|
import base64
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def _extractJsonFromResponse(content: str) -> Optional[dict]:
|
|
"""Extract JSON from AI response, handling markdown code blocks."""
|
|
# Try direct JSON parse first
|
|
try:
|
|
return json.loads(content.strip())
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
# Try to extract JSON from markdown code blocks
|
|
json_match = re.search(r'```(?:json)?\s*(\{.*?\})\s*```', content, re.DOTALL)
|
|
if json_match:
|
|
try:
|
|
return json.loads(json_match.group(1))
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
# Try to find JSON object in the text
|
|
json_match = re.search(r'\{.*\}', content, re.DOTALL)
|
|
if json_match:
|
|
try:
|
|
return json.loads(json_match.group(0))
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
return None
|
|
|
|
|
|
async def chatProcess(
|
|
currentUser: User,
|
|
mandateId: Optional[str],
|
|
userInput: UserInputRequest,
|
|
workflowId: Optional[str] = None,
|
|
featureInstanceId: Optional[str] = None
|
|
) -> ChatbotConversation:
|
|
"""
|
|
Simple chatbot processing - analyze user input and generate queries.
|
|
|
|
Flow:
|
|
1. Create or load workflow
|
|
2. Store user message
|
|
3. AI analyzes user input to create list of needed queries
|
|
4. Stream queries back
|
|
|
|
Args:
|
|
currentUser: Current user
|
|
mandateId: Mandate ID for the workflow
|
|
userInput: User input request
|
|
workflowId: Optional workflow ID to continue existing conversation
|
|
featureInstanceId: Feature instance ID for loading instance-specific config
|
|
|
|
Returns:
|
|
ChatbotConversation instance
|
|
"""
|
|
try:
|
|
# Get services with mandate and feature instance context
|
|
services = getServices(currentUser, mandateId=mandateId, featureInstanceId=featureInstanceId)
|
|
services.featureCode = 'chatbot'
|
|
|
|
# Load instance config and apply allowedProviders for AI calls (conversation name + main chat)
|
|
chatbot_config = await _load_chatbot_config(featureInstanceId)
|
|
if chatbot_config.model.allowedProviders:
|
|
services.allowedProviders = chatbot_config.model.allowedProviders
|
|
logger.info(f"Chatbot instance {featureInstanceId}: restricting to providers {chatbot_config.model.allowedProviders}")
|
|
|
|
from modules.features.chatbot.interfaceFeatureChatbot import getInterface as getChatbotInterface
|
|
interfaceDbChat = getChatbotInterface(currentUser, mandateId=mandateId, featureInstanceId=featureInstanceId)
|
|
|
|
# Get event manager and create queue if needed
|
|
event_manager = get_event_manager()
|
|
|
|
# Create or load workflow
|
|
if workflowId:
|
|
workflow = interfaceDbChat.getWorkflow(workflowId)
|
|
if not workflow:
|
|
raise ValueError(f"Workflow {workflowId} not found")
|
|
|
|
# Resume workflow: increment round number
|
|
new_round = workflow.currentRound + 1
|
|
interfaceDbChat.updateWorkflow(workflowId, {
|
|
"status": "running",
|
|
"currentRound": new_round,
|
|
"lastActivity": getUtcTimestamp()
|
|
})
|
|
workflow = interfaceDbChat.getWorkflow(workflowId)
|
|
logger.info(f"Resumed workflow {workflowId}, round incremented to {new_round}")
|
|
|
|
# Create event queue if it doesn't exist (for streaming)
|
|
if not event_manager.has_queue(workflowId):
|
|
event_manager.create_queue(workflowId)
|
|
else:
|
|
# Use placeholder name immediately - don't block on AI call
|
|
prompt_stripped = (userInput.prompt or "").strip()
|
|
trivial_prompts = {"test", "hi", "hallo", "hello", "hey"}
|
|
is_short = len(prompt_stripped) < 30 or prompt_stripped.lower() in trivial_prompts
|
|
|
|
if is_short:
|
|
conversation_name = generate_name_from_prompt(userInput.prompt)
|
|
run_name_task = False
|
|
else:
|
|
conversation_name = "Neue Unterhaltung"
|
|
run_name_task = True
|
|
|
|
# Create new conversation (per feature instance)
|
|
conversationData = {
|
|
"id": str(uuid.uuid4()),
|
|
"featureInstanceId": featureInstanceId,
|
|
"status": "running",
|
|
"name": conversation_name,
|
|
"currentRound": 1,
|
|
"workflowMode": "Chatbot",
|
|
"startedAt": getUtcTimestamp(),
|
|
"lastActivity": getUtcTimestamp()
|
|
}
|
|
workflow = interfaceDbChat.createConversation(conversationData)
|
|
logger.info(f"Created new chatbot workflow: {workflow.id} with name: {conversation_name}")
|
|
|
|
# Run AI name generation in background (for longer prompts only)
|
|
if run_name_task:
|
|
asyncio.create_task(_update_conversation_name_async(
|
|
services=services,
|
|
currentUser=currentUser,
|
|
mandateId=mandateId,
|
|
featureInstanceId=featureInstanceId,
|
|
workflowId=workflow.id,
|
|
prompt=userInput.prompt,
|
|
userLanguage=userInput.userLanguage,
|
|
interfaceDbChat=interfaceDbChat,
|
|
event_manager=event_manager
|
|
))
|
|
|
|
# Create event queue for new workflow (for streaming)
|
|
event_manager.create_queue(workflow.id)
|
|
|
|
# Reload workflow to get current message count
|
|
workflow = interfaceDbChat.getWorkflow(workflow.id)
|
|
|
|
# Process uploaded files and create ChatbotDocuments
|
|
user_documents = []
|
|
if userInput.listFileId and len(userInput.listFileId) > 0:
|
|
logger.info(f"Processing {len(userInput.listFileId)} uploaded file(s) for user message")
|
|
for fileId in userInput.listFileId:
|
|
try:
|
|
# Get file info from chat service
|
|
fileInfo = services.chat.getFileInfo(fileId)
|
|
if not fileInfo:
|
|
logger.warning(f"No file info found for file ID {fileId}")
|
|
continue
|
|
|
|
originalFileName = fileInfo.get("fileName", "unknown")
|
|
originalMimeType = fileInfo.get("mimeType", "application/octet-stream")
|
|
fileSizeToUse = fileInfo.get("size", 0)
|
|
|
|
# Create ChatbotDocument for the file
|
|
document = ChatbotDocument(
|
|
id=str(uuid.uuid4()),
|
|
messageId="", # Will be set when message is created
|
|
fileId=fileId,
|
|
fileName=originalFileName,
|
|
fileSize=fileSizeToUse,
|
|
mimeType=originalMimeType,
|
|
roundNumber=workflow.currentRound,
|
|
taskNumber=0,
|
|
actionNumber=0
|
|
)
|
|
user_documents.append(document)
|
|
logger.info(f"Created ChatbotDocument for file {fileId} -> {originalFileName}")
|
|
except Exception as e:
|
|
logger.error(f"Error processing file ID {fileId}: {e}", exc_info=True)
|
|
|
|
# Store user message
|
|
userMessageData: Dict[str, Any] = {
|
|
"id": f"msg_{uuid.uuid4()}",
|
|
"conversationId": workflow.id,
|
|
"message": userInput.prompt,
|
|
"role": "user",
|
|
"status": "first" if workflowId is None else "step",
|
|
"sequenceNr": len(workflow.messages) + 1,
|
|
"publishedAt": getUtcTimestamp(),
|
|
"roundNumber": workflow.currentRound,
|
|
"taskNumber": 0,
|
|
"actionNumber": 0
|
|
}
|
|
if user_documents:
|
|
userMessageData["documents"] = [d.model_dump() for d in user_documents]
|
|
userMessage = interfaceDbChat.createMessage(userMessageData)
|
|
logger.info(f"Stored user message: {userMessage.id} with {len(user_documents)} document(s)")
|
|
|
|
# Emit message event for streaming (exact chatData format)
|
|
message_timestamp = parseTimestamp(userMessage.publishedAt, default=getUtcTimestamp())
|
|
await event_manager.emit_event(
|
|
context_id=workflow.id,
|
|
event_type="chatdata",
|
|
data={
|
|
"type": "message",
|
|
"createdAt": message_timestamp,
|
|
"item": userMessage.model_dump()
|
|
},
|
|
event_category="chat"
|
|
)
|
|
|
|
# Update workflow status
|
|
interfaceDbChat.updateWorkflow(workflow.id, {
|
|
"status": "running",
|
|
"lastActivity": getUtcTimestamp()
|
|
})
|
|
|
|
# Pre-flight billing check before starting LangGraph (if mandateId present)
|
|
if mandateId:
|
|
_preflight_billing_check(services, mandateId, featureInstanceId)
|
|
|
|
# Process in background using LangGraph (async)
|
|
asyncio.create_task(_processChatbotMessageLangGraph(
|
|
services,
|
|
currentUser,
|
|
workflow.id,
|
|
userInput,
|
|
userMessage.id,
|
|
featureInstanceId=featureInstanceId,
|
|
config=chatbot_config
|
|
))
|
|
|
|
# Reload workflow to include new message
|
|
workflow = interfaceDbChat.getWorkflow(workflow.id)
|
|
return workflow
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in chatProcess: {str(e)}", exc_info=True)
|
|
raise
|
|
|
|
|
|
async def _update_conversation_name_async(
|
|
services,
|
|
currentUser: User,
|
|
mandateId: Optional[str],
|
|
featureInstanceId: Optional[str],
|
|
workflowId: str,
|
|
prompt: str,
|
|
userLanguage: str,
|
|
interfaceDbChat,
|
|
event_manager,
|
|
) -> None:
|
|
"""
|
|
Background task: generate conversation name via AI and update workflow.
|
|
Runs in parallel to LangGraph so it doesn't block the first response.
|
|
"""
|
|
try:
|
|
new_name = await generate_conversation_name(services, prompt, userLanguage)
|
|
if new_name:
|
|
interfaceDbChat.updateWorkflow(workflowId, {"name": new_name, "lastActivity": getUtcTimestamp()})
|
|
logger.info(f"Updated workflow {workflowId} name to: {new_name}")
|
|
# Emit stat event so frontend can refresh thread list/title
|
|
workflow = interfaceDbChat.getWorkflow(workflowId)
|
|
if workflow:
|
|
wf_dict = workflow.model_dump() if hasattr(workflow, "model_dump") else workflow.dict()
|
|
await event_manager.emit_event(
|
|
context_id=workflowId,
|
|
event_type="chatdata",
|
|
data={
|
|
"type": "stat",
|
|
"createdAt": getUtcTimestamp(),
|
|
"item": wf_dict
|
|
},
|
|
event_category="chat",
|
|
message="Workflow name updated",
|
|
step="workflow_update"
|
|
)
|
|
except Exception as e:
|
|
logger.warning(f"Background conversation name update failed for workflow {workflowId}: {e}")
|
|
|
|
|
|
async def _execute_queries_parallel(queries: List[Dict[str, Any]]) -> Dict[str, Any]:
|
|
"""
|
|
Execute multiple SQL queries in parallel with shared connector.
|
|
|
|
Args:
|
|
queries: List of query dictionaries, each containing:
|
|
- "query": SQL query string
|
|
- "purpose": Description of what the query retrieves
|
|
- "table": Primary table name
|
|
|
|
Returns:
|
|
Dictionary mapping query indices to results:
|
|
- "query_1", "query_2", etc.: Success result text
|
|
- "query_1_data", "query_2_data", etc.: Raw data arrays
|
|
- "query_1_error", "query_2_error", etc.: Error messages if query failed
|
|
"""
|
|
# Create single connector instance to reuse across all queries
|
|
connector = PreprocessorConnector()
|
|
try:
|
|
async def execute_single_query(idx: int, query_info: Dict[str, Any]):
|
|
"""Execute a single query using shared connector."""
|
|
try:
|
|
query_text = query_info.get("query", "")
|
|
result = await connector.executeQuery(query_text, return_json=True)
|
|
return idx, result, None
|
|
except Exception as e:
|
|
return idx, None, str(e)
|
|
|
|
# Execute all queries in parallel with shared connector
|
|
tasks = [execute_single_query(i, q) for i, q in enumerate(queries)]
|
|
results = await asyncio.gather(*tasks, return_exceptions=True)
|
|
finally:
|
|
# Close connector once after all queries complete
|
|
await connector.close()
|
|
|
|
# Process results into dictionary
|
|
query_results = {}
|
|
for result in results:
|
|
if isinstance(result, Exception):
|
|
# Handle exceptions from gather
|
|
logger.error(f"Exception in parallel query execution: {result}")
|
|
continue
|
|
|
|
idx, result_data, error = result
|
|
|
|
if error:
|
|
query_results[f"query_{idx+1}_error"] = error
|
|
logger.error(f"Query {idx+1} failed: {error}")
|
|
else:
|
|
if result_data and not result_data.get("text", "").startswith(("Error:", "Query failed:")):
|
|
query_results[f"query_{idx+1}"] = result_data.get("text", "")
|
|
query_results[f"query_{idx+1}_data"] = result_data.get("data", [])
|
|
row_count = len(result_data.get('data', []))
|
|
logger.info(f"Query {idx+1} executed successfully, returned {row_count} rows")
|
|
else:
|
|
error_text = result_data.get("text", "Query failed") if result_data else "Query failed: No response"
|
|
query_results[f"query_{idx+1}_error"] = error_text
|
|
logger.error(f"Query {idx+1} failed: {error_text}")
|
|
|
|
return query_results
|
|
|
|
|
|
async def _emit_log_and_event(
|
|
interfaceDbChat,
|
|
workflowId: str,
|
|
event_manager,
|
|
message: str,
|
|
log_type: str = "info",
|
|
status: str = "running",
|
|
round_number: Optional[int] = None
|
|
) -> None:
|
|
"""
|
|
Store log in database and emit event for streaming.
|
|
|
|
Args:
|
|
interfaceDbChat: Database interface
|
|
workflowId: Workflow ID
|
|
event_manager: Event manager for streaming
|
|
message: Log message
|
|
log_type: Log type (info, warning, error)
|
|
status: Status string
|
|
round_number: Optional round number (will be fetched from workflow if not provided)
|
|
"""
|
|
try:
|
|
# Get round number from workflow if not provided
|
|
if round_number is None:
|
|
workflow = interfaceDbChat.getWorkflow(workflowId)
|
|
if workflow:
|
|
round_number = workflow.currentRound
|
|
|
|
log_timestamp = getUtcTimestamp()
|
|
log_data = {
|
|
"id": f"log_{uuid.uuid4()}",
|
|
"workflowId": workflowId,
|
|
"message": message,
|
|
"type": log_type,
|
|
"timestamp": log_timestamp,
|
|
"status": status,
|
|
"roundNumber": round_number
|
|
}
|
|
# Store log in database
|
|
created_log = interfaceDbChat.createLog(log_data)
|
|
|
|
# Emit event directly for streaming (using correct signature)
|
|
if created_log and event_manager:
|
|
try:
|
|
# Convert to dict if it's a Pydantic model (ChatbotLog from createLog)
|
|
if hasattr(created_log, "model_dump"):
|
|
log_dict = created_log.model_dump()
|
|
elif hasattr(created_log, "dict"):
|
|
log_dict = created_log.dict()
|
|
else:
|
|
log_dict = log_data
|
|
|
|
await event_manager.emit_event(
|
|
context_id=workflowId,
|
|
event_type="chatdata",
|
|
data={
|
|
"type": "log",
|
|
"createdAt": log_timestamp,
|
|
"item": log_dict
|
|
},
|
|
event_category="chat",
|
|
message="New log",
|
|
step="log"
|
|
)
|
|
except Exception as emit_error:
|
|
logger.warning(f"Error emitting log event: {emit_error}")
|
|
except Exception as e:
|
|
logger.error(f"Error storing log: {e}", exc_info=True)
|
|
|
|
|
|
async def _check_workflow_stopped(interfaceDbChat, workflowId: str) -> bool:
|
|
"""
|
|
Check if workflow was stopped.
|
|
|
|
Args:
|
|
interfaceDbChat: Database interface
|
|
workflowId: Workflow ID
|
|
|
|
Returns:
|
|
True if workflow is stopped, False otherwise
|
|
"""
|
|
try:
|
|
workflow = interfaceDbChat.getWorkflow(workflowId)
|
|
return workflow and workflow.status == "stopped"
|
|
except Exception as e:
|
|
logger.warning(f"Error checking workflow status: {e}")
|
|
return False
|
|
|
|
|
|
def _buildWebResearchQuery(userPrompt: str, workflowMessages: List, queryResults: Optional[Dict[str, Any]] = None) -> str:
|
|
"""
|
|
Build enriched web research query by extracting product context from conversation history and current prompt.
|
|
|
|
Extracts product information from:
|
|
1. Current user prompt (article numbers, product mentions)
|
|
2. Database query results (if available)
|
|
3. Previous assistant messages (conversation history)
|
|
|
|
Args:
|
|
userPrompt: Current user prompt
|
|
workflowMessages: List of workflow messages (conversation history)
|
|
queryResults: Optional database query results to extract product info from
|
|
|
|
Returns:
|
|
Enriched search query string
|
|
"""
|
|
# Normalize user prompt for detection
|
|
prompt_lower = userPrompt.lower().strip()
|
|
|
|
# Patterns that indicate a search request
|
|
search_patterns = [
|
|
"ja", "yes", "oui", "si",
|
|
"such", "suche", "search", "recherche", "recherchier",
|
|
"internet", "web", "online",
|
|
"datenblatt", "datasheet", "fiche technique",
|
|
"mehr informationen", "more information", "plus d'information",
|
|
"weitere informationen", "further information", "additional information"
|
|
]
|
|
|
|
# Certification patterns that require web research
|
|
certification_patterns = [
|
|
"ul", "ce", "tüv", "vde", "iec", "en", "iso",
|
|
"zertifiziert", "certified", "certification", "zertifizierung",
|
|
"geprüft", "approved", "compliance"
|
|
]
|
|
|
|
# Check if current prompt contains search-related keywords
|
|
has_search_intent = any(pattern in prompt_lower for pattern in search_patterns)
|
|
|
|
# Check if prompt contains certification-related keywords
|
|
has_certification_intent = any(pattern in prompt_lower for pattern in certification_patterns)
|
|
|
|
# Extract product information - try multiple sources
|
|
article_number = None
|
|
article_description = None
|
|
supplier = None
|
|
|
|
# Pattern for article numbers like "6AV2 181-8XP00-0AX0" or "6AV2181-8XP00-0AX0"
|
|
article_patterns = [
|
|
r'\b[A-Z0-9]{2,}\s+[0-9]{3,}-[A-Z0-9-]+\b', # With space: "6AV2 181-8XP00-0AX0"
|
|
r'\b[A-Z0-9]{4,}[\s-][A-Z0-9-]{6,}\b', # General pattern
|
|
r'\b[A-Z]{2,}[0-9]+\s+[0-9]+-[A-Z0-9-]+\b', # Specific Siemens pattern
|
|
]
|
|
|
|
# 1. First, try to extract from current user prompt
|
|
for pattern in article_patterns:
|
|
matches = re.findall(pattern, userPrompt)
|
|
if matches:
|
|
article_number = matches[0]
|
|
logger.info(f"Extracted article number from user prompt: {article_number}")
|
|
break
|
|
|
|
# 2. Try to extract from database query results if available
|
|
# Always check queryResults to enrich with product description and supplier, even if article_number was already found
|
|
if queryResults:
|
|
# Look for article numbers in query result text (if not already found)
|
|
if not article_number:
|
|
for key in queryResults.keys():
|
|
if key.startswith("query_") and not key.endswith("_error") and not key.endswith("_data"):
|
|
result_text = queryResults.get(key, "")
|
|
if isinstance(result_text, str):
|
|
for pattern in article_patterns:
|
|
matches = re.findall(pattern, result_text)
|
|
if matches:
|
|
article_number = matches[0]
|
|
logger.info(f"Extracted article number from query results: {article_number}")
|
|
break
|
|
if article_number:
|
|
break
|
|
|
|
# Always check data arrays for product description and supplier (even if article_number already found)
|
|
for key in queryResults.keys():
|
|
if key.startswith("query_") and not key.endswith("_error") and not key.endswith("_data"):
|
|
data_key = f"{key}_data"
|
|
if data_key in queryResults:
|
|
data_array = queryResults[data_key]
|
|
if isinstance(data_array, list) and len(data_array) > 0:
|
|
# Look for article number in first row (if not already found)
|
|
first_row = data_array[0]
|
|
if isinstance(first_row, dict):
|
|
# Check common article number fields (if not already found)
|
|
if not article_number:
|
|
for field in ["Artikelnummer", "Artikelkürzel", "article_number", "articleNumber"]:
|
|
if field in first_row and first_row[field]:
|
|
article_number = str(first_row[field])
|
|
logger.info(f"Extracted article number from query data: {article_number}")
|
|
break
|
|
|
|
# Always check article description (can enrich even if article_number already found)
|
|
if not article_description:
|
|
for field in ["Artikelbezeichnung", "Bezeichnung", "article_description", "description"]:
|
|
if field in first_row and first_row[field]:
|
|
article_description = str(first_row[field])
|
|
logger.info(f"Extracted article description from query data: {article_description}")
|
|
break
|
|
|
|
# Always check supplier (can enrich even if article_number already found)
|
|
if not supplier:
|
|
for field in ["Lieferant", "Supplier", "supplier"]:
|
|
if field in first_row and first_row[field]:
|
|
supplier = str(first_row[field])
|
|
logger.info(f"Extracted supplier from query data: {supplier}")
|
|
break
|
|
|
|
# If we found all needed info, we can stop
|
|
if article_number and article_description and supplier:
|
|
break
|
|
|
|
# Check if current prompt is an explicit search request that should NOT use context
|
|
# If user explicitly asks to search for something, prioritize that over previous messages
|
|
explicit_search_patterns = [
|
|
r"recherchier\s+(?:im\s+internet\s+)?nach\s+(.+)",
|
|
r"suche\s+(?:im\s+internet\s+)?nach\s+(.+)",
|
|
r"search\s+(?:the\s+internet\s+)?for\s+(.+)",
|
|
r"find\s+(?:information\s+)?(?:about\s+)?(.+)",
|
|
r"recherche\s+(?:sur\s+internet\s+)?(.+)"
|
|
]
|
|
|
|
explicit_search_term = None
|
|
for pattern in explicit_search_patterns:
|
|
match = re.search(pattern, userPrompt, re.IGNORECASE)
|
|
if match:
|
|
explicit_search_term = match.group(1).strip()
|
|
logger.info(f"Found explicit search term in prompt: '{explicit_search_term}'")
|
|
break
|
|
|
|
# 3. Extract from previous assistant messages (conversation history)
|
|
# ONLY if there's no explicit search term (to avoid using old context for new searches)
|
|
if not explicit_search_term and (not article_number or not article_description):
|
|
for msg in reversed(workflowMessages[-10:]):
|
|
if msg.role == "assistant":
|
|
message_text = msg.message
|
|
|
|
# Extract article number if not found yet
|
|
if not article_number:
|
|
for pattern in article_patterns:
|
|
matches = re.findall(pattern, message_text)
|
|
if matches:
|
|
article_number = matches[0]
|
|
break
|
|
|
|
# Extract article description if not found yet
|
|
if not article_description:
|
|
description_patterns = [
|
|
r'Es handelt sich um\s+([^\.]+)',
|
|
r'It is a\s+([^\.]+)',
|
|
r'C\'est\s+([^\.]+)',
|
|
r'Bezeichnung:\s*([^\n]+)',
|
|
r'Description:\s*([^\n]+)',
|
|
r'Artikelbezeichnung:\s*([^\n]+)',
|
|
r'Artikelbezeichnung:\s*([^\n]+)'
|
|
]
|
|
for pattern in description_patterns:
|
|
match = re.search(pattern, message_text, re.IGNORECASE)
|
|
if match:
|
|
article_description = match.group(1).strip()
|
|
break
|
|
|
|
# Extract supplier if not found yet
|
|
if not supplier:
|
|
supplier_patterns = [
|
|
r'von\s+([A-Z][A-Za-z\s]+(?:AG|GmbH|Ltd|Inc|Corp)?)',
|
|
r'from\s+([A-Z][A-Za-z\s]+(?:AG|GmbH|Ltd|Inc|Corp)?)',
|
|
r'Lieferant:\s*([^\n]+)',
|
|
r'Supplier:\s*([^\n]+)'
|
|
]
|
|
for pattern in supplier_patterns:
|
|
match = re.search(pattern, message_text, re.IGNORECASE)
|
|
if match:
|
|
supplier = match.group(1).strip()
|
|
break
|
|
|
|
# Stop if we found everything
|
|
if article_number and article_description and supplier:
|
|
break
|
|
|
|
# Build enriched search query
|
|
query_parts = []
|
|
|
|
# If we have an explicit search term, use it as the primary query
|
|
if explicit_search_term:
|
|
query_parts.append(explicit_search_term)
|
|
logger.info(f"Using explicit search term as primary query: '{explicit_search_term}'")
|
|
# If we have search intent but no product info, try to use the user prompt intelligently
|
|
elif has_search_intent and not article_number and not article_description:
|
|
# Try to extract meaningful parts from the prompt
|
|
# Remove common search phrases and keep the product-related parts
|
|
cleaned_prompt = userPrompt
|
|
for phrase in ["recherchier", "recherche", "suche nach", "search for", "find", "informationen zu", "information about", "weitere informationen", "further information", "im internet", "the internet", "sur internet"]:
|
|
cleaned_prompt = re.sub(phrase, "", cleaned_prompt, flags=re.IGNORECASE)
|
|
cleaned_prompt = cleaned_prompt.strip()
|
|
|
|
# Use cleaned prompt if it has meaningful content
|
|
if cleaned_prompt and len(cleaned_prompt) > 2:
|
|
query_parts.append(cleaned_prompt)
|
|
|
|
# Add article description if found (but NOT if we have an explicit search term)
|
|
if article_description and not explicit_search_term:
|
|
query_parts.append(article_description)
|
|
|
|
# Add article number if found (but NOT if we have an explicit search term)
|
|
if article_number and not explicit_search_term:
|
|
query_parts.append(article_number)
|
|
|
|
# Add supplier if found (but NOT if we have an explicit search term)
|
|
if supplier and not explicit_search_term:
|
|
query_parts.append(supplier)
|
|
|
|
# Extract certification information from prompt if present
|
|
certification_terms = []
|
|
if has_certification_intent:
|
|
# Extract specific certification mentions
|
|
cert_keywords = {
|
|
"ul": "UL certification",
|
|
"ce": "CE certification",
|
|
"tüv": "TÜV certification",
|
|
"vde": "VDE certification",
|
|
"iec": "IEC certification",
|
|
"iso": "ISO certification"
|
|
}
|
|
for cert_key, cert_term in cert_keywords.items():
|
|
if cert_key in prompt_lower:
|
|
certification_terms.append(cert_term)
|
|
|
|
# If no specific certification found but certification intent detected, add generic term
|
|
if not certification_terms:
|
|
certification_terms.append("certification")
|
|
|
|
# Add certification terms to query if found
|
|
if certification_terms:
|
|
query_parts.extend(certification_terms)
|
|
|
|
# Add "Datenblatt" or "datasheet" if user requested it or if we have product info
|
|
# But NOT if we have an explicit search term (user wants to search for something specific)
|
|
if not explicit_search_term:
|
|
if "datenblatt" in prompt_lower or "datasheet" in prompt_lower or "fiche technique" in prompt_lower:
|
|
query_parts.append("Datenblatt")
|
|
elif query_parts and (article_number or article_description):
|
|
# If we have product info but no explicit request for datasheet, add it anyway
|
|
query_parts.append("Datenblatt")
|
|
|
|
# If we found product information or built a meaningful query, use it
|
|
if query_parts:
|
|
enriched_query = " ".join(query_parts)
|
|
logger.info(f"Built enriched search query: '{enriched_query}' from context (original: '{userPrompt}')")
|
|
return enriched_query
|
|
else:
|
|
# Fall back to original prompt, but try to clean it up
|
|
logger.info(f"No product context found, using original prompt: '{userPrompt}'")
|
|
return userPrompt
|
|
|
|
|
|
async def _convert_file_ids_to_document_references(
|
|
services,
|
|
file_ids: List[str]
|
|
) -> DocumentReferenceList:
|
|
"""
|
|
Convert file IDs to DocumentReferenceList for use with ai.process.
|
|
|
|
Args:
|
|
services: Services instance
|
|
file_ids: List of file IDs to convert
|
|
|
|
Returns:
|
|
DocumentReferenceList with docItem references
|
|
"""
|
|
references = []
|
|
|
|
# Get workflow to search for ChatbotDocuments
|
|
workflow = services.workflow
|
|
if not workflow:
|
|
logger.error("Cannot convert file IDs to document references: workflow not set in services")
|
|
return DocumentReferenceList(references=[])
|
|
|
|
for file_id in file_ids:
|
|
try:
|
|
# Get file info to verify it exists
|
|
file_info = services.chat.getFileInfo(file_id)
|
|
if not file_info:
|
|
logger.warning(f"File {file_id} not found, skipping")
|
|
continue
|
|
|
|
# Find ChatbotDocument that has this fileId
|
|
document_id = None
|
|
if workflow.messages:
|
|
for message in workflow.messages:
|
|
if hasattr(message, 'documents') and message.documents:
|
|
for doc in message.documents:
|
|
if getattr(doc, 'fileId', None) == file_id:
|
|
document_id = getattr(doc, 'id', None)
|
|
break
|
|
if document_id:
|
|
break
|
|
|
|
# Search chatbot database if not found in messages
|
|
if not document_id:
|
|
try:
|
|
from modules.interfaces.interfaceRbac import getRecordsetWithRBAC
|
|
from modules.features.chatbot.interfaceFeatureChatbot import getInterface as getChatbotInterface
|
|
chatbotInterface = getChatbotInterface(services.user, mandateId=services.mandateId, featureInstanceId=services.featureInstanceId)
|
|
documents = getRecordsetWithRBAC(
|
|
chatbotInterface.db,
|
|
ChatbotDocument,
|
|
services.user,
|
|
recordFilter={"fileId": file_id}
|
|
)
|
|
if documents:
|
|
workflow_message_ids = {msg.id for msg in workflow.messages} if workflow.messages else set()
|
|
for doc in documents:
|
|
if doc.get("messageId") in workflow_message_ids:
|
|
document_id = doc.get("id")
|
|
break
|
|
except Exception:
|
|
pass # Fallback to fileId
|
|
|
|
# Use ChatbotDocument ID if found, otherwise use fileId as fallback
|
|
ref = DocumentItemReference(documentId=document_id if document_id else file_id)
|
|
references.append(ref)
|
|
except Exception as e:
|
|
logger.error(f"Error converting fileId {file_id}: {e}", exc_info=True)
|
|
|
|
logger.info(f"Converted {len(references)} file IDs to document references")
|
|
return DocumentReferenceList(references=references)
|
|
|
|
|
|
def _format_query_results_as_lookup(query_data: Dict[str, List[Dict]]) -> str:
|
|
"""
|
|
Format database query results as JSON lookup table for Excel matching.
|
|
Converts query result data into structured JSON format: {Artikelnummer: {columns...}}
|
|
|
|
Args:
|
|
query_data: Dict with query_key -> list of row dicts (from connector with return_json=True)
|
|
|
|
Returns:
|
|
JSON string formatted as lookup table
|
|
"""
|
|
lookup_table = {}
|
|
|
|
for query_key, rows in query_data.items():
|
|
if query_key == "error" or not rows:
|
|
logger.warning(f"Skipping query key '{query_key}' - no rows or error")
|
|
continue
|
|
|
|
logger.info(f"Processing {len(rows)} rows from query '{query_key}'")
|
|
|
|
for row in rows:
|
|
if not isinstance(row, dict):
|
|
logger.warning(f"Skipping non-dict row: {type(row)}")
|
|
continue
|
|
|
|
# Find Artikelnummer field (case-insensitive)
|
|
artikelnummer = None
|
|
for key in row.keys():
|
|
if key.lower() in ['artikelnummer', 'artikel_nummer', 'art_nr', 'part_number']:
|
|
artikelnummer = str(row[key])
|
|
break
|
|
|
|
if artikelnummer:
|
|
lookup_table[artikelnummer] = row
|
|
else:
|
|
logger.warning(f"No Artikelnummer found in row with keys: {list(row.keys())}")
|
|
|
|
logger.info(f"Generated lookup table with {len(lookup_table)} entries")
|
|
if lookup_table:
|
|
sample_keys = list(lookup_table.keys())[:3]
|
|
logger.info(f"Sample Artikelnummern: {sample_keys}")
|
|
if sample_keys:
|
|
sample_entry = lookup_table[sample_keys[0]]
|
|
logger.info(f"Sample entry keys: {list(sample_entry.keys())}")
|
|
|
|
return json.dumps(lookup_table, ensure_ascii=False, indent=2)
|
|
|
|
|
|
async def _create_chat_document_from_action_document(
|
|
services,
|
|
action_document,
|
|
message_id: str,
|
|
workflow_id: str,
|
|
round_number: int
|
|
) -> ChatbotDocument:
|
|
"""
|
|
Create a ChatbotDocument from an ActionDocument by storing the file data.
|
|
|
|
Args:
|
|
services: Services instance
|
|
action_document: ActionDocument from ai.process result
|
|
message_id: ID of the message to attach to
|
|
workflow_id: Workflow ID
|
|
round_number: Round number
|
|
|
|
Returns:
|
|
ChatbotDocument instance
|
|
"""
|
|
try:
|
|
# Get file data (could be bytes or string)
|
|
document_data = action_document.documentData
|
|
|
|
# Convert to bytes if needed
|
|
if isinstance(document_data, str):
|
|
# Check if it's base64 encoded
|
|
try:
|
|
# Try to decode as base64 first
|
|
file_bytes = base64.b64decode(document_data)
|
|
except Exception:
|
|
# Not base64, encode as UTF-8
|
|
file_bytes = document_data.encode('utf-8')
|
|
elif isinstance(document_data, bytes):
|
|
file_bytes = document_data
|
|
else:
|
|
# Try to convert to bytes
|
|
try:
|
|
file_bytes = bytes(document_data)
|
|
except Exception:
|
|
# Last resort: convert to string then encode
|
|
file_bytes = str(document_data).encode('utf-8')
|
|
|
|
# Get MIME type (default to Excel)
|
|
mime_type = action_document.mimeType or "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
|
|
|
|
# Get file name
|
|
file_name = action_document.documentName or "data_export.xlsx"
|
|
# Ensure it has .xlsx extension
|
|
if not file_name.lower().endswith('.xlsx'):
|
|
# Remove any existing extension and add .xlsx
|
|
file_name = file_name.rsplit('.', 1)[0] + '.xlsx'
|
|
|
|
# Store file using component interface
|
|
file_item = services.interfaceDbComponent.createFile(
|
|
name=file_name,
|
|
mimeType=mime_type,
|
|
content=file_bytes
|
|
)
|
|
|
|
# Store file data
|
|
success = services.interfaceDbComponent.createFileData(file_item.id, file_bytes)
|
|
if not success:
|
|
logger.warning(f"Failed to store file data for {file_item.id}, but continuing...")
|
|
|
|
# Create ChatbotDocument
|
|
chat_document = ChatbotDocument(
|
|
id=str(uuid.uuid4()),
|
|
messageId=message_id,
|
|
fileId=file_item.id,
|
|
fileName=file_name,
|
|
fileSize=len(file_bytes),
|
|
mimeType=mime_type,
|
|
roundNumber=round_number,
|
|
taskNumber=0,
|
|
actionNumber=0
|
|
)
|
|
|
|
logger.info(f"Created ChatbotDocument {chat_document.id} from ActionDocument {file_name} (size: {len(file_bytes)} bytes)")
|
|
return chat_document
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error creating ChatbotDocument from ActionDocument: {e}", exc_info=True)
|
|
raise
|
|
|
|
|
|
async def _bridge_chatbot_events(
|
|
event_stream,
|
|
event_manager,
|
|
workflow_id: str,
|
|
interface_db_chat
|
|
):
|
|
"""
|
|
Bridge legacy chatbot events to current event manager format.
|
|
|
|
Args:
|
|
event_stream: Async iterator from chatbot.stream_events()
|
|
event_manager: Event manager instance
|
|
workflow_id: Workflow ID
|
|
interface_db_chat: Database interface for storing messages
|
|
"""
|
|
try:
|
|
final_message_stored = False
|
|
|
|
async for event in event_stream:
|
|
event_type = event.get("type")
|
|
|
|
# Handle status updates - emit immediately for real-time UI feedback
|
|
# Note: Status updates are transient UI feedback, no need to persist them
|
|
if event_type == "status":
|
|
label = event.get("label", "")
|
|
if label:
|
|
# Emit the status event directly for real-time UI feedback
|
|
await event_manager.emit_event(
|
|
context_id=workflow_id,
|
|
event_type="chatdata",
|
|
data={
|
|
"type": "status",
|
|
"label": label.strip()
|
|
},
|
|
event_category="chat",
|
|
message="Status update",
|
|
step="status"
|
|
)
|
|
continue
|
|
|
|
# Handle final response
|
|
if event_type == "final":
|
|
response_data = event.get("response", {})
|
|
chat_history = response_data.get("chat_history", [])
|
|
|
|
# The final message should already be stored by the memory/checkpointer
|
|
# We just need to emit the event, not store it again
|
|
# Check if the message was already stored by checking the workflow
|
|
workflow = interface_db_chat.getWorkflow(workflow_id)
|
|
if workflow and workflow.messages:
|
|
# Find the last assistant message in the workflow (already stored by memory)
|
|
last_message = workflow.messages[-1]
|
|
if last_message.role == "assistant":
|
|
final_message_stored = True
|
|
|
|
# Emit message event for the already-stored message
|
|
message_timestamp = parseTimestamp(last_message.publishedAt, default=getUtcTimestamp())
|
|
await event_manager.emit_event(
|
|
context_id=workflow_id,
|
|
event_type="chatdata",
|
|
data={
|
|
"type": "message",
|
|
"createdAt": message_timestamp,
|
|
"item": last_message.dict()
|
|
},
|
|
event_category="chat"
|
|
)
|
|
else:
|
|
# If no assistant message found, try to store from chat_history
|
|
assistant_message = None
|
|
for msg in reversed(chat_history):
|
|
if msg.get("role") == "assistant" and msg.get("content"):
|
|
assistant_message = msg
|
|
break
|
|
|
|
if assistant_message:
|
|
message_data = {
|
|
"id": f"msg_{workflow_id}_{getUtcTimestamp()}",
|
|
"workflowId": workflow_id,
|
|
"message": assistant_message.get("content", ""),
|
|
"role": "assistant",
|
|
"status": "last",
|
|
"sequenceNr": len(workflow.messages) + 1 if workflow.messages else 1,
|
|
"publishedAt": getUtcTimestamp(),
|
|
"roundNumber": workflow.currentRound if workflow else 1,
|
|
"taskNumber": 0,
|
|
"actionNumber": 0,
|
|
"success": True
|
|
}
|
|
|
|
try:
|
|
assistant_msg = interface_db_chat.createMessage(message_data)
|
|
final_message_stored = True
|
|
|
|
# Emit message event
|
|
message_timestamp = parseTimestamp(assistant_msg.publishedAt, default=getUtcTimestamp())
|
|
await event_manager.emit_event(
|
|
context_id=workflow_id,
|
|
event_type="chatdata",
|
|
data={
|
|
"type": "message",
|
|
"createdAt": message_timestamp,
|
|
"item": assistant_msg.dict()
|
|
},
|
|
event_category="chat"
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Error storing assistant message: {e}", exc_info=True)
|
|
|
|
# Emit completion event
|
|
await event_manager.emit_event(
|
|
context_id=workflow_id,
|
|
event_type="complete",
|
|
data={"workflowId": workflow_id},
|
|
event_category="workflow",
|
|
message="Chatbot-Verarbeitung abgeschlossen",
|
|
step="complete"
|
|
)
|
|
clear_workflow_allowed_providers(workflow_id)
|
|
|
|
# Update workflow status
|
|
try:
|
|
interface_db_chat.updateWorkflow(workflow_id, {
|
|
"status": "completed",
|
|
"lastActivity": getUtcTimestamp()
|
|
})
|
|
except Exception as e:
|
|
logger.error(f"Error updating workflow status: {e}", exc_info=True)
|
|
|
|
return
|
|
|
|
# Handle errors
|
|
if event_type == "error":
|
|
error_msg = event.get("message", "Unknown error")
|
|
|
|
await event_manager.emit_event(
|
|
context_id=workflow_id,
|
|
event_type="error",
|
|
data={"error": error_msg},
|
|
event_category="workflow",
|
|
message=f"Fehler beim Verarbeiten: {error_msg}",
|
|
step="error"
|
|
)
|
|
clear_workflow_allowed_providers(workflow_id)
|
|
|
|
# Update workflow status
|
|
try:
|
|
interface_db_chat.updateWorkflow(workflow_id, {
|
|
"status": "error",
|
|
"lastActivity": getUtcTimestamp()
|
|
})
|
|
except Exception as e:
|
|
logger.error(f"Error updating workflow status: {e}", exc_info=True)
|
|
|
|
return
|
|
|
|
# If stream ended without final message, store error message
|
|
if not final_message_stored:
|
|
logger.warning(f"Stream ended for workflow {workflow_id} without a final message")
|
|
try:
|
|
workflow = interface_db_chat.getWorkflow(workflow_id)
|
|
if workflow:
|
|
error_message_data = {
|
|
"id": f"msg_{workflow_id}_{getUtcTimestamp()}",
|
|
"workflowId": workflow_id,
|
|
"message": "Entschuldigung, ich konnte keine vollständige Antwort generieren. Bitte versuchen Sie es erneut.",
|
|
"role": "assistant",
|
|
"status": "last",
|
|
"sequenceNr": len(workflow.messages) + 1 if workflow.messages else 1,
|
|
"publishedAt": getUtcTimestamp(),
|
|
"roundNumber": workflow.currentRound if workflow else 1,
|
|
"taskNumber": 0,
|
|
"actionNumber": 0,
|
|
"success": False
|
|
}
|
|
|
|
error_msg = interface_db_chat.createMessage(error_message_data)
|
|
|
|
# Emit message event
|
|
message_timestamp = parseTimestamp(error_msg.publishedAt, default=getUtcTimestamp())
|
|
await event_manager.emit_event(
|
|
context_id=workflow_id,
|
|
event_type="chatdata",
|
|
data={
|
|
"type": "message",
|
|
"createdAt": message_timestamp,
|
|
"item": error_msg.dict()
|
|
},
|
|
event_category="chat"
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Error storing error message: {e}", exc_info=True)
|
|
|
|
# Emit completion event
|
|
await event_manager.emit_event(
|
|
context_id=workflow_id,
|
|
event_type="complete",
|
|
data={"workflowId": workflow_id},
|
|
event_category="workflow",
|
|
message="Chatbot-Verarbeitung abgeschlossen",
|
|
step="complete"
|
|
)
|
|
clear_workflow_allowed_providers(workflow_id)
|
|
|
|
# Update workflow status
|
|
try:
|
|
interface_db_chat.updateWorkflow(workflow_id, {
|
|
"status": "completed",
|
|
"lastActivity": getUtcTimestamp()
|
|
})
|
|
except Exception as e:
|
|
logger.error(f"Error updating workflow status: {e}", exc_info=True)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in bridge_chatbot_events: {e}", exc_info=True)
|
|
|
|
# Emit error event
|
|
await event_manager.emit_event(
|
|
context_id=workflow_id,
|
|
event_type="error",
|
|
data={"error": str(e)},
|
|
event_category="workflow",
|
|
message=f"Fehler beim Verarbeiten: {str(e)}",
|
|
step="error"
|
|
)
|
|
clear_workflow_allowed_providers(workflow_id)
|
|
|
|
|
|
async def _load_chatbot_config(featureInstanceId: Optional[str]) -> ChatbotConfig:
|
|
"""
|
|
Load chatbot configuration from FeatureInstance (database).
|
|
|
|
Args:
|
|
featureInstanceId: Feature instance ID to load config from
|
|
|
|
Returns:
|
|
ChatbotConfig instance
|
|
|
|
Raises:
|
|
ValueError: If no featureInstanceId provided or instance not found
|
|
"""
|
|
if not featureInstanceId:
|
|
raise ValueError("featureInstanceId is required to load chatbot config")
|
|
|
|
try:
|
|
# Import here to avoid circular imports
|
|
from modules.interfaces.interfaceDbApp import getRootInterface
|
|
from modules.interfaces.interfaceFeatures import getFeatureInterface
|
|
|
|
# Get feature instance from database
|
|
rootInterface = getRootInterface()
|
|
featureInterface = getFeatureInterface(rootInterface.db)
|
|
instance = featureInterface.getFeatureInstance(featureInstanceId)
|
|
|
|
if not instance:
|
|
raise ValueError(f"FeatureInstance {featureInstanceId} not found")
|
|
|
|
logger.info(f"Loading chatbot config from FeatureInstance {featureInstanceId}")
|
|
return load_chatbot_config_from_instance(instance)
|
|
except ValueError:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error loading config from FeatureInstance {featureInstanceId}: {e}")
|
|
raise
|
|
|
|
|
|
def _preflight_billing_check(services, mandateId: str, featureInstanceId: Optional[str]) -> None:
|
|
"""
|
|
Pre-flight billing check before starting chatbot AI processing.
|
|
Raises if mandate has insufficient balance or no providers allowed.
|
|
"""
|
|
from modules.services.serviceBilling.mainServiceBilling import (
|
|
getService as getBillingService,
|
|
InsufficientBalanceException,
|
|
ProviderNotAllowedException,
|
|
BillingContextError,
|
|
)
|
|
user = services.user
|
|
featureCode = "chatbot"
|
|
try:
|
|
billingService = getBillingService(user, mandateId, featureInstanceId, featureCode)
|
|
balanceCheck = billingService.checkBalance(0.01)
|
|
if not balanceCheck.allowed:
|
|
raise InsufficientBalanceException(
|
|
currentBalance=balanceCheck.currentBalance or 0.0,
|
|
requiredAmount=0.01,
|
|
message=f"Ungenuegendes Guthaben. Aktuell: CHF {balanceCheck.currentBalance:.2f}"
|
|
)
|
|
rbacAllowedProviders = billingService.getallowedProviders()
|
|
if not rbacAllowedProviders:
|
|
raise ProviderNotAllowedException(
|
|
provider="any",
|
|
message="Keine AI-Provider fuer Ihre Rolle freigegeben. Kontaktieren Sie Ihren Administrator."
|
|
)
|
|
except (InsufficientBalanceException, ProviderNotAllowedException):
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Billing pre-flight failed: {e}")
|
|
raise BillingContextError(f"Billing check failed: {e}")
|
|
|
|
|
|
def _create_chatbot_billing_callback(services, workflow_id: str):
|
|
"""
|
|
Create billing callback for AICenterChatModel. Records each AI call to poweron_billing.
|
|
"""
|
|
from modules.services.serviceBilling.mainServiceBilling import getService as getBillingService
|
|
from modules.datamodels.datamodelAi import AiCallResponse
|
|
|
|
user = services.user
|
|
mandateId = services.mandateId
|
|
featureInstanceId = getattr(services, "featureInstanceId", None)
|
|
featureCode = "chatbot"
|
|
billingService = getBillingService(user, mandateId, featureInstanceId, featureCode)
|
|
|
|
def _billing_callback(response: AiCallResponse) -> None:
|
|
if not response or getattr(response, "errorCount", 0) > 0:
|
|
return
|
|
priceCHF = getattr(response, "priceCHF", 0.0)
|
|
if not priceCHF or priceCHF <= 0:
|
|
return
|
|
provider = getattr(response, "provider", None) or "unknown"
|
|
modelName = getattr(response, "modelName", None) or "unknown"
|
|
try:
|
|
billingService.recordUsage(
|
|
priceCHF=priceCHF,
|
|
workflowId=workflow_id,
|
|
aicoreProvider=provider,
|
|
aicoreModel=modelName,
|
|
description=f"AI: {modelName}"
|
|
)
|
|
logger.debug(f"Chatbot billed: {priceCHF:.4f} CHF, provider={provider}, model={modelName}")
|
|
except Exception as e:
|
|
logger.error(f"Chatbot billing failed: {e}")
|
|
|
|
return _billing_callback
|
|
|
|
|
|
async def _processChatbotMessageLangGraph(
|
|
services,
|
|
currentUser: User,
|
|
workflowId: str,
|
|
userInput: UserInputRequest,
|
|
userMessageId: str,
|
|
featureInstanceId: Optional[str] = None,
|
|
config: Optional[ChatbotConfig] = None
|
|
):
|
|
"""
|
|
Process chatbot message using LangGraph.
|
|
Uses LangGraph workflow with AI center models and tools.
|
|
|
|
Args:
|
|
services: Service container
|
|
currentUser: Current user
|
|
workflowId: Workflow ID
|
|
userInput: User input request
|
|
userMessageId: User message ID
|
|
featureInstanceId: Optional feature instance ID for loading instance-specific config
|
|
"""
|
|
event_manager = get_event_manager()
|
|
|
|
try:
|
|
from modules.features.chatbot.interfaceFeatureChatbot import getInterface as getChatbotInterface
|
|
interfaceDbChat = getChatbotInterface(currentUser, mandateId=services.mandateId, featureInstanceId=featureInstanceId)
|
|
|
|
# Reload workflow to get current messages
|
|
workflow = interfaceDbChat.getWorkflow(workflowId)
|
|
if not workflow:
|
|
logger.error(f"Workflow {workflowId} not found during processing")
|
|
await event_manager.emit_event(
|
|
context_id=workflowId,
|
|
event_type="error",
|
|
data={"error": f"Workflow {workflowId} nicht gefunden"},
|
|
event_category="workflow",
|
|
message=f"Workflow {workflowId} nicht gefunden",
|
|
step="error"
|
|
)
|
|
return
|
|
|
|
# Check if workflow was stopped before starting
|
|
if await _check_workflow_stopped(interfaceDbChat, workflowId):
|
|
logger.info(f"Workflow {workflowId} was stopped, aborting processing")
|
|
return
|
|
|
|
# Emit synthetic status for real-time UI feedback
|
|
await event_manager.emit_event(
|
|
context_id=workflowId,
|
|
event_type="chatdata",
|
|
data={"type": "status", "label": "Lade Konfiguration..."},
|
|
event_category="chat",
|
|
message="Status update",
|
|
step="status"
|
|
)
|
|
|
|
# Load configuration if not passed (e.g. when resuming)
|
|
if config is None:
|
|
config = await _load_chatbot_config(featureInstanceId)
|
|
|
|
# Replace {{DATE}} placeholder in system prompt
|
|
from datetime import datetime
|
|
system_prompt = config.systemPrompt.replace(
|
|
"{{DATE}}",
|
|
datetime.now().strftime("%d.%m.%Y")
|
|
)
|
|
|
|
# Create AI center model
|
|
operation_type = OperationTypeEnum[config.model.operationType]
|
|
processing_mode = ProcessingModeEnum[config.model.processingMode]
|
|
|
|
billing_callback = None
|
|
if services.mandateId:
|
|
billing_callback = _create_chatbot_billing_callback(services, workflowId)
|
|
|
|
allowed_providers = config.model.allowedProviders or None
|
|
if allowed_providers:
|
|
logger.info(f"Chatbot AICenterChatModel: restricting to providers {allowed_providers}")
|
|
model = AICenterChatModel(
|
|
user=currentUser,
|
|
operation_type=operation_type,
|
|
processing_mode=processing_mode,
|
|
billing_callback=billing_callback,
|
|
workflow_id=workflowId,
|
|
allowed_providers=allowed_providers
|
|
)
|
|
|
|
# Emit synthetic status for real-time UI feedback
|
|
await event_manager.emit_event(
|
|
context_id=workflowId,
|
|
event_type="chatdata",
|
|
data={"type": "status", "label": "Bereite Chat vor..."},
|
|
event_category="chat",
|
|
message="Status update",
|
|
step="status"
|
|
)
|
|
|
|
# Create memory/checkpointer (uses chatbot's own DB via interfaceFeatureChatbot)
|
|
memory = DatabaseCheckpointer(
|
|
user=currentUser,
|
|
workflow_id=workflowId,
|
|
mandateId=services.mandateId,
|
|
featureInstanceId=featureInstanceId
|
|
)
|
|
|
|
# Create chatbot instance with config for dynamic tool configuration
|
|
chatbot = await Chatbot.create(
|
|
model=model,
|
|
memory=memory,
|
|
system_prompt=system_prompt,
|
|
workflow_id=workflowId,
|
|
config=config
|
|
)
|
|
|
|
# Emit synthetic status for real-time UI feedback
|
|
await event_manager.emit_event(
|
|
context_id=workflowId,
|
|
event_type="chatdata",
|
|
data={"type": "status", "label": "Analysiere Anfrage..."},
|
|
event_category="chat",
|
|
message="Status update",
|
|
step="status"
|
|
)
|
|
|
|
# Stream events using chatbot
|
|
event_stream = chatbot.stream_events(
|
|
message=userInput.prompt,
|
|
chat_id=workflowId
|
|
)
|
|
|
|
# Bridge chatbot events to event manager
|
|
await _bridge_chatbot_events(
|
|
event_stream=event_stream,
|
|
event_manager=event_manager,
|
|
workflow_id=workflowId,
|
|
interface_db_chat=interfaceDbChat
|
|
)
|
|
|
|
# Schedule cleanup
|
|
await event_manager.cleanup(workflowId, delay=300.0) # 5 minutes delay
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing chatbot message with LangGraph: {str(e)}", exc_info=True)
|
|
|
|
# Check if workflow was stopped - if so, don't store error message
|
|
if await _check_workflow_stopped(interfaceDbChat, workflowId):
|
|
logger.info(f"Workflow {workflowId} was stopped, not storing error message")
|
|
return
|
|
|
|
# Store error message
|
|
try:
|
|
workflow = interfaceDbChat.getWorkflow(workflowId)
|
|
|
|
if workflow and workflow.status == "stopped":
|
|
logger.info(f"Workflow {workflowId} was stopped, not storing error message")
|
|
return
|
|
|
|
errorMessageData = {
|
|
"id": f"msg_{uuid.uuid4()}",
|
|
"workflowId": workflowId,
|
|
"parentMessageId": userMessageId,
|
|
"message": f"Sorry, I encountered an error: {str(e)}",
|
|
"role": "assistant",
|
|
"status": "last",
|
|
"sequenceNr": len(workflow.messages) + 1 if workflow else 1,
|
|
"publishedAt": getUtcTimestamp(),
|
|
"success": False,
|
|
"roundNumber": workflow.currentRound if workflow else 1,
|
|
"taskNumber": 0,
|
|
"actionNumber": 0
|
|
}
|
|
errorMessage = interfaceDbChat.createMessage(errorMessageData)
|
|
|
|
# Emit message event
|
|
message_timestamp = parseTimestamp(errorMessage.publishedAt, default=getUtcTimestamp())
|
|
await event_manager.emit_event(
|
|
context_id=workflowId,
|
|
event_type="chatdata",
|
|
data={
|
|
"type": "message",
|
|
"createdAt": message_timestamp,
|
|
"item": errorMessage.dict()
|
|
},
|
|
event_category="chat"
|
|
)
|
|
|
|
# Update workflow status
|
|
if not await _check_workflow_stopped(interfaceDbChat, workflowId):
|
|
interfaceDbChat.updateWorkflow(workflowId, {
|
|
"status": "error",
|
|
"lastActivity": getUtcTimestamp()
|
|
})
|
|
|
|
# Schedule cleanup
|
|
await event_manager.cleanup(workflowId)
|
|
except Exception as storeError:
|
|
logger.error(f"Error storing error message: {storeError}")
|
|
|
|
|
|
async def _processChatbotMessage(
|
|
services,
|
|
workflowId: str,
|
|
userInput: UserInputRequest,
|
|
userMessageId: str
|
|
):
|
|
"""
|
|
DEPRECATED: Old chatbot processing implementation.
|
|
Kept for backward compatibility but redirects to LangGraph implementation.
|
|
"""
|
|
logger.warning("_processChatbotMessage is deprecated, using LangGraph implementation")
|
|
# Note: currentUser should be passed, but this function signature doesn't have it
|
|
# This is a deprecated function, so we'll need to get user from workflow or services
|
|
# For now, raise an error to indicate this needs to be fixed
|
|
raise NotImplementedError("_processChatbotMessage is deprecated and requires currentUser parameter")
|