gateway/modules/features/chatbot/service.py
2026-04-08 20:28:34 +02:00

1525 lines
64 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Simple chatbot feature - basic implementation.
User input is processed by AI to create list of needed queries.
Those queries get streamed back.
"""
import logging
import json
import uuid
import asyncio
import re
from typing import Optional, Dict, Any, List
from modules.datamodels.datamodelChat import UserInputRequest
from modules.features.chatbot.interfaceFeatureChatbot import ChatbotConversation, ChatbotDocument
from modules.datamodels.datamodelUam import User
from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationTypeEnum, ProcessingModeEnum
from modules.datamodels.datamodelDocref import DocumentReferenceList, DocumentItemReference
from modules.shared.timeUtils import getUtcTimestamp, parseTimestamp
from modules.features.chatbot.mainChatbot import getChatbotServices, getChatStreamingHelper
from modules.features.chatbot.chatbot import Chatbot
from modules.features.chatbot.bridges.ai import AICenterChatModel, clear_workflow_allowed_providers
from modules.features.chatbot.bridges.memory import DatabaseCheckpointer
from modules.features.chatbot.config import (
load_chatbot_config_from_instance,
ChatbotConfig
)
from modules.datamodels.datamodelAi import OperationTypeEnum, ProcessingModeEnum
from modules.workflows.methods.methodAi.methodAi import MethodAi
from modules.connectors.connectorPreprocessor import PreprocessorConnector
from modules.features.chatbot.chatbotConstants import generate_conversation_name, generate_name_from_prompt
import base64
logger = logging.getLogger(__name__)
def _extractJsonFromResponse(content: str) -> Optional[dict]:
"""Extract JSON from AI response, handling markdown code blocks."""
# Try direct JSON parse first
try:
return json.loads(content.strip())
except json.JSONDecodeError:
pass
# Try to extract JSON from markdown code blocks
json_match = re.search(r'```(?:json)?\s*(\{.*?\})\s*```', content, re.DOTALL)
if json_match:
try:
return json.loads(json_match.group(1))
except json.JSONDecodeError:
pass
# Try to find JSON object in the text
json_match = re.search(r'\{.*\}', content, re.DOTALL)
if json_match:
try:
return json.loads(json_match.group(0))
except json.JSONDecodeError:
pass
return None
async def chatProcess(
currentUser: User,
mandateId: Optional[str],
userInput: UserInputRequest,
workflowId: Optional[str] = None,
featureInstanceId: Optional[str] = None,
event_manager=None # Required when called from streaming route
) -> ChatbotConversation:
"""
Simple chatbot processing - analyze user input and generate queries.
Flow:
1. Create or load workflow
2. Store user message
3. AI analyzes user input to create list of needed queries
4. Stream queries back
Args:
currentUser: Current user
mandateId: Mandate ID for the workflow
userInput: User input request
workflowId: Optional workflow ID to continue existing conversation
featureInstanceId: Feature instance ID for loading instance-specific config
Returns:
ChatbotConversation instance
"""
try:
# Get services from service center (only services declared in mainChatbot.REQUIRED_SERVICES)
services = getChatbotServices(currentUser, mandateId=mandateId, featureInstanceId=featureInstanceId)
# Load instance config and apply allowedProviders for AI calls (conversation name + main chat)
chatbot_config = await _load_chatbot_config(featureInstanceId)
if chatbot_config.model.allowedProviders:
services.allowedProviders = chatbot_config.model.allowedProviders
logger.info(f"Chatbot instance {featureInstanceId}: restricting to providers {chatbot_config.model.allowedProviders}")
from modules.features.chatbot.interfaceFeatureChatbot import getInterface as getChatbotInterface
interfaceDbChat = getChatbotInterface(currentUser, mandateId=mandateId, featureInstanceId=featureInstanceId)
# Create or load workflow (event_manager passed from route)
if workflowId:
# Lightweight resume: minimal fetch + minimal update (no logs/messages load)
workflow = interfaceDbChat.getWorkflowMinimal(workflowId)
if not workflow:
raise ValueError(f"Workflow {workflowId} not found")
new_round = workflow.currentRound + 1
workflow = interfaceDbChat.updateWorkflowMinimal(workflowId, {
"status": "running",
"currentRound": new_round,
"lastActivity": getUtcTimestamp()
})
logger.info(f"Resumed workflow {workflowId}, round incremented to {new_round}")
# Create event queue if it doesn't exist (for streaming)
if not event_manager.has_queue(workflowId):
event_manager.create_queue(workflowId)
else:
# Use placeholder name immediately - don't block on AI call
prompt_stripped = (userInput.prompt or "").strip()
trivial_prompts = {"test", "hi", "hallo", "hello", "hey"}
is_short = len(prompt_stripped) < 30 or prompt_stripped.lower() in trivial_prompts
if is_short:
conversation_name = generate_name_from_prompt(userInput.prompt)
run_name_task = False
else:
conversation_name = "Neue Unterhaltung"
run_name_task = True
# Create new conversation (per feature instance)
conversationData = {
"id": str(uuid.uuid4()),
"featureInstanceId": featureInstanceId,
"status": "running",
"name": conversation_name,
"currentRound": 1,
"workflowMode": "Chatbot",
"startedAt": getUtcTimestamp(),
"lastActivity": getUtcTimestamp()
}
workflow = interfaceDbChat.createConversation(conversationData)
logger.info(f"Created new chatbot workflow: {workflow.id} with name: {conversation_name}")
# Run AI name generation in background (for longer prompts only)
if run_name_task:
asyncio.create_task(_update_conversation_name_async(
services=services,
currentUser=currentUser,
mandateId=mandateId,
featureInstanceId=featureInstanceId,
workflowId=workflow.id,
prompt=userInput.prompt,
userLanguage=userInput.userLanguage,
interfaceDbChat=interfaceDbChat,
event_manager=event_manager
))
# Create event queue for new workflow (for streaming)
event_manager.create_queue(workflow.id)
# Reload workflow to get current message count
workflow = interfaceDbChat.getWorkflow(workflow.id)
services.workflow = workflow # Required for chat service document resolution
# Process uploaded files and create ChatbotDocuments
user_documents = []
if userInput.listFileId and len(userInput.listFileId) > 0:
logger.info(f"Processing {len(userInput.listFileId)} uploaded file(s) for user message")
for fileId in userInput.listFileId:
try:
# Get file info from chat service
fileInfo = services.chat.getFileInfo(fileId)
if not fileInfo:
logger.warning(f"No file info found for file ID {fileId}")
continue
originalFileName = fileInfo.get("fileName", "unknown")
originalMimeType = fileInfo.get("mimeType", "application/octet-stream")
fileSizeToUse = fileInfo.get("size", 0)
# Create ChatbotDocument for the file
document = ChatbotDocument(
id=str(uuid.uuid4()),
messageId="", # Will be set when message is created
fileId=fileId,
fileName=originalFileName,
fileSize=fileSizeToUse,
mimeType=originalMimeType,
roundNumber=workflow.currentRound,
taskNumber=0,
actionNumber=0
)
user_documents.append(document)
logger.info(f"Created ChatbotDocument for file {fileId} -> {originalFileName}")
except Exception as e:
logger.error(f"Error processing file ID {fileId}: {e}", exc_info=True)
# Store user message (sequenceNr: for resume use message count, else len+1)
seq_nr = (
interfaceDbChat.getMessageCount(workflow.id) + 1
if workflowId
else len(workflow.messages) + 1
)
userMessageData: Dict[str, Any] = {
"id": f"msg_{uuid.uuid4()}",
"conversationId": workflow.id,
"message": userInput.prompt,
"role": "user",
"status": "first" if workflowId is None else "step",
"sequenceNr": seq_nr,
"publishedAt": getUtcTimestamp(),
"roundNumber": workflow.currentRound,
"taskNumber": 0,
"actionNumber": 0
}
if user_documents:
userMessageData["documents"] = [d.model_dump() for d in user_documents]
# Don't pass event_manager: event_stream sends initial chatData from DB (includes user msg).
# Emitting here would duplicate it (initial chatData + queue event).
userMessage = interfaceDbChat.createMessage(userMessageData, event_manager=None)
logger.info(f"Stored user message: {userMessage.id} with {len(user_documents)} document(s)")
# Update workflow status (minimal update for lastActivity; resume already did this)
if not workflowId:
interfaceDbChat.updateWorkflowMinimal(workflow.id, {
"status": "running",
"lastActivity": getUtcTimestamp()
})
# Pre-flight billing check before starting LangGraph (if mandateId present)
if mandateId:
_preflight_billing_check(services, mandateId, featureInstanceId)
# Process in background using LangGraph (async)
asyncio.create_task(_processChatbotMessageLangGraph(
services,
currentUser,
workflow.id,
userInput,
userMessage.id,
featureInstanceId=featureInstanceId,
config=chatbot_config,
event_manager=event_manager
))
return workflow
except Exception as e:
logger.error(f"Error in chatProcess: {str(e)}", exc_info=True)
raise
async def _update_conversation_name_async(
services,
currentUser: User,
mandateId: Optional[str],
featureInstanceId: Optional[str],
workflowId: str,
prompt: str,
userLanguage: str,
interfaceDbChat,
event_manager,
) -> None:
"""
Background task: generate conversation name via AI and update workflow.
Runs in parallel to LangGraph so it doesn't block the first response.
"""
try:
new_name = await generate_conversation_name(services, prompt, userLanguage)
if new_name:
interfaceDbChat.updateWorkflow(workflowId, {"name": new_name, "lastActivity": getUtcTimestamp()})
logger.info(f"Updated workflow {workflowId} name to: {new_name}")
# Emit stat event so frontend can refresh thread list/title
workflow = interfaceDbChat.getWorkflow(workflowId)
if workflow:
wf_dict = workflow.model_dump()
await event_manager.emit_event(
context_id=workflowId,
event_type="chatdata",
data={
"type": "stat",
"createdAt": getUtcTimestamp(),
"item": wf_dict
},
event_category="chat",
message="Workflow name updated",
step="workflow_update"
)
except Exception as e:
logger.warning(f"Background conversation name update failed for workflow {workflowId}: {e}")
async def _execute_queries_parallel(queries: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
Execute multiple SQL queries in parallel with shared connector.
Args:
queries: List of query dictionaries, each containing:
- "query": SQL query string
- "purpose": Description of what the query retrieves
- "table": Primary table name
Returns:
Dictionary mapping query indices to results:
- "query_1", "query_2", etc.: Success result text
- "query_1_data", "query_2_data", etc.: Raw data arrays
- "query_1_error", "query_2_error", etc.: Error messages if query failed
"""
# Create single connector instance to reuse across all queries
connector = PreprocessorConnector()
try:
async def execute_single_query(idx: int, query_info: Dict[str, Any]):
"""Execute a single query using shared connector."""
try:
query_text = query_info.get("query", "")
result = await connector.executeQuery(query_text, return_json=True)
return idx, result, None
except Exception as e:
return idx, None, str(e)
# Execute all queries in parallel with shared connector
tasks = [execute_single_query(i, q) for i, q in enumerate(queries)]
results = await asyncio.gather(*tasks, return_exceptions=True)
finally:
# Close connector once after all queries complete
await connector.close()
# Process results into dictionary
query_results = {}
for result in results:
if isinstance(result, Exception):
# Handle exceptions from gather
logger.error(f"Exception in parallel query execution: {result}")
continue
idx, result_data, error = result
if error:
query_results[f"query_{idx+1}_error"] = error
logger.error(f"Query {idx+1} failed: {error}")
else:
if result_data and not result_data.get("text", "").startswith(("Error:", "Query failed:")):
query_results[f"query_{idx+1}"] = result_data.get("text", "")
query_results[f"query_{idx+1}_data"] = result_data.get("data", [])
row_count = len(result_data.get('data', []))
logger.info(f"Query {idx+1} executed successfully, returned {row_count} rows")
else:
error_text = result_data.get("text", "Query failed") if result_data else "Query failed: No response"
query_results[f"query_{idx+1}_error"] = error_text
logger.error(f"Query {idx+1} failed: {error_text}")
return query_results
async def _emit_log_and_event(
interfaceDbChat,
workflowId: str,
event_manager,
message: str,
log_type: str = "info",
status: str = "running",
round_number: Optional[int] = None
) -> None:
"""
Store log in database and emit event for streaming.
Args:
interfaceDbChat: Database interface
workflowId: Workflow ID
event_manager: Event manager for streaming
message: Log message
log_type: Log type (info, warning, error)
status: Status string
round_number: Optional round number (will be fetched from workflow if not provided)
"""
try:
# Get round number from workflow if not provided
if round_number is None:
workflow = interfaceDbChat.getWorkflow(workflowId)
if workflow:
round_number = workflow.currentRound
log_timestamp = getUtcTimestamp()
log_data = {
"id": f"log_{uuid.uuid4()}",
"workflowId": workflowId,
"message": message,
"type": log_type,
"timestamp": log_timestamp,
"status": status,
"roundNumber": round_number
}
# Store log in database (createLog emits when event_manager is provided)
created_log = interfaceDbChat.createLog(log_data, event_manager=event_manager)
except Exception as e:
logger.error(f"Error storing log: {e}", exc_info=True)
async def _check_workflow_stopped(interfaceDbChat, workflowId: str) -> bool:
"""
Check if workflow was stopped.
Args:
interfaceDbChat: Database interface
workflowId: Workflow ID
Returns:
True if workflow is stopped, False otherwise
"""
try:
workflow = interfaceDbChat.getWorkflow(workflowId)
return workflow and workflow.status == "stopped"
except Exception as e:
logger.warning(f"Error checking workflow status: {e}")
return False
def _buildWebResearchQuery(userPrompt: str, workflowMessages: List, queryResults: Optional[Dict[str, Any]] = None) -> str:
"""
Build enriched web research query by extracting product context from conversation history and current prompt.
Extracts product information from:
1. Current user prompt (article numbers, product mentions)
2. Database query results (if available)
3. Previous assistant messages (conversation history)
Args:
userPrompt: Current user prompt
workflowMessages: List of workflow messages (conversation history)
queryResults: Optional database query results to extract product info from
Returns:
Enriched search query string
"""
# Normalize user prompt for detection
prompt_lower = userPrompt.lower().strip()
# Patterns that indicate a search request
search_patterns = [
"ja", "yes", "oui", "si",
"such", "suche", "search", "recherche", "recherchier",
"internet", "web", "online",
"datenblatt", "datasheet", "fiche technique",
"mehr informationen", "more information", "plus d'information",
"weitere informationen", "further information", "additional information"
]
# Certification patterns that require web research
certification_patterns = [
"ul", "ce", "tüv", "vde", "iec", "en", "iso",
"zertifiziert", "certified", "certification", "zertifizierung",
"geprüft", "approved", "compliance"
]
# Check if current prompt contains search-related keywords
has_search_intent = any(pattern in prompt_lower for pattern in search_patterns)
# Check if prompt contains certification-related keywords
has_certification_intent = any(pattern in prompt_lower for pattern in certification_patterns)
# Extract product information - try multiple sources
article_number = None
article_description = None
supplier = None
# Pattern for article numbers like "6AV2 181-8XP00-0AX0" or "6AV2181-8XP00-0AX0"
article_patterns = [
r'\b[A-Z0-9]{2,}\s+[0-9]{3,}-[A-Z0-9-]+\b', # With space: "6AV2 181-8XP00-0AX0"
r'\b[A-Z0-9]{4,}[\s-][A-Z0-9-]{6,}\b', # General pattern
r'\b[A-Z]{2,}[0-9]+\s+[0-9]+-[A-Z0-9-]+\b', # Specific Siemens pattern
]
# 1. First, try to extract from current user prompt
for pattern in article_patterns:
matches = re.findall(pattern, userPrompt)
if matches:
article_number = matches[0]
logger.info(f"Extracted article number from user prompt: {article_number}")
break
# 2. Try to extract from database query results if available
# Always check queryResults to enrich with product description and supplier, even if article_number was already found
if queryResults:
# Look for article numbers in query result text (if not already found)
if not article_number:
for key in queryResults.keys():
if key.startswith("query_") and not key.endswith("_error") and not key.endswith("_data"):
result_text = queryResults.get(key, "")
if isinstance(result_text, str):
for pattern in article_patterns:
matches = re.findall(pattern, result_text)
if matches:
article_number = matches[0]
logger.info(f"Extracted article number from query results: {article_number}")
break
if article_number:
break
# Always check data arrays for product description and supplier (even if article_number already found)
for key in queryResults.keys():
if key.startswith("query_") and not key.endswith("_error") and not key.endswith("_data"):
data_key = f"{key}_data"
if data_key in queryResults:
data_array = queryResults[data_key]
if isinstance(data_array, list) and len(data_array) > 0:
# Look for article number in first row (if not already found)
first_row = data_array[0]
if isinstance(first_row, dict):
# Check common article number fields (if not already found)
if not article_number:
for field in ["Artikelnummer", "Artikelkürzel", "article_number", "articleNumber"]:
if field in first_row and first_row[field]:
article_number = str(first_row[field])
logger.info(f"Extracted article number from query data: {article_number}")
break
# Always check article description (can enrich even if article_number already found)
if not article_description:
for field in ["Artikelbezeichnung", "Bezeichnung", "article_description", "description"]:
if field in first_row and first_row[field]:
article_description = str(first_row[field])
logger.info(f"Extracted article description from query data: {article_description}")
break
# Always check supplier (can enrich even if article_number already found)
if not supplier:
for field in ["Lieferant", "Supplier", "supplier"]:
if field in first_row and first_row[field]:
supplier = str(first_row[field])
logger.info(f"Extracted supplier from query data: {supplier}")
break
# If we found all needed info, we can stop
if article_number and article_description and supplier:
break
# Check if current prompt is an explicit search request that should NOT use context
# If user explicitly asks to search for something, prioritize that over previous messages
explicit_search_patterns = [
r"recherchier\s+(?:im\s+internet\s+)?nach\s+(.+)",
r"suche\s+(?:im\s+internet\s+)?nach\s+(.+)",
r"search\s+(?:the\s+internet\s+)?for\s+(.+)",
r"find\s+(?:information\s+)?(?:about\s+)?(.+)",
r"recherche\s+(?:sur\s+internet\s+)?(.+)"
]
explicit_search_term = None
for pattern in explicit_search_patterns:
match = re.search(pattern, userPrompt, re.IGNORECASE)
if match:
explicit_search_term = match.group(1).strip()
logger.info(f"Found explicit search term in prompt: '{explicit_search_term}'")
break
# 3. Extract from previous assistant messages (conversation history)
# ONLY if there's no explicit search term (to avoid using old context for new searches)
if not explicit_search_term and (not article_number or not article_description):
for msg in reversed(workflowMessages[-10:]):
if msg.role == "assistant":
message_text = msg.message
# Extract article number if not found yet
if not article_number:
for pattern in article_patterns:
matches = re.findall(pattern, message_text)
if matches:
article_number = matches[0]
break
# Extract article description if not found yet
if not article_description:
description_patterns = [
r'Es handelt sich um\s+([^\.]+)',
r'It is a\s+([^\.]+)',
r'C\'est\s+([^\.]+)',
r'Bezeichnung:\s*([^\n]+)',
r'Description:\s*([^\n]+)',
r'Artikelbezeichnung:\s*([^\n]+)',
r'Artikelbezeichnung:\s*([^\n]+)'
]
for pattern in description_patterns:
match = re.search(pattern, message_text, re.IGNORECASE)
if match:
article_description = match.group(1).strip()
break
# Extract supplier if not found yet
if not supplier:
supplier_patterns = [
r'von\s+([A-Z][A-Za-z\s]+(?:AG|GmbH|Ltd|Inc|Corp)?)',
r'from\s+([A-Z][A-Za-z\s]+(?:AG|GmbH|Ltd|Inc|Corp)?)',
r'Lieferant:\s*([^\n]+)',
r'Supplier:\s*([^\n]+)'
]
for pattern in supplier_patterns:
match = re.search(pattern, message_text, re.IGNORECASE)
if match:
supplier = match.group(1).strip()
break
# Stop if we found everything
if article_number and article_description and supplier:
break
# Build enriched search query
query_parts = []
# If we have an explicit search term, use it as the primary query
if explicit_search_term:
query_parts.append(explicit_search_term)
logger.info(f"Using explicit search term as primary query: '{explicit_search_term}'")
# If we have search intent but no product info, try to use the user prompt intelligently
elif has_search_intent and not article_number and not article_description:
# Try to extract meaningful parts from the prompt
# Remove common search phrases and keep the product-related parts
cleaned_prompt = userPrompt
for phrase in ["recherchier", "recherche", "suche nach", "search for", "find", "informationen zu", "information about", "weitere informationen", "further information", "im internet", "the internet", "sur internet"]:
cleaned_prompt = re.sub(phrase, "", cleaned_prompt, flags=re.IGNORECASE)
cleaned_prompt = cleaned_prompt.strip()
# Use cleaned prompt if it has meaningful content
if cleaned_prompt and len(cleaned_prompt) > 2:
query_parts.append(cleaned_prompt)
# Add article description if found (but NOT if we have an explicit search term)
if article_description and not explicit_search_term:
query_parts.append(article_description)
# Add article number if found (but NOT if we have an explicit search term)
if article_number and not explicit_search_term:
query_parts.append(article_number)
# Add supplier if found (but NOT if we have an explicit search term)
if supplier and not explicit_search_term:
query_parts.append(supplier)
# Extract certification information from prompt if present
certification_terms = []
if has_certification_intent:
# Extract specific certification mentions
cert_keywords = {
"ul": "UL certification",
"ce": "CE certification",
"tüv": "TÜV certification",
"vde": "VDE certification",
"iec": "IEC certification",
"iso": "ISO certification"
}
for cert_key, cert_term in cert_keywords.items():
if cert_key in prompt_lower:
certification_terms.append(cert_term)
# If no specific certification found but certification intent detected, add generic term
if not certification_terms:
certification_terms.append("certification")
# Add certification terms to query if found
if certification_terms:
query_parts.extend(certification_terms)
# Add "Datenblatt" or "datasheet" if user requested it or if we have product info
# But NOT if we have an explicit search term (user wants to search for something specific)
if not explicit_search_term:
if "datenblatt" in prompt_lower or "datasheet" in prompt_lower or "fiche technique" in prompt_lower:
query_parts.append("Datenblatt")
elif query_parts and (article_number or article_description):
# If we have product info but no explicit request for datasheet, add it anyway
query_parts.append("Datenblatt")
# If we found product information or built a meaningful query, use it
if query_parts:
enriched_query = " ".join(query_parts)
logger.info(f"Built enriched search query: '{enriched_query}' from context (original: '{userPrompt}')")
return enriched_query
else:
# Fall back to original prompt, but try to clean it up
logger.info(f"No product context found, using original prompt: '{userPrompt}'")
return userPrompt
async def _convert_file_ids_to_document_references(
services,
file_ids: List[str]
) -> DocumentReferenceList:
"""
Convert file IDs to DocumentReferenceList for use with ai.process.
Args:
services: Services instance
file_ids: List of file IDs to convert
Returns:
DocumentReferenceList with docItem references
"""
references = []
# Get workflow to search for ChatbotDocuments
workflow = services.workflow
if not workflow:
logger.error("Cannot convert file IDs to document references: workflow not set in services")
return DocumentReferenceList(references=[])
for file_id in file_ids:
try:
# Get file info to verify it exists
file_info = services.chat.getFileInfo(file_id)
if not file_info:
logger.warning(f"File {file_id} not found, skipping")
continue
# Find ChatbotDocument that has this fileId
document_id = None
if workflow.messages:
for message in workflow.messages:
if hasattr(message, 'documents') and message.documents:
for doc in message.documents:
if getattr(doc, 'fileId', None) == file_id:
document_id = getattr(doc, 'id', None)
break
if document_id:
break
# Search chatbot database if not found in messages
if not document_id:
try:
from modules.interfaces.interfaceRbac import getRecordsetWithRBAC
chatbotInterface = services.interfaceDbChat
documents = getRecordsetWithRBAC(
chatbotInterface.db,
ChatbotDocument,
services.user,
recordFilter={"fileId": file_id}
)
if documents:
workflow_message_ids = {msg.id for msg in workflow.messages} if workflow.messages else set()
for doc in documents:
if doc.get("messageId") in workflow_message_ids:
document_id = doc.get("id")
break
except Exception:
pass # Fallback to fileId
# Use ChatbotDocument ID if found, otherwise use fileId as fallback
ref = DocumentItemReference(documentId=document_id if document_id else file_id)
references.append(ref)
except Exception as e:
logger.error(f"Error converting fileId {file_id}: {e}", exc_info=True)
logger.info(f"Converted {len(references)} file IDs to document references")
return DocumentReferenceList(references=references)
def _format_query_results_as_lookup(query_data: Dict[str, List[Dict]]) -> str:
"""
Format database query results as JSON lookup table for Excel matching.
Converts query result data into structured JSON format: {Artikelnummer: {columns...}}
Args:
query_data: Dict with query_key -> list of row dicts (from connector with return_json=True)
Returns:
JSON string formatted as lookup table
"""
lookup_table = {}
for query_key, rows in query_data.items():
if query_key == "error" or not rows:
logger.warning(f"Skipping query key '{query_key}' - no rows or error")
continue
logger.info(f"Processing {len(rows)} rows from query '{query_key}'")
for row in rows:
if not isinstance(row, dict):
logger.warning(f"Skipping non-dict row: {type(row)}")
continue
# Find Artikelnummer field (case-insensitive)
artikelnummer = None
for key in row.keys():
if key.lower() in ['artikelnummer', 'artikel_nummer', 'art_nr', 'part_number']:
artikelnummer = str(row[key])
break
if artikelnummer:
lookup_table[artikelnummer] = row
else:
logger.warning(f"No Artikelnummer found in row with keys: {list(row.keys())}")
logger.info(f"Generated lookup table with {len(lookup_table)} entries")
if lookup_table:
sample_keys = list(lookup_table.keys())[:3]
logger.info(f"Sample Artikelnummern: {sample_keys}")
if sample_keys:
sample_entry = lookup_table[sample_keys[0]]
logger.info(f"Sample entry keys: {list(sample_entry.keys())}")
return json.dumps(lookup_table, ensure_ascii=False, indent=2)
async def _create_chat_document_from_action_document(
services,
action_document,
message_id: str,
workflow_id: str,
round_number: int
) -> ChatbotDocument:
"""
Create a ChatbotDocument from an ActionDocument by storing the file data.
Args:
services: Services instance
action_document: ActionDocument from ai.process result
message_id: ID of the message to attach to
workflow_id: Workflow ID
round_number: Round number
Returns:
ChatbotDocument instance
"""
try:
# Get file data (could be bytes or string)
document_data = action_document.documentData
# Convert to bytes if needed
if isinstance(document_data, str):
# Check if it's base64 encoded
try:
# Try to decode as base64 first
file_bytes = base64.b64decode(document_data)
except Exception:
# Not base64, encode as UTF-8
file_bytes = document_data.encode('utf-8')
elif isinstance(document_data, bytes):
file_bytes = document_data
else:
# Try to convert to bytes
try:
file_bytes = bytes(document_data)
except Exception:
# Last resort: convert to string then encode
file_bytes = str(document_data).encode('utf-8')
# Get MIME type (default to Excel)
mime_type = action_document.mimeType or "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
# Get file name
file_name = action_document.documentName or "data_export.xlsx"
# Ensure it has .xlsx extension
if not file_name.lower().endswith('.xlsx'):
# Remove any existing extension and add .xlsx
file_name = file_name.rsplit('.', 1)[0] + '.xlsx'
# Store file using component interface
file_item = services.interfaceDbComponent.createFile(
name=file_name,
mimeType=mime_type,
content=file_bytes
)
# Store file data
success = services.interfaceDbComponent.createFileData(file_item.id, file_bytes)
if not success:
logger.warning(f"Failed to store file data for {file_item.id}, but continuing...")
# Create ChatbotDocument
chat_document = ChatbotDocument(
id=str(uuid.uuid4()),
messageId=message_id,
fileId=file_item.id,
fileName=file_name,
fileSize=len(file_bytes),
mimeType=mime_type,
roundNumber=round_number,
taskNumber=0,
actionNumber=0
)
logger.info(f"Created ChatbotDocument {chat_document.id} from ActionDocument {file_name} (size: {len(file_bytes)} bytes)")
return chat_document
except Exception as e:
logger.error(f"Error creating ChatbotDocument from ActionDocument: {e}", exc_info=True)
raise
async def _bridge_chatbot_events(
event_stream,
event_manager,
workflow_id: str,
interface_db_chat
):
"""
Bridge legacy chatbot events to current event manager format.
Args:
event_stream: Async iterator from chatbot.stream_events()
event_manager: Event manager instance
workflow_id: Workflow ID
interface_db_chat: Database interface for storing messages
"""
try:
final_message_stored = False
async for event in event_stream:
event_type = event.get("type")
# Handle status updates - emit immediately for real-time UI feedback
# Note: Status updates are transient UI feedback, no need to persist them
if event_type == "status":
label = event.get("label", "")
if label:
# Emit the status event directly for real-time UI feedback
await event_manager.emit_event(
context_id=workflow_id,
event_type="chatdata",
data={
"type": "status",
"label": label.strip()
},
event_category="chat",
message="Status update",
step="status"
)
continue
# Handle token chunks for ChatGPT-like streaming (append to message as it's generated)
if event_type == "chunk":
content = event.get("content", "")
if content:
await event_manager.emit_event(
context_id=workflow_id,
event_type="chatdata",
data={"type": "chunk", "content": content},
event_category="chat",
message="Token chunk",
step="chunk"
)
continue
# Handle final response
if event_type == "final":
response_data = event.get("response", {})
chat_history = response_data.get("chat_history", [])
# The final message should already be stored by the memory/checkpointer
# We just need to emit the event, not store it again
# Check if the message was already stored by checking the workflow
workflow = interface_db_chat.getWorkflow(workflow_id)
if workflow and workflow.messages:
# Find the last assistant message in the workflow (already stored by memory)
last_message = workflow.messages[-1]
if last_message.role == "assistant":
final_message_stored = True
# Emit message event for the already-stored message
message_timestamp = parseTimestamp(last_message.publishedAt, default=getUtcTimestamp())
await event_manager.emit_event(
context_id=workflow_id,
event_type="chatdata",
data={
"type": "message",
"createdAt": message_timestamp,
"item": last_message.model_dump()
},
event_category="chat"
)
else:
# If no assistant message found, try to store from chat_history
assistant_message = None
for msg in reversed(chat_history):
if msg.get("role") == "assistant" and msg.get("content"):
assistant_message = msg
break
if assistant_message:
message_data = {
"id": f"msg_{workflow_id}_{getUtcTimestamp()}",
"workflowId": workflow_id,
"message": assistant_message.get("content", ""),
"role": "assistant",
"status": "last",
"sequenceNr": len(workflow.messages) + 1 if workflow.messages else 1,
"publishedAt": getUtcTimestamp(),
"roundNumber": workflow.currentRound if workflow else 1,
"taskNumber": 0,
"actionNumber": 0,
"success": True
}
try:
assistant_msg = interface_db_chat.createMessage(message_data, event_manager=event_manager)
final_message_stored = True
# Emit message event
message_timestamp = parseTimestamp(assistant_msg.publishedAt, default=getUtcTimestamp())
await event_manager.emit_event(
context_id=workflow_id,
event_type="chatdata",
data={
"type": "message",
"createdAt": message_timestamp,
"item": assistant_msg.model_dump()
},
event_category="chat"
)
except Exception as e:
logger.error(f"Error storing assistant message: {e}", exc_info=True)
# Emit completion event
await event_manager.emit_event(
context_id=workflow_id,
event_type="complete",
data={"workflowId": workflow_id},
event_category="workflow",
message="Chatbot-Verarbeitung abgeschlossen",
step="complete"
)
clear_workflow_allowed_providers(workflow_id)
# Update workflow status
try:
interface_db_chat.updateWorkflow(workflow_id, {
"status": "completed",
"lastActivity": getUtcTimestamp()
})
except Exception as e:
logger.error(f"Error updating workflow status: {e}", exc_info=True)
return
# Handle errors
if event_type == "error":
error_msg = event.get("message", "Unknown error")
await event_manager.emit_event(
context_id=workflow_id,
event_type="error",
data={"error": error_msg},
event_category="workflow",
message=f"Fehler beim Verarbeiten: {error_msg}",
step="error"
)
clear_workflow_allowed_providers(workflow_id)
# Update workflow status
try:
interface_db_chat.updateWorkflow(workflow_id, {
"status": "error",
"lastActivity": getUtcTimestamp()
})
except Exception as e:
logger.error(f"Error updating workflow status: {e}", exc_info=True)
return
# If stream ended without final message, store error message
if not final_message_stored:
logger.warning(f"Stream ended for workflow {workflow_id} without a final message")
try:
workflow = interface_db_chat.getWorkflow(workflow_id)
if workflow:
error_message_data = {
"id": f"msg_{workflow_id}_{getUtcTimestamp()}",
"workflowId": workflow_id,
"message": "Entschuldigung, ich konnte keine vollständige Antwort generieren. Bitte versuchen Sie es erneut.",
"role": "assistant",
"status": "last",
"sequenceNr": len(workflow.messages) + 1 if workflow.messages else 1,
"publishedAt": getUtcTimestamp(),
"roundNumber": workflow.currentRound if workflow else 1,
"taskNumber": 0,
"actionNumber": 0,
"success": False
}
error_msg = interface_db_chat.createMessage(error_message_data, event_manager=event_manager)
# Emit message event
message_timestamp = parseTimestamp(error_msg.publishedAt, default=getUtcTimestamp())
await event_manager.emit_event(
context_id=workflow_id,
event_type="chatdata",
data={
"type": "message",
"createdAt": message_timestamp,
"item": error_msg.model_dump()
},
event_category="chat"
)
except Exception as e:
logger.error(f"Error storing error message: {e}", exc_info=True)
# Emit completion event
await event_manager.emit_event(
context_id=workflow_id,
event_type="complete",
data={"workflowId": workflow_id},
event_category="workflow",
message="Chatbot-Verarbeitung abgeschlossen",
step="complete"
)
clear_workflow_allowed_providers(workflow_id)
# Update workflow status
try:
interface_db_chat.updateWorkflow(workflow_id, {
"status": "completed",
"lastActivity": getUtcTimestamp()
})
except Exception as e:
logger.error(f"Error updating workflow status: {e}", exc_info=True)
except Exception as e:
logger.error(f"Error in bridge_chatbot_events: {e}", exc_info=True)
# Emit error event
await event_manager.emit_event(
context_id=workflow_id,
event_type="error",
data={"error": str(e)},
event_category="workflow",
message=f"Fehler beim Verarbeiten: {str(e)}",
step="error"
)
clear_workflow_allowed_providers(workflow_id)
def _load_chatbot_config_sync(featureInstanceId: Optional[str]) -> ChatbotConfig:
"""
Load chatbot configuration from FeatureInstance (database). Sync version for use in executor.
Args:
featureInstanceId: Feature instance ID to load config from
Returns:
ChatbotConfig instance
"""
if not featureInstanceId:
raise ValueError("featureInstanceId is required to load chatbot config")
from modules.interfaces.interfaceDbApp import getRootInterface
from modules.interfaces.interfaceFeatures import getFeatureInterface
rootInterface = getRootInterface()
featureInterface = getFeatureInterface(rootInterface.db)
instance = featureInterface.getFeatureInstance(featureInstanceId)
if not instance:
raise ValueError(f"FeatureInstance {featureInstanceId} not found")
logger.info(f"Loading chatbot config from FeatureInstance {featureInstanceId}")
return load_chatbot_config_from_instance(instance)
def _warm_model_registry_cache_sync(
currentUser: "User",
mandateId: Optional[str] = None,
featureInstanceId: Optional[str] = None,
) -> None:
"""
Pre-warm getAvailableModels cache so planner/agent model selection is a cache hit.
Uses mandateId/featureInstanceId for faster RBAC (fewer roles to load).
Runs in executor to avoid blocking event loop.
"""
from modules.interfaces.interfaceDbApp import getRootInterface
from modules.aicore.aicoreModelRegistry import modelRegistry
root = getRootInterface()
modelRegistry.getAvailableModels(
currentUser=currentUser,
rbacInstance=root.rbac,
mandateId=mandateId,
featureInstanceId=featureInstanceId,
)
async def _load_chatbot_config(featureInstanceId: Optional[str]) -> ChatbotConfig:
"""
Load chatbot configuration from FeatureInstance (database).
Runs in thread pool to avoid blocking event loop on DB I/O.
Args:
featureInstanceId: Feature instance ID to load config from
Returns:
ChatbotConfig instance
Raises:
ValueError: If no featureInstanceId provided or instance not found
"""
if not featureInstanceId:
raise ValueError("featureInstanceId is required to load chatbot config")
try:
return await asyncio.to_thread(_load_chatbot_config_sync, featureInstanceId)
except ValueError:
raise
except Exception as e:
logger.error(f"Error loading config from FeatureInstance {featureInstanceId}: {e}")
raise
def _preflight_billing_check(services, mandateId: str, featureInstanceId: Optional[str]) -> None:
"""
Pre-flight billing check before starting chatbot AI processing.
Raises if mandate has insufficient balance or no providers allowed.
Uses services.billing from service center (REQUIRED_SERVICES).
Exception types from BillingService class (service center billing API).
"""
from modules.serviceCenter.services.serviceBilling import BillingService
billingService = services.billing
if not billingService:
raise BillingService.BillingContextError("Billing service not available for chatbot")
try:
balanceCheck = billingService.checkBalance(0.01)
if not balanceCheck.allowed:
mid = str(getattr(services, "mandateId", None) or mandateId or "")
from modules.serviceCenter.services.serviceBilling.billingExhaustedNotify import (
maybeEmailMandatePoolExhausted,
)
u = getattr(services, "user", None)
ulabel = (
(getattr(u, "email", None) or getattr(u, "username", None) or str(getattr(u, "id", "")))
if u is not None else ""
)
maybeEmailMandatePoolExhausted(
mid,
str(getattr(u, "id", "") if u is not None else ""),
ulabel,
float(balanceCheck.currentBalance or 0.0),
0.01,
)
raise BillingService.InsufficientBalanceException.fromBalanceCheck(
balanceCheck,
mid,
0.01,
)
rbacAllowedProviders = billingService.getallowedProviders()
if not rbacAllowedProviders:
raise BillingService.ProviderNotAllowedException(
provider="any",
message="Keine AI-Provider fuer Ihre Rolle freigegeben. Kontaktieren Sie Ihren Administrator."
)
except (BillingService.InsufficientBalanceException, BillingService.ProviderNotAllowedException):
raise
except Exception as e:
logger.error(f"Billing pre-flight failed: {e}")
raise BillingService.BillingContextError(f"Billing check failed: {e}")
def _create_chatbot_billing_callback(services, workflow_id: str):
"""
Create billing callback for AICenterChatModel. Records each AI call to poweron_billing.
Uses services.billing from service center (REQUIRED_SERVICES).
"""
from modules.datamodels.datamodelAi import AiCallResponse
billingService = services.billing
if not billingService:
return lambda _: None # No-op callback if billing unavailable
def _billing_callback(response: AiCallResponse) -> None:
if not response or getattr(response, "errorCount", 0) > 0:
return
priceCHF = getattr(response, "priceCHF", 0.0)
if not priceCHF or priceCHF <= 0:
return
provider = getattr(response, "provider", None) or "unknown"
modelName = getattr(response, "modelName", None) or "unknown"
try:
billingService.recordUsage(
priceCHF=priceCHF,
workflowId=workflow_id,
aicoreProvider=provider,
aicoreModel=modelName,
description=f"AI: {modelName}"
)
logger.debug(f"Chatbot billed: {priceCHF:.4f} CHF, provider={provider}, model={modelName}")
except Exception as e:
logger.error(f"Chatbot billing failed: {e}")
return _billing_callback
async def _processChatbotMessageLangGraph(
services,
currentUser: User,
workflowId: str,
userInput: UserInputRequest,
userMessageId: str,
featureInstanceId: Optional[str] = None,
config: Optional[ChatbotConfig] = None,
event_manager=None
):
"""
Process chatbot message using LangGraph.
Uses LangGraph workflow with AI center models and tools.
Args:
services: Service container
currentUser: Current user
workflowId: Workflow ID
userInput: User input request
userMessageId: User message ID
featureInstanceId: Optional feature instance ID for loading instance-specific config
event_manager: Event manager for streaming (passed from chatProcess)
"""
try:
# Start config + model cache warm in parallel (planner/agent need cache hit to avoid 23 s per call)
config_task = asyncio.create_task(_load_chatbot_config(featureInstanceId)) if config is None else None
warm_task = asyncio.create_task(asyncio.to_thread(
_warm_model_registry_cache_sync, currentUser, services.mandateId, featureInstanceId
))
# Emit first status immediately so stream feels responsive
await event_manager.emit_event(
context_id=workflowId,
event_type="chatdata",
data={"type": "status", "label": "Starte..."},
event_category="chat",
message="Status update",
step="status",
)
# Reuse interfaceDbChat from services (ChatObjects) - avoids duplicate DB init
interfaceDbChat = services.interfaceDbChat
# Reload workflow to get current messages
workflow = interfaceDbChat.getWorkflow(workflowId)
if not workflow:
logger.error(f"Workflow {workflowId} not found during processing")
await event_manager.emit_event(
context_id=workflowId,
event_type="error",
data={"error": f"Workflow {workflowId} nicht gefunden"},
event_category="workflow",
message=f"Workflow {workflowId} nicht gefunden",
step="error"
)
return
# Check if workflow was stopped before starting
if await _check_workflow_stopped(interfaceDbChat, workflowId):
logger.info(f"Workflow {workflowId} was stopped, aborting processing")
return
# Await config and model cache warm (planner gets cache hit, saves ~23 s)
if config_task is not None:
config = await config_task
await warm_task
# Replace {{DATE}} placeholder in system prompt
from datetime import datetime
system_prompt = config.systemPrompt.replace(
"{{DATE}}",
datetime.now().strftime("%d.%m.%Y")
)
# Create AI center model
operation_type = OperationTypeEnum[config.model.operationType]
processing_mode = ProcessingModeEnum[config.model.processingMode]
billing_callback = None
if services.mandateId:
billing_callback = _create_chatbot_billing_callback(services, workflowId)
allowed_providers = config.model.allowedProviders or None
if allowed_providers:
logger.info(f"Chatbot AICenterChatModel: restricting to providers {allowed_providers}")
model = AICenterChatModel(
user=currentUser,
operation_type=operation_type,
processing_mode=processing_mode,
billing_callback=billing_callback,
workflow_id=workflowId,
allowed_providers=allowed_providers,
mandate_id=services.mandateId,
feature_instance_id=featureInstanceId,
)
# Fast planner model (gpt-4o-mini etc.) for routing - saves ~1-2 s on first response
planner_model = AICenterChatModel(
user=currentUser,
operation_type=operation_type,
processing_mode=processing_mode,
billing_callback=billing_callback,
workflow_id=workflowId,
allowed_providers=allowed_providers,
prefer_fast_model=True,
mandate_id=services.mandateId,
feature_instance_id=featureInstanceId,
)
# Create memory/checkpointer (reuse interface to avoid extra DB init)
memory = DatabaseCheckpointer(
user=currentUser,
workflow_id=workflowId,
mandateId=services.mandateId,
featureInstanceId=featureInstanceId,
interface=interfaceDbChat,
)
# Create chatbot instance with config for dynamic tool configuration
# Use mainChatbot.getChatStreamingHelper() - resolves from service center (legacy hub.streaming has no getChatStreamingHelper)
from modules.features.chatbot.mainChatbot import getChatStreamingHelper
chat_streaming_helper = getChatStreamingHelper()
if not chat_streaming_helper:
logger.warning("ChatStreamingHelper not available from streaming service; message normalization may fail")
chatbot = await Chatbot.create(
model=model,
memory=memory,
system_prompt=system_prompt,
workflow_id=workflowId,
config=config,
event_manager=event_manager,
planner_model=planner_model,
chat_streaming_helper=chat_streaming_helper,
)
# Emit synthetic status for real-time UI feedback
await event_manager.emit_event(
context_id=workflowId,
event_type="chatdata",
data={"type": "status", "label": "Analysiere Anfrage..."},
event_category="chat",
message="Status update",
step="status"
)
# Stream events using chatbot
event_stream = chatbot.stream_events(
message=userInput.prompt,
chat_id=workflowId
)
# Bridge chatbot events to event manager
await _bridge_chatbot_events(
event_stream=event_stream,
event_manager=event_manager,
workflow_id=workflowId,
interface_db_chat=interfaceDbChat
)
# Schedule cleanup
await event_manager.cleanup(workflowId, delay=300.0) # 5 minutes delay
except Exception as e:
logger.error(f"Error processing chatbot message with LangGraph: {str(e)}", exc_info=True)
# Check if workflow was stopped - if so, don't store error message
if await _check_workflow_stopped(interfaceDbChat, workflowId):
logger.info(f"Workflow {workflowId} was stopped, not storing error message")
return
# Store error message
try:
workflow = interfaceDbChat.getWorkflow(workflowId)
if workflow and workflow.status == "stopped":
logger.info(f"Workflow {workflowId} was stopped, not storing error message")
return
errorMessageData = {
"id": f"msg_{uuid.uuid4()}",
"workflowId": workflowId,
"parentMessageId": userMessageId,
"message": f"Sorry, I encountered an error: {str(e)}",
"role": "assistant",
"status": "last",
"sequenceNr": len(workflow.messages) + 1 if workflow else 1,
"publishedAt": getUtcTimestamp(),
"success": False,
"roundNumber": workflow.currentRound if workflow else 1,
"taskNumber": 0,
"actionNumber": 0
}
errorMessage = interfaceDbChat.createMessage(errorMessageData, event_manager=event_manager)
# Emit message event
message_timestamp = parseTimestamp(errorMessage.publishedAt, default=getUtcTimestamp())
await event_manager.emit_event(
context_id=workflowId,
event_type="chatdata",
data={
"type": "message",
"createdAt": message_timestamp,
"item": errorMessage.model_dump()
},
event_category="chat"
)
# Update workflow status
if not await _check_workflow_stopped(interfaceDbChat, workflowId):
interfaceDbChat.updateWorkflow(workflowId, {
"status": "error",
"lastActivity": getUtcTimestamp()
})
# Schedule cleanup
await event_manager.cleanup(workflowId)
except Exception as storeError:
logger.error(f"Error storing error message: {storeError}")
async def _processChatbotMessage(
services,
workflowId: str,
userInput: UserInputRequest,
userMessageId: str
):
"""
DEPRECATED: Old chatbot processing implementation.
Kept for backward compatibility but redirects to LangGraph implementation.
"""
logger.warning("_processChatbotMessage is deprecated, using LangGraph implementation")
# Note: currentUser should be passed, but this function signature doesn't have it
# This is a deprecated function, so we'll need to get user from workflow or services
# For now, raise an error to indicate this needs to be fixed
raise NotImplementedError("_processChatbotMessage is deprecated and requires currentUser parameter")