diff --git a/modules/connectors/connectorPreprocessor.py b/modules/connectors/connectorPreprocessor.py index a1aa8368..189b6e2b 100644 --- a/modules/connectors/connectorPreprocessor.py +++ b/modules/connectors/connectorPreprocessor.py @@ -39,15 +39,17 @@ class PreprocessorConnector: logger.info("PreprocessorConnector initialized") - async def executeQuery(self, sql_query: str) -> str: + async def executeQuery(self, sql_query: str, return_json: bool = False): """ Execute a SQL query via the preprocessing API. Args: sql_query: SQL SELECT query to execute + return_json: If True, returns dict with 'text' and 'data' keys. If False, returns formatted string. Returns: - Formatted result string with query results + If return_json=False: Formatted result string with query results + If return_json=True: Dict with 'text' (formatted string) and 'data' (raw JSON data list) Raises: ValueError: If query is invalid or contains forbidden keywords @@ -57,16 +59,22 @@ class PreprocessorConnector: # Validate query validation_error = self._validateQuery(sql_query) if validation_error: + if return_json: + return {"text": validation_error, "data": []} return validation_error # Check configuration if not self.api_key: error_msg = "Error: PP_QUERY_API_KEY not configured" logger.error(error_msg) + if return_json: + return {"text": error_msg, "data": []} return error_msg if not self.base_url: error_msg = "Error: PP_QUERY_BASE_URL not configured" logger.error(error_msg) + if return_json: + return {"text": error_msg, "data": []} return error_msg # Make HTTP POST request to preprocessing API @@ -86,7 +94,10 @@ class PreprocessorConnector: # Parse response if not result.get("success"): error_message = result.get("message", "Unknown error") - return f"Query failed: {error_message}" + error_text = f"Query failed: {error_message}" + if return_json: + return {"text": error_text, "data": []} + return error_text # Format results data = result.get("data", []) @@ -97,7 +108,10 @@ class PreprocessorConnector: # Format results as string if not display_data: - return f"Query executed successfully. Returned {row_count} rows (no data)." + result_text = f"Query executed successfully. Returned {row_count} rows (no data)." + if return_json: + return {"text": result_text, "data": data} + return result_text # Format each row results = [] @@ -110,6 +124,8 @@ class PreprocessorConnector: + "\n".join(results) ) + if return_json: + return {"text": result_text, "data": data} return result_text except httpx.HTTPStatusError as e: diff --git a/modules/features/chatbot/mainChatbot.py b/modules/features/chatbot/mainChatbot.py index 8bcc73ce..1428ef41 100644 --- a/modules/features/chatbot/mainChatbot.py +++ b/modules/features/chatbot/mainChatbot.py @@ -11,16 +11,18 @@ import uuid import asyncio import re import datetime -from typing import Optional, Dict, Any +import base64 +from typing import Optional, Dict, Any, List -from modules.datamodels.datamodelChat import ChatWorkflow, UserInputRequest, ChatMessage, WorkflowModeEnum +from modules.datamodels.datamodelChat import ChatWorkflow, UserInputRequest, ChatDocument, WorkflowModeEnum from modules.datamodels.datamodelUam import User from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationTypeEnum, ProcessingModeEnum from modules.shared.timeUtils import getUtcTimestamp, parseTimestamp from modules.services import getInterface as getServices from modules.connectors.connectorPreprocessor import PreprocessorConnector from modules.features.chatbot.eventManager import get_event_manager -from modules.features.chatbot.eventManager import get_event_manager +from modules.datamodels.datamodelDocref import DocumentReferenceList, DocumentItemReference +from modules.workflows.methods.methodAi.methodAi import MethodAi logger = logging.getLogger(__name__) @@ -52,107 +54,6 @@ def _extractJsonFromResponse(content: str) -> Optional[dict]: return None -async def _generateSqlFromQuestion(services, userQuestion: str, context: str = "") -> str: - """Generate SQL query directly from user question when JSON parsing fails.""" - current_date = datetime.datetime.now().strftime("%d.%m.%Y") - sqlPrompt = f"""Heute ist der {current_date}. - -Du bist ein Chatbot der Althaus AG. -Du hast Zugriff auf ein SQL query tool, dass es dir ermöglicht, SQL SELECT Abfragen auf der Althaus AG Datenbank auszuführen. - -DATENBANK-INFORMATIONEN: -- Datenbankdatei: /data/database.db (SQLite) -- Tabellen: Artikel, Einkaufspreis, Lagerplatz_Artikel - -Die Datenbank besteht aus drei Tabellen, die über Beziehungen verbunden sind: -- **Artikel**: Enthält alle Produktinformationen (I_ID, Artikelbezeichnung, Artikelnummer, etc.) -- **Einkaufspreis**: Enthält Preisdaten (m_Artikel, EP_CHF) -- **Lagerplatz_Artikel**: Enthält Lagerbestands- und Lagerplatzinformationen (R_ARTIKEL, R_LAGERPLATZ, S_IST_BESTAND, etc.) -- **Beziehungen**: - - Artikel.I_ID = Einkaufspreis.m_Artikel - - Artikel.I_ID = Lagerplatz_Artikel.R_ARTIKEL - -TABELLEN-SCHEMA (WICHTIG - Spalten mit Leerzeichen/Sonderzeichen IMMER in doppelte Anführungszeichen setzen): - -Tabelle 1: Artikel -CREATE TABLE Artikel ( - "I_ID" INTEGER PRIMARY KEY, - "Artikelbeschrieb" TEXT, - "Artikelbezeichnung" TEXT, - "Artikelgruppe" TEXT, - "Artikelkategorie" TEXT, - "Artikelkürzel" TEXT, - "Artikelnummer" TEXT, - "Einheit" TEXT, - "Gesperrt" TEXT, - "Keywords" TEXT, - "Lieferant" TEXT, - "Warengruppe" TEXT -) - -Tabelle 2: Einkaufspreis -CREATE TABLE Einkaufspreis ( - "m_Artikel" INTEGER, - "EP_CHF" FLOAT -) - -Tabelle 3: Lagerplatz_Artikel -CREATE TABLE Lagerplatz_Artikel ( - "R_ARTIKEL" INTEGER, - "R_LAGERPLATZ" TEXT, - "S_BESTELLTER_BESTAND" INTEGER, - "S_IST_BESTAND" TEXT, - "S_MAXIMALBESTAND" INTEGER, - "S_MINDESTBESTAND" INTEGER, - "S_RESERVIERTER_BESTAND" INTEGER, - "S_SOLL_BESTAND" INTEGER -) - -SQL-HINWEISE: -- Verwende IMMER doppelte Anführungszeichen für Spaltennamen: "Artikelkürzel", "Artikelnummer", etc. -- Für Textsuche verwende LIKE mit Wildcards: WHERE a."Artikelbezeichnung" LIKE '%suchbegriff%' -- Für Preisabfragen: Nutze JOINs um auf e."EP_CHF" zuzugreifen -- Für Lagerbestände: Nutze JOINs um auf l."S_IST_BESTAND", l."S_SOLL_BESTAND", etc. zuzugreifen -- WICHTIG bei S_IST_BESTAND: Dieser Wert kann "Unbekannt" sein (TEXT), nicht nur Zahlen! Prüfe mit WHERE l."S_IST_BESTAND" != 'Unbekannt' wenn du nur numerische Werte willst -- Verwende Tabellenaliase (a für Artikel, e für Einkaufspreis, l für Lagerplatz_Artikel) für bessere Lesbarkeit - -SQL-AGGREGATIONEN: -Du kannst SQL-Aggregationsfunktionen verwenden, um statistische Auswertungen und Zusammenfassungen zu erstellen: -- COUNT() - Anzahl zählen -- SUM() - Summe berechnen (z.B. SUM(CASE WHEN l."S_IST_BESTAND" != 'Unbekannt' THEN CAST(l."S_IST_BESTAND" AS INTEGER) ELSE 0 END)) -- AVG() - Durchschnitt -- MIN() / MAX() - Minimum/Maximum -- GROUP BY - Gruppierung - -Beispiel für Lagerbestand-Aggregation: -SELECT SUM(CASE WHEN l."S_IST_BESTAND" != 'Unbekannt' THEN CAST(l."S_IST_BESTAND" AS INTEGER) ELSE 0 END) as "Gesamtbestand" -FROM Artikel a -LEFT JOIN Lagerplatz_Artikel l ON a."I_ID" = l."R_ARTIKEL" -WHERE a."Artikelbezeichnung" LIKE '%LED%' - -Generate a SQL SELECT query to answer this question: {userQuestion}{context} - -Return ONLY the SQL query, nothing else. Use SELECT queries only. Use double quotes for all column names.""" - - sqlRequest = AiCallRequest( - prompt=sqlPrompt, - options=AiCallOptions( - resultFormat="txt", - operationType=OperationTypeEnum.DATA_ANALYSE, - processingMode=ProcessingModeEnum.BASIC - ) - ) - - sqlResponse = await services.ai.callAi(sqlRequest) - # Extract SQL from response (might be wrapped in markdown) - sql = sqlResponse.content.strip() - # Remove markdown code blocks if present - if sql.startswith("```"): - sql = re.sub(r'^```(?:sql)?\s*', '', sql) - sql = re.sub(r'\s*```$', '', sql) - return sql.strip() - - async def _generate_conversation_name( services, userPrompt: str, @@ -170,39 +71,18 @@ async def _generate_conversation_name( Short conversation name (max 60 characters) """ try: - # Truncate prompt if too long to avoid excessive token usage truncated_prompt = userPrompt[:200] if len(userPrompt) > 200 else userPrompt - # Create a prompt that detects the input language and generates title in the same language - name_prompt = f"""You are an expert in professional business communication. Your task is to create a professional conversation title. + name_prompt = f"""Create a professional conversation title in THE SAME LANGUAGE as the user's question. -CRITICAL: The title MUST be in the SAME LANGUAGE as the user's question. If the question is in German, respond in German. If in French, respond in French. If in English, respond in English. +Question: "{truncated_prompt}" -USER QUESTION: "{truncated_prompt}" - -TASK: -1. Identify the language of the user's question -2. Identify the main topic of the question -3. Create an elegant, professional title in THE SAME LANGUAGE as the question (max 60 characters, no punctuation) - -RULES: -- Title MUST match the language of the input question exactly -- DO NOT translate to another language -- DO NOT just copy words from the question -- Use professional language appropriate to the detected language -- No punctuation (?, !, .) - -EXAMPLES: -- Input (German): "wie viele leds haben wir auf lager?" → Title (German): "LED Lagerbestand" -- Input (German): "wie viel kosten die leds ungefähr? was ist die preisrange?" → Title (German): "LED Preisübersicht" -- Input (French): "combien de leds avons-nous en stock?" → Title (French): "Stock de LED" -- Input (French): "combien coûtent les leds environ?" → Title (French): "Aperçu des Prix LED" -- Input (English): "how many leds do we have in stock?" → Title (English): "LED Inventory Inquiry" -- Input (English): "how much do the leds cost approximately?" → Title (English): "LED Price Overview" - -Respond ONLY with the title in the same language as the question, nothing else:""" +Rules: +- Title MUST be in the same language as the question (German→German, French→French, English→English) +- Max 60 characters, no punctuation (?, !, .) +- Professional and concise +- Respond ONLY with the title, nothing else""" - # Ensure AI services are initialized before calling await services.ai.ensureAiObjectsInitialized() nameRequest = AiCallRequest( @@ -211,131 +91,32 @@ Respond ONLY with the title in the same language as the question, nothing else:" resultFormat="txt", operationType=OperationTypeEnum.DATA_GENERATE, processingMode=ProcessingModeEnum.DETAILED, - temperature=0.7 # Balanced temperature for creativity and consistency + temperature=0.7 ) ) nameResponse = await services.ai.callAi(nameRequest) - raw_response = nameResponse.content.strip() + generated_name = nameResponse.content.strip() - logger.info(f"AI name generation raw response (full): {raw_response}") - - # Extract title from response - look for "TITEL:" or "TITLE:" marker first - generated_name = raw_response - - # Try to extract title after "TITEL:" or "TITLE:" marker (case insensitive) - title_markers = ["TITEL:", "TITLE:", "Titre:", "Titel:", "TITRE:", "titel:", "title:"] - for marker in title_markers: - if marker.lower() in raw_response.lower(): - # Find the marker (case insensitive) - marker_pos = raw_response.lower().find(marker.lower()) - if marker_pos >= 0: - generated_name = raw_response[marker_pos + len(marker):].strip() - # Take only the first line after the marker - generated_name = generated_name.split('\n')[0].strip() - logger.info(f"Extracted title after marker '{marker}': {generated_name}") - break - - # If no marker found, try to find the last meaningful line - if generated_name == raw_response or len(generated_name) > 60: - lines = [line.strip() for line in raw_response.split("\n") if line.strip()] - # Look for a line that looks like a title (not too long, no colons except at start) - for line in reversed(lines): - # Skip lines that are clearly explanations or steps - if any(skip in line.lower() for skip in ["schritt", "step", "étape", "beispiel", "example", "wichtig", "important"]): - continue - # Take lines that are reasonable title length and don't have colons in the middle - if 3 <= len(line) <= 60 and (":" not in line or line.startswith(":")): - generated_name = line - logger.info(f"Extracted title from last meaningful line: {generated_name}") - break - - # Remove common prefixes/suffixes and clean up - generated_name = re.sub(r'^(Title|Titel|Titre|Name|Name:):\s*', '', generated_name, flags=re.IGNORECASE) - generated_name = re.sub(r'^["\']|["\']$', '', generated_name) # Remove surrounding quotes - generated_name = re.sub(r'^#+\s*', '', generated_name) # Remove markdown headers - generated_name = re.sub(r'^```.*?\n', '', generated_name, flags=re.DOTALL) # Remove code blocks start - generated_name = re.sub(r'\n.*?```$', '', generated_name, flags=re.DOTALL) # Remove code blocks end - - # Take only the first line (in case AI added explanations) + # Extract first line and clean up generated_name = generated_name.split('\n')[0].strip() + generated_name = re.sub(r'^(Title|Titel|Titre|Name|Name:):\s*', '', generated_name, flags=re.IGNORECASE) + generated_name = re.sub(r'^["\']|["\']$', '', generated_name) + generated_name = re.sub(r'[?!.]+$', '', generated_name) # Remove trailing punctuation - # Remove ALL question marks, exclamation marks, and trailing periods - generated_name = re.sub(r'[?!]+', '', generated_name) # Remove all ? and ! - generated_name = re.sub(r'\.+$', '', generated_name) # Remove trailing periods - - generated_name = generated_name.strip() - - # Ensure proper title case (capitalize first letter of each word, but handle acronyms) + # Apply title case if generated_name: - # Split into words and capitalize properly words = generated_name.split() capitalized_words = [] for word in words: - # Keep acronyms (all caps) as-is, otherwise capitalize first letter if word.isupper() and len(word) > 1: - capitalized_words.append(word) + capitalized_words.append(word) # Keep acronyms else: - # Capitalize first letter, lowercase the rest capitalized_words.append(word.capitalize()) - generated_name = " ".join(capitalized_words) + generated_name = " ".join(capitalized_words).strip() - # Validate: should be a proper title, not just copied words - # Check if it's too similar to the original (just first words) - prompt_words = set(userPrompt.lower().split()[:6]) - name_words = set(generated_name.lower().split()) - similarity = len(prompt_words.intersection(name_words)) / max(len(prompt_words), 1) - - # If too similar (more than 80% same words), it's probably just copied - if similarity > 0.8 and len(generated_name.split()) <= len(userPrompt.split()[:5]): - logger.warning(f"Generated name too similar to input, regenerating: {generated_name}") - # Try once more with a more explicit prompt - retry_prompt = f"""CRITICAL: Create a professional title in THE SAME LANGUAGE as the user's question. - -Question: {truncated_prompt} - -The title MUST be in the exact same language as the question above. -- If the question is in German, respond in German -- If the question is in French, respond in French -- If the question is in English, respond in English - -Create a professional title (max 60 characters, no punctuation) that summarizes the topic. - -Title (in the same language as the question):""" - # Ensure AI services are initialized (should already be, but be safe) - await services.ai.ensureAiObjectsInitialized() - - retryRequest = AiCallRequest( - prompt=retry_prompt, - options=AiCallOptions( - resultFormat="txt", - operationType=OperationTypeEnum.DATA_GENERATE, - processingMode=ProcessingModeEnum.DETAILED, - temperature=0.8 - ) - ) - retryResponse = await services.ai.callAi(retryRequest) - generated_name = retryResponse.content.strip() - generated_name = re.sub(r'^(Title|Titel|Titre|Name|Name:):\s*', '', generated_name, flags=re.IGNORECASE) - generated_name = re.sub(r'^["\']|["\']$', '', generated_name) - generated_name = generated_name.split('\n')[0].strip() - generated_name = re.sub(r'[?!]+', '', generated_name) # Remove all ? and ! - generated_name = re.sub(r'\.+$', '', generated_name) # Remove trailing periods - # Apply title case - if generated_name: - words = generated_name.split() - capitalized_words = [] - for word in words: - if word.isupper() and len(word) > 1: - capitalized_words.append(word) - else: - capitalized_words.append(word.capitalize()) - generated_name = " ".join(capitalized_words).strip() - - # Final validation and length check + # Validate and truncate if needed if not generated_name or len(generated_name) < 3: - logger.error(f"Generated name too short or empty: '{generated_name}'") - # Use default title based on language if userLanguage == "de": generated_name = "Chatbot Konversation" elif userLanguage == "fr": @@ -343,26 +124,16 @@ Title (in the same language as the question):""" else: generated_name = "Chatbot Conversation" - # Final cleanup: ensure no question marks or trailing punctuation - generated_name = re.sub(r'[?!]+', '', generated_name) - generated_name = re.sub(r'\.+$', '', generated_name).strip() - - # Final length check and truncation if len(generated_name) > 60: - # Try to truncate at word boundary truncated = generated_name[:57] last_space = truncated.rfind(' ') - if last_space > 30: # Only truncate at word boundary if reasonable - generated_name = truncated[:last_space] + "..." - else: - generated_name = truncated + "..." + generated_name = truncated[:last_space] + "..." if last_space > 30 else truncated + "..." - logger.info(f"Generated conversation name: '{generated_name}' from prompt: '{userPrompt[:50]}...'") + logger.info(f"Generated conversation name: '{generated_name}'") return generated_name except Exception as e: logger.error(f"Error generating conversation name: {e}", exc_info=True) - # Use default title based on language if userLanguage == "de": return "Chatbot Konversation" elif userLanguage == "fr": @@ -456,6 +227,39 @@ async def chatProcess( # Reload workflow to get current message count workflow = interfaceDbChat.getWorkflow(workflow.id) + # Process uploaded files and create ChatDocuments + user_documents = [] + if userInput.listFileId and len(userInput.listFileId) > 0: + logger.info(f"Processing {len(userInput.listFileId)} uploaded file(s) for user message") + for fileId in userInput.listFileId: + try: + # Get file info from chat service + fileInfo = services.chat.getFileInfo(fileId) + if not fileInfo: + logger.warning(f"No file info found for file ID {fileId}") + continue + + originalFileName = fileInfo.get("fileName", "unknown") + originalMimeType = fileInfo.get("mimeType", "application/octet-stream") + fileSizeToUse = fileInfo.get("size", 0) + + # Create ChatDocument for the file + document = ChatDocument( + id=str(uuid.uuid4()), + messageId="", # Will be set when message is created + fileId=fileId, + fileName=originalFileName, + fileSize=fileSizeToUse, + mimeType=originalMimeType, + roundNumber=workflow.currentRound, + taskNumber=0, + actionNumber=0 + ) + user_documents.append(document) + logger.info(f"Created ChatDocument for file {fileId} -> {originalFileName}") + except Exception as e: + logger.error(f"Error processing file ID {fileId}: {e}", exc_info=True) + # Store user message userMessageData = { "id": f"msg_{uuid.uuid4()}", @@ -469,8 +273,18 @@ async def chatProcess( "taskNumber": 0, "actionNumber": 0 } + + # Add documents to message if any + if user_documents: + # Update messageId in all documents + for doc in user_documents: + doc.messageId = userMessageData["id"] + userMessageData["documents"] = [doc.dict() for doc in user_documents] + userMessageData["documentsLabel"] = "Uploaded Files" + logger.info(f"Attaching {len(user_documents)} document(s) to user message") + userMessage = interfaceDbChat.createMessage(userMessageData) - logger.info(f"Stored user message: {userMessage.id}") + logger.info(f"Stored user message: {userMessage.id} with {len(user_documents)} document(s)") # Emit message event for streaming (exact chatData format) event_manager = get_event_manager() @@ -530,6 +344,212 @@ async def _check_workflow_status(interfaceDbChat, workflowId: str, event_manager return False +async def _convert_file_ids_to_document_references( + services, + file_ids: List[str] +) -> DocumentReferenceList: + """ + Convert file IDs to DocumentReferenceList for use with ai.process. + + Args: + services: Services instance + file_ids: List of file IDs to convert + + Returns: + DocumentReferenceList with docItem references + """ + references = [] + + # Get workflow to search for ChatDocuments + workflow = services.workflow + if not workflow: + logger.error("Cannot convert file IDs to document references: workflow not set in services") + return DocumentReferenceList(references=[]) + + for file_id in file_ids: + try: + # Get file info to verify it exists + file_info = services.chat.getFileInfo(file_id) + if not file_info: + logger.warning(f"File {file_id} not found, skipping") + continue + + # Find ChatDocument that has this fileId + document_id = None + if workflow.messages: + for message in workflow.messages: + if hasattr(message, 'documents') and message.documents: + for doc in message.documents: + if getattr(doc, 'fileId', None) == file_id: + document_id = getattr(doc, 'id', None) + break + if document_id: + break + + # Search database if not found in messages + if not document_id: + try: + from modules.datamodels.datamodelChat import ChatDocument + from modules.shared.databaseUtils import getRecordsetWithRBAC + documents = getRecordsetWithRBAC( + services.interfaceDbChat.db, + ChatDocument, + services.currentUser, + recordFilter={"fileId": file_id} + ) + if documents: + workflow_message_ids = {msg.id for msg in workflow.messages} if workflow.messages else set() + for doc in documents: + if doc.get("messageId") in workflow_message_ids: + document_id = doc.get("id") + break + except Exception: + pass # Fallback to fileId + + # Use ChatDocument ID if found, otherwise use fileId as fallback + ref = DocumentItemReference(documentId=document_id if document_id else file_id) + references.append(ref) + except Exception as e: + logger.error(f"Error converting fileId {file_id}: {e}", exc_info=True) + + logger.info(f"Converted {len(references)} file IDs to document references") + return DocumentReferenceList(references=references) + + +def _format_query_results_as_lookup(query_data: Dict[str, List[Dict]]) -> str: + """ + Format database query results as JSON lookup table for Excel matching. + Converts query result data into structured JSON format: {Artikelnummer: {columns...}} + + Args: + query_data: Dict with query_key -> list of row dicts (from connector with return_json=True) + + Returns: + JSON string formatted as lookup table + """ + lookup_table = {} + + for query_key, rows in query_data.items(): + if query_key == "error" or not rows: + logger.warning(f"Skipping query key '{query_key}' - no rows or error") + continue + + logger.info(f"Processing {len(rows)} rows from query '{query_key}'") + + for row in rows: + if not isinstance(row, dict): + logger.warning(f"Skipping non-dict row: {type(row)}") + continue + + # Find Artikelnummer field (case-insensitive) + artikelnummer = None + for key in row.keys(): + if key.lower() in ['artikelnummer', 'artikel_nummer', 'art_nr', 'part_number']: + artikelnummer = str(row[key]) + break + + if artikelnummer: + lookup_table[artikelnummer] = row + else: + logger.warning(f"No Artikelnummer found in row with keys: {list(row.keys())}") + + logger.info(f"Generated lookup table with {len(lookup_table)} entries") + if lookup_table: + sample_keys = list(lookup_table.keys())[:3] + logger.info(f"Sample Artikelnummern: {sample_keys}") + if sample_keys: + sample_entry = lookup_table[sample_keys[0]] + logger.info(f"Sample entry keys: {list(sample_entry.keys())}") + + return json.dumps(lookup_table, ensure_ascii=False, indent=2) + + +async def _create_chat_document_from_action_document( + services, + action_document, + message_id: str, + workflow_id: str, + round_number: int +) -> ChatDocument: + """ + Create a ChatDocument from an ActionDocument by storing the file data. + + Args: + services: Services instance + action_document: ActionDocument from ai.process result + message_id: ID of the message to attach to + workflow_id: Workflow ID + round_number: Round number + + Returns: + ChatDocument instance + """ + try: + # Get file data (could be bytes or string) + document_data = action_document.documentData + + # Convert to bytes if needed + if isinstance(document_data, str): + # Check if it's base64 encoded + try: + # Try to decode as base64 first + file_bytes = base64.b64decode(document_data) + except Exception: + # Not base64, encode as UTF-8 + file_bytes = document_data.encode('utf-8') + elif isinstance(document_data, bytes): + file_bytes = document_data + else: + # Try to convert to bytes + try: + file_bytes = bytes(document_data) + except Exception: + # Last resort: convert to string then encode + file_bytes = str(document_data).encode('utf-8') + + # Get MIME type (default to Excel) + mime_type = action_document.mimeType or "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" + + # Get file name + file_name = action_document.documentName or "data_export.xlsx" + # Ensure it has .xlsx extension + if not file_name.lower().endswith('.xlsx'): + # Remove any existing extension and add .xlsx + file_name = file_name.rsplit('.', 1)[0] + '.xlsx' + + # Store file using component interface + file_item = services.interfaceDbComponent.createFile( + name=file_name, + mimeType=mime_type, + content=file_bytes + ) + + # Store file data + success = services.interfaceDbComponent.createFileData(file_item.id, file_bytes) + if not success: + logger.warning(f"Failed to store file data for {file_item.id}, but continuing...") + + # Create ChatDocument + chat_document = ChatDocument( + id=str(uuid.uuid4()), + messageId=message_id, + fileId=file_item.id, + fileName=file_name, + fileSize=len(file_bytes), + mimeType=mime_type, + roundNumber=round_number, + taskNumber=0, + actionNumber=0 + ) + + logger.info(f"Created ChatDocument {chat_document.id} from ActionDocument {file_name} (size: {len(file_bytes)} bytes)") + return chat_document + + except Exception as e: + logger.error(f"Error creating ChatDocument from ActionDocument: {e}", exc_info=True) + raise + + async def _processChatbotMessage( services, workflowId: str, @@ -538,7 +558,7 @@ async def _processChatbotMessage( ): """ Process chatbot message in background. - Executes the actual chatbot logic. + Simplified 4-step flow: Analysis → Query → Excel → Answer """ event_manager = get_event_manager() @@ -549,44 +569,16 @@ async def _processChatbotMessage( workflow = interfaceDbChat.getWorkflow(workflowId) if not workflow: logger.error(f"Workflow {workflowId} not found during processing") - await event_manager.emit_event( - workflowId, - "error", - f"Workflow {workflowId} nicht gefunden", - "error" - ) + await event_manager.emit_event(workflowId, "error", f"Workflow {workflowId} nicht gefunden", "error") return - # Check if workflow was stopped if await _check_workflow_status(interfaceDbChat, workflowId, event_manager): return - # Helper function to process streaming messages from AI responses and create logs - def process_streaming_messages(ai_response_data: Dict[str, Any], progress: Optional[float] = None): - """Process streaming messages from AI response and create logs.""" - if not ai_response_data: - return - - streaming_messages = ai_response_data.get("streamingMessages", []) - if isinstance(streaming_messages, list): - workflow = interfaceDbChat.getWorkflow(workflowId) - if workflow: - for msg in streaming_messages: - if isinstance(msg, str) and msg.strip(): - try: - services.chat.storeLog(workflow, { - "message": msg.strip(), - "type": "info", - "status": "running", - "progress": progress if progress is not None else 0.5 - }) - except Exception as e: - logger.warning(f"Error creating log from streaming message: {e}") - # Build conversation context from history context = "" if workflow.messages: - recent_messages = workflow.messages[-5:] # Last 5 messages + recent_messages = workflow.messages[-5:] context = "\n\nPrevious conversation:\n" for msg in recent_messages: if msg.role == "user": @@ -594,403 +586,138 @@ async def _processChatbotMessage( elif msg.role == "assistant": context += f"Assistant: {msg.message}\n" - # Ensure AI service is initialized await services.ai.ensureAiObjectsInitialized() - - # Get current date for prompts current_date = datetime.datetime.now().strftime("%d.%m.%Y") - # Check if workflow was stopped before starting analysis if await _check_workflow_status(interfaceDbChat, workflowId, event_manager): return - # Step 1: AI Analysis - create detailed plan for all database queries needed - logger.info("Step 1: Analyzing user input and creating query plan...") + # Step 1: Unified Analysis + logger.info("Step 1: Analyzing user input and attached files...") + await event_manager.emit_event(workflowId, "status", "Analysiere Benutzeranfrage und angehängte Dateien...", "analysis") - await event_manager.emit_event( - workflowId, - "status", - "Analysiere Benutzeranfrage und erstelle Abfrageplan...", - "analysis" - ) + # Prepare document references if files are attached + has_uploaded_files = bool(userInput.listFileId and len(userInput.listFileId) > 0) + document_references = None + + if has_uploaded_files: + workflow = interfaceDbChat.getWorkflow(workflowId) + if workflow: + services.workflow = workflow + document_references = await _convert_file_ids_to_document_references(services, userInput.listFileId) + logger.info(f"Prepared {len(document_references.references)} document references for analysis") + + # Build analysis prompt + file_context = "\n\nIMPORTANT - ATTACHED FILES:\nIf files are attached, read them completely. For Excel files, extract ALL article identifiers (Artikelnummer) from the file and include them directly in the SQL query using WHERE Artikelnummer IN ('ART1', 'ART2', ...). DO NOT reference ExcelFile table - it doesn't exist!\n" if has_uploaded_files else "" analysisPrompt = f"""Heute ist der {current_date}. -Du bist ein Chatbot der Althaus AG. -Du hast Zugriff auf ein SQL query tool, dass es dir ermöglicht, SQL SELECT Abfragen auf der Althaus AG Datenbank auszuführen. +Du bist ein Chatbot der Althaus AG mit Zugriff auf eine SQL-Datenbank. -Du kannst dem Nutzer bei allen Aufgaben helfen, die du mit SQL Abfragen erledigen kannst. - -DATENBANK-INFORMATIONEN: -- Datenbankdatei: /data/database.db (SQLite) -- Tabellen: Artikel, Einkaufspreis, Lagerplatz_Artikel - -Die Datenbank besteht aus drei Tabellen, die über Beziehungen verbunden sind: -- **Artikel**: Enthält alle Produktinformationen (I_ID, Artikelbezeichnung, Artikelnummer, etc.) -- **Einkaufspreis**: Enthält Preisdaten (m_Artikel, EP_CHF) -- **Lagerplatz_Artikel**: Enthält Lagerbestands- und Lagerplatzinformationen (R_ARTIKEL, R_LAGERPLATZ, S_IST_BESTAND, etc.) -- **Beziehungen**: - - Artikel.I_ID = Einkaufspreis.m_Artikel - - Artikel.I_ID = Lagerplatz_Artikel.R_ARTIKEL - -TABELLEN-SCHEMA (WICHTIG - Spalten mit Leerzeichen/Sonderzeichen IMMER in doppelte Anführungszeichen setzen): - -Tabelle 1: Artikel -CREATE TABLE Artikel ( - "I_ID" INTEGER PRIMARY KEY, - "Artikelbeschrieb" TEXT, - "Artikelbezeichnung" TEXT, - "Artikelgruppe" TEXT, - "Artikelkategorie" TEXT, - "Artikelkürzel" TEXT, - "Artikelnummer" TEXT, - "Einheit" TEXT, - "Gesperrt" TEXT, - "Keywords" TEXT, - "Lieferant" TEXT, - "Warengruppe" TEXT -) - -Tabelle 2: Einkaufspreis -CREATE TABLE Einkaufspreis ( - "m_Artikel" INTEGER, - "EP_CHF" FLOAT -) - -Tabelle 3: Lagerplatz_Artikel -CREATE TABLE Lagerplatz_Artikel ( - "R_ARTIKEL" INTEGER, - "R_LAGERPLATZ" TEXT, - "S_BESTELLTER_BESTAND" INTEGER, - "S_IST_BESTAND" TEXT, - "S_MAXIMALBESTAND" INTEGER, - "S_MINDESTBESTAND" INTEGER, - "S_RESERVIERTER_BESTAND" INTEGER, - "S_SOLL_BESTAND" INTEGER -) +DATENBANK-SCHEMA: +- Artikel: I_ID, Artikelnummer, Artikelbezeichnung, Lieferant, etc. +- Einkaufspreis: m_Artikel, EP_CHF +- Lagerplatz_Artikel: R_ARTIKEL, S_IST_BESTAND, etc. +- Beziehungen: Artikel.I_ID = Einkaufspreis.m_Artikel = Lagerplatz_Artikel.R_ARTIKEL SQL-HINWEISE: -- Verwende IMMER doppelte Anführungszeichen für Spaltennamen: "Artikelkürzel", "Artikelnummer", etc. -- Für Textsuche verwende LIKE mit Wildcards: WHERE a."Artikelbezeichnung" LIKE '%suchbegriff%' -- Für Preisabfragen: Nutze JOINs um auf e."EP_CHF" zuzugreifen -- Für Lagerbestände: Nutze JOINs um auf l."S_IST_BESTAND", l."S_SOLL_BESTAND", etc. zuzugreifen -- WICHTIG bei S_IST_BESTAND: Dieser Wert kann "Unbekannt" sein (TEXT), nicht nur Zahlen! Prüfe mit WHERE l."S_IST_BESTAND" != 'Unbekannt' wenn du nur numerische Werte willst -- Verwende Tabellenaliase (a für Artikel, e für Einkaufspreis, l für Lagerplatz_Artikel) für bessere Lesbarkeit +- IMMER doppelte Anführungszeichen für Spaltennamen: "Artikelnummer", "S_IST_BESTAND" +- S_IST_BESTAND kann "Unbekannt" sein (TEXT), prüfe mit WHERE l."S_IST_BESTAND" != 'Unbekannt' +- Verwende Tabellenaliase: a für Artikel, e für Einkaufspreis, l für Lagerplatz_Artikel +- WICHTIG: Wenn Excel-Dateien angehängt sind, extrahiere Artikelnummern aus der Datei und verwende sie direkt im SQL: WHERE a."Artikelnummer" IN ('ART1', 'ART2', 'ART3') +- VERBOTEN: Verwende NICHT "SELECT ... FROM ExcelFile" - diese Tabelle existiert nicht! -SQL-AGGREGATIONEN: -Du kannst SQL-Aggregationsfunktionen verwenden, um statistische Auswertungen und Zusammenfassungen zu erstellen: -- COUNT() - Anzahl zählen -- SUM() - Summe berechnen (z.B. SUM(CASE WHEN l."S_IST_BESTAND" != 'Unbekannt' THEN CAST(l."S_IST_BESTAND" AS INTEGER) ELSE 0 END)) -- AVG() - Durchschnitt -- MIN() / MAX() - Minimum/Maximum -- GROUP BY - Gruppierung +{file_context}User question: {userInput.prompt}{context} -STREAMING-UPDATES: -WICHTIG: Du kannst mehrere Tools parallel aufrufen! Wenn es sinnvoll ist, kannst du: -- Mehrere SQL-Abfragen gleichzeitig ausführen (z.B. verschiedene Suchkriterien parallel abfragen) -- SQL-Abfragen und Tavily-Suchen kombinieren (z.B. Artikel in der DB finden UND gleichzeitig im Internet nach Produktinformationen suchen) -- Verschiedene Analysen parallel durchführen - -Nutze diese Parallelisierung, um effizienter zu arbeiten und dem Nutzer schneller umfassende Antworten zu geben. - -Du hast Zugriff auf das Tool "send_streaming_message", mit dem du dem Nutzer kurze Status-Updates senden kannst, während du an seiner Anfrage arbeitest. Nutze dieses Tool, um den Nutzer über deine aktuellen Aktivitäten zu informieren. Du kannst es parallel zu anderen Tools aufrufen. - -Beispiele für Status-Updates: -- "Analysiere Benutzeranfrage..." -- "Bestimme benötigte Datenbankabfragen..." -- "Identifiziere relevante Suchbegriffe..." -- "Erstelle Abfrageplan mit allen benötigten Queries..." -- "Durchsuche Datenbank nach Lampen, LED, Leuchten, und Ähnlichem.." -- "Suche im Internet nach Produktinformationen zu [Produktname].." -- "Analysiere Suchergebnisse und bereite Antwort vor.." -- "Führe erweiterte Datenbankabfrage durch.." - -Sende diese Updates sehr sehr häufig, damit der Nutzer weiss, was du gerade machst. Es ist ganz wichtig, dass du den Nutzer so oft es geht auf dem Laufenden hältst. -Die Beispiele oben sind nur Beispiele. Wenn möglich, sei spezifischer und kreativer, damit der Nutzer genau weiss, was du gerade tust. -Falls es möglich ist, gib in den Status-Updates auch schon Zwischenergebnisse an, z.B. "Habe 20 Artikel gefunden, suche weiter nach ähnlichen Begriffen". -Du kannst auch gerne deinen Denkenprozess in den Status-Updates beschreiben, z.B. "Überlege, welche Suchbegriffe ich noch verwenden könnte". -Es ist super wichtig, dass wir dem Nutzer laufend Updates geben, damit er nicht das Gefühl hat, dass er zu lange warten muss. -Wichtig: Sende auch eine Status-Update, wenn du die Zusammenfassende Antwort an den Nutzer schreibst, z.B. "Formuliere finale Antwort mit übersichtlicher Tabelle..". - -Analyze the user's question: {userInput.prompt}{context} - -Determine what actions are needed to answer this question. Return ONLY a valid JSON object: +Return ONLY valid JSON: {{ "needsDatabaseQuery": boolean, "needsWebResearch": boolean, - "sqlQuery": string (if needsDatabaseQuery is true, generate the SQL query with correct column names using double quotes), - "reasoning": string, - "streamingMessages": array of strings (status updates to send to user via send_streaming_message tool, e.g. ["Analysiere Benutzeranfrage...", "Bestimme benötigte Datenbankabfragen..."]) -}} - -WICHTIG für SQL-Queries: -- Verwende IMMER doppelte Anführungszeichen für Spaltennamen -- Für Lagerbestand: Verwende l."S_IST_BESTAND" (NICHT "Bestände"!) -- Bei Aggregationen mit S_IST_BESTAND: SUM(CASE WHEN l."S_IST_BESTAND" != 'Unbekannt' THEN CAST(l."S_IST_BESTAND" AS INTEGER) ELSE 0 END) - -Only use SELECT queries for database. Return valid JSON only.""" - - analysisRequest = AiCallRequest( - prompt=analysisPrompt, - options=AiCallOptions( - resultFormat="json", - operationType=OperationTypeEnum.DATA_ANALYSE, - processingMode=ProcessingModeEnum.BASIC - ) - ) + "needsExcelFile": boolean, + "excelAction": "create" | "update" | null, + "excelFileName": string | null, + "sqlQuery": string (ready-to-execute SQL with double quotes for column names. If Excel file attached, extract Artikelnummern and use WHERE a."Artikelnummer" IN ('ART1', 'ART2', ...)), + "requestedColumns": array of strings (columns to add for Excel updates, e.g. ["Lieferant", "Lagerbestand"]), + "reasoning": string +}}""" - analysisResponse = await services.ai.callAi(analysisRequest) + # Single AI call for analysis + method_ai = MethodAi(services) + analysis_result = await method_ai.process({ + "aiPrompt": analysisPrompt, + "documentList": document_references if has_uploaded_files else None, + "resultType": "json", + "simpleMode": True + }) - # Log raw response for debugging - logger.debug(f"Raw AI analysis response: {analysisResponse.content[:500]}") + # Extract content from ActionResult (in simple mode, content is in first document's documentData) + analysis_content = None + if analysis_result.success and analysis_result.documents: + analysis_content = analysis_result.documents[0].documentData + if isinstance(analysis_content, bytes): + analysis_content = analysis_content.decode('utf-8') - # Parse analysis result with improved JSON extraction - analysis = _extractJsonFromResponse(analysisResponse.content) - - # Process streaming messages from AI (create logs) - process_streaming_messages(analysis, progress=0.1) - - if analysis: - needsDatabaseQuery = analysis.get("needsDatabaseQuery", False) - needsWebResearch = analysis.get("needsWebResearch", False) - queryPlan = analysis.get("queryPlan", {}) - reasoning = analysis.get("reasoning", "") + if not analysis_content: + logger.warning("Analysis failed, using fallback") + analysis = {} else: - # JSON parsing failed - fallback - logger.warning("Failed to parse analysis JSON, using fallback") - question_lower = userInput.prompt.lower() - db_keywords = ["stock", "lager", "bestand", "artikel", "preis", "price", "wie viele", "how many"] - needsDatabaseQuery = any(keyword in question_lower for keyword in db_keywords) - needsWebResearch = False - queryPlan = {} - reasoning = "Failed to parse analysis" + analysis = _extractJsonFromResponse(analysis_content) - logger.info(f"Analysis result: DB={needsDatabaseQuery}, Web={needsWebResearch}, Plan keys={list(queryPlan.keys()) if queryPlan else 'None'}...") + # Extract analysis results + needsDatabaseQuery = analysis.get("needsDatabaseQuery", False) if analysis else False + needsWebResearch = analysis.get("needsWebResearch", False) if analysis else False + needsExcelFile = analysis.get("needsExcelFile", False) if analysis else False + excelAction = analysis.get("excelAction") + excelFileName = analysis.get("excelFileName") + sql_query = analysis.get("sqlQuery", "") + requested_columns = analysis.get("requestedColumns", []) - await event_manager.emit_event( - workflowId, - "progress", - f"Analyse abgeschlossen: {len(queryPlan.keys()) if queryPlan else 0} Abfrage-Kategorien identifiziert", - "analysis", - {"needsDatabaseQuery": needsDatabaseQuery, "needsWebResearch": needsWebResearch} - ) + # Ensure database query if Excel update is needed + if needsExcelFile and excelAction == "update" and not needsDatabaseQuery: + needsDatabaseQuery = True + + logger.info(f"Analysis: DB={needsDatabaseQuery}, Excel={needsExcelFile}, SQL={'present' if sql_query else 'missing'}") - # Check if workflow was stopped after analysis if await _check_workflow_status(interfaceDbChat, workflowId, event_manager): return - # Step 2: Generate all SQL queries based on the plan - allQueries = {} # Store all queries to execute - queryResults = {} # Store results from all queries + # Step 2: Execute SQL Query + queryResults = {} + queryData = {} + excel_documents = [] - if needsDatabaseQuery and queryPlan: - logger.info("Step 2: Generating all SQL queries from plan...") + if needsDatabaseQuery and sql_query: + logger.info("Step 2: Executing SQL query...") + await event_manager.emit_event(workflowId, "status", "Führe Datenbankabfrage aus...", "query_execution") - # Store log: Query generation started - services.chat.storeLog(workflow, { - "message": "Generiere alle SQL-Abfragen basierend auf dem Plan...", - "type": "info", - "status": "running", - "progress": 0.3 - }) - - await event_manager.emit_event( - workflowId, - "status", - "Generiere alle SQL-Abfragen basierend auf dem Plan...", - "query_generation" - ) - - # Build query generation prompt - query_generation_prompt = f"""Based on this query plan: {json.dumps(queryPlan, indent=2, ensure_ascii=False)} - -And the user question: {userInput.prompt}{context} - -Generate ALL SQL queries needed. Return ONLY a valid JSON object with all queries: -{{ - "mainQuery": "SQL query for main query", - "statisticsQueries": {{ - "query1": "SQL query description and SQL", - "query2": "SQL query description and SQL" - }}, - "supplierQueries": {{ - "query1": "SQL query description and SQL" - }}, - "articleQueries": {{ - "query1": "SQL query description and SQL" - }}, - "additionalQueries": {{ - "query1": "SQL query description and SQL" - }}, - "streamingMessages": ["array of status updates via send_streaming_message tool"] -}} - -STREAMING-UPDATES: -Während du die Queries generierst, denke an hilfreiche Status-Updates: -- "Generiere Hauptabfrage..." -- "Erstelle Statistik-Abfragen..." -- "Vorbereite Lieferanten-Analyse-Abfrage..." -- "Erstelle Top-20-Artikel-Abfrage..." -- "Generiere zusätzliche relevante Abfragen..." - -IMPORTANT: -- Use double quotes for all column names -- For stock calculations: Use SUM(CASE WHEN l."S_IST_BESTAND" != 'Unbekannt' THEN CAST(l."S_IST_BESTAND" AS INTEGER) ELSE 0 END) -- For top articles: ORDER BY CAST(l."S_IST_BESTAND" AS INTEGER) DESC LIMIT 20 -- Extract search terms from user question (e.g., "LED", "Lampe", etc.) and use in WHERE clauses -- All queries should use consistent search criteria - -Return ONLY valid JSON, no other text.""" - - query_generation_request = AiCallRequest( - prompt=query_generation_prompt, - options=AiCallOptions( - resultFormat="json", - operationType=OperationTypeEnum.DATA_ANALYSE, - processingMode=ProcessingModeEnum.BASIC - ) - ) - - query_generation_response = await services.ai.callAi(query_generation_request) - queries_data = _extractJsonFromResponse(query_generation_response.content) - - # Process streaming messages from AI (create logs) - process_streaming_messages(queries_data, progress=0.3) - - if queries_data: - # Collect all queries - if queries_data.get("mainQuery"): - allQueries["main"] = queries_data["mainQuery"] - - # Collect statistics queries - if queries_data.get("statisticsQueries"): - for key, value in queries_data["statisticsQueries"].items(): - if isinstance(value, str): - allQueries[f"stat_{key}"] = value - elif isinstance(value, dict) and "sql" in value: - allQueries[f"stat_{key}"] = value["sql"] - - # Collect supplier queries - if queries_data.get("supplierQueries"): - for key, value in queries_data["supplierQueries"].items(): - if isinstance(value, str): - allQueries[f"supplier_{key}"] = value - elif isinstance(value, dict) and "sql" in value: - allQueries[f"supplier_{key}"] = value["sql"] - - # Collect article queries - if queries_data.get("articleQueries"): - for key, value in queries_data["articleQueries"].items(): - if isinstance(value, str): - allQueries[f"article_{key}"] = value - elif isinstance(value, dict) and "sql" in value: - allQueries[f"article_{key}"] = value["sql"] - - # Collect additional queries - if queries_data.get("additionalQueries"): - for key, value in queries_data["additionalQueries"].items(): - if isinstance(value, str): - allQueries[f"additional_{key}"] = value - elif isinstance(value, dict) and "sql" in value: - allQueries[f"additional_{key}"] = value["sql"] - - logger.info(f"Generated {len(allQueries)} queries from plan") - - await event_manager.emit_event( - workflowId, - "progress", - f"{len(allQueries)} SQL-Abfragen erfolgreich generiert", - "query_generation", - {"queryCount": len(allQueries), "queries": list(allQueries.keys())} - ) - else: - logger.warning("Failed to generate queries from plan") - - await event_manager.emit_event( - workflowId, - "error", - "Fehler beim Generieren der SQL-Abfragen", - "query_generation" - ) - - # Check if workflow was stopped before query execution - if await _check_workflow_status(interfaceDbChat, workflowId, event_manager): - return - - # Step 3: Execute all queries in parallel - if allQueries: - logger.info(f"Step 3: Executing {len(allQueries)} queries in parallel...") - - await event_manager.emit_event( - workflowId, - "status", - f"Führe {len(allQueries)} Datenbankabfragen parallel aus...", - "query_execution" - ) try: connector = PreprocessorConnector() - - async def execute_query(query_key, query_sql): - """Execute a single query and return result.""" - try: - result = await connector.executeQuery(query_sql) - if not result.startswith(("Error:", "Query failed:", "Network error:", "API error:")): - return (query_key, result) - else: - logger.warning(f"{query_key} query returned error: {result[:100]}") - return (query_key, None) - except Exception as e: - logger.warning(f"{query_key} query failed: {e}") - return (query_key, None) - - # Execute all queries in parallel - tasks = [execute_query(key, sql) for key, sql in allQueries.items()] - results = await asyncio.gather(*tasks) - - # Process results - successful_queries = 0 - for query_key, result in results: - if result is not None: - queryResults[query_key] = result - successful_queries += 1 - logger.info(f"{query_key} query executed successfully") - await event_manager.emit_event( - workflowId, - "progress", - f"Abfrage '{query_key}' erfolgreich ausgeführt", - "query_execution", - {"queryKey": query_key} - ) - - await event_manager.emit_event( - workflowId, - "progress", - f"{successful_queries} von {len(allQueries)} Abfragen erfolgreich ausgeführt", - "query_execution", - {"successful": successful_queries, "total": len(allQueries)} - ) - + result_dict = await connector.executeQuery(sql_query, return_json=True) await connector.close() - except Exception as e: - logger.error(f"Error executing queries: {e}") - queryResults["error"] = f"Error executing queries: {str(e)}" - await event_manager.emit_event( - workflowId, - "error", - f"Fehler beim Ausführen der Abfragen: {str(e)}", - "query_execution" - ) + if result_dict and not result_dict.get("text", "").startswith(("Error:", "Query failed:")): + queryResults["main"] = result_dict.get("text", "") + queryData["main"] = result_dict.get("data", []) + logger.info(f"Query executed successfully, returned {len(queryData.get('main', []))} rows") + else: + error_text = result_dict.get("text", "Query failed") if result_dict else "Query failed: No response" + queryResults["error"] = error_text + logger.error(f"Query failed: {error_text}") + + # If query failed and we need Excel update, try to extract article numbers from Excel and retry + if needsExcelFile and excelAction == "update" and has_uploaded_files and "ExcelFile" in error_text: + logger.warning("Query failed due to ExcelFile table reference. Attempting to extract article numbers from Excel file and retry...") + # The AI should have extracted article numbers, but if query failed, we can't proceed + # Log the issue for debugging + logger.error("Cannot proceed with Excel update: SQL query references non-existent ExcelFile table. AI should extract article numbers directly.") + except Exception as e: + logger.error(f"Error executing query: {e}") + queryResults["error"] = f"Error executing query: {str(e)}" - # Check if workflow was stopped after query execution - if await _check_workflow_status(interfaceDbChat, workflowId, event_manager): - return - - # Step 3: Execute web research if needed + # Step 2.5: Web Research (simplified) webResearchResults = "" if needsWebResearch: - logger.info("Step 3: Performing web research...") - + logger.info("Performing web research...") try: researchResult = await services.web.performWebResearch( prompt=userInput.prompt, @@ -1000,218 +727,215 @@ Return ONLY valid JSON, no other text.""" researchDepth="general", operationId=None ) - # Extract text from research result - if isinstance(researchResult, dict): - webResearchResults = json.dumps(researchResult, ensure_ascii=False, indent=2) - else: - webResearchResults = str(researchResult) - logger.info("Web research completed successfully") - - await event_manager.emit_event( - workflowId, - "progress", - "Internet-Recherche abgeschlossen", - "web_research" - ) + webResearchResults = json.dumps(researchResult, ensure_ascii=False, indent=2) if isinstance(researchResult, dict) else str(researchResult) except Exception as e: logger.error(f"Web research failed: {e}") webResearchResults = f"Web research error: {str(e)}" - - await event_manager.emit_event( - workflowId, - "error", - f"Fehler bei Internet-Recherche: {str(e)}", - "web_research" - ) - # Check if workflow was stopped before answer generation if await _check_workflow_status(interfaceDbChat, workflowId, event_manager): return - # Step 4: Generate final answer - logger.info("Step 4: Generating final answer from all query results...") - - await event_manager.emit_event( - workflowId, - "status", - "Formuliere finale Antwort mit allen Ergebnissen...", - "answer_generation" - ) - - # Build context for final answer with all query results + # Build answer context answerContext = f"User question: {userInput.prompt}{context}\n\n" - if queryResults: - answerContext += "Database query results (all queries executed in parallel):\n\n" - for query_key, result in queryResults.items(): - if query_key != "error": - answerContext += f"{query_key} results:\n{result}\n\n" - if "error" in queryResults: - answerContext += f"Errors: {queryResults['error']}\n\n" - + for key, result in queryResults.items(): + if key != "error": + answerContext += f"Database results:\n{result}\n\n" if webResearchResults: - answerContext += f"Web research results:\n{webResearchResults}\n\n" + answerContext += f"Web research:\n{webResearchResults}\n\n" + + # Step 3: Process Excel File + excel_error = None + if needsExcelFile: + logger.info(f"Step 3: Processing Excel file (action: {excelAction})...") + await event_manager.emit_event( + workflowId, + "status", + f"Erstelle Excel-Datei..." if excelAction == "create" else "Aktualisiere Excel-Datei...", + "excel_processing" + ) + + try: + # Check if we have query data for updates + if excelAction == "update": + if not queryData or "main" not in queryData or not queryData["main"]: + error_msg = f"No query data available for Excel update. queryData keys: {list(queryData.keys()) if queryData else 'None'}" + if queryResults.get("error"): + error_msg += f" Query error: {queryResults['error']}" + logger.error(error_msg) + excel_error = f"Die Datenbankabfrage ist fehlgeschlagen: {queryResults.get('error', 'Unbekannter Fehler')}. Die Excel-Datei konnte nicht aktualisiert werden." + raise ValueError(excel_error) + + workflow = interfaceDbChat.getWorkflow(workflowId) + if workflow: + services.workflow = workflow + + method_ai = MethodAi(services) + + # Prepare document references for updates + if excelAction == "update" and userInput.listFileId: + if not document_references or not document_references.references: + document_references = await _convert_file_ids_to_document_references(services, userInput.listFileId) + else: + from modules.datamodels.datamodelDocref import DocumentReferenceList + document_references = DocumentReferenceList(references=[]) + + # Build Excel prompt + if excelAction == "update": + # Format query data as JSON lookup table + lookup_table_json = _format_query_results_as_lookup(queryData) + lookup_table_dict = json.loads(lookup_table_json) + logger.info(f"Generated lookup table with {len(lookup_table_dict)} entries") + + # Log sample entries for debugging + if lookup_table_dict: + sample_keys = list(lookup_table_dict.keys())[:3] + logger.info(f"Sample Artikelnummern in lookup table: {sample_keys}") + if sample_keys: + sample_entry = lookup_table_dict[sample_keys[0]] + logger.info(f"Sample entry for '{sample_keys[0]}': {list(sample_entry.keys())}") + logger.info(f"Sample entry values: {sample_entry}") + + # Determine columns to add + column_names = requested_columns if requested_columns else [] + if not column_names: + prompt_lower = userInput.prompt.lower() + if "einkaufspreis" in prompt_lower or "purchase price" in prompt_lower or "kaufpreis" in prompt_lower: + column_names = ["Einkaufspreis"] + elif "lieferant" in prompt_lower or "supplier" in prompt_lower: + column_names = ["Lieferant"] + elif "lagerbestand" in prompt_lower or "stock" in prompt_lower: + column_names = ["Lagerbestand"] + else: + # Default: try to infer from available columns in lookup table + if lookup_table_dict: + sample_entry = next(iter(lookup_table_dict.values())) + available_columns = list(sample_entry.keys()) + logger.info(f"Available columns in lookup table: {available_columns}") + # Prefer common columns + if "Einkaufspreis" in available_columns: + column_names = ["Einkaufspreis"] + elif "Lieferant" in available_columns: + column_names = ["Lieferant"] + else: + column_names = available_columns[:1] if available_columns else ["Lieferant"] + else: + column_names = ["Lieferant"] + + logger.info(f"Adding columns to Excel: {column_names}") + column_name_str = ", ".join(column_names) + + # Build a more explicit prompt with clear instructions and the lookup table + # Include the lookup table data directly in the prompt so it's always available + excel_prompt = f"""You are updating an Excel file. CRITICAL TASK: + +Add new column(s) "{column_name_str}" to the attached Excel file by matching Artikelnummer values. + +STEP-BY-STEP PROCESS (MUST FOLLOW EXACTLY): +1. Read the attached Excel file completely - all sheets, all rows, all columns. +2. Find the column containing "Artikelnummer" (may be named "Artikelnummer", "Artikel Nummer", "Art-Nr", "Artikelnummer", etc.) +3. For EACH ROW in the Excel file: + a. Read the Artikelnummer value from that row (e.g., "LED001", "12345", etc.) + b. Look up that EXACT Artikelnummer in the DATABASE LOOKUP TABLE below (case-sensitive match) + c. If the Artikelnummer is found in the lookup table: + - Extract the value(s) for column(s) "{column_name_str}" from the matching entry + - Write the value(s) to the new column(s) in that row + d. If the Artikelnummer is NOT found in the lookup table: + - Leave the new column(s) empty for that row +4. Preserve ALL existing data, formatting, formulas, and structure exactly as-is. +5. Return ONLY the complete modified Excel file (.xlsx format). Do NOT include any text or explanations. + +=== DATABASE LOOKUP TABLE (JSON) === +Use this table to match Artikelnummer and extract values. Format: {{"Artikelnummer": {{"column": "value", ...}}}} + +{lookup_table_json} + +=== EXAMPLE MATCHING === +- Excel row has Artikelnummer "LED001" in column "Artikelnummer" +- Search for "LED001" in the lookup table above +- Find entry: {{"LED001": {{"Lieferant": "Phoenix Contact", "Einkaufspreis": 12.50, ...}}}} +- Extract "{column_name_str}" value from that entry +- Write the extracted value to the new "{column_name_str}" column in that Excel row + +CRITICAL REQUIREMENTS: +- Match EXACTLY by Artikelnummer (case-sensitive, exact string match) +- Preserve ALL existing columns, rows, formatting, and formulas +- Only add the new column(s), do NOT modify existing data +- Return ONLY the Excel file binary data, no explanations""" + else: + excel_prompt = f"""Create Excel file (.xlsx) from database results: + +{answerContext if queryResults else "No database results available."} + +User request: {userInput.prompt} + +Create well-structured spreadsheet with appropriate headers and formatting.""" + + # Use simpleMode=True for Excel updates to ensure lookup table is directly accessible + # simpleMode=False goes through complex document generation pipeline which may lose context + excel_result = await method_ai.process({ + "aiPrompt": excel_prompt, + "documentList": document_references if excelAction == "update" else None, + "resultType": "xlsx", + "simpleMode": True # Use simple mode for direct Excel updates with lookup table + }) + + if excel_result.success and excel_result.documents: + for action_doc in excel_result.documents: + try: + chat_doc = await _create_chat_document_from_action_document( + services, action_doc, "", workflowId, workflow.currentRound + ) + if chat_doc: + excel_documents.append(chat_doc) + except Exception as e: + logger.error(f"Error creating ChatDocument: {e}", exc_info=True) + logger.info(f"Excel processing complete: {len(excel_documents)} documents") + else: + excel_error = f"Excel-Verarbeitung fehlgeschlagen: {excel_result.error if excel_result else 'Unbekannter Fehler'}" + logger.warning(excel_error) + except Exception as e: + excel_error = str(e) if not excel_error else excel_error + logger.error(f"Error processing Excel: {e}", exc_info=True) + + if await _check_workflow_status(interfaceDbChat, workflowId, event_manager): + return + + # Step 4: Generate Final Answer + logger.info("Step 4: Generating final answer...") + await event_manager.emit_event(workflowId, "status", "Formuliere finale Antwort...", "answer_generation") + + excel_context = "" + if needsExcelFile: + if excel_documents: + file_names = [doc.fileName for doc in excel_documents] + action_text = "erstellt" if excelAction == "create" else "aktualisiert" + file_name_str = file_names[0] if len(file_names) == 1 else f"{len(file_names)} Dateien" + excel_context = f"\n\nWICHTIG - EXCEL-DATEI {action_text.upper()}:\nDie Excel-Datei '{file_name_str}' wurde erfolgreich {action_text} und ist als Anhang verfügbar. Erwähne dies am Ende deiner Antwort." + elif excel_error: + excel_context = f"\n\nFEHLER - EXCEL-VERARBEITUNG:\n{excel_error}\nErkläre dem Nutzer, warum die Excel-Datei nicht aktualisiert werden konnte." + + # Build answer context parts (avoid backslashes in f-string expressions) + db_results_part = f"\nDATENBANK-ERGEBNISSE:\n{answerContext}" if queryResults and not queryResults.get("error") else "" + if queryResults.get("error"): + db_results_part = f"\nDATENBANK-FEHLER:\n{queryResults['error']}" + web_results_part = f"\nINTERNET-RECHERCHE:\n{webResearchResults}" if webResearchResults else "" answerPrompt = f"""Heute ist der {current_date}. -Du bist ein Chatbot der Althaus AG. -Du hast Zugriff auf ein SQL query tool, dass es dir ermöglicht, SQL SELECT Abfragen auf der Althaus AG Datenbank auszuführen. +Du bist ein Chatbot der Althaus AG. Antworte auf Deutsch (kein ß, immer ss). -KRITISCH - AUSFÜHRLICHE ANTWORTEN ERFORDERLICH: -Du MUSST immer sehr ausführliche, detaillierte und strukturierte Antworten geben. Einfache, kurze Antworten sind NICHT ausreichend! - -ANTWORT-STRUKTUR FÜR DATENBANK-ABFRAGEN: -Wenn du Datenbank-Ergebnisse präsentierst, MUSS deine Antwort folgende Struktur haben: - -1. EINLEITUNG MIT QUELLENANGABE: - - Beginne IMMER mit: "Aus der Datenbank habe ich eine umfassende Analyse [des Themas] durchgeführt:" - - Oder: "Aus der Datenbank habe ich folgende Informationen gefunden:" - -2. ÜBERSCHRIFT MIT THEMA: - - Gib eine klare Überschrift, z.B. "LED-Lagerbestand Übersicht" oder "Artikel-Übersicht: [Suchbegriff]" - -3. ZUSAMMENFASSUNG MIT STATISTIKEN: - - Gib IMMER eine detaillierte Zusammenfassung mit wichtigen Kennzahlen - - Beispiele: - * "Gesamtbestand LEDs: 7'411 Stück verteilt auf 157 verschiedene Artikel" - * "1'027 LED-Artikel insgesamt in der Datenbank" - * "157 Artikel haben aktuell Lagerbestand > 0" - * "273 Artikel ohne Lagerinformationen" - - Formatiere Zahlen mit Tausender-Trennzeichen (z.B. 7'411 statt 7411) - - Verwende Aufzählungen für bessere Lesbarkeit - -4. TOP-LIEFERANTEN TABELLE (wenn relevant): - - Wenn mehrere Lieferanten vorhanden sind, erstelle IMMER eine Tabelle mit Top-Lieferanten - - Spalten: Lieferant, Anzahl Artikel, Gesamtbestand (oder andere relevante Metriken) - - Sortiere nach Gesamtbestand oder Anzahl Artikel (absteigend) - - Zeige mindestens Top 10 Lieferanten - -5. TOP-ARTIKEL TABELLE: - - Erstelle IMMER eine Tabelle mit den Top 20 Artikeln - - Spalten sollten enthalten: Artikelnummer (als Link), Artikelbezeichnung, Lieferant, Lagerplatz, Ist-Bestand, Soll-Bestand (wenn verfügbar) - - Sortiere nach Ist-Bestand (absteigend) oder nach Relevanz - - Formatiere Zahlen mit Tausender-Trennzeichen - -6. ZUSÄTZLICHE ANALYSEN: - - Füge immer eine kurze Analyse hinzu, z.B.: - * "Phoenix Contact AG dominiert klar mit über 68% des gesamten LED-Lagerbestands" - * "Die meisten Artikel sind Beschriftungsmarker und -schilder" - - Erkläre interessante Muster oder Auffälligkeiten - -7. HINWEIS BEI MEHR ERGEBNISSEN: - - Wenn mehr als 20 Artikel existieren, füge IMMER hinzu: - * "_Es existieren weitere X Artikel. Die Tabelle zeigt die 20 Artikel mit dem höchsten Bestand._" - - Oder: "_Es wurden insgesamt X Artikel gefunden. Die Tabelle zeigt die ersten 20._" - -8. NÄCHSTE SCHRITTE - MEHRERE VORSCHLÄGE: - - Gib IMMER mindestens 5 verschiedene Vorschläge für nächste Schritte - - Formatiere als Aufzählung mit Bullet Points - - Beispiele: - * "Details zu einem bestimmten LED-Artikel erfahren?" - * "LED-Artikel mit niedrigem Lagerbestand oder unter Mindestbestand anzeigen?" - * "Nach spezifischen LED-Typen suchen (z.B. Signalleuchten, Anzeige-LEDs, LED-Strips)?" - * "Preisinformationen zu den LED-Artikeln abrufen?" - * "LED-Artikel eines bestimmten Lieferanten detailliert anzeigen?" - - Passe die Vorschläge an den Kontext an - -TABELLEN-FORMATIERUNG: -- Verwende IMMER Markdown-Tabellen für strukturierte Daten -- Spalten sollten klar getrennt sein mit | -- Header-Zeile sollte deutlich hervorgehoben sein -- Zahlen sollten rechtsbündig sein (in Markdown mit Leerzeichen) -- Verwende Tausender-Trennzeichen (z.B. 1'090 statt 1090) - -QUELLENANGABE - DATENBANK: -WICHTIG: Wenn du Informationen aus der Datenbank präsentierst, kennzeichne dies IMMER klar für den Nutzer. -- Beginne deine Antwort mit einer klaren Kennzeichnung, z.B.: "Aus der Datenbank habe ich eine umfassende Analyse durchgeführt:" -- Bei kombinierten Informationen (Datenbank + Internet): Trenne klar zwischen beiden Quellen - -QUELLENANGABE - INTERNET: -WICHTIG: Wenn du Informationen aus dem Internet präsentierst, kennzeichne dies IMMER klar für den Nutzer. -- Beginne Internet-Recherchen mit: "Aus meiner Internet-Recherche:" oder "Laut Online-Quellen:" -- Gib IMMER die konkreten Quellen an (Website-Namen und Links) -- Bei mehreren Quellen: Liste die Quellen auf und verweise darauf -- Trenne klar zwischen Datenbank-Informationen und Internet-Recherchen - -TABELLENLÄNGE UND ARTIKELANZAHL - KRITISCH: -WICHTIG: Zeige MAXIMAL 20 Artikel in Tabellen. Du darfst und sollst aber ausführliche Erklärungen liefern! - -ZAHLEN-PRÜFUNG - ABSOLUT KRITISCH: -BEVOR du deine finale Antwort zurückgibst, MUSST du diese Schritte befolgen: -1. ZÄHLE die TATSÄCHLICHEN Zeilen in deiner finalen Tabelle -2. Diese Zahl ist die EINZIGE korrekte Anzahl für deine Antwort -3. Verwende diese Zahl KONSISTENT überall in deiner Antwort -4. Formatiere Zahlen mit Tausender-Trennzeichen (z.B. 7'411 statt 7411) - -Wenn immer du eine Artikelnummer innerhalb einer Tabelle zurückgibst bitte markiere diese als Markdownlink: -[ARTIKELNUMMER](/details/ARTIKELNUMMER). ARTIKELNUMMER ist hierbei der Platzhalter, den du ersetzen musst. -WICHTIG! Du musst im Link die ARTIKELNUMMER sicher URL-encodieren. Encodiere aber NICHT die Artikelnummer in eckigen Klammern. Also encodiere den Ankertext nicht! -Ausserhalb einer Tabelle musst du keine Links auf Artikelnummern setzen. - -Du antwortest ausschliesslich auf Deutsch. Nutze kein sz(ß) sondern immer ss. - -STREAMING-UPDATES: -Während du die Antwort erstellst, denke daran, dass der Nutzer Updates erhalten sollte: -- "Kompliliere Statistiken..." -- "Erstelle Lieferanten-Tabelle..." -- "Formatiere Top-20-Artikel..." -- "Formuliere finale Antwort mit übersichtlicher Tabelle..." -- "Füge zusätzliche Analysen hinzu..." - -Sende diese Updates sehr häufig, damit der Nutzer weiss, was du gerade machst. Es ist ganz wichtig, dass du den Nutzer so oft es geht auf dem Laufenden hältst. -Die Beispiele oben sind nur Beispiele. Wenn möglich, sei spezifischer und kreativer, damit der Nutzer genau weiss, was du gerade tust. -Falls es möglich ist, gib in den Status-Updates auch schon Zwischenergebnisse an, z.B. "Habe 20 Artikel gefunden, suche weiter nach ähnlichen Begriffen". -Du kannst auch gerne deinen Denkenprozess in den Status-Updates beschreiben, z.B. "Überlege, welche Suchbegriffe ich noch verwenden könnte". -Es ist super wichtig, dass wir dem Nutzer laufend Updates geben, damit er nicht das Gefühl hat, dass er zu lange warten muss. -Wichtig: Sende auch eine Status-Update, wenn du die Zusammenfassende Antwort an den Nutzer schreibst, z.B. "Formuliere finale Antwort mit übersichtlicher Tabelle..". - -Answer the user's question in German: {userInput.prompt}{context} - -{answerContext} - -KRITISCH: Du hast jetzt ZUSÄTZLICHE Datenbankabfragen erhalten, die dir detaillierte Informationen liefern: -- "Total count query results" - Gesamtanzahl der Artikel -- "Top suppliers query results" - Top-Lieferanten mit Artikelanzahl und Gesamtbestand -- "Top 20 articles query results" - Top 20 Artikel mit allen Details - -VERWENDE ALLE DIESE DATEN für deine Antwort! Erstelle eine umfassende Antwort mit: - -1. EINLEITUNG: "Aus der Datenbank habe ich eine umfassende Analyse [des Themas] durchgeführt:" - -2. ÜBERSCHRIFT: z.B. "LED-Lagerbestand Übersicht" - -3. ZUSAMMENFASSUNG MIT STATISTIKEN: - - Verwende die Daten aus "Total count query results" für Gesamtanzahl - - Verwende die Daten aus "Initial database query results" für Gesamtbestand - - Berechne weitere Kennzahlen basierend auf den Daten - -4. TOP-LIEFERANTEN TABELLE: - - Verwende IMMER die Daten aus "Top suppliers query results" - - Erstelle eine Tabelle mit: Lieferant | Anzahl Artikel | Gesamtbestand - - Sortiere nach Gesamtbestand (absteigend) - - Zeige mindestens Top 10 - -5. TOP 20 ARTIKEL TABELLE: - - Verwende IMMER die Daten aus "Top 20 articles query results" - - Erstelle eine Tabelle mit: Artikelnummer (als Link) | Artikelbezeichnung | Lieferant | Lagerplatz | Ist-Bestand | Soll-Bestand - - Formatiere Zahlen mit Tausender-Trennzeichen - -6. ZUSÄTZLICHE ANALYSEN: - - Analysiere die Daten und gib interessante Erkenntnisse - - z.B. "Phoenix Contact AG dominiert klar mit über 68% des gesamten Lagerbestands" - -7. HINWEIS BEI MEHR ERGEBNISSEN: - - Wenn mehr als 20 Artikel existieren: "_Es existieren weitere X Artikel. Die Tabelle zeigt die 20 Artikel mit dem höchsten Bestand._" - -8. NÄCHSTE SCHRITTE: - - Gib mindestens 5 verschiedene Vorschläge als Aufzählung - -WICHTIG: -- Verwende ALLE verfügbaren Daten aus den zusätzlichen Abfragen -- Erstelle Tabellen aus den Daten - nicht nur Text! -- Formatiere Zahlen mit Tausender-Trennzeichen (z.B. 7'411 statt 7411) -- Sei sehr ausführlich und detailliert - kurze Antworten sind NICHT ausreichend!""" +Antworte auf: {userInput.prompt}{context} +{db_results_part}{web_results_part}{excel_context} +WICHTIG: +- Klare, strukturierte Antwort +- Zahlen mit Tausender-Trennzeichen (7'411) +- Markdown-Tabellen für Daten (max 20 Zeilen) +- Artikelnummern als Link: [ARTIKELNUMMER](/details/ARTIKELNUMMER) +- Wenn Excel-Datei erstellt wurde, bestätige dies explizit am Ende +- Wenn Excel-Verarbeitung fehlgeschlagen ist, erkläre dem Nutzer den Fehler klar""" + answerRequest = AiCallRequest( prompt=answerPrompt, context=answerContext if (queryResults or webResearchResults) else None, @@ -1225,24 +949,8 @@ WICHTIG: answerResponse = await services.ai.callAi(answerRequest) finalAnswer = answerResponse.content - logger.info("Final answer generated successfully") + logger.info("Final answer generated") - # Store log: Answer generation completed - services.chat.storeLog(workflow, { - "message": "Antwort erfolgreich generiert", - "type": "info", - "status": "running", - "progress": 0.9 - }) - - await event_manager.emit_event( - workflowId, - "progress", - "Antwort erfolgreich generiert", - "answer_generation" - ) - - # Check if workflow was stopped after answer generation if await _check_workflow_status(interfaceDbChat, workflowId, event_manager): return @@ -1250,8 +958,10 @@ WICHTIG: # Reload workflow to get updated message count workflow = interfaceDbChat.getWorkflow(workflowId) + # Prepare message data with Excel documents if any + message_id = f"msg_{uuid.uuid4()}" assistantMessageData = { - "id": f"msg_{uuid.uuid4()}", + "id": message_id, "workflowId": workflowId, "parentMessageId": userMessageId, "message": finalAnswer, @@ -1264,6 +974,22 @@ WICHTIG: "taskNumber": 0, "actionNumber": 0 } + + # Add Excel documents if any were created + if excel_documents: + # Update messageId in all Excel documents + for excel_doc in excel_documents: + excel_doc.messageId = message_id + + # Convert ChatDocuments to dict format + try: + documents_dict = [doc.model_dump() for doc in excel_documents] + except Exception: + documents_dict = [doc.dict() for doc in excel_documents] + + assistantMessageData["documents"] = documents_dict + assistantMessageData["documentsLabel"] = "Excel Files" + assistantMessage = interfaceDbChat.createMessage(assistantMessageData) logger.info(f"Stored assistant message: {assistantMessage.id}") diff --git a/modules/interfaces/interfaceDbChatObjects.py b/modules/interfaces/interfaceDbChatObjects.py index 04bde515..40e30eaa 100644 --- a/modules/interfaces/interfaceDbChatObjects.py +++ b/modules/interfaces/interfaceDbChatObjects.py @@ -1004,24 +1004,36 @@ class ChatObjects: # Create documents in normalized documents table created_documents = [] - for doc_data in documents_to_create: - # Normalize to plain dict before assignment - if isinstance(doc_data, ChatDocument): - doc_dict = doc_data.model_dump() - elif isinstance(doc_data, dict): - doc_dict = dict(doc_data) - else: - # Attempt to coerce to ChatDocument then dump - try: - doc_dict = ChatDocument(**doc_data).model_dump() - except Exception: - logger.error("Invalid document data type for message creation") - continue - - doc_dict["messageId"] = createdMessage["id"] - created_doc = self.createDocument(doc_dict) - if created_doc: - created_documents.append(created_doc) + logger.debug(f"Creating {len(documents_to_create)} document(s) for message {createdMessage['id']}") + for idx, doc_data in enumerate(documents_to_create): + try: + # Normalize to plain dict before assignment + if isinstance(doc_data, ChatDocument): + doc_dict = doc_data.model_dump() + elif isinstance(doc_data, dict): + doc_dict = dict(doc_data) + else: + # Attempt to coerce to ChatDocument then dump + try: + doc_dict = ChatDocument(**doc_data).model_dump() + except Exception as e: + logger.error(f"Invalid document data type for message creation (document {idx + 1}/{len(documents_to_create)}): {e}") + continue + + # Ensure messageId is set + doc_dict["messageId"] = createdMessage["id"] + logger.debug(f"Creating document {idx + 1}/{len(documents_to_create)}: fileName={doc_dict.get('fileName', 'unknown')}, fileId={doc_dict.get('fileId', 'unknown')}, messageId={doc_dict.get('messageId', 'unknown')}") + + created_doc = self.createDocument(doc_dict) + if created_doc: + created_documents.append(created_doc) + logger.debug(f"Successfully created document {idx + 1}/{len(documents_to_create)}: {created_doc.fileName} (id: {created_doc.id})") + else: + logger.error(f"Failed to create document {idx + 1}/{len(documents_to_create)}: createDocument returned None for fileName={doc_dict.get('fileName', 'unknown')}") + except Exception as e: + logger.error(f"Error processing document {idx + 1}/{len(documents_to_create)}: {e}", exc_info=True) + + logger.info(f"Created {len(created_documents)}/{len(documents_to_create)} document(s) for message {createdMessage['id']}") # Convert to ChatMessage model chat_message = ChatMessage( @@ -1256,12 +1268,18 @@ class ChatObjects: try: # Validate and normalize document data to dict document = ChatDocument(**documentData) + logger.debug(f"Creating document in database: fileName={document.fileName}, fileId={document.fileId}, messageId={document.messageId}") created = self.db.recordCreate(ChatDocument, document.model_dump()) - - return ChatDocument(**created) + if created: + created_doc = ChatDocument(**created) + logger.debug(f"Successfully created document in database: {created_doc.fileName} (id: {created_doc.id})") + return created_doc + else: + logger.error(f"Failed to create document in database: recordCreate returned None for fileName={document.fileName}") + return None except Exception as e: - logger.error(f"Error creating message document: {str(e)}") + logger.error(f"Error creating message document: {str(e)}", exc_info=True) return None diff --git a/modules/workflows/methods/methodAi/actions/process.py b/modules/workflows/methods/methodAi/actions/process.py index c76ce35e..ee1079fb 100644 --- a/modules/workflows/methods/methodAi/actions/process.py +++ b/modules/workflows/methods/methodAi/actions/process.py @@ -44,12 +44,18 @@ async def process(self, parameters: Dict[str, Any]) -> ActionResult: # Convert to DocumentReferenceList if needed if documentListParam is None: documentList = DocumentReferenceList(references=[]) + logger.debug(f"ai.process: documentList is None, using empty DocumentReferenceList") elif isinstance(documentListParam, DocumentReferenceList): documentList = documentListParam + logger.info(f"ai.process: Received DocumentReferenceList with {len(documentList.references)} references") + for idx, ref in enumerate(documentList.references): + logger.info(f" Reference {idx + 1}: documentId={ref.documentId}, type={type(ref).__name__}") elif isinstance(documentListParam, str): documentList = DocumentReferenceList.from_string_list([documentListParam]) + logger.info(f"ai.process: Converted string to DocumentReferenceList with {len(documentList.references)} references") elif isinstance(documentListParam, list): documentList = DocumentReferenceList.from_string_list(documentListParam) + logger.info(f"ai.process: Converted list to DocumentReferenceList with {len(documentList.references)} references") else: logger.error(f"Invalid documentList type: {type(documentListParam)}") documentList = DocumentReferenceList(references=[]) @@ -152,9 +158,16 @@ async def process(self, parameters: Dict[str, Any]) -> ActionResult: ) else: # Full mode: use unified callAiContent method + # For document generation (xlsx, docx, pdf, etc.), use DATA_GENERATE with document intent + from modules.datamodels.datamodelAi import OperationTypeEnum + + # Always use DATA_GENERATE with document intent for ai.process + # This ensures proper document generation pipeline is used options = AiCallOptions( - resultFormat=output_format + resultFormat=output_format, + operationType=OperationTypeEnum.DATA_GENERATE ) + generation_intent = "document" # Update progress - calling AI self.services.chat.progressLogUpdate(operationId, 0.6, "Calling AI") @@ -171,17 +184,23 @@ async def process(self, parameters: Dict[str, Any]) -> ActionResult: options=options, contentParts=contentParts, # Pre-extracted ContentParts outputFormat=output_format, - parentOperationId=operationId + parentOperationId=operationId, + generationIntent=generation_intent ) else: # Pass documentList - callAiContent handles Phases 5A-5E internally # This includes automatic detection of ContentExtracted documents + logger.info(f"ai.process: Calling callAiContent with {len(documentList.references)} document references") + if documentList.references: + for idx, ref in enumerate(documentList.references): + logger.info(f" Passing reference {idx + 1}: documentId={ref.documentId}") aiResponse = await self.services.ai.callAiContent( prompt=aiPrompt, options=options, documentList=documentList, # callAiContent macht Phasen 5A-5E outputFormat=output_format, - parentOperationId=operationId + parentOperationId=operationId, + generationIntent=generation_intent ) # Update progress - processing result