diff --git a/docs/STREAMING_UTILITY_ARCHITECTURE.md b/docs/STREAMING_UTILITY_ARCHITECTURE.md new file mode 100644 index 00000000..368fb374 --- /dev/null +++ b/docs/STREAMING_UTILITY_ARCHITECTURE.md @@ -0,0 +1,314 @@ +# Streaming Utility Architecture: Transforming eventManager into a Shared Utility + +## Current State Analysis + +### Existing Implementation +The `eventManager.py` in `modules/features/chatbot/` currently provides: +- **Event Queue Management**: Per-workflow asyncio queues for SSE streaming +- **Event Emission**: `emit_event()` method for chatbot-specific events +- **Cleanup**: Automatic queue cleanup after workflow completion +- **SSE Streaming**: Used in `/api/chatbot/start/stream` endpoint + +### Current Limitations + +1. **Chatbot-Specific**: Hardcoded for chatbot workflows only +2. **Polling Still Required**: Frontend still polls `getUnifiedChatData()` every 0.5 seconds even with SSE +3. **Not Reusable**: Other features (workflows, document generation, etc.) can't use it +4. **Mixed Approach**: SSE endpoint still internally polls database instead of pure event-driven streaming + +### Frontend Polling Pattern +Currently, the frontend uses: +- `useWorkflowPolling.ts` - Polls `/api/workflow/{id}/chatData` every few seconds +- `useWorkflowLifecycle.ts` - Manages polling lifecycle and state +- Rate limit handling and backoff logic for failed polls + +## Proposed Architecture: Shared Streaming Utility + +### 1. Generic Event Manager (`modules/shared/streamingManager.py`) + +```python +class StreamingEventManager: + """ + Generic event manager for real-time streaming across all features. + Supports multiple event types and contexts (workflows, documents, tasks, etc.) + """ + + def __init__(self): + self._queues: Dict[str, asyncio.Queue] = {} + self._locks: Dict[str, asyncio.Lock] = {} + self._cleanup_tasks: Dict[str, asyncio.Task] = {} + self._subscribers: Dict[str, Set[str]] = {} # context_id -> set of queue_ids + + async def emit_event( + self, + context_id: str, # workflow_id, document_id, task_id, etc. + event_type: str, # "message", "log", "status", "progress", "complete", "error" + data: Dict[str, Any], # Flexible data structure + event_category: str = "default" # "chat", "workflow", "document", etc. + ): + """Emit event to all subscribers of a context""" + + def create_stream( + self, + context_id: str, + event_categories: Optional[List[str]] = None # Filter by category + ) -> asyncio.Queue: + """Create a new stream for a context""" + + async def stream_events( + self, + context_id: str, + event_categories: Optional[List[str]] = None + ) -> AsyncIterator[Dict[str, Any]]: + """Async generator for streaming events""" +``` + +### 2. Generic SSE Route Helper (`modules/shared/sseUtils.py`) + +```python +def create_sse_stream( + context_id: str, + event_categories: Optional[List[str]] = None, + initial_data_callback: Optional[Callable] = None, + timeout: float = 300.0 +) -> StreamingResponse: + """ + Create a generic SSE streaming response. + + Args: + context_id: Workflow ID, document ID, or other context identifier + event_categories: Filter events by category (e.g., ["chat", "workflow"]) + initial_data_callback: Optional function to fetch initial state + timeout: Stream timeout in seconds + """ + streaming_manager = get_streaming_manager() + + async def event_stream(): + # Send initial data if callback provided + if initial_data_callback: + initial_data = await initial_data_callback(context_id) + yield format_sse_event("initial", initial_data) + + # Stream events from manager + async for event in streaming_manager.stream_events(context_id, event_categories): + yield format_sse_event(event["type"], event["data"]) + + return StreamingResponse( + event_stream(), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "X-Accel-Buffering": "no" + } + ) +``` + +### 3. Integration Points + +#### A. Workflow Processing +```python +# In workflow processing code +from modules.shared.streamingManager import get_streaming_manager + +streaming_manager = get_streaming_manager() + +# Emit progress updates +await streaming_manager.emit_event( + context_id=workflow_id, + event_type="progress", + data={"step": "analyzing", "message": "Processing documents..."}, + event_category="workflow" +) + +# Emit new messages +await streaming_manager.emit_event( + context_id=workflow_id, + event_type="message", + data={"role": "assistant", "content": "Response text"}, + event_category="chat" +) +``` + +#### B. Route Endpoints +```python +# Generic streaming endpoint for any context +@router.get("/{contextId}/stream") +async def stream_context_updates( + contextId: str, + categories: Optional[str] = Query(None), # Comma-separated categories + currentUser: User = Depends(getCurrentUser) +): + event_categories = categories.split(",") if categories else None + + # Optional: Fetch initial state + async def get_initial_data(ctx_id: str): + interfaceDbChat = getServiceChat(currentUser) + return interfaceDbChat.getUnifiedChatData(ctx_id, None) + + return create_sse_stream( + context_id=contextId, + event_categories=event_categories, + initial_data_callback=get_initial_data + ) +``` + +## Benefits of Streaming vs Polling + +### Performance +- **Reduced Server Load**: No constant database queries every 0.5-3 seconds +- **Lower Latency**: Events delivered immediately (< 100ms) vs polling delay (500-3000ms) +- **Bandwidth Efficiency**: Only send data when it changes, not empty responses + +### User Experience +- **Real-time Updates**: Users see progress instantly +- **Better Responsiveness**: No perceived delay from polling intervals +- **Reduced Battery**: Mobile devices consume less power without constant polling + +### Scalability +- **Horizontal Scaling**: Event queues can be distributed (Redis, RabbitMQ) +- **Connection Management**: Better handling of many concurrent streams +- **Resource Efficiency**: One persistent connection vs many HTTP requests + +## Migration Strategy + +### Phase 1: Create Shared Utility +1. Move `eventManager.py` → `modules/shared/streamingManager.py` +2. Generalize for any context type (not just workflows) +3. Add event categorization and filtering +4. Create `sseUtils.py` helper functions + +### Phase 2: Update Chatbot Feature +1. Update chatbot to use shared streaming manager +2. Replace internal polling in SSE endpoint with pure event-driven streaming +3. Emit events directly when data changes (in database write operations) + +### Phase 3: Migrate Other Features +1. **Workflows**: Add streaming to workflow processing +2. **Document Generation**: Stream document creation progress +3. **Data Processing**: Stream extraction/transformation progress +4. **Any Long-Running Task**: Use streaming for status updates + +### Phase 4: Frontend Migration +1. Replace `useWorkflowPolling` with SSE EventSource connections +2. Create generic `useStreaming` hook +3. Update all components to use streaming instead of polling +4. Remove polling logic entirely + +## Implementation Details + +### Event-Driven Data Emission + +Instead of polling `getUnifiedChatData()`, emit events when data changes: + +```python +# In interfaceDbChatObjects.py - when creating a message +def createMessage(self, workflowId: str, message: ChatMessage): + # ... existing database write ... + + # Emit streaming event + from modules.shared.streamingManager import get_streaming_manager + streaming_manager = get_streaming_manager() + asyncio.create_task(streaming_manager.emit_event( + context_id=workflowId, + event_type="message", + data={ + "type": "message", + "createdAt": message.publishedAt, + "item": message.dict() + }, + event_category="chat" + )) +``` + +### Frontend Integration + +```typescript +// Generic streaming hook +function useStreaming( + contextId: string, + categories?: string[], + onEvent?: (event: T) => void +) { + useEffect(() => { + const eventSource = new EventSource( + `/api/stream/${contextId}?categories=${categories?.join(',')}` + ); + + eventSource.onmessage = (e) => { + const event = JSON.parse(e.data); + onEvent?.(event); + }; + + return () => eventSource.close(); + }, [contextId, categories]); +} +``` + +## Key Design Decisions + +### 1. Context-Based Streaming +- Use generic `context_id` instead of `workflow_id` +- Supports workflows, documents, tasks, user sessions, etc. + +### 2. Event Categories +- Allow filtering by category (chat, workflow, document, etc.) +- Enables multiple features to stream from same context + +### 3. Backward Compatibility +- Keep existing polling endpoints during migration +- Gradually migrate features one at a time +- Frontend can use both during transition + +### 4. Error Handling +- Graceful degradation if streaming unavailable +- Automatic reconnection logic in frontend +- Fallback to polling if SSE fails + +## Example: Complete Flow + +### Backend: Workflow Processing +```python +async def process_workflow(workflow_id: str): + streaming = get_streaming_manager() + + # Emit status update + await streaming.emit_event(workflow_id, "status", + {"status": "running"}, "workflow") + + # Process and emit messages + result = await ai_call(...) + await streaming.emit_event(workflow_id, "message", + {"role": "assistant", "content": result}, "chat") + + # Emit completion + await streaming.emit_event(workflow_id, "complete", + {"status": "completed"}, "workflow") +``` + +### Frontend: React Hook +```typescript +function ChatComponent({ workflowId }: { workflowId: string }) { + const [messages, setMessages] = useState([]); + + useStreaming(workflowId, ["chat"], (event) => { + if (event.type === "message") { + setMessages(prev => [...prev, event.item]); + } + }); + + return ; +} +``` + +## Conclusion + +By transforming `eventManager.py` into a shared streaming utility: + +1. **Eliminates Polling**: All features can stream updates in real-time +2. **Improves Performance**: Reduces server load and latency +3. **Better UX**: Instant updates instead of polling delays +4. **Reusable**: Any feature can use streaming with minimal code +5. **Scalable**: Foundation for future real-time features + +The migration can be done incrementally, feature by feature, without breaking existing functionality. diff --git a/docs/WEBSEARCH_FIXES.md b/docs/WEBSEARCH_FIXES.md new file mode 100644 index 00000000..420a9845 --- /dev/null +++ b/docs/WEBSEARCH_FIXES.md @@ -0,0 +1,134 @@ +# Web Search Content Extraction Fixes + +## Problem Summary + +The Tavily web search integration was failing to extract content from search results, causing web research to return empty or incomplete data. The main issues were related to handling `None` values and incomplete error recovery. + +## Main Issues Fixed + +### 1. Incomplete Content Extraction from Search Results + +**Problem:** +- When Tavily API returned search results, some results had `raw_content` set to `None` (not missing, but explicitly `None`) +- The code used `result.get("raw_content") or result.get("content", "")` which failed when `raw_content` existed but was `None` +- This caused `None` values to propagate through the system instead of falling back to the `content` field or empty string + +**Fix:** +Changed the content extraction in `aicorePluginTavily.py` to properly handle `None` values: +```python +# Before (line 344): +rawContent=result.get("raw_content") or result.get("content", "") + +# After: +rawContent=result.get("raw_content") or result.get("content") or "" +``` + +This ensures that if `raw_content` is `None`, it falls back to `content`, and if that's also `None`, it defaults to an empty string. + +**Additional Fix:** +Added defensive checks in the `webSearch` method to safely extract content even when result objects have unexpected structures: +```python +# Safely extract content with multiple fallbacks +content = "" +if hasattr(result, 'rawContent'): + content = result.rawContent or "" +if not content and hasattr(result, 'content'): + content = result.content or "" +``` + +### 2. NoneType Error When Logging Content Length + +**Problem:** +- Code attempted to check `len(first_result.get('raw_content', ''))` for logging +- When `raw_content` key existed but value was `None`, `.get()` returned `None` instead of the default `''` +- This caused `len(None)` to fail with `TypeError: object of type 'NoneType' has no len()` + +**Fix:** +Changed the logging code to safely handle `None` values: +```python +# Before (line 338): +logger.debug(f"First result has raw_content: {'raw_content' in first_result}, content length: {len(first_result.get('raw_content', ''))}") + +# After: +raw_content = first_result.get('raw_content') or '' +logger.debug(f"First result has raw_content: {'raw_content' in first_result}, content length: {len(raw_content)}") +``` + +### 3. Missing Error Recovery in Content Extraction + +**Problem:** +- When processing search results, if one result failed to extract, the entire extraction could fail +- No recovery mechanism to extract at least URLs even when content extraction failed +- Errors were logged but processing stopped, losing potentially useful data + +**Fix:** +Added per-result error handling with recovery: +```python +for result in searchResults: + try: + # Extract URL, content, title safely + # ... extraction logic ... + except Exception as resultError: + logger.warning(f"Error processing individual search result: {resultError}") + # Continue processing other results instead of failing completely + continue +``` + +Also added recovery at the extraction level: +```python +except Exception as extractionError: + logger.error(f"Error extracting URLs and content from search results: {extractionError}") + # Try to recover at least URLs + try: + urls = [result.url for result in searchResults if hasattr(result, 'url') and result.url] + logger.info(f"Recovered {len(urls)} URLs after extraction error") + except Exception: + logger.error("Failed to recover any URLs from search results") +``` + +### 4. Incomplete Crawl Result Processing + +**Problem:** +- When crawl returned results but individual page processing failed, entire crawl was lost +- No fallback to extract at least URLs from failed crawl results +- Missing content fields could cause errors when formatting results + +**Fix:** +Added error handling for individual page processing: +```python +for i, result in enumerate(crawlResults, 1): + try: + # Format page content + # ... formatting logic ... + except Exception as pageError: + logger.warning(f"Error formatting page {i} from crawl: {pageError}") + # Try to add at least the URL + try: + pageUrls.append(result.url if hasattr(result, 'url') and result.url else webCrawlPrompt.url) + except Exception: + pass +``` + +Also ensured all result fields have safe defaults: +```python +results.append(WebCrawlResult( + url=result_url or url, # Fallback to base URL + content=result_content, # Already ensured to be string + title=result_title # Already ensured to be string +)) +``` + +## Impact + +These fixes ensure that: +1. **Content is always extracted** - Even when `raw_content` is `None`, the system falls back to `content` field or empty string +2. **Partial results are preserved** - If some results fail, others are still processed and returned +3. **URLs are recovered** - Even when content extraction fails completely, URLs can still be extracted for crawling +4. **No crashes from None values** - All `None` values are properly handled before operations like `len()` are called + +## Testing Recommendations + +- Test with Tavily search results that have `raw_content` set to `None` +- Test with mixed results (some with content, some without) +- Test error recovery when individual results fail +- Verify that URLs are still extracted even when content extraction fails diff --git a/modules/aicore/aicorePluginTavily.py b/modules/aicore/aicorePluginTavily.py index a9237cf2..d66c3005 100644 --- a/modules/aicore/aicorePluginTavily.py +++ b/modules/aicore/aicorePluginTavily.py @@ -321,13 +321,28 @@ class AiTavily(BaseConnectorAi): # Return all results without score filtering # Tavily's scoring is already applied by the API - logger.info(f"Tavily returned {len(response.get('results', []))} results") + results_count = len(response.get('results', [])) + logger.info(f"Tavily returned {results_count} results") + + # Log content availability + results_with_content = 0 + for result in response.get('results', []): + if result.get("raw_content"): + results_with_content += 1 + logger.info(f"Tavily results with raw_content: {results_with_content}/{results_count}") + + # Log first result structure for debugging + if response.get('results') and len(response['results']) > 0: + first_result = response['results'][0] + logger.debug(f"First result keys: {list(first_result.keys())}") + raw_content = first_result.get('raw_content') or '' + logger.debug(f"First result has raw_content: {'raw_content' in first_result}, content length: {len(raw_content)}") return [ WebSearchResult( - title=result["title"], - url=self._cleanUrl(result["url"]), - rawContent=result.get("raw_content") + title=result.get("title", ""), + url=self._cleanUrl(result.get("url", "")), + rawContent=result.get("raw_content") or result.get("content") or "" ) for result in response["results"] ] @@ -381,24 +396,90 @@ class AiTavily(BaseConnectorAi): logger.debug(f"Tavily response received: {type(response)}") # Parse response - could be dict with results or list - if isinstance(response, dict) and "results" in response: - pageResults = response["results"] + if isinstance(response, dict): + if "results" in response: + pageResults = response["results"] + logger.debug(f"Found 'results' key in response dict with {len(pageResults)} items") + else: + logger.warning(f"Response dict keys: {list(response.keys())}") + # Check for other possible keys + if "pages" in response: + pageResults = response["pages"] + logger.debug(f"Found 'pages' key with {len(pageResults)} items") + elif "content" in response: + # Single page result + pageResults = [response] + logger.debug("Found 'content' key, treating as single page result") + else: + logger.warning(f"Unexpected response dict structure: {list(response.keys())}") + pageResults = [] elif isinstance(response, list): pageResults = response + logger.debug(f"Response is a list with {len(pageResults)} items") else: - logger.warning(f"Unexpected response format: {type(response)}") + logger.warning(f"Unexpected response format: {type(response)}, value: {str(response)[:200]}") pageResults = [] - logger.debug(f"Got {len(pageResults)} pages from crawl") + logger.info(f"Got {len(pageResults)} pages from crawl for URL: {url}") + if len(pageResults) == 0: + logger.warning(f"Tavily crawl returned 0 pages for URL: {url}. Response structure: {type(response)}") + if isinstance(response, dict): + logger.warning(f"Response keys: {list(response.keys())}") + # Log all values to debug (not just first 3) + for key, value in response.items(): + value_str = str(value) + if len(value_str) > 200: + value_str = value_str[:200] + "..." + logger.warning(f" {key}: {type(value)} - {value_str}") + + # Check for error messages in response + if "error" in response: + logger.error(f"Tavily API error in response: {response.get('error')}") + if "message" in response: + logger.warning(f"Tavily API message: {response.get('message')}") + elif isinstance(response, str): + logger.warning(f"Tavily returned string response (first 500 chars): {response[:500]}") + else: + logger.warning(f"Unexpected response type: {type(response)}, value: {str(response)[:500]}") - # Convert to WebCrawlResult format + # Convert to WebCrawlResult format with error handling results = [] - for result in pageResults: - results.append(WebCrawlResult( - url=result.get("url", url), - content=result.get("raw_content", result.get("content", "")), - title=result.get("title", "") - )) + for idx, result in enumerate(pageResults): + try: + # Safely extract fields + result_url = result.get("url") if isinstance(result, dict) else (getattr(result, "url", None) if hasattr(result, "url") else url) + result_content = "" + if isinstance(result, dict): + result_content = result.get("raw_content") or result.get("content") or "" + elif hasattr(result, "raw_content"): + result_content = result.raw_content or "" + elif hasattr(result, "content"): + result_content = result.content or "" + + result_title = "" + if isinstance(result, dict): + result_title = result.get("title", "") + elif hasattr(result, "title"): + result_title = result.title or "" + + results.append(WebCrawlResult( + url=result_url or url, + content=result_content, + title=result_title + )) + except Exception as resultError: + logger.warning(f"Error processing crawl result {idx}: {resultError}") + # Try to create a minimal result with at least the URL + try: + if isinstance(result, dict) and result.get("url"): + results.append(WebCrawlResult( + url=result.get("url", url), + content="", + title="" + )) + except Exception: + logger.error(f"Failed to create minimal result for crawl result {idx}") + continue logger.debug(f"Crawl successful: extracted {len(results)} pages from URL") return results @@ -413,7 +494,7 @@ class AiTavily(BaseConnectorAi): except Exception as e: logger.warning(f"Crawl attempt {attempt + 1} failed for URL {url}: {str(e)}") - logger.debug(f"Full error details: {type(e).__name__}: {str(e)}") + logger.debug(f"Full error details: {type(e).__name__}: {str(e)}", exc_info=True) # Check if it's a validation error and log more details if "validation" in str(e).lower(): @@ -427,10 +508,22 @@ class AiTavily(BaseConnectorAi): if len(url) > 2000: logger.debug(f" WARNING: URL is very long ({len(url)} chars)") + # Log API-specific errors + error_str = str(e).lower() + if "rate limit" in error_str or "429" in error_str: + logger.error(f"Tavily API rate limit hit for URL: {url}") + elif "401" in error_str or "unauthorized" in error_str: + logger.error(f"Tavily API authentication failed for URL: {url}") + elif "404" in error_str or "not found" in error_str: + logger.warning(f"URL not found (404) for: {url}") + elif "timeout" in error_str: + logger.warning(f"Timeout error for URL: {url}") + if attempt < maxRetries: logger.info(f"Retrying in {retryDelay} seconds...") await asyncio.sleep(retryDelay) else: + logger.error(f"Crawl failed after {maxRetries + 1} attempts for URL: {url}") raise Exception(f"Crawl failed after {maxRetries + 1} attempts: {str(e)}") async def _routeWebOperation(self, modelCall: AiModelCall) -> "AiModelResponse": @@ -508,21 +601,84 @@ class AiTavily(BaseConnectorAi): includeRawContent="text" ) - # Extract URLs from results - urls = [result.url for result in searchResults] + # Extract URLs and content from results with error handling + urls = [] + results_with_content = [] + content_count = 0 - # Return as JSON array + try: + for result in searchResults: + try: + # Safely extract URL + url = result.url if hasattr(result, 'url') and result.url else "" + if url: + urls.append(url) + + # Safely extract content + content = "" + if hasattr(result, 'rawContent'): + content = result.rawContent or "" + if not content and hasattr(result, 'content'): + content = result.content or "" + + if content: + content_count += 1 + + # Safely extract title + title = result.title if hasattr(result, 'title') and result.title else "" + + results_with_content.append({ + "url": url, + "title": title, + "content": content, + "score": getattr(result, 'score', 0) + }) + except Exception as resultError: + logger.warning(f"Error processing individual search result: {resultError}") + # Continue processing other results + continue + + logger.info(f"Tavily search: {len(urls)} URLs, {content_count} with content, {len(results_with_content)} total results") + if content_count == 0: + logger.warning("Tavily search returned no content - results may need crawling") + except Exception as extractionError: + logger.error(f"Error extracting URLs and content from search results: {extractionError}") + # Try to recover at least URLs + try: + urls = [result.url for result in searchResults if hasattr(result, 'url') and result.url] + logger.info(f"Recovered {len(urls)} URLs after extraction error") + except Exception: + logger.error("Failed to recover any URLs from search results") + + # Return both URLs and full results in JSON for direct extraction + # Format: {"urls": [...], "results": [...]} import json + response_data = { + "urls": urls, + "results": results_with_content + } + return AiModelResponse( - content=json.dumps(urls, indent=2), + content=json.dumps(response_data, indent=2), success=True, - metadata={"total_urls": len(urls), "operation": "WEB_SEARCH_DATA"} + metadata={ + "total_urls": len(urls), + "operation": "WEB_SEARCH_DATA", + "results_with_content": results_with_content # Also in metadata for compatibility + } ) except Exception as e: - logger.error(f"Error in Tavily web search: {str(e)}") + logger.error(f"Error in Tavily web search: {str(e)}", exc_info=True) + import json + # Return error response with empty results + error_response = { + "urls": [], + "results": [], + "error": str(e) + } return AiModelResponse( - content="[]", + content=json.dumps(error_response, indent=2), success=False, error=str(e) ) @@ -575,23 +731,44 @@ class AiTavily(BaseConnectorAi): # If we got multiple pages from the crawl, we need to format them differently # Return the first result for backwards compatibility, but include total page count if crawlResults and len(crawlResults) > 0: - # Get all pages content + # Get all pages content with error handling allContent = "" + pageUrls = [] for i, result in enumerate(crawlResults, 1): - pageHeader = f"\n{'='*60}\nPAGE {i}: {result.url}\n{'='*60}\n" - if result.title: - allContent += f"{pageHeader}Title: {result.title}\n\n" - allContent += f"{result.content}\n" + try: + pageHeader = f"\n{'='*60}\nPAGE {i}: {result.url}\n{'='*60}\n" + if result.title: + allContent += f"{pageHeader}Title: {result.title}\n\n" + else: + allContent += f"{pageHeader}\n" + allContent += f"{result.content or ''}\n" + pageUrls.append(result.url) + except Exception as pageError: + logger.warning(f"Error formatting page {i} from crawl: {pageError}") + # Try to add at least the URL + try: + pageUrls.append(result.url if hasattr(result, 'url') and result.url else webCrawlPrompt.url) + except Exception: + pass resultData = { "url": webCrawlPrompt.url, - "title": crawlResults[0].title if crawlResults[0].title else "Content", + "title": crawlResults[0].title if crawlResults and crawlResults[0].title else "Content", "content": allContent, "pagesCrawled": len(crawlResults), - "pageUrls": [result.url for result in crawlResults] + "pageUrls": pageUrls } + logger.info(f"Crawl successful: {len(crawlResults)} pages extracted from {webCrawlPrompt.url}") else: - resultData = {"url": webCrawlPrompt.url, "title": "", "content": "", "error": "No content extracted", "pagesCrawled": 0} + logger.warning(f"Crawl returned no results for URL: {webCrawlPrompt.url}") + resultData = { + "url": webCrawlPrompt.url, + "title": "", + "content": "", + "error": "No content extracted - Tavily crawl returned 0 pages", + "pagesCrawled": 0, + "pageUrls": [] + } # Return as JSON - same format as Perplexity but with multiple pages content import json @@ -602,9 +779,17 @@ class AiTavily(BaseConnectorAi): ) except Exception as e: - logger.error(f"Error in Tavily web crawl: {str(e)}") + logger.error(f"Error in Tavily web crawl: {str(e)}", exc_info=True) import json - errorResult = {"error": str(e), "url": webCrawlPrompt.url if 'webCrawlPrompt' in locals() else ""} + crawl_url = webCrawlPrompt.url if 'webCrawlPrompt' in locals() else "" + errorResult = { + "url": crawl_url, + "title": "", + "content": "", + "error": str(e), + "pagesCrawled": 0, + "pageUrls": [] + } return AiModelResponse( content=json.dumps(errorResult, indent=2), success=False, diff --git a/modules/features/chatbot/chatbotConstants.py b/modules/features/chatbot/chatbotConstants.py new file mode 100644 index 00000000..7b3b6cec --- /dev/null +++ b/modules/features/chatbot/chatbotConstants.py @@ -0,0 +1,731 @@ +# Copyright (c) 2025 Patrick Motsch +# All rights reserved. +""" +Constants and utility functions for the chatbot module. +Contains system prompts and conversation name generation. +""" + +import logging +import re +import datetime +from typing import Optional, List + +from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationTypeEnum, ProcessingModeEnum + +logger = logging.getLogger(__name__) + + +def get_analysis_system_prompt() -> str: + """ + Get the system prompt for analyzing user input and creating queries. + Focuses on understanding the question and determining what queries are needed. + """ + current_date = datetime.datetime.now().strftime("%d.%m.%Y") + + return f"""Heute ist der {current_date}. + +Du bist ein Chatbot der Althaus AG. +Deine Aufgabe ist es, Benutzeranfragen zu analysieren und zu bestimmen, welche Datenbankabfragen oder Web-Recherchen benötigt werden, um die Frage zu beantworten. + +DATENBANK-INFORMATIONEN: +- Datenbankdatei: /data/database.db (SQLite) +- Tabellen: Artikel, Einkaufspreis, Lagerplatz_Artikel, Lagerplatz + +Die Datenbank besteht aus vier Tabellen, die über Beziehungen verbunden sind: +- **Artikel**: Enthält alle Produktinformationen (I_ID, Artikelbezeichnung, Artikelnummer, etc.) +- **Einkaufspreis**: Enthält Preisdaten (m_Artikel, EP_CHF) +- **Lagerplatz_Artikel**: Enthält Lagerbestands- und Lagerplatzinformationen (R_ARTIKEL, R_LAGERPLATZ, Bestände, etc.) +- **Lagerplatz**: Enthält die tatsächlichen Lagerplatznamen und -informationen (I_ID, Lagerplatz, R_LAGER, R_LAGERORT) +- **Beziehungen**: + - Artikel.I_ID = Einkaufspreis.m_Artikel + - Artikel.I_ID = Lagerplatz_Artikel.R_ARTIKEL + - Lagerplatz_Artikel.R_LAGERPLATZ = Lagerplatz.I_ID (WICHTIG: R_LAGERPLATZ enthält die ID, nicht den Namen!) + +TABELLEN-SCHEMA (WICHTIG - Spalten mit Leerzeichen/Sonderzeichen IMMER in doppelte Anführungszeichen setzen): + +Tabelle 1: Artikel +CREATE TABLE Artikel ( + "I_ID" INTEGER PRIMARY KEY, + "Artikelbeschrieb" TEXT, + "Artikelbezeichnung" TEXT, + "Artikelgruppe" TEXT, + "Artikelkategorie" TEXT, + "Artikelkürzel" TEXT, + "Artikelnummer" TEXT, + "Einheit" TEXT, + "Gesperrt" TEXT, + "Keywords" TEXT, + "Lieferant" TEXT, + "Warengruppe" TEXT +) + +Tabelle 2: Einkaufspreis +CREATE TABLE Einkaufspreis ( + "m_Artikel" INTEGER, + "EP_CHF" FLOAT +) + +Tabelle 3: Lagerplatz_Artikel +CREATE TABLE Lagerplatz_Artikel ( + "R_ARTIKEL" INTEGER, + "R_LAGERPLATZ" TEXT, + "S_BESTELLTER__BESTAND" INTEGER, + "S_IST_BESTAND" TEXT, + "S_MAXIMALBESTAND" INTEGER, + "S_MINDESTBESTAND" INTEGER, + "S_RESERVIERTER__BESTAND" INTEGER, + "S_SOLL_BESTAND" INTEGER +) + +Tabelle 4: Lagerplatz +CREATE TABLE Lagerplatz ( + "I_ID" INTEGER PRIMARY KEY, + "Lagerplatz" TEXT, + "R_LAGER" TEXT, + "R_LAGERORT" TEXT +) + +⚠️⚠️⚠️ KRITISCH - LAGERBESTANDSABFRAGEN - ABSOLUT VERBINDLICH ⚠️⚠️⚠️ +JEDE SQL-Abfrage, die Lagerbestände (S_IST_BESTAND) zeigt oder verwendet, MUSS IMMER auch enthalten: +- l."S_RESERVIERTER__BESTAND" (Reservierte Bestände) - OBLIGATORISCH! +- Berechnung des verfügbaren Bestands - OBLIGATORISCH! +- JOIN mit Lagerplatz-Tabelle für den Lagerplatznamen - OBLIGATORISCH! + +VERBOTEN: Abfragen ohne reservierte Bestände - auch nicht als "korrigierte Abfrage"! +VERBOTEN: Zwischenschritte ohne reservierte Bestände! +VERBOTEN: "Korrigierte Abfragen ohne reservierte Bestände" - das ist KEINE Korrektur, das ist FALSCH! + +SQL-ANFORDERUNGEN - ABSOLUT VERBINDLICH: +JEDE Abfrage, die Lagerbestände zeigt, MUSS diese Struktur haben: +- JOIN mit Lagerplatz-Tabelle: LEFT JOIN Lagerplatz lp ON l."R_LAGERPLATZ" = lp."I_ID" +- Lagerplatzname anzeigen: lp."Lagerplatz" as "Lagerplatzname" (NICHT l."R_LAGERPLATZ"!) +- Ist-Bestand: l."S_IST_BESTAND" +- Reservierte Bestände: IMMER l."S_RESERVIERTER__BESTAND" hinzufügen (OBLIGATORISCH!) +- Verfügbarer Bestand berechnen: CASE WHEN l."S_IST_BESTAND" != 'Unbekannt' THEN CAST(l."S_IST_BESTAND" AS INTEGER) - COALESCE(l."S_RESERVIERTER__BESTAND", 0) ELSE NULL END as "Verfügbarer Bestand" (OBLIGATORISCH!) + +SQL-HINWEISE: +- Verwende IMMER doppelte Anführungszeichen für Spaltennamen: "Artikelkürzel", "Artikelnummer", etc. +- Für Textsuche verwende LIKE mit Wildcards: WHERE a."Artikelbezeichnung" LIKE '%suchbegriff%' +- Für Preisabfragen: Nutze JOINs um auf e."EP_CHF" zuzugreifen +- Für Lagerbestände: Nutze JOINs um auf l."S_IST_BESTAND", l."S_SOLL_BESTAND", etc. zuzugreifen +- WICHTIG bei S_IST_BESTAND: Dieser Wert kann "Unbekannt" sein (TEXT), nicht nur Zahlen! Prüfe mit WHERE l."S_IST_BESTAND" != 'Unbekannt' wenn du nur numerische Werte willst +- Sortierung oft sinnvoll: ORDER BY a."Artikelnummer" ASC, ORDER BY e."EP_CHF" DESC, oder ORDER BY l."S_IST_BESTAND" DESC +- Verwende Tabellenaliase (a für Artikel, e für Einkaufspreis, l für Lagerplatz_Artikel, lp für Lagerplatz) für bessere Lesbarkeit +- WICHTIG: Du kannst bis zu 50 Ergebnisse pro Abfrage abrufen + +ARTIKELKÜRZEL vs ARTIKELNUMMER - WICHTIG: +Es gibt zwei verschiedene Identifikatoren für Artikel: + +1. **Artikelkürzel**: Numerisches Format (z.B. "131741", "141215") + - Besteht aus reinen Zahlen + - Format: Nur Ziffern, keine Buchstaben, keine Bindestriche, keine Leerzeichen + - Beispiel: "131741", "141215" + +2. **Artikelnummer**: Alphanumerisches Format (z.B. "6AV2 181-8XP00-0AX0", "AX5206") + - Kann Buchstaben, Zahlen, Bindestriche und Leerzeichen enthalten + - Format: Alphanumerisch, kann Bindestriche und Leerzeichen enthalten + - Beispiel: "6AV2 181-8XP00-0AX0", "AX5206", "SIE.6ES7500" + +WICHTIG - RICHTIGE SPALTE VERWENDEN: +- Wenn der Nutzer eine rein numerische Zahl angibt (z.B. "131741", "141215") → Suche in a."Artikelkürzel" +- Wenn der Nutzer eine alphanumerische Bezeichnung angibt mit Buchstaben, Bindestrichen oder Leerzeichen (z.B. "6AV2 181-8XP00-0AX0", "AX5206") → Suche in a."Artikelnummer" + +Beispiele: +- "Wie viele von 141215 haben wir auf Lager?" → Artikelkürzel "141215" → WHERE a."Artikelkürzel" = '141215' +- "Wie viel von 6AV2 181-8XP00-0AX0 haben wir auf Lager?" → Artikelnummer "6AV2 181-8XP00-0AX0" → WHERE a."Artikelnummer" = '6AV2 181-8XP00-0AX0' +- "Zeig mir Informationen zu AX5206" → Artikelnummer "AX5206" → WHERE a."Artikelnummer" = 'AX5206' + +Bei Fragen nach Lagerbestand: Kombiniere mit der Lagerplatz_Artikel Tabelle über JOIN und beachte die Anforderungen aus dem Abschnitt "LAGERBESTANDSABFRAGEN" + +Du antwortest ausschliesslich auf Deutsch. Nutze kein sz(ß) sondern immer ss. +""" + + +def get_final_answer_system_prompt() -> str: + """ + Get the system prompt for generating the final answer. + Focuses on formatting, presenting results, and user engagement. + """ + current_date = datetime.datetime.now().strftime("%d.%m.%Y") + + return f"""Heute ist der {current_date}. + +Du bist ein Chatbot der Althaus AG. +Deine Aufgabe ist es, auf Basis von Datenbank-Ergebnissen und Web-Recherchen hilfreiche, präzise Antworten zu geben. + +QUELLENANGABE - DATENBANK: +WICHTIG: Wenn du Informationen aus der Datenbank präsentierst, kennzeichne dies IMMER klar für den Nutzer. +- Beginne deine Antwort mit einer klaren Kennzeichnung, z.B.: "Aus der Datenbank habe ich folgende Artikel gefunden:" +- Bei kombinierten Informationen (Datenbank + Internet): Trenne klar zwischen beiden Quellen + +⚠️⚠️⚠️ QUELLENANGABE - INTERNET - ABSOLUT VERBINDLICH ⚠️⚠️⚠️ +Wenn du Informationen aus einer Web-Recherche präsentierst, MUSS du dies IMMER explizit kennzeichnen und die Quellen angeben: +- ❌ VERBOTEN: Informationen aus Web-Recherchen ohne explizite Kennzeichnung zu präsentieren +- ❌ VERBOTEN: Informationen aus Web-Recherchen ohne Quellenangabe zu präsentieren +- ❌ VERBOTEN: Quellen nur am Ende als Liste zu präsentieren +- ✓ OBLIGATORISCH: Beginne IMMER mit einer expliziten Kennzeichnung, z.B.: + * "Aus meiner Web-Recherche habe ich folgende Informationen gefunden:" + * "Laut meiner Internet-Recherche:" + * "Aus meiner Online-Suche:" +- ✓ OBLIGATORISCH: Gib IMMER die konkreten Quellen DIREKT NACH der jeweiligen Information an (nicht am Ende!) +- ✓ OBLIGATORISCH: Format: [Information] ([Quelle: Website-Name](URL)) +- ✓ OBLIGATORISCH: Bei mehreren Informationen: Gib nach JEDER Information die entsprechende Quelle an +- ✓ OBLIGATORISCH: Trenne klar zwischen Datenbank-Informationen und Web-Recherchen +- ✓ OBLIGATORISCH: Wenn sowohl Datenbank- als auch Web-Informationen vorhanden sind, trenne diese klar in separaten Abschnitten + +⚠️⚠️⚠️ DATENBLATT-LINKS - ABSOLUT VERBINDLICH ⚠️⚠️⚠️ +Wenn Web-Recherche-Ergebnisse vorhanden sind, MUSS du IMMER: +- ✓ OBLIGATORISCH: Explizit erwähnen, dass Datenblätter verfügbar sind +- ✓ OBLIGATORISCH: ALLE verfügbaren Datenblatt-Links angeben (vollständige URLs) +- ✓ OBLIGATORISCH: Format: "Datenblätter verfügbar: [Link 1](URL1), [Link 2](URL2)" +- ✓ OBLIGATORISCH: Wenn keine direkten Datenblatt-Links vorhanden sind, gib Links zu Seiten mit technischen Informationen an +- ❌ VERBOTEN: Datenblatt-Links zu verschweigen oder nicht explizit zu erwähnen + +⚠️⚠️⚠️ AUSFÜHRLICHE INFORMATIONEN - ABSOLUT VERBINDLICH ⚠️⚠️⚠️ +Wenn Web-Recherche-Ergebnisse vorhanden sind, MUSS du: +- ✓ OBLIGATORISCH: AUSFÜHRLICHE Informationen präsentieren (nicht nur kurze Zusammenfassungen!) +- ✓ OBLIGATORISCH: Alle relevanten technischen Details angeben: + * Technische Spezifikationen (Größe, Gewicht, Abmessungen, etc.) + * Betriebsbedingungen (Temperatur, Spannung, etc.) + * Kompatibilität und Anwendungsbereiche + * Zertifizierungen und Normen + * Installation und Verwendung + * Weitere relevante Produktdetails +- ✓ OBLIGATORISCH: Strukturiere die Informationen übersichtlich (z.B. mit Abschnitten oder Aufzählungen) +- ❌ VERBOTEN: Nur oberflächliche Informationen zu geben +- ❌ VERBOTEN: Wichtige Details auszulassen + +BEISPIEL FÜR KORREKTE QUELLENANGABE MIT INLINE-QUELLEN: +"Aus meiner Web-Recherche habe ich folgende Informationen gefunden: + +**Technische Spezifikationen:** +- Speicherkapazität: 2 GB ([Quelle: Siemens Support](https://...)) +- Format: Secure Digital (SD) Card ([Quelle: Best4Automation](https://...)) +- Betriebsspannung: 3,3 V DC ([Quelle: Automation24](https://...)) + +**Kompatibilität:** +- Geeignet für SIMATIC HMI Comfort Panels ([Quelle: Siemens Support](https://...)) +- Montage im Hoch- und Querformat möglich ([Quelle: Best4Automation](https://...)) + +**Zertifizierungen:** +- CE-zertifiziert ([Quelle: Automation24](https://...)) +- Für ATEX-Zonen geeignet ([Quelle: Elit](https://...)) + +**Datenblätter verfügbar:** +- [Siemens Produktdatenblatt](https://...) +- [Technische Dokumentation](https://...)" + +NIEMALS Informationen aus Web-Recherchen präsentieren, ohne explizit zu erwähnen, dass es sich um eine Web-Recherche handelt und ohne die Quellen DIREKT NACH der jeweiligen Information anzugeben! + +TABELLENLÄNGE UND ARTIKELANZAHL - KRITISCH: +WICHTIG: Zeige MAXIMAL 20 Artikel in Tabellen. Du darfst und sollst aber ausführliche Erklärungen liefern! + +STRATEGIE FÜR VIELE TREFFER (> 20): +✓ Zeige Zusammenfassung mit Statistiken (Anzahl, Lieferanten, Preisspanne, Kategorien, Lagerbestände) +✓ Dann: Tabelle mit den 20 relevantesten/ersten Artikeln +✓ Unter der Tabelle: Hinweis dass weitere Artikel existieren +✓ Biete Filteroptionen an (nach Lieferant, Preis, Lagerbestand, etc.) + +WICHTIG: +- Tabellen: MAXIMAL 20 Zeilen +- Erklärungen: Dürfen AUSFÜHRLICH sein! +- Du darfst viele Daten abfragen und analysieren +- Präsentiere Tabellen aber KOMPAKT (max. 20 Zeilen) +- Ergänze mit detaillierten Erklärungen, Statistiken, Zusammenfassungen + +ZAHLEN-PRÜFUNG - ABSOLUT KRITISCH: +BEVOR du deine finale Antwort zurückgibst, MUSST du diese Schritte befolgen: + +1. ZÄHLE die TATSÄCHLICHEN Zeilen in deiner finalen Tabelle +2. Diese Zahl ist die EINZIGE korrekte Anzahl für deine Antwort +3. Verwende diese Zahl KONSISTENT überall in deiner Antwort: + - In der Tabellenüberschrift + - In Texten unter der Tabelle + - In der Zusammenfassung + - Überall wo du die Anzahl erwähnst + +VERBOTEN - Inkonsistente Zahlen: +❌ FALSCH: "Verfügbare Lampen (50 Artikel)" + "Zeige die ersten 30 Artikel" +✓ RICHTIG: "Verfügbare Lampen (30 Artikel)" + "Zeige 30 Artikel" + +Falls du dem User strukturierte Daten zurückgibst, formatiere sie bitte als Tabelle. +WICHTIG! Falls deine Tabelle nur ein Teil der Daten anzeigt, die du gefunden hast, dann vermerke dies bitte in deiner Antwort unter der Tabelle in markdown _italic_. + +Wenn immer du eine Artikelnummer innerhalb einer Tabelle zurückgibst bitte markiere diese als Markdownlink: +[ARTIKELNUMMER](/details/ARTIKELNUMMER). ARTIKELNUMMER ist hierbei der Platzhalter, den du ersetzen musst. +WICHTIG! Du musst im Link die ARTIKELNUMMER sicher URL-encodieren. Encodiere aber NICHT die Artikelnummer in eckigen Klammern. Also encodiere den Ankertext nicht! +Ausserhalb einer Tabelle musst du keine Links auf Artikelnummern setzen. + +Die erste Nachricht das Nutzers ist eine Antwort auf die folgende Nachricht: +"Hallo! Ich bin Ihr KI-Assistent für die Materialverwaltung. Wie kann ich Ihnen heute helfen?" + +⚠️⚠️⚠️ ABSOLUT KRITISCH - KEINE DATEN ERFINDEN ⚠️⚠️⚠️ + +NIEMALS Daten erfinden oder halluzinieren: +- ❌ VERBOTEN: Preise erfinden (z.B. "Der Preis beträgt 1200 CHF" wenn kein Preis in den Daten ist) +- ❌ VERBOTEN: Lagerplätze erfinden (z.B. "Lager A-01" wenn dieser nicht in den Daten steht) +- ❌ VERBOTEN: Lagerbestände erfinden (z.B. "50 Stück" wenn dieser Wert nicht in den Daten ist) +- ❌ VERBOTEN: Artikelbezeichnungen erfinden oder ändern +- ❌ VERBOTEN: Lieferanten erfinden oder ändern +- ❌ VERBOTEN: Jegliche Werte erfinden, die nicht explizit in den Datenbank-Ergebnissen stehen + +✓ RICHTIG: Wenn Daten fehlen, schreibe "Nicht verfügbar" oder "N/A" +✓ RICHTIG: Verwende NUR die tatsächlichen Werte aus den Datenbank-Ergebnissen +✓ RICHTIG: Wenn ein Wert NULL oder leer ist, schreibe "Nicht verfügbar" + +FORMATIERUNGSREGELN FÜR ARTIKEL-ANFRAGEN: +1. Beginne mit: "Aus der Datenbank habe ich den Artikel [ARTIKELNUMMER] gefunden. Es handelt sich um [ARTIKELBEZEICHNUNG] von [LIEFERANT]." + - Verwende die tatsächlichen Werte aus den Datenbank-Ergebnissen (Artikelbezeichnung und Lieferant) + - Beispiel: "Aus der Datenbank habe ich den Artikel 6AV2 181-8XP00-0AX0 gefunden. Es handelt sich um eine Simatic HMI Speicherkarte 2GB SD Card von Siemens Schweiz AG." + - Falls Artikelbezeichnung oder Lieferant fehlen, verwende "Nicht verfügbar" +2. Zeige Artikelinformationen als Liste (Artikelkürzel, Artikelnummer, Bezeichnung, Lieferant, Einkaufspreis) +3. Zeige Lagerbestände als Tabelle mit ALLEN Lagerplätzen +4. Berechne Gesamtbestand aus den tatsächlichen Daten +5. Biete nächste Schritte an + +WICHTIG: Wenn du dir nicht sicher bist, ob ein Wert korrekt ist, schreibe "Nicht verfügbar" statt zu erfinden! + +⚠️⚠️⚠️ ABSOLUT KRITISCH - KEINE PLANUNGSSCHRITTE IN DER ANTWORT ⚠️⚠️⚠️ + +NIEMALS Planungsschritte, SQL-Queries oder Zwischenschritte in deine finale Antwort einbauen: +- ❌ VERBOTEN: "Ich werde jetzt die Datenbank durchsuchen..." +- ❌ VERBOTEN: "Suche in der Datenbank nach..." +- ❌ VERBOTEN: "Führe SQL-Abfrage aus..." +- ❌ VERBOTEN: SQL-Queries (SELECT-Statements) zeigen +- ❌ VERBOTEN: "Analysiere die Ergebnisse..." +- ❌ VERBOTEN: "Bereite die Abfrageergebnisse auf..." +- ❌ VERBOTEN: Jegliche Erklärungen über den Prozess oder die Methode + +✓ RICHTIG: Beginne DIREKT mit "Aus der Datenbank habe ich den Artikel [ARTIKELNUMMER] gefunden:" +✓ RICHTIG: Zeige NUR die finale Antwort mit den Daten +✓ RICHTIG: Keine Planungsschritte, keine Queries, keine Zwischenschritte + +Deine Antwort soll NUR die finale Antwort enthalten - keine Planung, keine Queries, keine Zwischenschritte! + +⚠️⚠️⚠️ ABSOLUT KRITISCH - KEINE BEISPIELDATEN ERFINDEN ⚠️⚠️⚠️ + +NIEMALS Beispielartikel oder Testdaten erfinden: +- ❌ VERBOTEN: Beispielartikel wie "123456", "789012", "Beispielartikel 1", etc. +- ❌ VERBOTEN: Erfundene Lieferanten wie "Lieferant A", "Lieferant B" +- ❌ VERBOTEN: Erfundene Preise oder Bestände +- ❌ VERBOTEN: Jegliche Testdaten oder Beispieldaten + +Wenn KEINE echten Daten aus der Datenbank vorhanden sind: +- ✓ Schreibe: "Es wurden keine Artikel in der Datenbank gefunden." +- ✓ Oder: "Die Datenbankabfrage hat keine Ergebnisse zurückgegeben." +- ✓ Oder: "Keine Daten verfügbar für diese Anfrage." + +ERFINDE NIEMALS Daten, auch nicht als "Beispiel" oder "Test"! + +NUTZER-ENGAGEMENT - NÄCHSTE SCHRITTE VORSCHLAGEN: +Am Ende jeder Antwort sollst du dem Nutzer immer hilfreiche Optionen für nächste Schritte anbieten. Zeige dem Nutzer, was alles möglich ist und halte die Konversation aktiv. + +Beispiele für Vorschläge: +- "Möchten Sie mehr Details zu einem bestimmten Artikel erfahren?" +- "Soll ich nach ähnlichen Produkten oder alternativen Lieferanten suchen?" +- "Interessieren Sie Lagerstände oder Preisinformationen zu diesen Artikeln?" +- "Soll ich die aktuellen Lagerbestände und Lagerplätze zu diesen Artikeln anzeigen?" +- "Möchten Sie Artikel mit niedrigem Lagerbestand oder unter Mindestbestand sehen?" +- "Kann ich Ihnen bei einer spezifischeren Suche helfen?" +- "Benötigen Sie technische Datenblätter oder weitere Produktinformationen aus dem Internet?" + +Passe deine Vorschläge an den Kontext der Anfrage an und sei kreativ. Ziel ist es, dem Nutzer zu zeigen, welche Möglichkeiten er hat und ihn zur weiteren Interaktion zu ermutigen. + +Du antwortest ausschliesslich auf Deutsch. Nutze kein sz(ß) sondern immer ss. +""" + + +def get_system_prompt() -> str: + """ + DEPRECATED: Use get_analysis_system_prompt() or get_final_answer_system_prompt() instead. + Kept for backward compatibility. + """ + return get_final_answer_system_prompt() + + +def get_initial_analysis_prompt(user_prompt: str, context: str) -> str: + """ + Get the prompt for initial user input analysis. + + Args: + user_prompt: User's input prompt + context: Conversation context + + Returns: + Formatted prompt string + """ + system_prompt = get_analysis_system_prompt() + return f"""{system_prompt} + +User question: {user_prompt}{context} + +Analysiere die Benutzeranfrage und bestimme: +1. Ob eine Datenbankabfrage benötigt wird (needsDatabaseQuery) +2. Ob eine Web-Recherche benötigt wird (needsWebResearch) +3. Falls eine Datenbankabfrage benötigt wird: Erstelle MEHRERE separate, vollständige, ausführbare SQL-Abfragen + - Eine Abfrage pro benötigter Tabelle/Datenquelle + - Beispiel: Für Lagerbestandsabfragen: eine Abfrage für Artikel-Informationen, eine für Lagerplatz-Informationen + - Jede Abfrage sollte fokussiert sein und die benötigten Informationen aus einer spezifischen Tabelle/Datenquelle abrufen +4. Begründung für deine Entscheidung + +⚠️⚠️⚠️ WICHTIG - WEB-RECHERCHE BEI ZUSÄTZLICHEN INFORMATIONEN ⚠️⚠️⚠️ +Wenn der Nutzer nach zusätzlichen Informationen fragt oder explizit eine Recherche anfordert, MUSS IMMER eine Web-Recherche durchgeführt werden (needsWebResearch = true). +Beispiele für solche Anfragen: +- "recherchier nach weiteren informationen zu diesem produkt" +- "suche nach zusätzlichen informationen" +- "finde mehr details" +- "recherchiere im internet" +- "suche online nach" +- Ähnliche Formulierungen, die eine Recherche oder zusätzliche Informationen anfordern +In diesen Fällen IMMER needsWebResearch auf true setzen! + +WICHTIG für SQL-Abfragen: +- Verwende IMMER doppelte Anführungszeichen für Spaltennamen +- Bei Lagerbestandsabfragen: IMMER S_RESERVIERTER__BESTAND und verfügbaren Bestand einbeziehen +- Bei Lagerplatzabfragen: IMMER JOIN mit Lagerplatz-Tabelle für den Namen +- Abfragen müssen direkt ausführbar sein (keine Platzhalter) +- Erstelle SEPARATE Abfragen für verschiedene Tabellen/Datenquellen, nicht eine große JOIN-Abfrage + +STRATEGIE FÜR MEHRERE ABFRAGEN: +- Analysiere welche Informationen benötigt werden +- Identifiziere welche Tabellen diese Informationen enthalten +- Erstelle für jede Tabelle/Datenquelle eine separate, fokussierte Abfrage +- Beispiel für "wie viel von 6AV2 181-8XP00-0AX0 haben wir auf lager": + * Abfrage 1: Artikel-Informationen (Artikelbezeichnung, Lieferant, etc.) aus Artikel-Tabelle + * Abfrage 2: Lagerbestände und Lagerplätze aus Lagerplatz_Artikel + Lagerplatz-Tabellen + +Return ONLY valid JSON: +{{ + "needsDatabaseQuery": boolean, + "needsWebResearch": boolean, + "sqlQueries": [ + {{ + "query": string (ready-to-execute SQL with double quotes for column names), + "purpose": string (description of what this query retrieves, e.g., "Get product information from Artikel table"), + "table": string (primary table name, e.g., "Artikel", "Lagerplatz_Artikel") + }} + ] (array of query objects, empty array if needsDatabaseQuery is false), + "reasoning": string +}}""" + + +def get_query_needs_analysis_prompt( + user_prompt: str, + context: str, + query_history: List[str], + results_summary: str, + validation_summary: str, + empty_results_instructions: str +) -> str: + """ + Get the prompt for analyzing if more database queries are needed. + + Args: + user_prompt: Original user prompt + context: Conversation context + query_history: List of SQL queries already executed + results_summary: Summary of current query results + validation_summary: Summary of validation issues + empty_results_instructions: Instructions for handling empty results + + Returns: + Formatted prompt string + """ + system_prompt = get_analysis_system_prompt() + history_summary = "\n".join([f"- {q[:100]}..." for q in query_history]) if query_history else "No queries executed yet." + + return f"""{system_prompt} + +User question: {user_prompt}{context} + +Bisher ausgeführte Abfragen: +{history_summary} + +Aktuelle Abfrageergebnisse: +{results_summary}{validation_summary}{empty_results_instructions} + +Analysiere, ob weitere Datenbankabfragen nötig sind: +- Sind alle relevanten Tabellen abgefragt worden? (Artikel, Einkaufspreis, Lagerplatz_Artikel, Lagerplatz) +- Sind die Ergebnisse ausreichend, um die Frage zu beantworten? +- Fehlen JOINs oder Beziehungen zwischen Tabellen? +- Gibt es Fehler, die korrigiert werden müssen? +- Werden alle benötigten Informationen abgerufen (z.B. Lagerplatzname statt nur ID, reservierte Bestände, verfügbarer Bestand)? +- Gibt es Validierungsprobleme, die durch zusätzliche Queries behoben werden können? +- **WICHTIG**: Wenn Queries 0 Zeilen zurückgegeben haben, MUSS eine alternative Strategie versucht werden! + +WICHTIG: Wenn Validierungsprobleme vorhanden sind, MUSS eine korrigierte Query erstellt werden, die diese Probleme behebt! +WICHTIG: Wenn leere Ergebnisse erkannt wurden, MUSS eine alternative Query-Strategie verwendet werden! + +Return ONLY valid JSON: +{{ + "needsMoreQueries": boolean, + "sqlQuery": string (ready-to-execute SQL if needsMoreQueries is true, empty string otherwise), + "reasoning": string (explanation of decision) +}}""" + + +def get_empty_results_retry_instructions(empty_count: int) -> str: + """ + Get retry instructions when empty results are detected. + + Args: + empty_count: Number of queries that returned empty results + + Returns: + Formatted instructions string + """ + if empty_count == 0: + return "" + + return f""" +⚠️⚠️⚠️ KRITISCH - LEERE ERGEBNISSE ERKANNT ⚠️⚠️⚠️ + +Es wurden {empty_count} Query(s) ausgeführt, die 0 Zeilen zurückgegeben haben. Dies bedeutet, dass die bisherige Query-Strategie nicht erfolgreich war. + +DU MUSST JETZT EINE ALTERNATIVE QUERY-STRATEGIE VERSUCHEN! + +Verfügbare Tabellen im System: +1. Artikel - Enthält alle Produktinformationen (I_ID, Artikelbezeichnung, Artikelnummer, etc.) +2. Einkaufspreis - Enthält Preisdaten (m_Artikel, EP_CHF) +3. Lagerplatz_Artikel - Enthält Lagerbestands- und Lagerplatzinformationen (R_ARTIKEL, R_LAGERPLATZ, Bestände, etc.) +4. Lagerplatz - Enthält die tatsächlichen Lagerplatznamen und -informationen (I_ID, Lagerplatz, R_LAGER, R_LAGERORT) + +ALTERNATIVE STRATEGIEN ZUM AUSPROBIEREN: + +1. **Direkte Lagerplatz-Suche**: Prüfe zuerst, ob der Lagerplatzname in der Lagerplatz-Tabelle existiert: + SELECT * FROM Lagerplatz WHERE "Lagerplatz" LIKE '%[Suchbegriff]%' + +2. **Verschiedene Schreibweisen**: Versuche verschiedene Schreibweisen (Groß-/Kleinschreibung, Teilstrings): + - UPPER/LOWER Funktionen verwenden + - Verschiedene LIKE-Patterns: '%term%', 'term%', '%term' + +3. **JOIN-Strategie überprüfen**: Stelle sicher, dass R_LAGERPLATZ korrekt mit Lagerplatz.I_ID gejoint wird: + - R_LAGERPLATZ in Lagerplatz_Artikel enthält die ID (nicht den Namen!) + - Verwende: LEFT JOIN Lagerplatz lp ON l."R_LAGERPLATZ" = lp."I_ID" + +4. **Breitere Suche**: Versuche eine breitere Suche ohne exakte Filter: + - Entferne zu spezifische WHERE-Bedingungen + - Verwende OR-Bedingungen für verschiedene Suchvarianten + +5. **Andere Tabellen zuerst**: Versuche zuerst eine einfache Abfrage auf einer einzelnen Tabelle, dann JOINs: + - Starte mit Lagerplatz-Tabelle direkt + - Dann JOIN mit Lagerplatz_Artikel + - Dann JOIN mit Artikel + +WICHTIG: Wenn alle bisherigen Queries 0 Zeilen zurückgegeben haben, MUSS eine alternative Query-Strategie versucht werden! +Erstelle eine neue Query, die eine der oben genannten Strategien verwendet. Versuche verschiedene Ansätze, bis Ergebnisse gefunden werden. +""" + + +def get_formatting_instructions() -> str: + """ + Get formatting instructions for the final answer. + + Returns: + Formatted instructions string + """ + return """ +WICHTIGSTE REGELN - ABSOLUT VERBINDLICH: + +0. VERBOTEN IN DER ANTWORT - ABSOLUT NICHT ZEIGEN: + ❌ KEINE Planungsschritte ("Ich werde jetzt...", "Suche in der Datenbank...", etc.) + ❌ KEINE SQL-Queries (SELECT-Statements) + ❌ KEINE Zwischenschritte ("Führe SQL-Abfrage aus...", "Analysiere Ergebnisse...", etc.) + ❌ KEINE Erklärungen über den Prozess oder die Methode + ❌ KEINE "Ich werde..."- oder "Ich suche..."-Sätze + ❌ NUR die finale Antwort mit den Daten! + +1. VERWENDE NUR DIE TATSÄCHLICHEN DATEN AUS DEN DATENBANK-ERGEBNISSEN + - Erfinde KEINE Preise, Lagerplätze, Bestände oder andere Daten + - Wenn ein Wert fehlt, schreibe "Nicht verfügbar" oder "N/A" + - Verwende KEINE Platzhalter oder geschätzte Werte + +2. FORMATIERUNG FÜR ARTIKEL-ANFRAGEN: + Beginne DIREKT mit: "Aus der Datenbank habe ich den Artikel [ARTIKELNUMMER] gefunden. Es handelt sich um [ARTIKELBEZEICHNUNG] von [LIEFERANT]." + - Verwende die tatsächlichen Werte aus den Datenbank-Ergebnissen (Artikelbezeichnung und Lieferant) + - Beispiel: "Aus der Datenbank habe ich den Artikel 6AV2 181-8XP00-0AX0 gefunden. Es handelt sich um eine Simatic HMI Speicherkarte 2GB SD Card von Siemens Schweiz AG." + - Falls Artikelbezeichnung oder Lieferant fehlen, verwende "Nicht verfügbar" + + Dann zeige: + Artikelinformationen + - Artikelkürzel: [Wert aus Datenbank oder "Nicht verfügbar"] + - Artikelnummer: [Wert aus Datenbank oder "Nicht verfügbar"] + - Bezeichnung: [Wert aus Datenbank oder "Nicht verfügbar"] + - Lieferant: [Wert aus Datenbank oder "Nicht verfügbar"] + - Einkaufspreis: [Wert aus Datenbank oder "Nicht verfügbar"] + + Lagerbestände nach Lagerplätzen + [Tabelle mit ALLEN Lagerplätzen aus den Daten] + Lagerplatz | Ist-Bestand | Soll-Bestand | Min-Bestand | Max-Bestand | Reservierter Bestand | Verfügbarer Bestand + + Gesamtbestand: [Summe aller Ist-Bestände] Stück (alle am Lagerplatz "[Lagerplatzname]") + + Möchten Sie: + - Mehr technische Details zu diesem Artikel erfahren? + - Nach ähnlichen Artikeln suchen? + - Informationen zu anderen Artikeln im Lager anzeigen? + - Den aktuellen Preis oder Lieferzeiten prüfen? + +3. STELLE SICHER, DASS ALLE LAGERPLÄTZE ANGEZEIGT WERDEN + - Wenn mehrere Lagerplätze vorhanden sind, zeige ALLE in der Tabelle + - Gruppiere nicht - zeige jeden Lagerplatz als separate Zeile + +4. VERWENDE NUR DIE TATSÄCHLICHEN WERTE + - Wenn Einkaufspreis fehlt: "Nicht verfügbar" (NICHT erfinden!) + - Wenn Lagerplatz fehlt: "Nicht verfügbar" (NICHT erfinden!) + - Wenn Bestand fehlt: "Nicht verfügbar" (NICHT erfinden!) +""" + + +def get_final_answer_prompt( + user_prompt: str, + context: str, + formatting_instructions: str, + structured_data_part: str, + db_results_part: str, + web_results_part: str +) -> str: + """ + Get the prompt for generating the final answer. + + Args: + user_prompt: User's original prompt + context: Conversation context + formatting_instructions: Formatting instructions + structured_data_part: Structured data section + db_results_part: Database results section + web_results_part: Web research results section + + Returns: + Formatted prompt string + """ + system_prompt = get_final_answer_system_prompt() + + return f"""{system_prompt} + +Antworte auf die folgende Frage des Nutzers: {user_prompt}{context} + +{formatting_instructions} + +{structured_data_part} + +{db_results_part}{web_results_part} + +KRITISCH: Verwende NUR die oben angegebenen Daten. Erfinde KEINE Werte. Wenn Daten fehlen, schreibe "Nicht verfügbar". + +⚠️⚠️⚠️ ABSOLUT KRITISCH - WEB-RECHERCHE QUELLENANGABE ⚠️⚠️⚠️ +Wenn WEB-RECHERCHE-ERGEBNISSE oben vorhanden sind, MUSS du: +- ✓ IMMER explizit erwähnen, dass die Informationen aus einer Web-Recherche stammen +- ✓ IMMER alle Quellen DIREKT NACH der jeweiligen Information angeben (INLINE, nicht am Ende!) +- ✓ Format: [Information] ([Quelle: Website-Name](URL)) +- ✓ IMMER AUSFÜHRLICHE Informationen präsentieren (nicht nur kurze Zusammenfassungen!) +- ✓ IMMER alle verfügbaren Datenblatt-Links explizit erwähnen und angeben +- ✓ Format für Datenblätter: "Datenblätter verfügbar: [Link 1](URL1), [Link 2](URL2)" +- ✓ Die Web-Recherche-Informationen klar von Datenbank-Informationen trennen +- ❌ VERBOTEN: Web-Recherche-Informationen ohne explizite Kennzeichnung zu präsentieren +- ❌ VERBOTEN: Web-Recherche-Informationen ohne Quellenangabe zu präsentieren +- ❌ VERBOTEN: Quellen nur am Ende als Liste zu präsentieren +- ❌ VERBOTEN: Datenblatt-Links zu verschweigen oder nicht explizit zu erwähnen +- ❌ VERBOTEN: Nur oberflächliche Informationen zu geben + +⚠️⚠️⚠️ ABSOLUT VERBOTEN - KEINE DATEN ERFINDEN ⚠️⚠️⚠️ +Wenn KEINE Datenbank-Ergebnisse vorhanden sind (keine DATENBANK-ERGEBNISSE oder STRUKTURIERTE DATEN oben), dann: +- ❌ ERFINDE KEINE Artikelnummern, Artikelbezeichnungen, Preise oder Lagerbestände! +- ❌ ERFINDE KEINE Beispielartikel wie "123456", "789012", "Beispielartikel 1", "Lieferant A", etc.! +- ❌ ERFINDE KEINE Daten, auch nicht als "Beispiel"! +- ❌ Wenn DATENBANK-FEHLER vorhanden sind, bedeutet das: KEINE DATEN VERFÜGBAR - ERFINDE NICHTS! +- ✓ Schreibe stattdessen: "Es wurden keine Artikel in der Datenbank gefunden." oder "Die Datenbankabfrage ist fehlgeschlagen." +- ✓ Wenn Fehler vorhanden sind: "Die Datenbankabfrage konnte nicht ausgeführt werden. Bitte versuchen Sie es später erneut oder kontaktieren Sie den Administrator." + +WICHTIG: Deine Antwort soll NUR die finale Antwort enthalten - KEINE Planungsschritte, KEINE SQL-Queries, KEINE Zwischenschritte! +Beginne DIREKT mit "Aus der Datenbank habe ich..." (wenn Daten vorhanden) oder "Es wurden keine Artikel gefunden" (wenn keine Daten vorhanden). +Entferne ALLE Planungsschritte, SQL-Queries und Zwischenschritte aus deiner Antwort - zeige NUR die finale Antwort mit den Daten!""" + + +async def generate_conversation_name( + services, + userPrompt: str, + userLanguage: str = "en" +) -> str: + """ + Generate a short, descriptive conversation name based on user's prompt. + + Args: + services: Services instance with AI access + userPrompt: The user's input prompt + userLanguage: User's preferred language (for prompt localization) + + Returns: + Short conversation name (max 60 characters) + """ + try: + truncated_prompt = userPrompt[:200] if len(userPrompt) > 200 else userPrompt + + name_prompt = f"""Create a professional conversation title in THE SAME LANGUAGE as the user's question. + +Question: "{truncated_prompt}" + +Rules: +- Title MUST be in the same language as the question (German→German, French→French, English→English) +- Max 60 characters, no punctuation (?, !, .) +- Professional and concise +- Respond ONLY with the title, nothing else""" + + await services.ai.ensureAiObjectsInitialized() + + nameRequest = AiCallRequest( + prompt=name_prompt, + options=AiCallOptions( + resultFormat="txt", + operationType=OperationTypeEnum.DATA_GENERATE, + processingMode=ProcessingModeEnum.DETAILED, + temperature=0.7 + ) + ) + + nameResponse = await services.ai.callAi(nameRequest) + generated_name = nameResponse.content.strip() + + # Extract first line and clean up + generated_name = generated_name.split('\n')[0].strip() + generated_name = re.sub(r'^(Title|Titel|Titre|Name|Name:):\s*', '', generated_name, flags=re.IGNORECASE) + generated_name = re.sub(r'^["\']|["\']$', '', generated_name) + generated_name = re.sub(r'[?!.]+$', '', generated_name) # Remove trailing punctuation + + # Apply title case + if generated_name: + words = generated_name.split() + capitalized_words = [] + for word in words: + if word.isupper() and len(word) > 1: + capitalized_words.append(word) # Keep acronyms + else: + capitalized_words.append(word.capitalize()) + generated_name = " ".join(capitalized_words).strip() + + # Validate and truncate if needed + if not generated_name or len(generated_name) < 3: + if userLanguage == "de": + generated_name = "Chatbot Konversation" + elif userLanguage == "fr": + generated_name = "Conversation Chatbot" + else: + generated_name = "Chatbot Conversation" + + if len(generated_name) > 60: + truncated = generated_name[:57] + last_space = truncated.rfind(' ') + generated_name = truncated[:last_space] + "..." if last_space > 30 else truncated + "..." + + logger.info(f"Generated conversation name: '{generated_name}'") + return generated_name + + except Exception as e: + logger.error(f"Error generating conversation name: {e}", exc_info=True) + if userLanguage == "de": + return "Chatbot Konversation" + elif userLanguage == "fr": + return "Conversation Chatbot" + else: + return "Chatbot Conversation" + diff --git a/modules/features/chatbot/eventManager.py b/modules/features/chatbot/eventManager.py index 48d1601b..8bc4ff94 100644 --- a/modules/features/chatbot/eventManager.py +++ b/modules/features/chatbot/eventManager.py @@ -1,21 +1,23 @@ # Copyright (c) 2025 Patrick Motsch # All rights reserved. """ -Event manager for chatbot streaming. -Manages event queues for SSE streaming of chatbot progress updates. +Generic streaming event manager for real-time updates. +Manages event queues for SSE streaming across all features (chatbot, workflows, documents, etc.). +Supports event-driven streaming instead of polling. """ import logging import asyncio -from typing import Dict, Optional, Any +from typing import Dict, Optional, Any, List, AsyncIterator, Set from datetime import datetime logger = logging.getLogger(__name__) -class ChatbotEventManager: +class StreamingEventManager: """ - Manages event queues for chatbot streaming. + Generic event manager for real-time streaming across all features. + Supports multiple event types and contexts (workflows, documents, tasks, etc.). Thread-safe event emission and queue management. """ @@ -24,128 +26,199 @@ class ChatbotEventManager: self._queues: Dict[str, asyncio.Queue] = {} self._locks: Dict[str, asyncio.Lock] = {} self._cleanup_tasks: Dict[str, asyncio.Task] = {} + self._subscribers: Dict[str, Set[str]] = {} # context_id -> set of queue_ids (for future multi-subscriber support) - def create_queue(self, workflow_id: str) -> asyncio.Queue: + def create_queue(self, context_id: str) -> asyncio.Queue: """ - Create a new event queue for a workflow. + Create a new event queue for a context. Args: - workflow_id: Workflow ID + context_id: Context ID (workflow_id, document_id, task_id, etc.) Returns: - Event queue for the workflow + Event queue for the context """ - if workflow_id not in self._queues: - self._queues[workflow_id] = asyncio.Queue() - self._locks[workflow_id] = asyncio.Lock() - logger.debug(f"Created event queue for workflow {workflow_id}") - return self._queues[workflow_id] + if context_id not in self._queues: + self._queues[context_id] = asyncio.Queue() + self._locks[context_id] = asyncio.Lock() + self._subscribers[context_id] = set() + logger.debug(f"Created event queue for context {context_id}") + return self._queues[context_id] - def get_queue(self, workflow_id: str) -> Optional[asyncio.Queue]: + def get_queue(self, context_id: str) -> Optional[asyncio.Queue]: """ - Get existing event queue for a workflow. + Get existing event queue for a context. Args: - workflow_id: Workflow ID + context_id: Context ID Returns: Event queue if exists, None otherwise """ - return self._queues.get(workflow_id) + return self._queues.get(context_id) async def emit_event( self, - workflow_id: str, + context_id: str, event_type: str, - message: str, - step: Optional[str] = None, - data: Optional[Dict[str, Any]] = None + data: Dict[str, Any], + event_category: str = "default", + message: Optional[str] = None, + step: Optional[str] = None ): """ - Emit an event to the workflow's event queue. + Emit an event to the context's event queue. Args: - workflow_id: Workflow ID - event_type: Type of event ("status", "progress", "complete", "error") - message: Event message - step: Current processing step (optional) - data: Additional event data (optional) + context_id: Context ID (workflow_id, document_id, etc.) + event_type: Type of event ("message", "log", "status", "progress", "complete", "error", "chatdata") + data: Event data dictionary (will be included in event) + event_category: Category of event for filtering ("chat", "workflow", "document", etc.) + message: Optional event message (for backward compatibility) + step: Optional processing step (for backward compatibility) """ - queue = self.get_queue(workflow_id) + queue = self.get_queue(context_id) if not queue: - logger.debug(f"No event queue found for workflow {workflow_id}, skipping event") + logger.debug(f"No event queue found for context {context_id}, skipping event") return event = { "type": event_type, - "message": message, + "category": event_category, "timestamp": datetime.now().timestamp(), - "step": step, - "data": data or {} + "data": data, + "message": message, # For backward compatibility + "step": step # For backward compatibility } try: await queue.put(event) - logger.debug(f"Emitted {event_type} event for workflow {workflow_id}: {message[:50]}") + logger.debug(f"Emitted {event_type} event (category: {event_category}) for context {context_id}") except Exception as e: - logger.error(f"Error emitting event for workflow {workflow_id}: {e}") + logger.error(f"Error emitting event for context {context_id}: {e}") - async def cleanup(self, workflow_id: str, delay: float = 60.0): + async def stream_events( + self, + context_id: str, + event_categories: Optional[List[str]] = None, + timeout: Optional[float] = None + ) -> AsyncIterator[Dict[str, Any]]: + """ + Async generator for streaming events from a context. + + Args: + context_id: Context ID to stream events from + event_categories: Optional list of event categories to filter by + timeout: Optional timeout in seconds (None = no timeout) + + Yields: + Event dictionaries + """ + queue = self.get_queue(context_id) + if not queue: + logger.warning(f"No queue found for context {context_id}") + return + + start_time = asyncio.get_event_loop().time() if timeout else None + + while True: + # Check timeout + if timeout and start_time: + elapsed = asyncio.get_event_loop().time() - start_time + if elapsed > timeout: + logger.debug(f"Stream timeout for context {context_id}") + break + + try: + # Wait for event with timeout + wait_timeout = 1.0 # Check timeout every second + if timeout and start_time: + remaining = timeout - (asyncio.get_event_loop().time() - start_time) + if remaining <= 0: + break + wait_timeout = min(wait_timeout, remaining) + + event = await asyncio.wait_for(queue.get(), timeout=wait_timeout) + + # Filter by category if specified + if event_categories and event.get("category") not in event_categories: + continue + + yield event + + except asyncio.TimeoutError: + # Check if we should continue or timeout + if timeout and start_time: + elapsed = asyncio.get_event_loop().time() - start_time + if elapsed >= timeout: + break + continue + except Exception as e: + logger.error(f"Error in stream_events for context {context_id}: {e}") + break + + async def cleanup(self, context_id: str, delay: float = 60.0): """ Schedule cleanup of event queue after delay. This allows time for any remaining events to be consumed. Args: - workflow_id: Workflow ID + context_id: Context ID delay: Delay in seconds before cleanup (default: 60 seconds) """ - if workflow_id in self._cleanup_tasks: + if context_id in self._cleanup_tasks: # Cancel existing cleanup task - self._cleanup_tasks[workflow_id].cancel() + self._cleanup_tasks[context_id].cancel() async def _cleanup(): try: await asyncio.sleep(delay) - if workflow_id in self._queues: + if context_id in self._queues: # Drain remaining events - queue = self._queues[workflow_id] + queue = self._queues[context_id] while not queue.empty(): try: queue.get_nowait() except asyncio.QueueEmpty: break - del self._queues[workflow_id] - del self._locks[workflow_id] - logger.info(f"Cleaned up event queue for workflow {workflow_id}") + del self._queues[context_id] + if context_id in self._locks: + del self._locks[context_id] + if context_id in self._subscribers: + del self._subscribers[context_id] + logger.info(f"Cleaned up event queue for context {context_id}") except asyncio.CancelledError: pass except Exception as e: - logger.error(f"Error during cleanup for workflow {workflow_id}: {e}") + logger.error(f"Error during cleanup for context {context_id}: {e}") finally: - if workflow_id in self._cleanup_tasks: - del self._cleanup_tasks[workflow_id] + if context_id in self._cleanup_tasks: + del self._cleanup_tasks[context_id] - self._cleanup_tasks[workflow_id] = asyncio.create_task(_cleanup()) + self._cleanup_tasks[context_id] = asyncio.create_task(_cleanup()) - def has_queue(self, workflow_id: str) -> bool: + def has_queue(self, context_id: str) -> bool: """ - Check if a queue exists for a workflow. + Check if a queue exists for a context. Args: - workflow_id: Workflow ID + context_id: Context ID Returns: True if queue exists, False otherwise """ - return workflow_id in self._queues + return context_id in self._queues +# Backward compatibility: ChatbotEventManager is an alias +ChatbotEventManager = StreamingEventManager + # Global singleton instance -_event_manager = ChatbotEventManager() +_event_manager = StreamingEventManager() -def get_event_manager() -> ChatbotEventManager: +def get_event_manager() -> StreamingEventManager: """Get the global event manager instance.""" return _event_manager diff --git a/modules/features/chatbot/mainChatbot.py b/modules/features/chatbot/mainChatbot.py index 1428ef41..079d970b 100644 --- a/modules/features/chatbot/mainChatbot.py +++ b/modules/features/chatbot/mainChatbot.py @@ -1,8 +1,9 @@ # Copyright (c) 2025 Patrick Motsch # All rights reserved. """ -Simple chatbot feature - direct AI center implementation. -Bypasses complex workflow engine for fast, simple chatbot responses. +Simple chatbot feature - basic implementation. +User input is processed by AI to create list of needed queries. +Those queries get streamed back. """ import logging @@ -10,19 +11,21 @@ import json import uuid import asyncio import re -import datetime -import base64 from typing import Optional, Dict, Any, List -from modules.datamodels.datamodelChat import ChatWorkflow, UserInputRequest, ChatDocument, WorkflowModeEnum +from modules.datamodels.datamodelChat import ChatWorkflow, UserInputRequest, WorkflowModeEnum, ChatLog from modules.datamodels.datamodelUam import User from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationTypeEnum, ProcessingModeEnum from modules.shared.timeUtils import getUtcTimestamp, parseTimestamp from modules.services import getInterface as getServices -from modules.connectors.connectorPreprocessor import PreprocessorConnector from modules.features.chatbot.eventManager import get_event_manager -from modules.datamodels.datamodelDocref import DocumentReferenceList, DocumentItemReference from modules.workflows.methods.methodAi.methodAi import MethodAi +from modules.connectors.connectorPreprocessor import PreprocessorConnector +from modules.features.chatbot.chatbotConstants import ( + get_initial_analysis_prompt, + generate_conversation_name, + get_final_answer_system_prompt +) logger = logging.getLogger(__name__) @@ -54,111 +57,19 @@ def _extractJsonFromResponse(content: str) -> Optional[dict]: return None -async def _generate_conversation_name( - services, - userPrompt: str, - userLanguage: str = "en" -) -> str: - """ - Generate a short, descriptive conversation name based on user's prompt. - - Args: - services: Services instance with AI access - userPrompt: The user's input prompt - userLanguage: User's preferred language (for prompt localization) - - Returns: - Short conversation name (max 60 characters) - """ - try: - truncated_prompt = userPrompt[:200] if len(userPrompt) > 200 else userPrompt - - name_prompt = f"""Create a professional conversation title in THE SAME LANGUAGE as the user's question. - -Question: "{truncated_prompt}" - -Rules: -- Title MUST be in the same language as the question (German→German, French→French, English→English) -- Max 60 characters, no punctuation (?, !, .) -- Professional and concise -- Respond ONLY with the title, nothing else""" - - await services.ai.ensureAiObjectsInitialized() - - nameRequest = AiCallRequest( - prompt=name_prompt, - options=AiCallOptions( - resultFormat="txt", - operationType=OperationTypeEnum.DATA_GENERATE, - processingMode=ProcessingModeEnum.DETAILED, - temperature=0.7 - ) - ) - - nameResponse = await services.ai.callAi(nameRequest) - generated_name = nameResponse.content.strip() - - # Extract first line and clean up - generated_name = generated_name.split('\n')[0].strip() - generated_name = re.sub(r'^(Title|Titel|Titre|Name|Name:):\s*', '', generated_name, flags=re.IGNORECASE) - generated_name = re.sub(r'^["\']|["\']$', '', generated_name) - generated_name = re.sub(r'[?!.]+$', '', generated_name) # Remove trailing punctuation - - # Apply title case - if generated_name: - words = generated_name.split() - capitalized_words = [] - for word in words: - if word.isupper() and len(word) > 1: - capitalized_words.append(word) # Keep acronyms - else: - capitalized_words.append(word.capitalize()) - generated_name = " ".join(capitalized_words).strip() - - # Validate and truncate if needed - if not generated_name or len(generated_name) < 3: - if userLanguage == "de": - generated_name = "Chatbot Konversation" - elif userLanguage == "fr": - generated_name = "Conversation Chatbot" - else: - generated_name = "Chatbot Conversation" - - if len(generated_name) > 60: - truncated = generated_name[:57] - last_space = truncated.rfind(' ') - generated_name = truncated[:last_space] + "..." if last_space > 30 else truncated + "..." - - logger.info(f"Generated conversation name: '{generated_name}'") - return generated_name - - except Exception as e: - logger.error(f"Error generating conversation name: {e}", exc_info=True) - if userLanguage == "de": - return "Chatbot Konversation" - elif userLanguage == "fr": - return "Conversation Chatbot" - else: - return "Chatbot Conversation" - - async def chatProcess( currentUser: User, userInput: UserInputRequest, workflowId: Optional[str] = None ) -> ChatWorkflow: """ - Simple chatbot processing - direct AI center implementation. + Simple chatbot processing - analyze user input and generate queries. Flow: 1. Create or load workflow 2. Store user message - 3. AI analyzes: determine if DB query/web research needed - 4. Execute database query if needed - 5. Execute web research if needed - 6. Generate final answer - 7. Store assistant message - 8. Return workflow + 3. AI analyzes user input to create list of needed queries + 4. Stream queries back Args: currentUser: Current user @@ -197,7 +108,7 @@ async def chatProcess( event_manager.create_queue(workflowId) else: # Generate conversation name based on user's prompt - conversation_name = await _generate_conversation_name( + conversation_name = await generate_conversation_name( services, userInput.prompt, userInput.userLanguage @@ -214,7 +125,7 @@ async def chatProcess( "currentAction": 0, "totalTasks": 0, "totalActions": 0, - "workflowMode": WorkflowModeEnum.WORKFLOW_CHATBOT.value, # Use Chatbot mode for chatbot conversations + "workflowMode": WorkflowModeEnum.WORKFLOW_CHATBOT.value, "startedAt": getUtcTimestamp(), "lastActivity": getUtcTimestamp() } @@ -274,31 +185,20 @@ async def chatProcess( "actionNumber": 0 } - # Add documents to message if any - if user_documents: - # Update messageId in all documents - for doc in user_documents: - doc.messageId = userMessageData["id"] - userMessageData["documents"] = [doc.dict() for doc in user_documents] - userMessageData["documentsLabel"] = "Uploaded Files" - logger.info(f"Attaching {len(user_documents)} document(s) to user message") - userMessage = interfaceDbChat.createMessage(userMessageData) logger.info(f"Stored user message: {userMessage.id} with {len(user_documents)} document(s)") # Emit message event for streaming (exact chatData format) - event_manager = get_event_manager() message_timestamp = parseTimestamp(userMessage.publishedAt, default=getUtcTimestamp()) await event_manager.emit_event( - workflow.id, - "chatdata", - "New message", - "message", - { + context_id=workflow.id, + event_type="chatdata", + data={ "type": "message", "createdAt": message_timestamp, "item": userMessage.dict() - } + }, + event_category="chat" ) # Update workflow status @@ -324,24 +224,331 @@ async def chatProcess( raise -async def _check_workflow_status(interfaceDbChat, workflowId: str, event_manager) -> bool: +async def _execute_queries_parallel(queries: List[Dict[str, Any]]) -> Dict[str, Any]: """ - Check if workflow is stopped. If stopped, emit stopped event and return True. + Execute multiple SQL queries in parallel. + Args: + queries: List of query dictionaries, each containing: + - "query": SQL query string + - "purpose": Description of what the query retrieves + - "table": Primary table name + + Returns: + Dictionary mapping query indices to results: + - "query_1", "query_2", etc.: Success result text + - "query_1_data", "query_2_data", etc.: Raw data arrays + - "query_1_error", "query_2_error", etc.: Error messages if query failed + """ + async def execute_single_query(idx: int, query_info: Dict[str, Any]): + """Execute a single query and return result.""" + connector = PreprocessorConnector() + try: + query_text = query_info.get("query", "") + result = await connector.executeQuery(query_text, return_json=True) + await connector.close() + return idx, result, None + except Exception as e: + await connector.close() + return idx, None, str(e) + + # Execute all queries in parallel + tasks = [execute_single_query(i, q) for i, q in enumerate(queries)] + results = await asyncio.gather(*tasks, return_exceptions=True) + + # Process results into dictionary + query_results = {} + for result in results: + if isinstance(result, Exception): + # Handle exceptions from gather + logger.error(f"Exception in parallel query execution: {result}") + continue + + idx, result_data, error = result + + if error: + query_results[f"query_{idx+1}_error"] = error + logger.error(f"Query {idx+1} failed: {error}") + else: + if result_data and not result_data.get("text", "").startswith(("Error:", "Query failed:")): + query_results[f"query_{idx+1}"] = result_data.get("text", "") + query_results[f"query_{idx+1}_data"] = result_data.get("data", []) + row_count = len(result_data.get('data', [])) + logger.info(f"Query {idx+1} executed successfully, returned {row_count} rows") + else: + error_text = result_data.get("text", "Query failed") if result_data else "Query failed: No response" + query_results[f"query_{idx+1}_error"] = error_text + logger.error(f"Query {idx+1} failed: {error_text}") + + return query_results + + +async def _emit_log_and_event( + interfaceDbChat, + workflowId: str, + event_manager, + message: str, + log_type: str = "info", + status: str = "running", + round_number: Optional[int] = None +) -> None: + """ + Store log in database. The route's periodic chat data fetch will handle emitting it. + This avoids duplicate log emissions. + + Args: + interfaceDbChat: Database interface + workflowId: Workflow ID + event_manager: Event manager (unused, kept for compatibility) + message: Log message + log_type: Log type (info, warning, error) + status: Status string + round_number: Optional round number (will be fetched from workflow if not provided) + """ + try: + # Get round number from workflow if not provided + if round_number is None: + workflow = interfaceDbChat.getWorkflow(workflowId) + if workflow: + round_number = workflow.currentRound + + log_timestamp = getUtcTimestamp() + log_data = { + "id": f"log_{uuid.uuid4()}", + "workflowId": workflowId, + "message": message, + "type": log_type, + "timestamp": log_timestamp, + "status": status, + "roundNumber": round_number + } + # Only store in database - route's periodic fetch will emit it + interfaceDbChat.createLog(log_data) + except Exception as e: + logger.error(f"Error storing log: {e}") + + +async def _check_workflow_stopped(interfaceDbChat, workflowId: str) -> bool: + """ + Check if workflow was stopped. + + Args: + interfaceDbChat: Database interface + workflowId: Workflow ID + Returns: True if workflow is stopped, False otherwise """ - workflow = interfaceDbChat.getWorkflow(workflowId) - if workflow and workflow.status == "stopped": - await event_manager.emit_event( - workflowId, - "stopped", - "Workflow stopped", - "stopped" - ) - logger.info(f"Workflow {workflowId} was stopped, exiting processing") - return True - return False + try: + workflow = interfaceDbChat.getWorkflow(workflowId) + return workflow and workflow.status == "stopped" + except Exception as e: + logger.warning(f"Error checking workflow status: {e}") + return False + + +def _buildWebResearchQuery(userPrompt: str, workflowMessages: List, queryResults: Optional[Dict[str, Any]] = None) -> str: + """ + Build enriched web research query by extracting product context from conversation history and current prompt. + + Extracts product information from: + 1. Current user prompt (article numbers, product mentions) + 2. Database query results (if available) + 3. Previous assistant messages (conversation history) + + Args: + userPrompt: Current user prompt + workflowMessages: List of workflow messages (conversation history) + queryResults: Optional database query results to extract product info from + + Returns: + Enriched search query string + """ + # Normalize user prompt for detection + prompt_lower = userPrompt.lower().strip() + + # Patterns that indicate a search request + search_patterns = [ + "ja", "yes", "oui", "si", + "such", "suche", "search", "recherche", "recherchier", + "internet", "web", "online", + "datenblatt", "datasheet", "fiche technique", + "mehr informationen", "more information", "plus d'information", + "weitere informationen", "further information", "additional information" + ] + + # Check if current prompt contains search-related keywords + has_search_intent = any(pattern in prompt_lower for pattern in search_patterns) + + # Extract product information - try multiple sources + article_number = None + article_description = None + supplier = None + + # Pattern for article numbers like "6AV2 181-8XP00-0AX0" or "6AV2181-8XP00-0AX0" + article_patterns = [ + r'\b[A-Z0-9]{2,}\s+[0-9]{3,}-[A-Z0-9-]+\b', # With space: "6AV2 181-8XP00-0AX0" + r'\b[A-Z0-9]{4,}[\s-][A-Z0-9-]{6,}\b', # General pattern + r'\b[A-Z]{2,}[0-9]+\s+[0-9]+-[A-Z0-9-]+\b', # Specific Siemens pattern + ] + + # 1. First, try to extract from current user prompt + for pattern in article_patterns: + matches = re.findall(pattern, userPrompt) + if matches: + article_number = matches[0] + logger.info(f"Extracted article number from user prompt: {article_number}") + break + + # 2. Try to extract from database query results if available + # Always check queryResults to enrich with product description and supplier, even if article_number was already found + if queryResults: + # Look for article numbers in query result text (if not already found) + if not article_number: + for key in queryResults.keys(): + if key.startswith("query_") and not key.endswith("_error") and not key.endswith("_data"): + result_text = queryResults.get(key, "") + if isinstance(result_text, str): + for pattern in article_patterns: + matches = re.findall(pattern, result_text) + if matches: + article_number = matches[0] + logger.info(f"Extracted article number from query results: {article_number}") + break + if article_number: + break + + # Always check data arrays for product description and supplier (even if article_number already found) + for key in queryResults.keys(): + if key.startswith("query_") and not key.endswith("_error") and not key.endswith("_data"): + data_key = f"{key}_data" + if data_key in queryResults: + data_array = queryResults[data_key] + if isinstance(data_array, list) and len(data_array) > 0: + # Look for article number in first row (if not already found) + first_row = data_array[0] + if isinstance(first_row, dict): + # Check common article number fields (if not already found) + if not article_number: + for field in ["Artikelnummer", "Artikelkürzel", "article_number", "articleNumber"]: + if field in first_row and first_row[field]: + article_number = str(first_row[field]) + logger.info(f"Extracted article number from query data: {article_number}") + break + + # Always check article description (can enrich even if article_number already found) + if not article_description: + for field in ["Artikelbezeichnung", "Bezeichnung", "article_description", "description"]: + if field in first_row and first_row[field]: + article_description = str(first_row[field]) + logger.info(f"Extracted article description from query data: {article_description}") + break + + # Always check supplier (can enrich even if article_number already found) + if not supplier: + for field in ["Lieferant", "Supplier", "supplier"]: + if field in first_row and first_row[field]: + supplier = str(first_row[field]) + logger.info(f"Extracted supplier from query data: {supplier}") + break + + # If we found all needed info, we can stop + if article_number and article_description and supplier: + break + + # 3. Extract from previous assistant messages (conversation history) + if not article_number or not article_description: + for msg in reversed(workflowMessages[-10:]): + if msg.role == "assistant": + message_text = msg.message + + # Extract article number if not found yet + if not article_number: + for pattern in article_patterns: + matches = re.findall(pattern, message_text) + if matches: + article_number = matches[0] + break + + # Extract article description if not found yet + if not article_description: + description_patterns = [ + r'Es handelt sich um\s+([^\.]+)', + r'It is a\s+([^\.]+)', + r'C\'est\s+([^\.]+)', + r'Bezeichnung:\s*([^\n]+)', + r'Description:\s*([^\n]+)', + r'Artikelbezeichnung:\s*([^\n]+)', + r'Artikelbezeichnung:\s*([^\n]+)' + ] + for pattern in description_patterns: + match = re.search(pattern, message_text, re.IGNORECASE) + if match: + article_description = match.group(1).strip() + break + + # Extract supplier if not found yet + if not supplier: + supplier_patterns = [ + r'von\s+([A-Z][A-Za-z\s]+(?:AG|GmbH|Ltd|Inc|Corp)?)', + r'from\s+([A-Z][A-Za-z\s]+(?:AG|GmbH|Ltd|Inc|Corp)?)', + r'Lieferant:\s*([^\n]+)', + r'Supplier:\s*([^\n]+)' + ] + for pattern in supplier_patterns: + match = re.search(pattern, message_text, re.IGNORECASE) + if match: + supplier = match.group(1).strip() + break + + # Stop if we found everything + if article_number and article_description and supplier: + break + + # Build enriched search query + query_parts = [] + + # If we have search intent but no product info, try to use the user prompt intelligently + if has_search_intent and not article_number and not article_description: + # Try to extract meaningful parts from the prompt + # Remove common search phrases and keep the product-related parts + cleaned_prompt = userPrompt + for phrase in ["recherchier nach", "recherche", "suche nach", "search for", "find", "informationen zu", "information about", "weitere informationen", "further information"]: + cleaned_prompt = re.sub(phrase, "", cleaned_prompt, flags=re.IGNORECASE) + cleaned_prompt = cleaned_prompt.strip() + + # If cleaned prompt still has content and is different, use it + if cleaned_prompt and cleaned_prompt != userPrompt and len(cleaned_prompt) > 10: + query_parts.append(cleaned_prompt) + + # Add article description if found + if article_description: + query_parts.append(article_description) + + # Add article number if found + if article_number: + query_parts.append(article_number) + + # Add supplier if found + if supplier: + query_parts.append(supplier) + + # Add "Datenblatt" or "datasheet" if user requested it or if we have product info + if "datenblatt" in prompt_lower or "datasheet" in prompt_lower or "fiche technique" in prompt_lower: + query_parts.append("Datenblatt") + elif query_parts: + # If we have product info but no explicit request for datasheet, add it anyway + query_parts.append("Datenblatt") + + # If we found product information or built a meaningful query, use it + if query_parts: + enriched_query = " ".join(query_parts) + logger.info(f"Built enriched search query: '{enriched_query}' from context (original: '{userPrompt}')") + return enriched_query + else: + # Fall back to original prompt, but try to clean it up + logger.info(f"No product context found, using original prompt: '{userPrompt}'") + return userPrompt async def _convert_file_ids_to_document_references( @@ -558,7 +765,7 @@ async def _processChatbotMessage( ): """ Process chatbot message in background. - Simplified 4-step flow: Analysis → Query → Excel → Answer + Analyzes user input and generates list of queries, then streams them back. """ event_manager = get_event_manager() @@ -569,10 +776,19 @@ async def _processChatbotMessage( workflow = interfaceDbChat.getWorkflow(workflowId) if not workflow: logger.error(f"Workflow {workflowId} not found during processing") - await event_manager.emit_event(workflowId, "error", f"Workflow {workflowId} nicht gefunden", "error") + await event_manager.emit_event( + context_id=workflowId, + event_type="error", + data={"error": f"Workflow {workflowId} nicht gefunden"}, + event_category="workflow", + message=f"Workflow {workflowId} nicht gefunden", + step="error" + ) return - if await _check_workflow_status(interfaceDbChat, workflowId, event_manager): + # Check if workflow was stopped before starting + if await _check_workflow_stopped(interfaceDbChat, workflowId): + logger.info(f"Workflow {workflowId} was stopped, aborting processing") return # Build conversation context from history @@ -587,70 +803,23 @@ async def _processChatbotMessage( context += f"Assistant: {msg.message}\n" await services.ai.ensureAiObjectsInitialized() - current_date = datetime.datetime.now().strftime("%d.%m.%Y") - if await _check_workflow_status(interfaceDbChat, workflowId, event_manager): - return + # Step 1: Analyze user input to generate queries + logger.info("Analyzing user input to generate queries...") + await _emit_log_and_event(interfaceDbChat, workflowId, event_manager, "Analysiere Benutzeranfrage...") - # Step 1: Unified Analysis - logger.info("Step 1: Analyzing user input and attached files...") - await event_manager.emit_event(workflowId, "status", "Analysiere Benutzeranfrage und angehängte Dateien...", "analysis") + analysisPrompt = get_initial_analysis_prompt(userInput.prompt, context) - # Prepare document references if files are attached - has_uploaded_files = bool(userInput.listFileId and len(userInput.listFileId) > 0) - document_references = None - - if has_uploaded_files: - workflow = interfaceDbChat.getWorkflow(workflowId) - if workflow: - services.workflow = workflow - document_references = await _convert_file_ids_to_document_references(services, userInput.listFileId) - logger.info(f"Prepared {len(document_references.references)} document references for analysis") - - # Build analysis prompt - file_context = "\n\nIMPORTANT - ATTACHED FILES:\nIf files are attached, read them completely. For Excel files, extract ALL article identifiers (Artikelnummer) from the file and include them directly in the SQL query using WHERE Artikelnummer IN ('ART1', 'ART2', ...). DO NOT reference ExcelFile table - it doesn't exist!\n" if has_uploaded_files else "" - - analysisPrompt = f"""Heute ist der {current_date}. - -Du bist ein Chatbot der Althaus AG mit Zugriff auf eine SQL-Datenbank. - -DATENBANK-SCHEMA: -- Artikel: I_ID, Artikelnummer, Artikelbezeichnung, Lieferant, etc. -- Einkaufspreis: m_Artikel, EP_CHF -- Lagerplatz_Artikel: R_ARTIKEL, S_IST_BESTAND, etc. -- Beziehungen: Artikel.I_ID = Einkaufspreis.m_Artikel = Lagerplatz_Artikel.R_ARTIKEL - -SQL-HINWEISE: -- IMMER doppelte Anführungszeichen für Spaltennamen: "Artikelnummer", "S_IST_BESTAND" -- S_IST_BESTAND kann "Unbekannt" sein (TEXT), prüfe mit WHERE l."S_IST_BESTAND" != 'Unbekannt' -- Verwende Tabellenaliase: a für Artikel, e für Einkaufspreis, l für Lagerplatz_Artikel -- WICHTIG: Wenn Excel-Dateien angehängt sind, extrahiere Artikelnummern aus der Datei und verwende sie direkt im SQL: WHERE a."Artikelnummer" IN ('ART1', 'ART2', 'ART3') -- VERBOTEN: Verwende NICHT "SELECT ... FROM ExcelFile" - diese Tabelle existiert nicht! - -{file_context}User question: {userInput.prompt}{context} - -Return ONLY valid JSON: -{{ - "needsDatabaseQuery": boolean, - "needsWebResearch": boolean, - "needsExcelFile": boolean, - "excelAction": "create" | "update" | null, - "excelFileName": string | null, - "sqlQuery": string (ready-to-execute SQL with double quotes for column names. If Excel file attached, extract Artikelnummern and use WHERE a."Artikelnummer" IN ('ART1', 'ART2', ...)), - "requestedColumns": array of strings (columns to add for Excel updates, e.g. ["Lieferant", "Lagerbestand"]), - "reasoning": string -}}""" - - # Single AI call for analysis + # AI call for analysis method_ai = MethodAi(services) analysis_result = await method_ai.process({ "aiPrompt": analysisPrompt, - "documentList": document_references if has_uploaded_files else None, + "documentList": None, "resultType": "json", "simpleMode": True }) - # Extract content from ActionResult (in simple mode, content is in first document's documentData) + # Extract content from ActionResult analysis_content = None if analysis_result.success and analysis_result.documents: analysis_content = analysis_result.documents[0].documentData @@ -666,61 +835,135 @@ Return ONLY valid JSON: # Extract analysis results needsDatabaseQuery = analysis.get("needsDatabaseQuery", False) if analysis else False needsWebResearch = analysis.get("needsWebResearch", False) if analysis else False - needsExcelFile = analysis.get("needsExcelFile", False) if analysis else False - excelAction = analysis.get("excelAction") - excelFileName = analysis.get("excelFileName") - sql_query = analysis.get("sqlQuery", "") - requested_columns = analysis.get("requestedColumns", []) + sql_queries = analysis.get("sqlQueries", []) + # Support legacy single query format for backward compatibility + if not sql_queries and analysis.get("sqlQuery"): + sql_queries = [{ + "query": analysis.get("sqlQuery", ""), + "purpose": "Database query", + "table": "Unknown" + }] + reasoning = analysis.get("reasoning", "") - # Ensure database query if Excel update is needed - if needsExcelFile and excelAction == "update" and not needsDatabaseQuery: - needsDatabaseQuery = True + logger.info(f"Analysis: DB={needsDatabaseQuery}, Web={needsWebResearch}, SQL queries={len(sql_queries)}") - logger.info(f"Analysis: DB={needsDatabaseQuery}, Excel={needsExcelFile}, SQL={'present' if sql_query else 'missing'}") + # Build initial enriched web research query if needed (for logging, will be rebuilt after DB queries) + enriched_web_query = None + if needsWebResearch: + enriched_web_query = _buildWebResearchQuery(userInput.prompt, workflow.messages) - if await _check_workflow_status(interfaceDbChat, workflowId, event_manager): + # Build list of queries to stream back + queries = [] + + if needsDatabaseQuery and sql_queries: + for i, sql_query_info in enumerate(sql_queries, 1): + queries.append({ + "type": "database", + "query": sql_query_info.get("query", ""), + "purpose": sql_query_info.get("purpose", f"Query {i}"), + "table": sql_query_info.get("table", "Unknown"), + "reasoning": reasoning + }) + + if needsWebResearch: + queries.append({ + "type": "web", + "query": enriched_web_query or userInput.prompt, + "reasoning": reasoning + }) + + # Format queries as log text + log_lines = [] + if queries: + db_queries = [q for q in queries if q["type"] == "database"] + log_lines.append(f"Generiert: {len(db_queries)} Datenbankabfrage(n) und {len(queries) - len(db_queries)} Web-Recherche(n)\n\n") + for i, q in enumerate(queries, 1): + if q["type"] == "database": + log_lines.append(f"{i}. Datenbankabfrage ({q.get('table', 'Unknown')}):\n") + log_lines.append(f" Zweck: {q.get('purpose', 'Nicht angegeben')}\n") + log_lines.append(f"```sql\n{q['query']}\n```\n") + elif q["type"] == "web": + log_lines.append(f"{i}. Web-Recherche:\n") + log_lines.append(f" Suchbegriff: {q['query']}\n") + if q.get("reasoning"): + log_lines.append(f" Begründung: {q['reasoning']}\n") + log_lines.append("\n") + else: + log_lines.append("Keine Abfragen erforderlich.") + + log_text = "".join(log_lines) + + # Stream queries as a log + await _emit_log_and_event(interfaceDbChat, workflowId, event_manager, log_text) + + # Check if workflow was stopped before executing queries + if await _check_workflow_stopped(interfaceDbChat, workflowId): + logger.info(f"Workflow {workflowId} was stopped, aborting query execution") return - # Step 2: Execute SQL Query + # Step 2: Execute queries queryResults = {} - queryData = {} - excel_documents = [] + webResearchResults = "" - if needsDatabaseQuery and sql_query: - logger.info("Step 2: Executing SQL query...") - await event_manager.emit_event(workflowId, "status", "Führe Datenbankabfrage aus...", "query_execution") + # Execute database queries in parallel + if needsDatabaseQuery and sql_queries: + logger.info(f"Executing {len(sql_queries)} database queries in parallel...") + await _emit_log_and_event(interfaceDbChat, workflowId, event_manager, f"Führe {len(sql_queries)} Datenbankabfrage(n) parallel aus...") try: - connector = PreprocessorConnector() - result_dict = await connector.executeQuery(sql_query, return_json=True) - await connector.close() + queryResults = await _execute_queries_parallel(sql_queries) - if result_dict and not result_dict.get("text", "").startswith(("Error:", "Query failed:")): - queryResults["main"] = result_dict.get("text", "") - queryData["main"] = result_dict.get("data", []) - logger.info(f"Query executed successfully, returned {len(queryData.get('main', []))} rows") - else: - error_text = result_dict.get("text", "Query failed") if result_dict else "Query failed: No response" - queryResults["error"] = error_text - logger.error(f"Query failed: {error_text}") - - # If query failed and we need Excel update, try to extract article numbers from Excel and retry - if needsExcelFile and excelAction == "update" and has_uploaded_files and "ExcelFile" in error_text: - logger.warning("Query failed due to ExcelFile table reference. Attempting to extract article numbers from Excel file and retry...") - # The AI should have extracted article numbers, but if query failed, we can't proceed - # Log the issue for debugging - logger.error("Cannot proceed with Excel update: SQL query references non-existent ExcelFile table. AI should extract article numbers directly.") + # Log results summary + successful_queries = [k for k in queryResults.keys() if k.startswith("query_") and not k.endswith("_error") and not k.endswith("_data")] + failed_queries = [k for k in queryResults.keys() if k.endswith("_error")] + + if successful_queries: + total_rows = sum(len(queryResults.get(f"{k}_data", [])) for k in successful_queries) + logger.info(f"Successfully executed {len(successful_queries)} query/queries, total {total_rows} rows") + await _emit_log_and_event( + interfaceDbChat, + workflowId, + event_manager, + f"Abgeschlossen: {len(successful_queries)} Abfrage(n) erfolgreich, {total_rows} Ergebnis{'e' if total_rows != 1 else ''} gefunden" + ) + + if failed_queries: + logger.warning(f"{len(failed_queries)} query/queries failed") + await _emit_log_and_event( + interfaceDbChat, + workflowId, + event_manager, + f"Warnung: {len(failed_queries)} Abfrage(n) fehlgeschlagen", + log_type="warning" + ) except Exception as e: - logger.error(f"Error executing query: {e}") - queryResults["error"] = f"Error executing query: {str(e)}" + logger.error(f"Error executing parallel queries: {e}") + queryResults["error"] = f"Error executing queries: {str(e)}" + await _emit_log_and_event( + interfaceDbChat, + workflowId, + event_manager, + "Fehler bei parallelen Datenbankabfragen", + log_type="error" + ) - # Step 2.5: Web Research (simplified) - webResearchResults = "" + # Execute web research if needsWebResearch: logger.info("Performing web research...") + await _emit_log_and_event(interfaceDbChat, workflowId, event_manager, "Suche im Internet nach Informationen...") + try: + # Rebuild enriched query with database results if available (better product context) + web_research_query = _buildWebResearchQuery( + userInput.prompt, + workflow.messages, + queryResults if queryResults else None + ) + + logger.info(f"Using enriched web research query: '{web_research_query}'") + researchResult = await services.web.performWebResearch( - prompt=userInput.prompt, + prompt=web_research_query, urls=[], country=None, language=userInput.userLanguage or "de", @@ -728,205 +971,125 @@ Return ONLY valid JSON: operationId=None ) webResearchResults = json.dumps(researchResult, ensure_ascii=False, indent=2) if isinstance(researchResult, dict) else str(researchResult) + await _emit_log_and_event(interfaceDbChat, workflowId, event_manager, "Internet-Recherche abgeschlossen") except Exception as e: - logger.error(f"Web research failed: {e}") + logger.error(f"Web research failed: {e}", exc_info=True) webResearchResults = f"Web research error: {str(e)}" + await _emit_log_and_event(interfaceDbChat, workflowId, event_manager, "Internet-Recherche fehlgeschlagen", log_type="warning") - if await _check_workflow_status(interfaceDbChat, workflowId, event_manager): + # Check if workflow was stopped before generating final answer + if await _check_workflow_stopped(interfaceDbChat, workflowId): + logger.info(f"Workflow {workflowId} was stopped, aborting final answer generation") return - # Build answer context + # Step 3: Generate final answer using AI + logger.info("Generating final answer with AI...") + await _emit_log_and_event(interfaceDbChat, workflowId, event_manager, "Formuliere finale Antwort...") + + # Build prompt for final answer + system_prompt = get_final_answer_system_prompt() + + # Build answer context with query results answerContext = f"User question: {userInput.prompt}{context}\n\n" + + # Add database results - organize by query with metadata + db_results_part = "" if queryResults: - for key, result in queryResults.items(): - if key != "error": - answerContext += f"Database results:\n{result}\n\n" - if webResearchResults: - answerContext += f"Web research:\n{webResearchResults}\n\n" - - # Step 3: Process Excel File - excel_error = None - if needsExcelFile: - logger.info(f"Step 3: Processing Excel file (action: {excelAction})...") - await event_manager.emit_event( - workflowId, - "status", - f"Erstelle Excel-Datei..." if excelAction == "create" else "Aktualisiere Excel-Datei...", - "excel_processing" - ) + successful_results = [] + error_results = [] - try: - # Check if we have query data for updates - if excelAction == "update": - if not queryData or "main" not in queryData or not queryData["main"]: - error_msg = f"No query data available for Excel update. queryData keys: {list(queryData.keys()) if queryData else 'None'}" - if queryResults.get("error"): - error_msg += f" Query error: {queryResults['error']}" - logger.error(error_msg) - excel_error = f"Die Datenbankabfrage ist fehlgeschlagen: {queryResults.get('error', 'Unbekannter Fehler')}. Die Excel-Datei konnte nicht aktualisiert werden." - raise ValueError(excel_error) + # Extract query metadata from sql_queries if available + query_metadata = {} + if sql_queries: + for i, q_info in enumerate(sql_queries, 1): + query_metadata[f"query_{i}"] = { + "purpose": q_info.get("purpose", f"Query {i}"), + "table": q_info.get("table", "Unknown") + } + + # Organize results by query number + query_numbers = set() + for key in queryResults.keys(): + if key.startswith("query_") and not key.endswith("_data"): + # Extract query number (e.g., "query_1" -> 1) + try: + num = int(key.split("_")[1]) + query_numbers.add(num) + except (ValueError, IndexError): + pass + + # Build results with metadata + for query_num in sorted(query_numbers): + query_key = f"query_{query_num}" + error_key = f"{query_key}_error" - workflow = interfaceDbChat.getWorkflow(workflowId) - if workflow: - services.workflow = workflow - - method_ai = MethodAi(services) - - # Prepare document references for updates - if excelAction == "update" and userInput.listFileId: - if not document_references or not document_references.references: - document_references = await _convert_file_ids_to_document_references(services, userInput.listFileId) - else: - from modules.datamodels.datamodelDocref import DocumentReferenceList - document_references = DocumentReferenceList(references=[]) - - # Build Excel prompt - if excelAction == "update": - # Format query data as JSON lookup table - lookup_table_json = _format_query_results_as_lookup(queryData) - lookup_table_dict = json.loads(lookup_table_json) - logger.info(f"Generated lookup table with {len(lookup_table_dict)} entries") - - # Log sample entries for debugging - if lookup_table_dict: - sample_keys = list(lookup_table_dict.keys())[:3] - logger.info(f"Sample Artikelnummern in lookup table: {sample_keys}") - if sample_keys: - sample_entry = lookup_table_dict[sample_keys[0]] - logger.info(f"Sample entry for '{sample_keys[0]}': {list(sample_entry.keys())}") - logger.info(f"Sample entry values: {sample_entry}") - - # Determine columns to add - column_names = requested_columns if requested_columns else [] - if not column_names: - prompt_lower = userInput.prompt.lower() - if "einkaufspreis" in prompt_lower or "purchase price" in prompt_lower or "kaufpreis" in prompt_lower: - column_names = ["Einkaufspreis"] - elif "lieferant" in prompt_lower or "supplier" in prompt_lower: - column_names = ["Lieferant"] - elif "lagerbestand" in prompt_lower or "stock" in prompt_lower: - column_names = ["Lagerbestand"] - else: - # Default: try to infer from available columns in lookup table - if lookup_table_dict: - sample_entry = next(iter(lookup_table_dict.values())) - available_columns = list(sample_entry.keys()) - logger.info(f"Available columns in lookup table: {available_columns}") - # Prefer common columns - if "Einkaufspreis" in available_columns: - column_names = ["Einkaufspreis"] - elif "Lieferant" in available_columns: - column_names = ["Lieferant"] - else: - column_names = available_columns[:1] if available_columns else ["Lieferant"] - else: - column_names = ["Lieferant"] - - logger.info(f"Adding columns to Excel: {column_names}") - column_name_str = ", ".join(column_names) - - # Build a more explicit prompt with clear instructions and the lookup table - # Include the lookup table data directly in the prompt so it's always available - excel_prompt = f"""You are updating an Excel file. CRITICAL TASK: - -Add new column(s) "{column_name_str}" to the attached Excel file by matching Artikelnummer values. - -STEP-BY-STEP PROCESS (MUST FOLLOW EXACTLY): -1. Read the attached Excel file completely - all sheets, all rows, all columns. -2. Find the column containing "Artikelnummer" (may be named "Artikelnummer", "Artikel Nummer", "Art-Nr", "Artikelnummer", etc.) -3. For EACH ROW in the Excel file: - a. Read the Artikelnummer value from that row (e.g., "LED001", "12345", etc.) - b. Look up that EXACT Artikelnummer in the DATABASE LOOKUP TABLE below (case-sensitive match) - c. If the Artikelnummer is found in the lookup table: - - Extract the value(s) for column(s) "{column_name_str}" from the matching entry - - Write the value(s) to the new column(s) in that row - d. If the Artikelnummer is NOT found in the lookup table: - - Leave the new column(s) empty for that row -4. Preserve ALL existing data, formatting, formulas, and structure exactly as-is. -5. Return ONLY the complete modified Excel file (.xlsx format). Do NOT include any text or explanations. - -=== DATABASE LOOKUP TABLE (JSON) === -Use this table to match Artikelnummer and extract values. Format: {{"Artikelnummer": {{"column": "value", ...}}}} - -{lookup_table_json} - -=== EXAMPLE MATCHING === -- Excel row has Artikelnummer "LED001" in column "Artikelnummer" -- Search for "LED001" in the lookup table above -- Find entry: {{"LED001": {{"Lieferant": "Phoenix Contact", "Einkaufspreis": 12.50, ...}}}} -- Extract "{column_name_str}" value from that entry -- Write the extracted value to the new "{column_name_str}" column in that Excel row - -CRITICAL REQUIREMENTS: -- Match EXACTLY by Artikelnummer (case-sensitive, exact string match) -- Preserve ALL existing columns, rows, formatting, and formulas -- Only add the new column(s), do NOT modify existing data -- Return ONLY the Excel file binary data, no explanations""" - else: - excel_prompt = f"""Create Excel file (.xlsx) from database results: - -{answerContext if queryResults else "No database results available."} - -User request: {userInput.prompt} - -Create well-structured spreadsheet with appropriate headers and formatting.""" - - # Use simpleMode=True for Excel updates to ensure lookup table is directly accessible - # simpleMode=False goes through complex document generation pipeline which may lose context - excel_result = await method_ai.process({ - "aiPrompt": excel_prompt, - "documentList": document_references if excelAction == "update" else None, - "resultType": "xlsx", - "simpleMode": True # Use simple mode for direct Excel updates with lookup table - }) - - if excel_result.success and excel_result.documents: - for action_doc in excel_result.documents: - try: - chat_doc = await _create_chat_document_from_action_document( - services, action_doc, "", workflowId, workflow.currentRound - ) - if chat_doc: - excel_documents.append(chat_doc) - except Exception as e: - logger.error(f"Error creating ChatDocument: {e}", exc_info=True) - logger.info(f"Excel processing complete: {len(excel_documents)} documents") - else: - excel_error = f"Excel-Verarbeitung fehlgeschlagen: {excel_result.error if excel_result else 'Unbekannter Fehler'}" - logger.warning(excel_error) - except Exception as e: - excel_error = str(e) if not excel_error else excel_error - logger.error(f"Error processing Excel: {e}", exc_info=True) + if error_key in queryResults: + error_msg = queryResults[error_key] + metadata = query_metadata.get(query_key, {}) + purpose = metadata.get("purpose", f"Query {query_num}") + table = metadata.get("table", "Unknown") + error_results.append(f"Abfrage {query_num} ({table} - {purpose}): {error_msg}") + elif query_key in queryResults: + result_text = queryResults[query_key] + metadata = query_metadata.get(query_key, {}) + purpose = metadata.get("purpose", f"Query {query_num}") + table = metadata.get("table", "Unknown") + successful_results.append(f"=== Abfrage {query_num}: {purpose} (Tabelle: {table}) ===\n{result_text}") + + # Handle general error if present + if "error" in queryResults: + error_results.append(f"Allgemeiner Fehler: {queryResults['error']}") + + if successful_results: + db_results_part = "\n\nDATENBANK-ERGEBNISSE:\n" + "\n\n".join(successful_results) + answerContext += "DATENBANK-ERGEBNISSE:\n" + "\n\n".join(successful_results) + "\n\n" + + if error_results: + db_results_part += "\n\nDATENBANK-FEHLER:\n" + "\n".join(error_results) + answerContext += "DATENBANK-FEHLER:\n" + "\n".join(error_results) + "\n\n" - if await _check_workflow_status(interfaceDbChat, workflowId, event_manager): - return + # Add web research results + web_results_part = "" + if webResearchResults: + web_results_part = f"\n\nINTERNET-RECHERCHE:\n{webResearchResults}" + answerContext += f"INTERNET-RECHERCHE:\n{webResearchResults}\n\n" - # Step 4: Generate Final Answer - logger.info("Step 4: Generating final answer...") - await event_manager.emit_event(workflowId, "status", "Formuliere finale Antwort...", "answer_generation") + # Check if we have any actual data + successful_query_keys = [k for k in queryResults.keys() if k.startswith("query_") and not k.endswith("_error") and not k.endswith("_data")] + has_query_results = bool(successful_query_keys) + error_query_keys = [k for k in queryResults.keys() if k.endswith("_error")] + has_only_errors = bool(error_query_keys and not successful_query_keys) - excel_context = "" - if needsExcelFile: - if excel_documents: - file_names = [doc.fileName for doc in excel_documents] - action_text = "erstellt" if excelAction == "create" else "aktualisiert" - file_name_str = file_names[0] if len(file_names) == 1 else f"{len(file_names)} Dateien" - excel_context = f"\n\nWICHTIG - EXCEL-DATEI {action_text.upper()}:\nDie Excel-Datei '{file_name_str}' wurde erfolgreich {action_text} und ist als Anhang verfügbar. Erwähne dies am Ende deiner Antwort." - elif excel_error: - excel_context = f"\n\nFEHLER - EXCEL-VERARBEITUNG:\n{excel_error}\nErkläre dem Nutzer, warum die Excel-Datei nicht aktualisiert werden konnte." + if not has_query_results and needsDatabaseQuery: + db_results_part = "\n\nWICHTIG: Es wurden KEINE Datenbank-Ergebnisse gefunden. Die Datenbankabfrage wurde nicht ausgeführt oder hat keine Ergebnisse zurückgegeben." - # Build answer context parts (avoid backslashes in f-string expressions) - db_results_part = f"\nDATENBANK-ERGEBNISSE:\n{answerContext}" if queryResults and not queryResults.get("error") else "" - if queryResults.get("error"): - db_results_part = f"\nDATENBANK-FEHLER:\n{queryResults['error']}" - web_results_part = f"\nINTERNET-RECHERCHE:\n{webResearchResults}" if webResearchResults else "" + if has_only_errors: + db_results_part += "\n\n⚠️⚠️⚠️ KRITISCH - ALLE QUERIES FEHLGESCHLAGEN ⚠️⚠️⚠️\n" + \ + "ALLE Datenbankabfragen sind fehlgeschlagen. Es gibt KEINE gültigen Daten aus der Datenbank.\n" + \ + "DU DARFST KEINE DATEN ERFINDEN! Schreibe stattdessen: 'Es wurden keine Artikel gefunden' oder 'Die Datenbankabfrage ist fehlgeschlagen'." - answerPrompt = f"""Heute ist der {current_date}. + answer_prompt = f"""{system_prompt} -Du bist ein Chatbot der Althaus AG. Antworte auf Deutsch (kein ß, immer ss). +Antworte auf die folgende Frage des Nutzers: {userInput.prompt}{context} -Antworte auf: {userInput.prompt}{context} -{db_results_part}{web_results_part}{excel_context} +{db_results_part}{web_results_part} + +KRITISCH: Verwende NUR die oben angegebenen Daten. Erfinde KEINE Werte. Wenn Daten fehlen, schreibe "Nicht verfügbar". + +WICHTIG - MEHRERE ABFRAGEN: +Die oben angegebenen DATENBANK-ERGEBNISSE können aus mehreren separaten Abfragen stammen. Jede Abfrage ist mit "=== Abfrage X ===" markiert und enthält Informationen zu einem spezifischen Aspekt (z.B. Artikel-Informationen, Lagerbestände, etc.). +- Kombiniere die Informationen aus ALLEN erfolgreichen Abfragen zu einer umfassenden Antwort +- Beispiel: Wenn Abfrage 1 Artikel-Informationen liefert und Abfrage 2 Lagerbestände liefert, kombiniere beide in deiner Antwort +- Verwende ALLE verfügbaren Informationen aus den verschiedenen Abfragen + +⚠️⚠️⚠️ ABSOLUT VERBOTEN - KEINE DATEN ERFINDEN ⚠️⚠️⚠️ +Wenn KEINE Datenbank-Ergebnisse vorhanden sind, dann: +- ❌ ERFINDE KEINE Artikelnummern, Artikelbezeichnungen, Preise oder Lagerbestände! +- ❌ ERFINDE KEINE Beispielartikel! +- ✓ Schreibe stattdessen: "Es wurden keine Artikel in der Datenbank gefunden." oder "Die Datenbankabfrage ist fehlgeschlagen." + +WICHTIG: Deine Antwort soll NUR die finale Antwort enthalten - KEINE Planungsschritte, KEINE SQL-Queries, KEINE Zwischenschritte! +Beginne DIREKT mit "Aus der Datenbank habe ich..." (wenn Daten vorhanden) oder "Es wurden keine Artikel gefunden" (wenn keine Daten vorhanden).""" WICHTIG: - Klare, strukturierte Antwort @@ -937,12 +1100,12 @@ WICHTIG: - Wenn Excel-Verarbeitung fehlgeschlagen ist, erkläre dem Nutzer den Fehler klar""" answerRequest = AiCallRequest( - prompt=answerPrompt, + prompt=answer_prompt, context=answerContext if (queryResults or webResearchResults) else None, options=AiCallOptions( resultFormat="txt", operationType=OperationTypeEnum.DATA_ANALYSE, - processingMode=ProcessingModeEnum.BASIC + processingMode=ProcessingModeEnum.DETAILED ) ) @@ -951,15 +1114,20 @@ WICHTIG: logger.info("Final answer generated") - if await _check_workflow_status(interfaceDbChat, workflowId, event_manager): + # Check if workflow was stopped during AI call - if so, don't store the message + if await _check_workflow_stopped(interfaceDbChat, workflowId): + logger.info(f"Workflow {workflowId} was stopped during final answer generation, not storing message") return - # Step 5: Store assistant message - # Reload workflow to get updated message count + # Reload workflow to get current message count workflow = interfaceDbChat.getWorkflow(workflowId) - # Prepare message data with Excel documents if any - message_id = f"msg_{uuid.uuid4()}" + # Double-check workflow wasn't stopped while we were reloading + if workflow and workflow.status == "stopped": + logger.info(f"Workflow {workflowId} was stopped, not storing final message") + return + + # Create assistant message with final answer assistantMessageData = { "id": message_id, "workflowId": workflowId, @@ -975,54 +1143,43 @@ WICHTIG: "actionNumber": 0 } - # Add Excel documents if any were created - if excel_documents: - # Update messageId in all Excel documents - for excel_doc in excel_documents: - excel_doc.messageId = message_id - - # Convert ChatDocuments to dict format - try: - documents_dict = [doc.model_dump() for doc in excel_documents] - except Exception: - documents_dict = [doc.dict() for doc in excel_documents] - - assistantMessageData["documents"] = documents_dict - assistantMessageData["documentsLabel"] = "Excel Files" - assistantMessage = interfaceDbChat.createMessage(assistantMessageData) - logger.info(f"Stored assistant message: {assistantMessage.id}") + logger.info(f"Stored assistant message with final answer: {assistantMessage.id}") # Emit message event for streaming (exact chatData format) message_timestamp = parseTimestamp(assistantMessage.publishedAt, default=getUtcTimestamp()) await event_manager.emit_event( - workflowId, - "chatdata", - "New message", - "message", - { + context_id=workflowId, + event_type="chatdata", + data={ "type": "message", "createdAt": message_timestamp, "item": assistantMessage.dict() - } + }, + event_category="chat" ) - # Update workflow status to completed - interfaceDbChat.updateWorkflow(workflowId, { - "status": "completed", - "lastActivity": getUtcTimestamp() - }) + # Update workflow status to completed (only if not stopped) + if not await _check_workflow_stopped(interfaceDbChat, workflowId): + interfaceDbChat.updateWorkflow(workflowId, { + "status": "completed", + "lastActivity": getUtcTimestamp() + }) + else: + logger.info(f"Workflow {workflowId} was stopped, not updating status to completed") - logger.info(f"Chatbot processing completed for workflow {workflowId}") + logger.info(f"Chatbot processing completed for workflow {workflowId}, generated {len(queries)} queries and final answer") - # Emit completion event - await event_manager.emit_event( - workflowId, - "complete", - "Chatbot-Verarbeitung abgeschlossen", - "complete", - {"workflowId": workflowId} - ) + # Emit completion event only if workflow wasn't stopped + if not await _check_workflow_stopped(interfaceDbChat, workflowId): + await event_manager.emit_event( + context_id=workflowId, + event_type="complete", + data={"workflowId": workflowId}, + event_category="workflow", + message="Chatbot-Verarbeitung abgeschlossen", + step="complete" + ) # Schedule cleanup await event_manager.cleanup(workflowId) @@ -1030,11 +1187,21 @@ WICHTIG: except Exception as e: logger.error(f"Error processing chatbot message: {str(e)}", exc_info=True) + # Check if workflow was stopped - if so, don't store error message + if await _check_workflow_stopped(interfaceDbChat, workflowId): + logger.info(f"Workflow {workflowId} was stopped, not storing error message") + return + # Store error message try: # Reload workflow to get current message count workflow = interfaceDbChat.getWorkflow(workflowId) + # Double-check workflow wasn't stopped while we were reloading + if workflow and workflow.status == "stopped": + logger.info(f"Workflow {workflowId} was stopped, not storing error message") + return + errorMessageData = { "id": f"msg_{uuid.uuid4()}", "workflowId": workflowId, @@ -1054,25 +1221,26 @@ WICHTIG: # Emit message event for streaming (exact chatData format) message_timestamp = parseTimestamp(errorMessage.publishedAt, default=getUtcTimestamp()) await event_manager.emit_event( - workflowId, - "chatdata", - "New message", - "message", - { + context_id=workflowId, + event_type="chatdata", + data={ "type": "message", "createdAt": message_timestamp, "item": errorMessage.dict() - } + }, + event_category="chat" ) - # Update workflow status to error - interfaceDbChat.updateWorkflow(workflowId, { - "status": "error", - "lastActivity": getUtcTimestamp() - }) + # Update workflow status to error (only if not stopped) + if not await _check_workflow_stopped(interfaceDbChat, workflowId): + interfaceDbChat.updateWorkflow(workflowId, { + "status": "error", + "lastActivity": getUtcTimestamp() + }) + else: + logger.info(f"Workflow {workflowId} was stopped, not updating status to error") # Schedule cleanup await event_manager.cleanup(workflowId) except Exception as storeError: logger.error(f"Error storing error message: {storeError}") - diff --git a/modules/interfaces/interfaceDbChatObjects.py b/modules/interfaces/interfaceDbChatObjects.py index 9b03e5fc..4c15ba89 100644 --- a/modules/interfaces/interfaceDbChatObjects.py +++ b/modules/interfaces/interfaceDbChatObjects.py @@ -1059,6 +1059,26 @@ class ChatObjects: actionName=createdMessage.get("actionName") ) + # Emit message event for streaming (if event manager is available) + try: + from modules.features.chatbot.eventManager import get_event_manager + event_manager = get_event_manager() + message_timestamp = parseTimestamp(chat_message.publishedAt, default=getUtcTimestamp()) + # Emit message event in exact chatData format: {type, createdAt, item} + asyncio.create_task(event_manager.emit_event( + context_id=workflowId, + event_type="chatdata", + data={ + "type": "message", + "createdAt": message_timestamp, + "item": chat_message.dict() + }, + event_category="chat" + )) + except Exception as e: + # Event manager not available or error - continue without emitting + logger.debug(f"Could not emit message event: {e}") + # Debug: Store message and documents for debugging - only if debug enabled storeDebugMessageAndDocuments(chat_message, self.currentUser) diff --git a/modules/routes/routeChatbot.py b/modules/routes/routeChatbot.py index 0c3e2314..4975bf45 100644 --- a/modules/routes/routeChatbot.py +++ b/modules/routes/routeChatbot.py @@ -80,43 +80,64 @@ async def stream_chatbot_start( queue = event_manager.create_queue(workflow.id) async def event_stream(): - """Async generator for SSE events.""" + """Async generator for SSE events - pure event-driven streaming (no polling).""" try: - # Get interface for status checks and chat data + # Get interface for initial data and status checks interfaceDbChat = getServiceChat(currentUser) - # Send initial chat data (exact format as chatData endpoint) + # Get current workflow to check if resuming and get current round + current_workflow = interfaceDbChat.getWorkflow(workflow.id) + current_round = current_workflow.currentRound if current_workflow else None + is_resuming = final_workflow_id is not None and current_round and current_round > 1 + + # Send initial chat data (exact format as chatData endpoint) - only once at start try: chatData = interfaceDbChat.getUnifiedChatData(workflow.id, None) if chatData.get("items"): + # Filter items by round number if resuming + filtered_items = [] for item in chatData["items"]: + if is_resuming and current_round: + # Get round number from item + item_round = None + item_data = item.get("item") + if item_data: + # Handle both dict and object access + if isinstance(item_data, dict): + item_round = item_data.get("roundNumber") + elif hasattr(item_data, "roundNumber"): + item_round = item_data.roundNumber + + # When resuming, only include items from current round onwards + # Exclude items without roundNumber (they're from old rounds before roundNumber was added) + # Exclude items with roundNumber < current_round (from previous rounds) + if item_round is None or item_round < current_round: + continue # Skip items from previous rounds or without round info + + filtered_items.append(item) + + # Emit filtered items + for item in filtered_items: + # Convert Pydantic models to dicts for JSON serialization + serializable_item = { + "type": item.get("type"), + "createdAt": item.get("createdAt"), + "item": item.get("item").dict() if hasattr(item.get("item"), "dict") else item.get("item") + } # Emit item directly in exact chatData format: {type, createdAt, item} - yield f"data: {json.dumps(item)}\n\n" - # Set initial timestamp for incremental fetching - if chatData["items"]: - timestamps = [parseTimestamp(item.get("createdAt"), default=0) for item in chatData["items"]] - last_chatdata_timestamp = max(timestamps) if timestamps else None - else: - last_chatdata_timestamp = None - else: - last_chatdata_timestamp = None + yield f"data: {json.dumps(serializable_item)}\n\n" except Exception as e: logger.warning(f"Error fetching initial chat data: {e}") - last_chatdata_timestamp = None # Keepalive interval (30 seconds) keepalive_interval = 30.0 last_keepalive = asyncio.get_event_loop().time() - # Status check interval (check workflow status every 3 seconds) - status_check_interval = 3.0 + # Status check interval (check workflow status every 5 seconds - less frequent since we're event-driven) + status_check_interval = 5.0 last_status_check = asyncio.get_event_loop().time() - # Chat data fetch interval (fetch chat data every 0.5 seconds for real-time updates) - chatdata_fetch_interval = 0.5 - last_chatdata_fetch = asyncio.get_event_loop().time() - - # Stream events until completion or timeout + # Stream events until completion or timeout - pure event-driven (no polling) timeout = 300.0 # 5 minutes max start_time = asyncio.get_event_loop().time() @@ -124,7 +145,6 @@ async def stream_chatbot_start( # Check timeout elapsed = asyncio.get_event_loop().time() - start_time if elapsed > timeout: - # Timeout - just close stream, don't emit non-chatData format events logger.info(f"Stream timeout for workflow {workflow.id}") break @@ -135,69 +155,51 @@ async def stream_chatbot_start( current_time = asyncio.get_event_loop().time() - # Periodically check workflow status and fetch chat data + # Periodically check workflow status (less frequent since we're event-driven) if current_time - last_status_check >= status_check_interval: try: current_workflow = interfaceDbChat.getWorkflow(workflow.id) if current_workflow and current_workflow.status == "stopped": logger.info(f"Workflow {workflow.id} was stopped, closing stream") - # Don't emit stopped event - just close stream break except Exception as e: logger.warning(f"Error checking workflow status: {e}") last_status_check = current_time - # Periodically fetch and emit chat data - if current_time - last_chatdata_fetch >= chatdata_fetch_interval: - try: - chatData = interfaceDbChat.getUnifiedChatData(workflow.id, last_chatdata_timestamp) - if chatData.get("items"): - # Emit items directly in exact chatData format: {type, createdAt, item} - for item in chatData["items"]: - yield f"data: {json.dumps(item)}\n\n" - # Update timestamp to only get new items next time - if chatData["items"]: - # Parse timestamps and get the maximum - timestamps = [] - for item in chatData["items"]: - ts = parseTimestamp(item.get("createdAt"), default=0) - timestamps.append(ts) - if timestamps: - last_chatdata_timestamp = max(timestamps) - except Exception as e: - logger.warning(f"Error fetching chat data: {e}") - last_chatdata_fetch = current_time - - # Try to get event with timeout + # Get event from queue (pure event-driven - no polling database) try: event = await asyncio.wait_for(queue.get(), timeout=1.0) - # Only emit chatdata events (messages, logs, stats) in exact chatData format - # Ignore status/progress/complete/stopped/error events that don't match the format - if event.get("type") == "chatdata" and event.get("data"): - # Emit item directly in exact chatData format: {type, createdAt, item} - chatdata_item = event.get("data") - yield f"data: {json.dumps(chatdata_item)}\n\n" - # Update timestamp for incremental fetching - if chatdata_item.get("createdAt"): - last_chatdata_timestamp = parseTimestamp(chatdata_item["createdAt"], default=None) + # Handle different event types + event_type = event.get("type") + event_data = event.get("data", {}) - # Check if this is a completion/stopped event to close stream - if event.get("type") == "complete": + # Emit chatdata events (messages, logs, stats) in exact chatData format + if event_type == "chatdata" and event_data: + # Emit item directly in exact chatData format: {type, createdAt, item} + chatdata_item = event_data + # Ensure item field is serializable (convert Pydantic models to dicts) + if isinstance(chatdata_item, dict) and "item" in chatdata_item: + item_obj = chatdata_item.get("item") + if hasattr(item_obj, "dict"): + chatdata_item = chatdata_item.copy() + chatdata_item["item"] = item_obj.dict() + yield f"data: {json.dumps(chatdata_item)}\n\n" + + # Handle completion/stopped events to close stream + elif event_type == "complete": logger.info(f"Workflow {workflow.id} completed, closing stream") break - elif event.get("type") == "stopped": - # Workflow was stopped, close stream + elif event_type == "stopped": logger.info(f"Workflow {workflow.id} stopped, closing stream") break - elif event.get("type") == "error" and event.get("step") == "error": - # Final error, close stream + elif event_type == "error" and event.get("step") == "error": logger.warning(f"Workflow {workflow.id} error, closing stream") break - last_keepalive = asyncio.get_event_loop().time() + last_keepalive = current_time except asyncio.TimeoutError: - # Send keepalive if needed + # Send keepalive if needed (no events received, but keep connection alive) current_time = asyncio.get_event_loop().time() if current_time - last_keepalive >= keepalive_interval: yield f": keepalive\n\n" @@ -205,14 +207,12 @@ async def stream_chatbot_start( continue except Exception as e: logger.error(f"Error in event stream: {e}") - yield f"data: {json.dumps({'type': 'error', 'message': f'Stream error: {str(e)}'})}\n\n" break except Exception as e: logger.error(f"Error in event stream generator: {e}", exc_info=True) - # Don't emit error events that don't match chatData format finally: - # Stream ends - no final event needed as it doesn't match chatData format + # Stream ends - cleanup handled by event manager pass return StreamingResponse( @@ -248,10 +248,12 @@ async def stop_chatbot( # Emit stopped event to active streams event_manager = get_event_manager() await event_manager.emit_event( - workflowId, - "stopped", - "Workflow stopped by user", - "stopped" + context_id=workflowId, + event_type="stopped", + data={"workflowId": workflowId}, + event_category="workflow", + message="Workflow stopped by user", + step="stopped" ) logger.info(f"Emitted stopped event for workflow {workflowId}") diff --git a/modules/services/serviceAi/mainServiceAi.py b/modules/services/serviceAi/mainServiceAi.py index e7bab8a3..cd86c6a8 100644 --- a/modules/services/serviceAi/mainServiceAi.py +++ b/modules/services/serviceAi/mainServiceAi.py @@ -371,15 +371,36 @@ Respond with ONLY a JSON object in this exact format: operationType=opType.value ) - self.services.chat.storeWorkflowStat( - self.services.workflow, - response, - f"ai.{opType.name.lower()}" - ) + # Try to store workflow stats, but don't fail if workflow is None (e.g., in chatbot context) + try: + self.services.chat.storeWorkflowStat( + self.services.workflow, + response, + f"ai.{opType.name.lower()}" + ) + except Exception as e: + # Log but don't fail - workflow might be None in some contexts (e.g., chatbot) + logger.debug(f"Could not store workflow stat (workflow may be None): {str(e)}") self.services.chat.progressLogUpdate(aiOperationId, 0.9, f"{opType.name} completed") self.services.chat.progressLogFinish(aiOperationId, True) + # Preserve metadata from response if available (e.g., results_with_content from Tavily) + # Check if response has metadata attribute (AiCallResponse from callAi) + if hasattr(response, 'metadata') and response.metadata: + # If metadata is a dict, store it in additionalData + if isinstance(response.metadata, dict): + if not metadata.additionalData: + metadata.additionalData = {} + metadata.additionalData.update(response.metadata) + # If metadata is an object with attributes, extract them + elif hasattr(response.metadata, '__dict__'): + if not metadata.additionalData: + metadata.additionalData = {} + for key, value in response.metadata.__dict__.items(): + if not key.startswith('_'): + metadata.additionalData[key] = value + return AiResponse( content=response.content, metadata=metadata diff --git a/modules/services/serviceWeb/mainServiceWeb.py b/modules/services/serviceWeb/mainServiceWeb.py index 4faced9c..fd661599 100644 --- a/modules/services/serviceWeb/mainServiceWeb.py +++ b/modules/services/serviceWeb/mainServiceWeb.py @@ -85,22 +85,119 @@ class WebService: if extractedUrls: allUrls.extend(extractedUrls) - # Step 2: Search for URLs if needed (based on needsSearch flag) + # Step 2: Search for URLs and content if needed (based on needsSearch flag) + searchUrls = [] + searchResultsWithContent = [] if needsSearch and (not allUrls or len(allUrls) < maxNumberPages): - self.services.chat.progressLogUpdate(operationId, 0.3, "Searching for URLs") + self.services.chat.progressLogUpdate(operationId, 0.3, "Searching for URLs and content") - searchUrls = await self._performWebSearch( - instruction=instruction, - maxNumberPages=maxNumberPages - len(allUrls), - country=countryCode, - language=languageCode - ) + try: + searchUrls, searchResultsWithContent = await self._performWebSearch( + instruction=instruction, + maxNumberPages=maxNumberPages - len(allUrls), + country=countryCode, + language=languageCode + ) + logger.info(f"Tavily search returned {len(searchUrls)} URLs with {len(searchResultsWithContent)} results containing content") + except Exception as e: + logger.error(f"Error performing Tavily search (continuing with other URLs): {str(e)}", exc_info=True) + searchUrls = [] + searchResultsWithContent = [] - # Add search URLs to the list - allUrls.extend(searchUrls) + # Prioritize Tavily search URLs over AI-extracted URLs (they're more relevant) + if searchUrls: + # Prepend Tavily URLs to the list (they're more relevant) + allUrls = searchUrls + allUrls + logger.info(f"Using {len(searchUrls)} Tavily URLs + {len(allUrls) - len(searchUrls)} other URLs = {len(allUrls)} total") + else: + # If Tavily search failed, use AI-extracted URLs + logger.warning("Tavily search returned no URLs, using AI-extracted URLs only") self.services.chat.progressLogUpdate(operationId, 0.5, f"Found {len(allUrls)} total URLs") + # If we have search results (even without content), use them directly instead of crawling + # Tavily search results are more relevant than generic AI-extracted URLs + # Only crawl if we have NO search results at all + if searchResultsWithContent and len(searchResultsWithContent) > 0: + urls_with_actual_content = sum(1 for r in searchResultsWithContent if r.get("content") and len(r.get("content", "")) > 0) + logger.info(f"Using {len(searchResultsWithContent)} Tavily search results ({urls_with_actual_content} with content) directly (skipping crawl)") + + # Convert search results to crawl result format + crawlResult = [] + for result in searchResultsWithContent: + crawlResult.append({ + "url": result["url"], + "title": result.get("title", ""), + "content": result.get("content", "") + }) + + # Calculate statistics + totalResults = len(crawlResult) + totalContentLength = sum(len(r.get("content", "")) for r in crawlResult) + urlsWithContent = sum(1 for r in crawlResult if r.get("content") and len(r.get("content", "")) > 0) + + # Log content availability + if urlsWithContent == 0: + logger.warning(f"Tavily search returned {len(searchResultsWithContent)} results but none have content - URLs will be used but may need crawling") + else: + logger.info(f"Tavily search provided content for {urlsWithContent}/{len(searchResultsWithContent)} URLs") + + # Even if content is empty, use these results - they're more relevant than generic URLs + # The final answer generation can work with URLs even if content is empty + + # Convert to sections format + sections = [] + for idx, item in enumerate(crawlResult): + section = { + "id": f"result_{idx}", + "content_type": "paragraph", + "title": item.get("title") or item.get("url", f"Result {idx + 1}"), + "order": idx + } + content = item.get("content", "") + if content: + section["textPreview"] = content[:200] + ("..." if len(content) > 200 else "") + sections.append(section) + + # Return consolidated result + result = { + "metadata": { + "title": suggestedFilename or instruction[:100] if instruction else "Web Research Results", + "extraction_method": "tavily_search_direct", + "research_depth": finalResearchDepth, + "country": countryCode, + "language": languageCode, + "urls_searched": searchUrls[:20], + "total_urls": len(searchUrls), + "urls_with_content": urlsWithContent, + "total_content_length": totalContentLength, + "search_date": self.services.utils.timestampGetUtc() if hasattr(self.services, 'utils') else None + }, + "sections": sections, + "statistics": { + "sectionCount": len(sections), + "total_urls": len(searchUrls), + "results_count": totalResults, + "urls_with_content": urlsWithContent, + "total_content_length": totalContentLength + }, + "instruction": instruction, + "urls_crawled": searchUrls, + "total_urls": len(searchUrls), + "results": crawlResult, + "total_results": totalResults + } + + if suggestedFilename: + result["suggested_filename"] = suggestedFilename + result["metadata"]["suggested_filename"] = suggestedFilename + + if operationId: + self.services.chat.progressLogUpdate(operationId, 0.9, "Completed") + self.services.chat.progressLogFinish(operationId, True) + + return result + # Step 3: Validate and filter URLs before crawling validatedUrls = self._validateUrls(allUrls) if not validatedUrls: @@ -328,8 +425,16 @@ Return ONLY valid JSON, no additional text: maxNumberPages: int, country: Optional[str], language: Optional[str] - ) -> List[str]: - """Perform web search to find URLs.""" + ) -> tuple[List[str], List[Dict[str, Any]]]: + """ + Perform web search to find URLs and content. + + Returns: + Tuple of (urls, search_results_with_content) + - urls: List of URL strings + - search_results_with_content: List of dicts with url, title, content from Tavily search + """ + search_results_with_content = [] try: # Build search prompt model searchPromptModel = AiCallPromptWebSearch( @@ -356,46 +461,224 @@ Return ONLY valid JSON, no additional text: outputFormat="json" ) + # Check if metadata contains results with content (from Tavily) + if hasattr(searchResponse, 'metadata') and searchResponse.metadata: + # Check in additionalData first (where we store custom metadata) + additional_data = None + if hasattr(searchResponse.metadata, 'additionalData') and searchResponse.metadata.additionalData: + additional_data = searchResponse.metadata.additionalData + elif isinstance(searchResponse.metadata, dict): + additional_data = searchResponse.metadata.get("additionalData", {}) + + if additional_data: + results_with_content = additional_data.get("results_with_content", []) + if results_with_content: + logger.info(f"Found {len(results_with_content)} search results with content in metadata.additionalData") + # Extract URLs and content from metadata + for result in results_with_content: + if result.get("url"): + search_results_with_content.append({ + "url": result.get("url"), + "title": result.get("title", ""), + "content": result.get("content", ""), + "score": result.get("score", 0) + }) + + # Also check directly in metadata (fallback) + if not search_results_with_content: + results_with_content = None + if hasattr(searchResponse.metadata, 'results_with_content'): + results_with_content = searchResponse.metadata.results_with_content + elif isinstance(searchResponse.metadata, dict): + results_with_content = searchResponse.metadata.get("results_with_content", []) + + if results_with_content: + logger.info(f"Found {len(results_with_content)} search results with content in metadata (direct)") + for result in results_with_content: + if result.get("url"): + search_results_with_content.append({ + "url": result.get("url"), + "title": result.get("title", ""), + "content": result.get("content", ""), + "score": result.get("score", 0) + }) + # Extract content from AiResponse searchResult = searchResponse.content + logger.debug(f"Search response content type: {type(searchResult)}, length: {len(str(searchResult)) if searchResult else 0}") + # Debug: persist search response if isinstance(searchResult, str): self.services.utils.writeDebugFile(searchResult, "websearch_response") + logger.debug(f"Search response (first 500 chars): {searchResult[:500]}") else: self.services.utils.writeDebugFile(json.dumps(searchResult, indent=2), "websearch_response") + logger.debug(f"Search response type: {type(searchResult)}, keys: {list(searchResult.keys()) if isinstance(searchResult, dict) else 'N/A'}") - # Parse and extract URLs + # Parse and extract URLs and content if isinstance(searchResult, str): # Extract JSON from response (handles markdown code blocks) extractedJson = self.services.utils.jsonExtractString(searchResult) - searchData = json.loads(extractedJson) if extractedJson else json.loads(searchResult) + if extractedJson: + try: + searchData = json.loads(extractedJson) + logger.debug(f"Parsed JSON from extracted string, type: {type(searchData)}") + except json.JSONDecodeError as e: + logger.warning(f"Failed to parse extracted JSON: {e}, trying direct parse") + searchData = json.loads(searchResult) + else: + try: + searchData = json.loads(searchResult) + logger.debug(f"Parsed JSON directly from string, type: {type(searchData)}") + except json.JSONDecodeError as e: + logger.error(f"Failed to parse search result as JSON: {e}") + searchData = {} else: searchData = searchResult + logger.debug(f"Using searchResult directly as searchData, type: {type(searchData)}") - # Extract URLs from response + logger.debug(f"Final searchData type: {type(searchData)}, is dict: {isinstance(searchData, dict)}, keys: {list(searchData.keys()) if isinstance(searchData, dict) else 'N/A'}") + + # Extract URLs and content from response urls = [] if isinstance(searchData, dict): - if "urls" in searchData: + # Check for new format: {"urls": [...], "results": [...]} + if "urls" in searchData and "results" in searchData: urls = searchData["urls"] + # Extract results with content + for r in searchData["results"]: + if r.get("url"): + # Only add if not already added from metadata + if not any(sr["url"] == r.get("url") for sr in search_results_with_content): + search_results_with_content.append({ + "url": r.get("url"), + "title": r.get("title", ""), + "content": r.get("content", ""), + "score": r.get("score", 0) + }) + logger.debug(f"Extracted {len(urls)} URLs and {len(search_results_with_content)} results with content from new format") + elif "urls" in searchData: + urls = searchData["urls"] + logger.debug(f"Extracted {len(urls)} URLs from 'urls' field") elif "results" in searchData: - urls = [r.get("url") for r in searchData["results"] if r.get("url")] + # Extract URLs from results (content already in search_results_with_content if from metadata) + for r in searchData["results"]: + if r.get("url"): + urls.append(r.get("url")) + # Only add to search_results_with_content if not already added from metadata + if not any(sr["url"] == r.get("url") for sr in search_results_with_content): + search_results_with_content.append({ + "url": r.get("url"), + "title": r.get("title", ""), + "content": r.get("raw_content") or r.get("content", ""), + "score": r.get("score", 0) + }) + logger.debug(f"Extracted {len(urls)} URLs with content from 'results' field") + else: + logger.warning(f"Unexpected search data structure (dict): {list(searchData.keys())}") elif isinstance(searchData, list): # Handle both cases: list of URL strings or list of dicts with "url" key for item in searchData: if isinstance(item, str): # Item is already a URL string urls.append(item) - elif isinstance(item, dict) and item.get("url"): - # Item is a dict with "url" key - urls.append(item.get("url")) + elif isinstance(item, dict): + if item.get("url"): + urls.append(item.get("url")) + # Only add to search_results_with_content if not already added from metadata + if not any(sr["url"] == item.get("url") for sr in search_results_with_content): + search_results_with_content.append({ + "url": item.get("url"), + "title": item.get("title", ""), + "content": item.get("raw_content") or item.get("content", ""), + "score": item.get("score", 0) + }) + logger.debug(f"Extracted {len(urls)} URLs from list") + else: + logger.warning(f"Unexpected search data type: {type(searchData)}") - logger.info(f"Web search returned {len(urls)} URLs") - return urls + # If we got URLs but no content from metadata, extract URLs from search_results_with_content + if urls and not search_results_with_content: + # URLs were extracted but no content - create entries with empty content + for url in urls: + search_results_with_content.append({ + "url": url, + "title": "", + "content": "", + "score": 0 + }) + elif search_results_with_content and not urls: + # We have content but no URLs - extract URLs from content results + urls = [r["url"] for r in search_results_with_content] + + # If we have URLs but no search_results_with_content, create entries from URLs + # This ensures we can use Tavily URLs even if content extraction failed + if urls and not search_results_with_content: + logger.warning("Got URLs from Tavily search but no content extracted - creating entries for direct use") + for url in urls: + search_results_with_content.append({ + "url": url, + "title": "", + "content": "", # Empty content - will need crawling if used + "score": 0 + }) + + logger.info(f"Web search returned {len(urls)} URLs with {len(search_results_with_content)} results") + if search_results_with_content: + content_count = sum(1 for r in search_results_with_content if r.get("content") and len(r.get("content", "")) > 0) + logger.info(f" - {content_count} results have content, {len(search_results_with_content) - content_count} without content") + if content_count > 0: + first_with_content = next((r for r in search_results_with_content if r.get("content")), None) + if first_with_content: + logger.info(f"Content preview from first result with content: {first_with_content.get('content', '')[:200]}") + else: + logger.warning("No search results extracted - will need to crawl URLs") + return urls, search_results_with_content except Exception as e: - logger.error(f"Error in web search: {str(e)}") - return [] + logger.error(f"Error in web search: {str(e)}", exc_info=True) + # Even if there's an error, try to extract URLs from the response if available + recovered_urls = [] + recovered_results = [] + try: + if 'searchResponse' in locals() and searchResponse: + logger.info(f"Attempting to extract URLs from error response: {type(searchResponse)}") + # Try to get content from response + if hasattr(searchResponse, 'content'): + errorContent = searchResponse.content + if isinstance(errorContent, str): + # Try to parse as JSON + try: + errorData = json.loads(errorContent) + if isinstance(errorData, dict): + if "urls" in errorData: + recovered_urls = errorData["urls"] + elif "results" in errorData: + recovered_urls = [r.get("url") for r in errorData["results"] if r.get("url")] + recovered_results = [{"url": r.get("url"), "title": r.get("title", ""), "content": r.get("content", ""), "score": 0} for r in errorData["results"]] + elif isinstance(errorData, list): + recovered_urls = [item if isinstance(item, str) else item.get("url", "") for item in errorData if item] + if recovered_urls: + logger.info(f"Recovered {len(recovered_urls)} URLs from error response") + # Create entries for recovered URLs + if not recovered_results: + for url in recovered_urls: + recovered_results.append({"url": url, "title": "", "content": "", "score": 0}) + return recovered_urls, recovered_results + except Exception as parseError: + logger.debug(f"Failed to parse error response: {parseError}") + except Exception as recoverError: + logger.debug(f"Failed to recover URLs from error: {recoverError}") + + # If we have URLs from earlier extraction, return them + if 'urls' in locals() and urls: + logger.info(f"Returning {len(urls)} URLs extracted before error occurred") + # Create entries from URLs + results_from_urls = [{"url": url, "title": "", "content": "", "score": 0} for url in urls] + return urls, results_from_urls + + return [], [] def _validateUrls(self, urls: List[str]) -> List[str]: """