From 3c1f66cb6d88ae77bce49e64d77a778d54700449 Mon Sep 17 00:00:00 2001 From: ValueOn AG Date: Tue, 2 Sep 2025 18:58:30 +0200 Subject: [PATCH] web integrated --- AI_ENGINE_MIGRATION_PLAN.md | 276 +++++ config.ini | 27 +- modules/__init__.py | 0 modules/chat/serviceCenter.py | 5 - modules/chat/serviceCenter_ai_engine.py | 266 +++++ modules/connectors/connectorAiOpenai.py | 21 + ...nector_tavily.py => connectorWebTavily.py} | 79 +- modules/engines/aiEngine.py | 544 +++++++++ modules/interfaces/interfaceAiCalls.py | 22 +- modules/interfaces/interfaceAiEngine.py | 115 ++ ...face_web_model.py => interfaceWebModel.py} | 29 +- modules/interfaces/interfaceWebObjects.py | 118 ++ modules/interfaces/interface_web_objects.py | 46 - modules/methods/methodWeb.py | 1043 ++++------------- modules/methods/method_web.py | 197 ---- .../methods/web/web_search/web_search_base.py | 31 - .../web/web_search/web_search_tavily.py | 70 -- notes/changelog.txt | 8 +- requirements.txt | 19 + test_ai_calls.md | 235 ++++ test_ai_fallback.py | 103 ++ test_methodWeb.py | 658 +++++++++++ test_web_csv_functionality.py | 207 ++++ tests/connectors/test_connector_tavily.py | 16 +- tests/methods/test_method_web.py | 10 +- 25 files changed, 2953 insertions(+), 1192 deletions(-) create mode 100644 AI_ENGINE_MIGRATION_PLAN.md delete mode 100644 modules/__init__.py create mode 100644 modules/chat/serviceCenter_ai_engine.py rename modules/connectors/{connector_tavily.py => connectorWebTavily.py} (70%) create mode 100644 modules/engines/aiEngine.py create mode 100644 modules/interfaces/interfaceAiEngine.py rename modules/interfaces/{interface_web_model.py => interfaceWebModel.py} (69%) create mode 100644 modules/interfaces/interfaceWebObjects.py delete mode 100644 modules/interfaces/interface_web_objects.py delete mode 100644 modules/methods/method_web.py delete mode 100644 modules/methods/web/web_search/web_search_base.py delete mode 100644 modules/methods/web/web_search/web_search_tavily.py create mode 100644 test_ai_calls.md create mode 100644 test_ai_fallback.py create mode 100644 test_methodWeb.py create mode 100644 test_web_csv_functionality.py diff --git a/AI_ENGINE_MIGRATION_PLAN.md b/AI_ENGINE_MIGRATION_PLAN.md new file mode 100644 index 00000000..71077e6f --- /dev/null +++ b/AI_ENGINE_MIGRATION_PLAN.md @@ -0,0 +1,276 @@ +# AI Engine Migration Plan + +## Overview +This document outlines the migration strategy from the current AI call system to the new Smart AI Engine architecture. + +## Benefits of the New Architecture + +### 1. **Separation of Concerns** +- Applications no longer need to worry about content size limits +- Centralized AI model selection and failover +- Intelligent content reduction strategies + +### 2. **Improved Reliability** +- Automatic handling of "content too large" errors +- Multiple fallback strategies +- Model-specific optimization + +### 3. **Better Performance** +- Optimal model selection based on content characteristics +- Intelligent chunking and processing strategies +- Reduced API costs through smart model selection + +### 4. **Enhanced Maintainability** +- Single point of AI logic +- Easy to add new models and strategies +- Consistent error handling + +## Migration Phases + +### Phase 1: Infrastructure Setup (Week 1-2) +1. **Create AI Engine Interface** + - ✅ `interfaceAiEngine.py` - Core interfaces and data structures + - ✅ `aiEngine.py` - Smart AI Engine implementation + - ✅ `serviceCenter_ai_engine.py` - ServiceCenter integration + +2. **Update Dependencies** + - Add new imports to existing modules + - Update configuration for AI model selection + - Add logging for AI engine operations + +### Phase 2: ServiceCenter Integration (Week 3) +1. **Update ServiceCenter Class** + ```python + # Add to ServiceCenter.__init__ + self.ai_engine = ServiceCenterAIEngine(self) + + # Replace existing AI methods + async def callAiTextAdvanced(self, prompt: str, context: str = None) -> str: + return await self.ai_engine.callAiTextAdvanced(prompt, context) + + async def callAiTextBasic(self, prompt: str, context: str = None) -> str: + return await self.ai_engine.callAiTextBasic(prompt, context) + + async def extractContentFromDocument(self, prompt: str, document: ChatDocument) -> str: + return await self.ai_engine.extractContentFromDocument(prompt, document) + + async def summarizeChat(self, messages: List[ChatMessage]) -> str: + return await self.ai_engine.summarizeChat(messages) + ``` + +2. **Add New Document-Aware Methods** + ```python + async def callAiWithDocuments( + self, + prompt: str, + documents: List[ChatDocument] = None, + operation_type: str = "general" + ) -> str: + return await self.ai_engine.callAiWithDocuments( + prompt, documents, operation_type=operation_type + ) + ``` + +### Phase 3: Method Updates (Week 4-5) +1. **Update MethodWeb.py** + ```python + # Before + web_scrape_result = await web_interface.scrape(web_scrape_request) + + # After - no changes needed, but can be enhanced + # The AI engine will automatically handle large content + ``` + +2. **Update MethodDocument.py** + ```python + # Before + formatted_content = await self.service.callAiTextBasic(ai_prompt, content) + + # After + formatted_content = await self.service.callAiForReportGeneration( + prompt=ai_prompt, + documents=chat_documents + ) + ``` + +3. **Update MethodAi.py** + ```python + # Before + result = await self.service.callAiTextAdvanced(enhanced_prompt, context) + + # After + result = await self.service.callAiWithDocuments( + prompt=enhanced_prompt, + documents=document_list, + operation_type="ai_processing" + ) + ``` + +4. **Update MethodOutlook.py** + ```python + # Before + composed_email = await self.service.interfaceAiCalls.callAiTextAdvanced(ai_prompt) + + # After + composed_email = await self.service.callAiForEmailComposition( + prompt=ai_prompt, + documents=attached_documents + ) + ``` + +### Phase 4: Task Handling Updates (Week 6) +1. **Update handlingTasks.py** + ```python + # Before + prompt = await self.service.callAiTextAdvanced(task_planning_prompt) + + # After + prompt = await self.service.callAiForTaskPlanning( + prompt=task_planning_prompt, + documents=available_documents, + context=workflow_context + ) + ``` + +2. **Update promptFactory.py** + ```python + # Before + messageSummary = await service.summarizeChat(context.workflow.messages) + + # After - no changes needed, method signature stays the same + # But internally uses the new AI engine + ``` + +### Phase 5: Testing and Optimization (Week 7-8) +1. **Unit Tests** + - Test AI engine with various content sizes + - Test fallback strategies + - Test model selection logic + +2. **Integration Tests** + - Test with real documents of various sizes + - Test error scenarios + - Test performance improvements + +3. **Performance Monitoring** + - Monitor AI call success rates + - Monitor processing times + - Monitor cost savings + +## Code Changes Required + +### 1. ServiceCenter Updates +```python +# Add to ServiceCenter.__init__ +from modules.chat.serviceCenter_ai_engine import ServiceCenterAIEngine +self.ai_engine_wrapper = ServiceCenterAIEngine(self) + +# Update existing methods to use AI engine +async def callAiTextAdvanced(self, prompt: str, context: str = None) -> str: + return await self.ai_engine_wrapper.callAiTextAdvanced(prompt, context) + +async def callAiTextBasic(self, prompt: str, context: str = None) -> str: + return await self.ai_engine_wrapper.callAiTextBasic(prompt, context) + +async def extractContentFromDocument(self, prompt: str, document: ChatDocument) -> str: + return await self.ai_engine_wrapper.extractContentFromDocument(prompt, document) + +async def summarizeChat(self, messages: List[ChatMessage]) -> str: + return await self.ai_engine_wrapper.summarizeChat(messages) +``` + +### 2. Method Updates (Optional Enhancements) +```python +# Enhanced method calls with document awareness +async def process_documents_with_ai(self, prompt: str, documents: List[ChatDocument]): + return await self.service.callAiWithDocuments( + prompt=prompt, + documents=documents, + operation_type="document_processing" + ) +``` + +### 3. Configuration Updates +```ini +# Add to config.ini +[AI_ENGINE] +DEFAULT_MODEL=anthropic_claude +FALLBACK_MODEL=openai_gpt35 +MAX_CONTENT_SIZE=100000 +ENABLE_CONTENT_REDUCTION=true +CONTENT_REDUCTION_THRESHOLD=0.8 +``` + +## Backward Compatibility + +### 1. **Method Signatures** +- All existing method signatures remain unchanged +- Internal implementation uses new AI engine +- No breaking changes for existing code + +### 2. **Error Handling** +- Same error types and messages +- Enhanced error recovery with fallback strategies +- Better error reporting with processing details + +### 3. **Performance** +- Same or better performance +- Automatic optimization based on content +- Reduced API costs through smart model selection + +## Risk Mitigation + +### 1. **Gradual Rollout** +- Deploy with feature flags +- A/B testing with subset of users +- Rollback capability + +### 2. **Monitoring** +- Comprehensive logging of AI engine operations +- Performance metrics tracking +- Error rate monitoring + +### 3. **Fallback Strategy** +- Keep original AI call methods as backup +- Automatic fallback to original methods on errors +- Manual override capability + +## Expected Benefits + +### 1. **Immediate Benefits** +- Elimination of "content too large" errors +- Better handling of large documents +- Improved user experience + +### 2. **Long-term Benefits** +- Easier addition of new AI models +- Better cost optimization +- Enhanced content processing capabilities +- Improved system reliability + +### 3. **Developer Benefits** +- Simplified AI integration +- No need to worry about content size limits +- Consistent AI behavior across the system +- Better debugging and monitoring + +## Success Metrics + +### 1. **Error Reduction** +- 90% reduction in "content too large" errors +- 50% reduction in AI call failures +- 95% success rate for document processing + +### 2. **Performance Improvement** +- 20% faster processing for large documents +- 30% reduction in API costs +- 50% reduction in retry attempts + +### 3. **User Experience** +- Faster response times +- More reliable document processing +- Better content extraction quality + +## Conclusion + +The new AI Engine architecture provides a robust, scalable solution for handling AI calls with large content. The migration can be done gradually with full backward compatibility, ensuring minimal risk while providing significant benefits in reliability, performance, and maintainability. diff --git a/config.ini b/config.ini index 0ba1e0bf..9c529400 100644 --- a/config.ini +++ b/config.ini @@ -36,20 +36,6 @@ Security_LOCK_DURATION_MINUTES = 30 # Content Neutralization configuration Content_Neutralization_ENABLED = False -# Agent Webcrawler configuration -Agent_Webcrawler_SERPAPI_ENGINE = google -Agent_Webcrawler_SERPAPI_APIKEY = 7304bd34bca767aa52dd3233297e30a9edc0abc57871f702b3f8238b9d3ee7bc -Agent_Webcrawler_SERPAPI_MAX_URLS = 3 -Agent_Webcrawler_SERPAPI_MAX_SEARCH_KEYWORDS = 3 -Agent_Webcrawler_SERPAPI_MAX_SEARCH_RESULTS = 5 -Agent_Webcrawler_SERPAPI_TIMEOUT = 10 -Agent_Webcrawler_SERPAPI_USER_AGENT = Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36 - -# Agent Coder configuration -Agent_Coder_INSTALL_TIMEOUT = 180 -Agent_Coder_EXECUTION_TIMEOUT = 60 -Agent_Coder_EXECUTION_RETRY = 5 - # Agent Mail configuration Service_MSFT_CLIENT_ID = c7e7112d-61dc-4f3a-8cd3-08cc4cd7504c Service_MSFT_CLIENT_SECRET = Kxf8Q~2lJIteZ~JaI32kMf1lfaWKATqxXiNiFbzV @@ -58,3 +44,16 @@ Service_MSFT_TENANT_ID = common # Google Service configuration Service_GOOGLE_CLIENT_ID = 354925410565-aqs2b2qaiqmm73qpjnel6al8eid78uvg.apps.googleusercontent.com Service_GOOGLE_CLIENT_SECRET = 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 + +# Tavily Web Search configuration +Connector_WebTavily_API_KEY = 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 + +# Web Search configuration +Web_Search_MAX_QUERY_LENGTH = 400 +Web_Search_MAX_RESULTS = 20 +Web_Search_MIN_RESULTS = 1 + +# Web Crawl configuration +Web_Crawl_TIMEOUT = 30 +Web_Crawl_MAX_RETRIES = 3 +Web_Crawl_RETRY_DELAY = 2 \ No newline at end of file diff --git a/modules/__init__.py b/modules/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/modules/chat/serviceCenter.py b/modules/chat/serviceCenter.py index 13545001..703531f0 100644 --- a/modules/chat/serviceCenter.py +++ b/modules/chat/serviceCenter.py @@ -283,7 +283,6 @@ class ServiceCenter: methodList.append(signature) return methodList - def generateDocumentLabel(self, document: ChatDocument, message: ChatMessage) -> str: """Generate new document label: round+task+action+filename.extension""" try: @@ -1302,10 +1301,6 @@ Please provide a comprehensive summary of this conversation.""" except Exception as e: logger.error(f"Error refreshing file attributes for document {doc.id}: {e}") - # Note: Workflow progress update methods have been moved to handlingTasks.py - # where they belong since that's where the actual workflow execution happens - # This avoids circular import issues between ServiceCenter and ChatInterface - def diagnoseDocumentAccess(self, document: ChatDocument) -> Dict[str, Any]: """ Diagnose document access issues and provide recovery information. diff --git a/modules/chat/serviceCenter_ai_engine.py b/modules/chat/serviceCenter_ai_engine.py new file mode 100644 index 00000000..a33a5813 --- /dev/null +++ b/modules/chat/serviceCenter_ai_engine.py @@ -0,0 +1,266 @@ +""" +ServiceCenter integration with Smart AI Engine +""" + +import logging +from typing import List, Dict, Any, Optional +from modules.interfaces.interfaceChatModel import ChatDocument +from modules.interfaces.interfaceAiEngine import ( + AIRequest, AIResponse, AIModelType, ProcessingStrategy, + ContentReductionStrategy +) +from modules.engines.aiEngine import SmartAIEngine + +logger = logging.getLogger(__name__) + + +class ServiceCenterAIEngine: + """ServiceCenter integration with Smart AI Engine""" + + def __init__(self, service_center): + self.service_center = service_center + self.ai_engine = SmartAIEngine(service_center) + + async def callAiWithDocuments( + self, + prompt: str, + documents: List[ChatDocument] = None, + context: str = None, + preferred_model: AIModelType = None, + operation_type: str = "general", + processing_strategy: ProcessingStrategy = None, + reduction_strategy: ContentReductionStrategy = None, + **kwargs + ) -> str: + """ + Unified AI call method that handles documents and prompts separately + + Args: + prompt: The AI prompt + documents: List of documents to process + context: Additional context + preferred_model: Preferred AI model + operation_type: Type of operation (for strategy selection) + processing_strategy: Explicit processing strategy + reduction_strategy: Explicit content reduction strategy + **kwargs: Additional parameters + + Returns: + AI response content + """ + try: + # Create AI request + request = AIRequest( + prompt=prompt, + documents=documents or [], + context=context, + preferred_model=preferred_model, + processing_strategy=processing_strategy, + reduction_strategy=reduction_strategy, + metadata={ + "operation_type": operation_type, + **kwargs + } + ) + + # Process request + response = await self.ai_engine.process_request(request) + + if response.success: + return response.content + else: + raise Exception(f"AI processing failed: {response.error}") + + except Exception as e: + logger.error(f"Error in AI call with documents: {str(e)}") + raise e + + # Convenience methods for different operation types + + async def callAiForTaskPlanning( + self, + prompt: str, + documents: List[ChatDocument] = None, + context: str = None + ) -> str: + """AI call optimized for task planning""" + return await self.callAiWithDocuments( + prompt=prompt, + documents=documents, + context=context, + operation_type="task_planning", + preferred_model=AIModelType.ANTHROPIC_CLAUDE # Better for complex planning + ) + + async def callAiForActionDefinition( + self, + prompt: str, + documents: List[ChatDocument] = None, + context: str = None + ) -> str: + """AI call optimized for action definition""" + return await self.callAiWithDocuments( + prompt=prompt, + documents=documents, + context=context, + operation_type="action_definition", + preferred_model=AIModelType.ANTHROPIC_CLAUDE + ) + + async def callAiForDocumentExtraction( + self, + prompt: str, + documents: List[ChatDocument], + context: str = None + ) -> str: + """AI call optimized for document extraction""" + return await self.callAiWithDocuments( + prompt=prompt, + documents=documents, + context=context, + operation_type="document_extraction", + processing_strategy=ProcessingStrategy.DOCUMENT_BY_DOCUMENT + ) + + async def callAiForReportGeneration( + self, + prompt: str, + documents: List[ChatDocument], + context: str = None + ) -> str: + """AI call optimized for report generation""" + return await self.callAiWithDocuments( + prompt=prompt, + documents=documents, + context=context, + operation_type="report_generation", + processing_strategy=ProcessingStrategy.CHUNKED_PROCESSING + ) + + async def callAiForEmailComposition( + self, + prompt: str, + documents: List[ChatDocument] = None, + context: str = None + ) -> str: + """AI call optimized for email composition""" + return await self.callAiWithDocuments( + prompt=prompt, + documents=documents, + context=context, + operation_type="email_composition", + preferred_model=AIModelType.OPENAI_GPT4 # Better for creative writing + ) + + async def callAiForChatSummarization( + self, + prompt: str, + documents: List[ChatDocument] = None, + context: str = None + ) -> str: + """AI call optimized for chat summarization""" + return await self.callAiWithDocuments( + prompt=prompt, + documents=documents, + context=context, + operation_type="chat_summarization", + processing_strategy=ProcessingStrategy.SUMMARIZED_CONTENT + ) + + async def callAiForImageAnalysis( + self, + prompt: str, + documents: List[ChatDocument], + context: str = None + ) -> str: + """AI call optimized for image analysis""" + return await self.callAiWithDocuments( + prompt=prompt, + documents=documents, + context=context, + operation_type="image_analysis", + preferred_model=AIModelType.OPENAI_VISION, + requires_vision=True + ) + + # Backward compatibility methods + + async def callAiTextAdvanced(self, prompt: str, context: str = None) -> str: + """Backward compatibility method""" + return await self.callAiWithDocuments( + prompt=prompt, + context=context, + operation_type="general", + preferred_model=AIModelType.ANTHROPIC_CLAUDE + ) + + async def callAiTextBasic(self, prompt: str, context: str = None) -> str: + """Backward compatibility method""" + return await self.callAiWithDocuments( + prompt=prompt, + context=context, + operation_type="general", + preferred_model=AIModelType.OPENAI_GPT35 + ) + + async def callAiImageBasic(self, prompt: str, image_data: str, mime_type: str) -> str: + """Backward compatibility method for image processing""" + # Create a document from image data + image_doc = self.service_center.createDocument( + "image_analysis.jpg", + mime_type, + image_data, + base64encoded=True + ) + + return await self.callAiForImageAnalysis( + prompt=prompt, + documents=[image_doc] + ) + + async def extractContentFromDocument(self, prompt: str, document: ChatDocument) -> str: + """Enhanced document extraction using AI engine""" + try: + return await self.callAiForDocumentExtraction( + prompt=prompt, + documents=[document] + ) + except Exception as e: + logger.error(f"Error in enhanced document extraction: {str(e)}") + # Fall back to original method + from modules.interfaces.interfaceChatModel import ExtractedContent + extracted = await self.service_center.documentProcessor.processFileData( + fileData=self.service_center.getFileData(document.fileId), + fileName=document.fileName, + mimeType=document.mimeType, + prompt=prompt, + documentId=document.id + ) + if extracted and extracted.contents: + return "\n".join([item.data for item in extracted.contents]) + return "" + + async def summarizeChat(self, messages: List) -> str: + """Enhanced chat summarization using AI engine""" + try: + # Convert messages to a simple text format + chat_content = "\n".join([f"{msg.role}: {msg.message}" for msg in messages if hasattr(msg, 'message')]) + + # Create a document from chat content + chat_doc = self.service_center.createDocument( + "chat_history.txt", + "text/plain", + chat_content, + base64encoded=False + ) + + return await self.callAiForChatSummarization( + prompt="Summarize this chat conversation, focusing on key decisions, outcomes, and next steps.", + documents=[chat_doc] + ) + except Exception as e: + logger.error(f"Error in enhanced chat summarization: {str(e)}") + # Fall back to original method + return await self.service_center.callAiTextBasic( + f"Summarize this chat conversation: {chat_content}" + ) diff --git a/modules/connectors/connectorAiOpenai.py b/modules/connectors/connectorAiOpenai.py index b81991d3..4a9f4888 100644 --- a/modules/connectors/connectorAiOpenai.py +++ b/modules/connectors/connectorAiOpenai.py @@ -8,6 +8,10 @@ from modules.shared.configuration import APP_CONFIG # Configure logger logger = logging.getLogger(__name__) +class ContextLengthExceededException(Exception): + """Exception raised when the context length exceeds the model's limit""" + pass + def loadConfigData(): """Load configuration data for OpenAI connector""" return { @@ -75,12 +79,29 @@ class AiOpenai: if response.status_code != 200: logger.error(f"OpenAI API error: {response.status_code} - {response.text}") + + # Check for context length exceeded error + if response.status_code == 400: + try: + error_data = response.json() + if (error_data.get("error", {}).get("code") == "context_length_exceeded" or + "context length" in error_data.get("error", {}).get("message", "").lower()): + # Raise a specific exception for context length issues + raise ContextLengthExceededException( + f"Context length exceeded: {error_data.get('error', {}).get('message', 'Unknown error')}" + ) + except (ValueError, KeyError): + pass # If we can't parse the error, fall through to generic error + raise HTTPException(status_code=500, detail="Error communicating with OpenAI API") responseJson = response.json() content = responseJson["choices"][0]["message"]["content"] return content + except ContextLengthExceededException: + # Re-raise context length exceptions without wrapping + raise except Exception as e: logger.error(f"Error calling OpenAI API: {str(e)}") raise HTTPException(status_code=500, detail=f"Error calling OpenAI API: {str(e)}") diff --git a/modules/connectors/connector_tavily.py b/modules/connectors/connectorWebTavily.py similarity index 70% rename from modules/connectors/connector_tavily.py rename to modules/connectors/connectorWebTavily.py index 8a05e781..7a9ec038 100644 --- a/modules/connectors/connector_tavily.py +++ b/modules/connectors/connectorWebTavily.py @@ -3,7 +3,7 @@ import logging import os from dataclasses import dataclass -from modules.interfaces.interface_web_model import ( +from modules.interfaces.interfaceWebModel import ( WebCrawlBase, WebCrawlDocumentData, WebCrawlRequest, @@ -22,16 +22,35 @@ from modules.interfaces.interface_web_model import ( WebSearchResultItem, WebCrawlActionDocument, WebCrawlActionResult, + get_web_search_min_results, + get_web_search_max_results, ) # from modules.interfaces.interfaceChatModel import ActionResult, ActionDocument from tavily import AsyncTavilyClient from modules.shared.timezoneUtils import get_utc_timestamp +from modules.shared.configuration import APP_CONFIG logger = logging.getLogger(__name__) +# Configuration loading functions +def get_web_crawl_timeout() -> int: + """Get web crawl timeout from configuration""" + return int(APP_CONFIG.get("Web_Crawl_TIMEOUT", "30")) + + +def get_web_crawl_max_retries() -> int: + """Get web crawl max retries from configuration""" + return int(APP_CONFIG.get("Web_Crawl_MAX_RETRIES", "3")) + + +def get_web_crawl_retry_delay() -> int: + """Get web crawl retry delay from configuration""" + return int(APP_CONFIG.get("Web_Crawl_RETRY_DELAY", "2")) + + @dataclass class TavilySearchResult: title: str @@ -50,7 +69,10 @@ class ConnectorTavily(WebSearchBase, WebCrawlBase, WebScrapeBase): @classmethod async def create(cls): - return cls(client=AsyncTavilyClient(api_key=os.getenv("TAVILY_API_KEY"))) + api_key = APP_CONFIG.get("Connector_WebTavily_API_KEY") + if not api_key: + raise ValueError("Tavily API key not configured. Please set Connector_WebTavily_API_KEY in config.ini") + return cls(client=AsyncTavilyClient(api_key=api_key)) async def search_urls(self, request: WebSearchRequest) -> WebSearchActionResult: """Handles the web search request. @@ -113,14 +135,14 @@ class ConnectorTavily(WebSearchBase, WebCrawlBase, WebScrapeBase): async def _search(self, query: str, max_results: int) -> list[TavilySearchResult]: """Calls the Tavily API to perform a web search.""" # Make sure max_results is within the allowed range - if max_results < 0 or max_results > 20: - raise ValueError("max_results must be between 0 and 20") + min_results = get_web_search_min_results() + max_allowed_results = get_web_search_max_results() + if max_results < min_results or max_results > max_allowed_results: + raise ValueError(f"max_results must be between {min_results} and {max_allowed_results}") # Perform actual API call response = await self.client.search(query=query, max_results=max_results) - logger.info(f"Tavily API search response:\n{response}") - return [ TavilySearchResult(title=result["title"], url=result["url"]) for result in response["results"] @@ -153,18 +175,41 @@ class ConnectorTavily(WebSearchBase, WebCrawlBase, WebScrapeBase): ) async def _crawl(self, urls: list) -> list[TavilyCrawlResult]: - """Calls the Tavily API to extract text content from URLs.""" - response = await self.client.extract( - urls=urls, extract_depth="advanced", format="text" - ) + """Calls the Tavily API to extract text content from URLs with retry logic.""" + import asyncio + + max_retries = get_web_crawl_max_retries() + retry_delay = get_web_crawl_retry_delay() + timeout = get_web_crawl_timeout() + + for attempt in range(max_retries + 1): + try: + # Use asyncio.wait_for for timeout + response = await asyncio.wait_for( + self.client.extract(urls=urls, extract_depth="advanced", format="text"), + timeout=timeout + ) - # Log the result - logger.info(f"Tavily API extract (crawl) response:\n{response}") - - return [ - TavilyCrawlResult(url=result["url"], content=result["raw_content"]) - for result in response["results"] - ] + return [ + TavilyCrawlResult(url=result["url"], content=result["raw_content"]) + for result in response["results"] + ] + + except asyncio.TimeoutError: + logger.warning(f"Crawl attempt {attempt + 1} timed out after {timeout} seconds") + if attempt < max_retries: + logger.info(f"Retrying in {retry_delay} seconds...") + await asyncio.sleep(retry_delay) + else: + raise Exception(f"Crawl failed after {max_retries + 1} attempts due to timeout") + + except Exception as e: + logger.warning(f"Crawl attempt {attempt + 1} failed: {str(e)}") + if attempt < max_retries: + logger.info(f"Retrying in {retry_delay} seconds...") + await asyncio.sleep(retry_delay) + else: + raise Exception(f"Crawl failed after {max_retries + 1} attempts: {str(e)}") def _build_crawl_action_result( self, crawl_results: list[TavilyCrawlResult], urls: list[str] = None diff --git a/modules/engines/aiEngine.py b/modules/engines/aiEngine.py new file mode 100644 index 00000000..f2f74c27 --- /dev/null +++ b/modules/engines/aiEngine.py @@ -0,0 +1,544 @@ +""" +Smart AI Engine with intelligent content management and model selection +""" + +import logging +import asyncio +from typing import List, Dict, Any, Optional, Tuple +from modules.interfaces.interfaceAiEngine import ( + AIEngine, AIRequest, AIResponse, AIModelType, ProcessingStrategy, + ContentReductionStrategy, ModelCapabilities, ContentReducer +) +from modules.interfaces.interfaceChatModel import ChatDocument +from modules.interfaces.interfaceAiCalls import AiCalls +from modules.chat.documents.documentExtraction import DocumentExtraction +from modules.shared.configuration import APP_CONFIG + +logger = logging.getLogger(__name__) + + +class SmartAIEngine(AIEngine): + """Smart AI Engine with automatic content management and model selection""" + + def __init__(self, service_center=None): + self.service_center = service_center + self.ai_calls = AiCalls() + self.document_processor = DocumentExtraction(service_center) + self.content_reducer = SmartContentReducer(service_center) + + # Model capabilities mapping + self.model_capabilities = { + AIModelType.OPENAI_GPT4: ModelCapabilities( + max_tokens=8192, + max_input_tokens=128000, + supports_vision=False, + supports_function_calling=True, + cost_per_1k_tokens=0.03, + processing_speed="medium" + ), + AIModelType.OPENAI_GPT35: ModelCapabilities( + max_tokens=4096, + max_input_tokens=16384, + supports_vision=False, + supports_function_calling=True, + cost_per_1k_tokens=0.002, + processing_speed="fast" + ), + AIModelType.ANTHROPIC_CLAUDE: ModelCapabilities( + max_tokens=4096, + max_input_tokens=200000, + supports_vision=False, + supports_function_calling=False, + cost_per_1k_tokens=0.015, + processing_speed="medium" + ), + AIModelType.OPENAI_VISION: ModelCapabilities( + max_tokens=4096, + max_input_tokens=128000, + supports_vision=True, + supports_function_calling=False, + cost_per_1k_tokens=0.01, + processing_speed="slow" + ) + } + + # Processing strategy preferences + self.strategy_preferences = { + "task_planning": ProcessingStrategy.SINGLE_CALL, + "action_definition": ProcessingStrategy.SINGLE_CALL, + "document_extraction": ProcessingStrategy.DOCUMENT_BY_DOCUMENT, + "report_generation": ProcessingStrategy.CHUNKED_PROCESSING, + "email_composition": ProcessingStrategy.SINGLE_CALL, + "chat_summarization": ProcessingStrategy.SUMMARIZED_CONTENT + } + + async def process_request(self, request: AIRequest) -> AIResponse: + """Process AI request with intelligent content management""" + try: + # Step 1: Determine optimal processing strategy + strategy = self._determine_processing_strategy(request) + request.processing_strategy = strategy + + # Step 2: Estimate token usage + estimated_tokens = await self.estimate_token_usage(request) + + # Step 3: Select appropriate model + model = self._select_optimal_model(request, estimated_tokens) + + # Step 4: Process with selected strategy + if strategy == ProcessingStrategy.SINGLE_CALL: + return await self._process_single_call(request, model) + elif strategy == ProcessingStrategy.DOCUMENT_BY_DOCUMENT: + return await self._process_document_by_document(request, model) + elif strategy == ProcessingStrategy.CHUNKED_PROCESSING: + return await self._process_chunked(request, model) + elif strategy == ProcessingStrategy.SUMMARIZED_CONTENT: + return await self._process_with_summarization(request, model) + else: + raise ValueError(f"Unknown processing strategy: {strategy}") + + except Exception as e: + logger.error(f"Error processing AI request: {str(e)}") + return AIResponse( + success=False, + content="", + model_used=AIModelType.OPENAI_GPT35, + processing_strategy=ProcessingStrategy.SINGLE_CALL, + error=str(e) + ) + + def _determine_processing_strategy(self, request: AIRequest) -> ProcessingStrategy: + """Determine the best processing strategy based on request characteristics""" + + # Use explicit strategy if provided + if request.processing_strategy: + return request.processing_strategy + + # Determine based on metadata or content characteristics + metadata = request.metadata or {} + operation_type = metadata.get("operation_type", "general") + + # Check if we have a preference for this operation type + if operation_type in self.strategy_preferences: + return self.strategy_preferences[operation_type] + + # Auto-determine based on content characteristics + num_documents = len(request.documents) + prompt_length = len(request.prompt) + + if num_documents == 0: + return ProcessingStrategy.SINGLE_CALL + elif num_documents == 1: + return ProcessingStrategy.SINGLE_CALL + elif num_documents <= 3 and prompt_length < 1000: + return ProcessingStrategy.SINGLE_CALL + elif num_documents > 5: + return ProcessingStrategy.DOCUMENT_BY_DOCUMENT + else: + return ProcessingStrategy.CHUNKED_PROCESSING + + def _select_optimal_model(self, request: AIRequest, estimated_tokens: int) -> AIModelType: + """Select the optimal AI model based on request characteristics""" + + # Use preferred model if specified and suitable + if request.preferred_model: + capabilities = self.get_model_capabilities(request.preferred_model) + if estimated_tokens <= capabilities.max_input_tokens: + return request.preferred_model + + # Select model based on requirements + metadata = request.metadata or {} + requires_vision = metadata.get("requires_vision", False) + requires_function_calling = metadata.get("requires_function_calling", False) + + # Filter models by requirements + suitable_models = [] + for model, capabilities in self.model_capabilities.items(): + if estimated_tokens <= capabilities.max_input_tokens: + if requires_vision and not capabilities.supports_vision: + continue + if requires_function_calling and not capabilities.supports_function_calling: + continue + suitable_models.append((model, capabilities)) + + if not suitable_models: + # If no model can handle the full content, use the one with highest capacity + best_model = max(self.model_capabilities.items(), + key=lambda x: x[1].max_input_tokens) + logger.warning(f"No model can handle {estimated_tokens} tokens, using {best_model[0]}") + return best_model[0] + + # Select based on cost and speed preferences + # For now, prefer Claude for large content, GPT-4 for complex tasks, GPT-3.5 for simple tasks + if estimated_tokens > 50000: + return AIModelType.ANTHROPIC_CLAUDE + elif metadata.get("complex_task", False): + return AIModelType.OPENAI_GPT4 + else: + return AIModelType.OPENAI_GPT35 + + async def _process_single_call(self, request: AIRequest, model: AIModelType) -> AIResponse: + """Process request with a single AI call""" + try: + # Prepare content + content = await self._prepare_content_for_single_call(request) + + # Make AI call + if model in [AIModelType.OPENAI_GPT4, AIModelType.OPENAI_GPT35]: + response = await self.ai_calls.callAiTextAdvanced(content, request.context) + elif model == AIModelType.ANTHROPIC_CLAUDE: + response = await self.ai_calls.callAiTextAdvanced(content, request.context) + else: + raise ValueError(f"Unsupported model for single call: {model}") + + return AIResponse( + success=True, + content=response, + model_used=model, + processing_strategy=ProcessingStrategy.SINGLE_CALL + ) + + except Exception as e: + # If single call fails due to size, try with content reduction + if "too large" in str(e).lower() or "400" in str(e): + return await self._process_with_content_reduction(request, model) + else: + raise e + + async def _process_document_by_document(self, request: AIRequest, model: AIModelType) -> AIResponse: + """Process each document separately and merge results""" + try: + results = [] + + for i, document in enumerate(request.documents): + # Create individual request for each document + doc_request = AIRequest( + prompt=request.prompt, + documents=[document], + context=request.context, + preferred_model=model, + metadata=request.metadata + ) + + # Process document + doc_response = await self._process_single_call(doc_request, model) + if doc_response.success: + results.append(f"Document {i+1} ({document.fileName}):\n{doc_response.content}") + else: + results.append(f"Document {i+1} ({document.fileName}): Error - {doc_response.error}") + + # Merge results + merged_content = "\n\n".join(results) + + return AIResponse( + success=True, + content=merged_content, + model_used=model, + processing_strategy=ProcessingStrategy.DOCUMENT_BY_DOCUMENT + ) + + except Exception as e: + logger.error(f"Error in document-by-document processing: {str(e)}") + return AIResponse( + success=False, + content="", + model_used=model, + processing_strategy=ProcessingStrategy.DOCUMENT_BY_DOCUMENT, + error=str(e) + ) + + async def _process_chunked(self, request: AIRequest, model: AIModelType) -> AIResponse: + """Process content in chunks and merge results""" + try: + # This would implement chunked processing logic + # For now, fall back to document-by-document + return await self._process_document_by_document(request, model) + + except Exception as e: + logger.error(f"Error in chunked processing: {str(e)}") + return AIResponse( + success=False, + content="", + model_used=model, + processing_strategy=ProcessingStrategy.CHUNKED_PROCESSING, + error=str(e) + ) + + async def _process_with_summarization(self, request: AIRequest, model: AIModelType) -> AIResponse: + """Process with content summarization first""" + try: + # Summarize documents first + summarized_docs = [] + for document in request.documents: + summary_doc = await self.content_reducer.summarize_document( + document, + f"Summarize this document for: {request.prompt}" + ) + summarized_docs.append(summary_doc) + + # Create new request with summarized documents + summary_request = AIRequest( + prompt=request.prompt, + documents=summarized_docs, + context=request.context, + preferred_model=model, + metadata=request.metadata + ) + + # Process with summarized content + return await self._process_single_call(summary_request, model) + + except Exception as e: + logger.error(f"Error in summarization processing: {str(e)}") + return AIResponse( + success=False, + content="", + model_used=model, + processing_strategy=ProcessingStrategy.SUMMARIZED_CONTENT, + error=str(e) + ) + + async def _process_with_content_reduction(self, request: AIRequest, model: AIModelType) -> AIResponse: + """Process with automatic content reduction""" + try: + # Determine reduction strategy + strategy = self._determine_reduction_strategy(request) + + # Reduce content + reduced_docs, reduced_prompt = await self.content_reducer.reduce_content( + request.documents, + request.prompt, + strategy, + target_reduction=0.5 + ) + + # Create new request with reduced content + reduced_request = AIRequest( + prompt=reduced_prompt, + documents=reduced_docs, + context=request.context, + preferred_model=model, + metadata=request.metadata + ) + + # Try processing with reduced content + return await self._process_single_call(reduced_request, model) + + except Exception as e: + logger.error(f"Error in content reduction processing: {str(e)}") + return AIResponse( + success=False, + content="", + model_used=model, + processing_strategy=ProcessingStrategy.SINGLE_CALL, + error=f"Content reduction failed: {str(e)}" + ) + + def _determine_reduction_strategy(self, request: AIRequest) -> ContentReductionStrategy: + """Determine the best content reduction strategy""" + + # Use explicit strategy if provided + if request.reduction_strategy: + return request.reduction_strategy + + # Auto-determine based on request characteristics + metadata = request.metadata or {} + operation_type = metadata.get("operation_type", "general") + + # Different strategies for different operations + if operation_type in ["task_planning", "action_definition"]: + # For planning tasks, prompt is crucial + return ContentReductionStrategy.REDUCE_DOCUMENTS_ONLY + elif operation_type in ["document_extraction", "report_generation"]: + # For document processing, documents are crucial + return ContentReductionStrategy.REDUCE_PROMPT_AND_DOCS + else: + # Default: reduce both + return ContentReductionStrategy.REDUCE_PROMPT_AND_DOCS + + async def _prepare_content_for_single_call(self, request: AIRequest) -> str: + """Prepare content for a single AI call""" + content_parts = [request.prompt] + + if request.context: + content_parts.append(f"Context: {request.context}") + + # Add document content + for i, document in enumerate(request.documents): + try: + # Extract document content + extracted = await self.service_center.extractContentFromDocument( + "Extract all relevant text content", + document + ) + + if extracted and extracted.contents: + doc_content = "\n".join([item.data for item in extracted.contents]) + content_parts.append(f"Document {i+1} ({document.fileName}):\n{doc_content}") + else: + content_parts.append(f"Document {i+1} ({document.fileName}): [No content extracted]") + + except Exception as e: + logger.warning(f"Could not extract content from document {document.fileName}: {str(e)}") + content_parts.append(f"Document {i+1} ({document.fileName}): [Error extracting content]") + + return "\n\n".join(content_parts) + + def get_model_capabilities(self, model: AIModelType) -> ModelCapabilities: + """Get capabilities for a specific model""" + return self.model_capabilities.get(model, self.model_capabilities[AIModelType.OPENAI_GPT35]) + + async def estimate_token_usage(self, request: AIRequest) -> int: + """Estimate token usage for a request""" + # Simple estimation: ~4 characters per token + prompt_tokens = len(request.prompt) // 4 + context_tokens = len(request.context or "") // 4 + + # Estimate document tokens + doc_tokens = 0 + for document in request.documents: + # Rough estimate based on file size + doc_tokens += document.fileSize // 4 + + return prompt_tokens + context_tokens + doc_tokens + + +class SmartContentReducer(ContentReducer): + """Smart content reducer using document extraction engine""" + + def __init__(self, service_center): + self.service_center = service_center + self.document_processor = DocumentExtraction(service_center) + + async def reduce_content( + self, + documents: List[ChatDocument], + prompt: str, + strategy: ContentReductionStrategy, + target_reduction: float = 0.5 + ) -> Tuple[List[ChatDocument], str]: + """Reduce content size while preserving important information""" + + reduced_docs = [] + reduced_prompt = prompt + + # Sort documents by size (largest first) + sorted_docs = sorted(documents, key=lambda d: d.fileSize, reverse=True) + + for document in sorted_docs: + try: + # Create reduction prompt based on strategy + if strategy == ContentReductionStrategy.REDUCE_DOCUMENTS_ONLY: + reduction_prompt = f""" + Summarize this document to {int(100 * (1 - target_reduction))}% of its original size. + Focus on the most important information relevant to: {prompt} + Preserve key facts, data, and conclusions. + """ + elif strategy == ContentReductionStrategy.SUMMARIZE_DOCUMENTS: + reduction_prompt = f""" + Create a concise summary of this document focusing on: {prompt} + Include only the most relevant information. + """ + else: # REDUCE_PROMPT_AND_DOCS or EXTRACT_KEY_INFO + reduction_prompt = f""" + Extract only the key information from this document that is relevant to: {prompt} + Be very selective and concise. + """ + + # Process document with reduction + extracted = await self.service_center.extractContentFromDocument( + reduction_prompt, + document + ) + + if extracted and extracted.contents: + # Create new document with reduced content + reduced_content = "\n".join([item.data for item in extracted.contents]) + reduced_doc = await self._create_reduced_document(document, reduced_content) + reduced_docs.append(reduced_doc) + else: + # If reduction fails, keep original document + reduced_docs.append(document) + + except Exception as e: + logger.warning(f"Could not reduce document {document.fileName}: {str(e)}") + reduced_docs.append(document) + + # Reduce prompt if strategy requires it + if strategy in [ContentReductionStrategy.REDUCE_PROMPT_AND_DOCS]: + reduced_prompt = self._reduce_prompt(prompt, target_reduction) + + return reduced_docs, reduced_prompt + + async def summarize_document( + self, + document: ChatDocument, + focus_prompt: str + ) -> ChatDocument: + """Create a summary of a document focused on specific aspects""" + + summary_prompt = f""" + Create a comprehensive summary of this document focusing on: {focus_prompt} + + Include: + - Key points and main ideas + - Important data and statistics + - Conclusions and recommendations + - Any relevant details + + Keep the summary concise but informative. + """ + + try: + extracted = await self.service_center.extractContentFromDocument( + summary_prompt, + document + ) + + if extracted and extracted.contents: + summary_content = "\n".join([item.data for item in extracted.contents]) + return await self._create_reduced_document(document, summary_content) + else: + return document + + except Exception as e: + logger.warning(f"Could not summarize document {document.fileName}: {str(e)}") + return document + + async def _create_reduced_document(self, original_doc: ChatDocument, reduced_content: str) -> ChatDocument: + """Create a new document with reduced content""" + try: + # Create new file with reduced content + file_id = self.service_center.createFile( + f"reduced_{original_doc.fileName}", + "text/plain", + reduced_content, + base64encoded=False + ) + + # Create new document + return self.service_center.createDocument( + f"reduced_{original_doc.fileName}", + "text/plain", + reduced_content, + base64encoded=False, + existing_file_id=file_id + ) + + except Exception as e: + logger.error(f"Could not create reduced document: {str(e)}") + return original_doc + + def _reduce_prompt(self, prompt: str, target_reduction: float) -> str: + """Reduce prompt size while preserving essential information""" + # Simple prompt reduction - keep first and last parts + lines = prompt.split('\n') + if len(lines) <= 3: + return prompt + + # Keep first 30% and last 20% of lines + keep_start = int(len(lines) * 0.3) + keep_end = int(len(lines) * 0.2) + + reduced_lines = lines[:keep_start] + ["... (content reduced) ..."] + lines[-keep_end:] + return '\n'.join(reduced_lines) diff --git a/modules/interfaces/interfaceAiCalls.py b/modules/interfaces/interfaceAiCalls.py index 2c47ff33..fe93105f 100644 --- a/modules/interfaces/interfaceAiCalls.py +++ b/modules/interfaces/interfaceAiCalls.py @@ -1,6 +1,6 @@ import logging from typing import Dict, Any, List, Union, Optional -from modules.connectors.connectorAiOpenai import AiOpenai +from modules.connectors.connectorAiOpenai import AiOpenai, ContextLengthExceededException from modules.connectors.connectorAiAnthropic import AiAnthropic logger = logging.getLogger(__name__) @@ -53,14 +53,23 @@ class AiCalls: try: return await self.openaiService.callAiBasic(messages) + except ContextLengthExceededException as e: + logger.warning(f"OpenAI context length exceeded, falling back to Anthropic: {str(e)}") + # Fallback to Anthropic (AI Advanced) when context length is exceeded + return await self.callAiTextAdvanced(prompt, context, _is_fallback=True) except Exception as e: logger.error(f"Error in OpenAI call: {str(e)}") return f"Error: {str(e)}" - async def callAiTextAdvanced(self, prompt: str, context: Optional[str] = None) -> str: + async def callAiTextAdvanced(self, prompt: str, context: Optional[str] = None, _is_fallback: bool = False) -> str: """ Advanced text processing using Anthropic. Fallback to OpenAI if Anthropic is overloaded or rate-limited. + + Args: + prompt: The user prompt to process + context: Optional system context/prompt + _is_fallback: Internal flag to prevent infinite recursion """ # For Anthropic, we need to handle system content differently # Anthropic expects system content in a top-level parameter, not as a message role @@ -97,8 +106,13 @@ class AiCalls: except Exception as e: err_str = str(e) logger.warning(f"[UI NOTICE] Advanced AI failed, falling back to Basic AI (OpenAI). Reason: {err_str}") - # Fallback to OpenAI basic - return await self.callAiTextBasic(prompt, context) + # Fallback to OpenAI basic, but only if we're not already in a fallback + if not _is_fallback: + return await self.callAiTextBasic(prompt, context) + else: + # If we're already in a fallback, return error to prevent infinite recursion + logger.error("Both AI services failed, cannot provide fallback") + return f"Error: Both AI services failed. Anthropic error: {err_str}" async def callAiImageBasic(self, prompt: str, imageData: Union[str, bytes], mimeType: str = None) -> str: """ diff --git a/modules/interfaces/interfaceAiEngine.py b/modules/interfaces/interfaceAiEngine.py new file mode 100644 index 00000000..2ccdd70e --- /dev/null +++ b/modules/interfaces/interfaceAiEngine.py @@ -0,0 +1,115 @@ +""" +Centralized AI Engine Interface for intelligent content processing +""" + +from abc import ABC, abstractmethod +from typing import List, Dict, Any, Optional, Union, Tuple +from enum import Enum +from dataclasses import dataclass +from modules.interfaces.interfaceChatModel import ChatDocument, ExtractedContent + + +class AIModelType(Enum): + """Available AI model types""" + OPENAI_GPT4 = "openai_gpt4" + OPENAI_GPT35 = "openai_gpt35" + ANTHROPIC_CLAUDE = "anthropic_claude" + OPENAI_VISION = "openai_vision" + ANTHROPIC_VISION = "anthropic_vision" + + +class ProcessingStrategy(Enum): + """Content processing strategies""" + SINGLE_CALL = "single_call" # One AI call with full content + DOCUMENT_BY_DOCUMENT = "doc_by_doc" # One call per document, merge results + CHUNKED_PROCESSING = "chunked" # Process in chunks, merge results + SUMMARIZED_CONTENT = "summarized" # Summarize content first, then process + + +class ContentReductionStrategy(Enum): + """Content reduction strategies""" + REDUCE_DOCUMENTS_ONLY = "reduce_docs" # Keep prompt, reduce documents + REDUCE_PROMPT_AND_DOCS = "reduce_both" # Reduce both prompt and documents + SUMMARIZE_DOCUMENTS = "summarize_docs" # Summarize documents to key points + EXTRACT_KEY_INFO = "extract_key" # Extract only relevant information + + +@dataclass +class AIRequest: + """Standardized AI request structure""" + prompt: str + documents: List[ChatDocument] + context: Optional[str] = None + preferred_model: Optional[AIModelType] = None + max_tokens: Optional[int] = None + temperature: Optional[float] = None + processing_strategy: Optional[ProcessingStrategy] = None + reduction_strategy: Optional[ContentReductionStrategy] = None + metadata: Optional[Dict[str, Any]] = None + + +@dataclass +class AIResponse: + """Standardized AI response structure""" + success: bool + content: str + model_used: AIModelType + processing_strategy: ProcessingStrategy + tokens_used: Optional[int] = None + processing_time: Optional[float] = None + error: Optional[str] = None + metadata: Optional[Dict[str, Any]] = None + + +@dataclass +class ModelCapabilities: + """AI model capabilities and limits""" + max_tokens: int + max_input_tokens: int + supports_vision: bool + supports_function_calling: bool + cost_per_1k_tokens: float + processing_speed: str # "fast", "medium", "slow" + + +class AIEngine(ABC): + """Abstract AI Engine interface""" + + @abstractmethod + async def process_request(self, request: AIRequest) -> AIResponse: + """Process an AI request with intelligent content management""" + pass + + @abstractmethod + def get_model_capabilities(self, model: AIModelType) -> ModelCapabilities: + """Get capabilities and limits for a specific model""" + pass + + @abstractmethod + async def estimate_token_usage(self, request: AIRequest) -> int: + """Estimate token usage for a request""" + pass + + +class ContentReducer(ABC): + """Abstract content reduction interface""" + + @abstractmethod + async def reduce_content( + self, + documents: List[ChatDocument], + prompt: str, + strategy: ContentReductionStrategy, + target_reduction: float = 0.5 + ) -> Tuple[List[ChatDocument], str]: + """Reduce content size while preserving important information""" + pass + + @abstractmethod + async def summarize_document( + self, + document: ChatDocument, + focus_prompt: str + ) -> ChatDocument: + """Create a summary of a document focused on specific aspects""" + pass diff --git a/modules/interfaces/interface_web_model.py b/modules/interfaces/interfaceWebModel.py similarity index 69% rename from modules/interfaces/interface_web_model.py rename to modules/interfaces/interfaceWebModel.py index bb4a82e0..26a16560 100644 --- a/modules/interfaces/interface_web_model.py +++ b/modules/interfaces/interfaceWebModel.py @@ -4,6 +4,23 @@ from abc import ABC, abstractmethod from modules.interfaces.interfaceChatModel import ActionDocument, ActionResult from pydantic import BaseModel, Field, HttpUrl from typing import List +from modules.shared.configuration import APP_CONFIG + + +# Configuration loading functions +def get_web_search_max_query_length() -> int: + """Get maximum query length from configuration""" + return int(APP_CONFIG.get("Web_Search_MAX_QUERY_LENGTH", "400")) + + +def get_web_search_max_results() -> int: + """Get maximum search results from configuration""" + return int(APP_CONFIG.get("Web_Search_MAX_RESULTS", "20")) + + +def get_web_search_min_results() -> int: + """Get minimum search results from configuration""" + return int(APP_CONFIG.get("Web_Search_MIN_RESULTS", "1")) # --- Web search --- @@ -12,8 +29,8 @@ from typing import List class WebSearchRequest(BaseModel): - query: str = Field(min_length=1, max_length=400) - max_results: int = Field(ge=1, le=20) + query: str = Field(min_length=1, max_length=get_web_search_max_query_length()) + max_results: int = Field(ge=get_web_search_min_results(), le=get_web_search_max_results()) class WebSearchResultItem(BaseModel): @@ -26,7 +43,7 @@ class WebSearchResultItem(BaseModel): class WebSearchDocumentData(BaseModel): """Complete search results document""" - query: str = Field(min_length=1, max_length=400) + query: str = Field(min_length=1, max_length=get_web_search_max_query_length()) results: List[WebSearchResultItem] total_count: int @@ -89,8 +106,8 @@ class WebCrawlBase(ABC): class WebScrapeRequest(BaseModel): - query: str = Field(min_length=1, max_length=400) - max_results: int = Field(ge=1, le=20) + query: str = Field(min_length=1, max_length=get_web_search_max_query_length()) + max_results: int = Field(ge=get_web_search_min_results(), le=get_web_search_max_results()) class WebScrapeResultItem(BaseModel): @@ -103,7 +120,7 @@ class WebScrapeResultItem(BaseModel): class WebScrapeDocumentData(BaseModel): """Complete scrape results document""" - query: str = Field(min_length=1, max_length=400) + query: str = Field(min_length=1, max_length=get_web_search_max_query_length()) results: List[WebScrapeResultItem] total_count: int diff --git a/modules/interfaces/interfaceWebObjects.py b/modules/interfaces/interfaceWebObjects.py new file mode 100644 index 00000000..bdd1fd53 --- /dev/null +++ b/modules/interfaces/interfaceWebObjects.py @@ -0,0 +1,118 @@ +from typing import Optional +import json +import csv +import io +from modules.interfaces.interfaceWebModel import ( + WebCrawlActionResult, + WebSearchActionResult, + WebSearchRequest, + WebCrawlRequest, + WebScrapeActionResult, + WebScrapeRequest, + WebCrawlDocumentData, + WebScrapeDocumentData, + WebSearchDocumentData, +) + +from dataclasses import dataclass +from modules.connectors.connectorWebTavily import ConnectorTavily +from modules.interfaces.interfaceChatModel import ActionDocument + + +@dataclass(slots=True) +class WebInterface: + connectorWebTavily: ConnectorTavily + + def __post_init__(self) -> None: + if self.connectorWebTavily is None: + raise TypeError( + "connectorWebTavily must be provided. " + "Use `await WebInterface.create()` or pass a ConnectorTavily." + ) + + @classmethod + async def create(cls) -> "WebInterface": + connectorWebTavily = await ConnectorTavily.create() + + return WebInterface(connectorWebTavily=connectorWebTavily) + + async def search( + self, web_search_request: WebSearchRequest + ) -> WebSearchActionResult: + # NOTE: Add connectors here + return await self.connectorWebTavily.search_urls(web_search_request) + + async def crawl(self, web_crawl_request: WebCrawlRequest) -> WebCrawlActionResult: + # NOTE: Add connectors here + return await self.connectorWebTavily.crawl_urls(web_crawl_request) + + async def scrape( + self, web_scrape_request: WebScrapeRequest + ) -> WebScrapeActionResult: + # NOTE: Add connectors here + return await self.connectorWebTavily.scrape(web_scrape_request) + + def convert_web_result_to_json(self, web_result) -> str: + """Convert WebCrawlActionResult or WebScrapeActionResult to proper JSON format""" + if not web_result.success or not web_result.documents: + return json.dumps({"success": web_result.success, "error": web_result.error}) + + # Extract the document data and convert to dict + document_data = web_result.documents[0].documentData + + # Convert Pydantic model to dict + result_dict = { + "success": web_result.success, + "results": [ + { + "url": str(result.url), + "content": result.content + } + for result in document_data.results + ], + "total_count": document_data.total_count + } + + # Add type-specific fields + if hasattr(document_data, 'urls'): + # WebCrawlDocumentData has urls field + result_dict["urls"] = [str(url) for url in document_data.urls] + elif hasattr(document_data, 'query'): + # WebScrapeDocumentData has query field + result_dict["query"] = document_data.query + + return json.dumps(result_dict, indent=2, ensure_ascii=False) + + def convert_web_search_result_to_csv(self, web_search_result: WebSearchActionResult) -> str: + """Convert WebSearchActionResult to CSV format with url and title columns""" + if not web_search_result.success or not web_search_result.documents: + return "" + + output = io.StringIO() + writer = csv.writer(output, delimiter=';') + + # Write header + writer.writerow(['url', 'title']) + + # Write data rows + document_data = web_search_result.documents[0].documentData + for result in document_data.results: + writer.writerow([str(result.url), result.title]) + + return output.getvalue() + + def create_json_action_document(self, json_content: str, document_name: str) -> ActionDocument: + """Create an ActionDocument with JSON content""" + return ActionDocument( + documentName=document_name, + documentData=json_content, + mimeType="application/json" + ) + + def create_csv_action_document(self, csv_content: str, document_name: str) -> ActionDocument: + """Create an ActionDocument with CSV content""" + return ActionDocument( + documentName=document_name, + documentData=csv_content, + mimeType="text/csv" + ) \ No newline at end of file diff --git a/modules/interfaces/interface_web_objects.py b/modules/interfaces/interface_web_objects.py deleted file mode 100644 index f348f0bd..00000000 --- a/modules/interfaces/interface_web_objects.py +++ /dev/null @@ -1,46 +0,0 @@ -from typing import Optional -from modules.interfaces.interface_web_model import ( - WebCrawlActionResult, - WebSearchActionResult, - WebSearchRequest, - WebCrawlRequest, - WebScrapeActionResult, - WebScrapeRequest, -) - -from dataclasses import dataclass -from modules.connectors.connector_tavily import ConnectorTavily - - -@dataclass(slots=True) -class WebInterface: - connector_tavily: ConnectorTavily - - def __post_init__(self) -> None: - if self.connector_tavily is None: - raise TypeError( - "connector_tavily must be provided. " - "Use `await WebInterface.create()` or pass a ConnectorTavily." - ) - - @classmethod - async def create(cls) -> "WebInterface": - connector_tavily = await ConnectorTavily.create() - - return WebInterface(connector_tavily=connector_tavily) - - async def search( - self, web_search_request: WebSearchRequest - ) -> WebSearchActionResult: - # NOTE: Add connectors here - return await self.connector_tavily.search_urls(web_search_request) - - async def crawl(self, web_crawl_request: WebCrawlRequest) -> WebCrawlActionResult: - # NOTE: Add connectors here - return await self.connector_tavily.crawl_urls(web_crawl_request) - - async def scrape( - self, web_scrape_request: WebScrapeRequest - ) -> WebScrapeActionResult: - # NOTE: Add connectors here - return await self.connector_tavily.scrape(web_scrape_request) diff --git a/modules/methods/methodWeb.py b/modules/methods/methodWeb.py index d3535916..96c597db 100644 --- a/modules/methods/methodWeb.py +++ b/modules/methods/methodWeb.py @@ -1,817 +1,284 @@ -""" -Web operations method module. -Handles web scraping, crawling, and search operations. -""" - import logging -import requests -import json -import re -import copy -from typing import Dict, Any, List, Optional -from datetime import datetime, UTC -from urllib.parse import urlparse, urljoin -import time -import random -from bs4 import BeautifulSoup -import os - -# Selenium imports for JavaScript-heavy pages -from selenium import webdriver -from selenium.webdriver.chrome.options import Options -from selenium.common.exceptions import WebDriverException -from selenium.webdriver.common.by import By -from selenium.webdriver.support.ui import WebDriverWait -from selenium.webdriver.support import expected_conditions as EC - +import csv +import io +from typing import Any, Dict from modules.chat.methodBase import MethodBase, action -from modules.interfaces.interfaceChatModel import ActionResult -from modules.shared.configuration import APP_CONFIG -from modules.shared.timezoneUtils import get_utc_timestamp +from modules.interfaces.interfaceChatModel import ActionResult, ActionDocument +from modules.interfaces.interfaceWebObjects import WebInterface +from modules.interfaces.interfaceWebModel import ( + WebSearchRequest, + WebCrawlRequest, + WebScrapeRequest, +) + logger = logging.getLogger(__name__) + class MethodWeb(MethodBase): - """ - Web method implementation for web operations. - - web.search: Uses Google SerpAPI to find relevant URLs for a query. Returns only search result metadata (title, URL, snippet). Does NOT fetch or extract page content. - - web.crawl: Fetches and extracts main content from a list of URLs, either provided directly or via referenced documents. Uses a headless browser for JavaScript-heavy pages. - """ - + """Web method implementation for web operations.""" + def __init__(self, serviceCenter: Any): super().__init__(serviceCenter) self.name = "web" - self.description = "Handle web operations like search and crawling" - self.srcApikey = APP_CONFIG.get("Agent_Webcrawler_SERPAPI_APIKEY", "") - self.srcEngine = APP_CONFIG.get("Agent_Webcrawler_SERPAPI_ENGINE", "google") - self.srcCountry = APP_CONFIG.get("Agent_Webcrawler_SERPAPI_COUNTRY", "auto") - self.maxResults = int(APP_CONFIG.get("Agent_Webcrawler_SERPAPI_MAX_SEARCH_RESULTS", "5")) - self.user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" - self.timeout = 30 - - def _format_timestamp_for_filename(self) -> str: - """Format current timestamp as YYYYMMDD-hhmmss for filenames.""" - return datetime.now(UTC).strftime("%Y%m%d-%H%M%S") - - def _readUrl(self, url: str) -> BeautifulSoup: - """Read a URL and return a BeautifulSoup parser for the content with enhanced error handling""" - if not url or not url.startswith(('http://', 'https://')): - logger.error(f"Invalid URL: {url}") - return None - - # Enhanced headers to mimic real browser - headers = { - 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', - 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8', - 'Accept-Language': 'en-US,en;q=0.9,de;q=0.8', - 'Accept-Encoding': 'gzip, deflate, br', - 'DNT': '1', - 'Connection': 'keep-alive', - 'Upgrade-Insecure-Requests': '1', - 'Sec-Fetch-Dest': 'document', - 'Sec-Fetch-Mode': 'navigate', - 'Sec-Fetch-Site': 'none', - 'Cache-Control': 'max-age=0' - } - - try: - # Use session for better connection handling - session = requests.Session() - session.headers.update(headers) - - # Initial request with allow_redirects - response = session.get(url, timeout=self.timeout, allow_redirects=True) - - # Handle various status codes - if response.status_code == 200: - # Success - parse content - logger.debug(f"Successfully read URL: {url}") - return BeautifulSoup(response.text, 'html.parser') - - elif response.status_code == 202: - # Accepted - retry with backoff - logger.info(f"Status 202 for {url}, retrying with backoff...") - backoff_times = [1.0, 2.0, 5.0, 10.0] - - for wait_time in backoff_times: - time.sleep(wait_time) - retry_response = session.get(url, timeout=self.timeout, allow_redirects=True) - - if retry_response.status_code == 200: - logger.debug(f"Successfully read URL after retry: {url}") - return BeautifulSoup(retry_response.text, 'html.parser') - elif retry_response.status_code != 202: - break - - logger.warning(f"Failed to read URL after retries: {url}") - return None - - elif response.status_code in [301, 302, 307, 308]: - # Redirect - should be handled by allow_redirects=True - logger.warning(f"Unexpected redirect status {response.status_code} for {url}") - return None - - elif response.status_code == 403: - # Forbidden - try with different user agent - logger.warning(f"403 Forbidden for {url}, trying with different user agent...") - headers['User-Agent'] = 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36' - session.headers.update(headers) - - retry_response = session.get(url, timeout=self.timeout, allow_redirects=True) - if retry_response.status_code == 200: - logger.debug(f"Successfully read URL with different user agent: {url}") - return BeautifulSoup(retry_response.text, 'html.parser') - else: - logger.error(f"Still getting {retry_response.status_code} for {url}") - return None - - elif response.status_code == 429: - # Rate limited - wait and retry - logger.warning(f"Rate limited for {url}, waiting 30 seconds...") - time.sleep(30) - retry_response = session.get(url, timeout=self.timeout, allow_redirects=True) - if retry_response.status_code == 200: - logger.debug(f"Successfully read URL after rate limit: {url}") - return BeautifulSoup(retry_response.text, 'html.parser') - else: - logger.error(f"Still getting {retry_response.status_code} after rate limit wait for {url}") - return None - - else: - # Other error status codes - logger.error(f"HTTP {response.status_code} for {url}") - return None - - except requests.exceptions.Timeout: - logger.error(f"Timeout reading URL: {url}") - return None - except requests.exceptions.ConnectionError: - logger.error(f"Connection error reading URL: {url}") - return None - except requests.exceptions.RequestException as e: - logger.error(f"Request error reading URL {url}: {str(e)}") - return None - except Exception as e: - logger.error(f"Unexpected error reading URL {url}: {str(e)}") - return None - - def _extractTitle(self, soup: BeautifulSoup, url: str) -> str: - """Extract the title from a webpage""" - if not soup: - return f"Error with {url}" - - # Extract title from title tag - title_tag = soup.find('title') - title = title_tag.text.strip() if title_tag else "No title" - - # Alternative: Also look for h1 tags if title tag is missing - if title == "No title": - h1_tag = soup.find('h1') - if h1_tag: - title = h1_tag.text.strip() - - return title - - def _extractMainContent(self, soup: BeautifulSoup, max_chars: int = 50000) -> str: - """Extract the main content from an HTML page with enhanced content detection""" - if not soup: - return "" - - # Try to find main content elements in priority order with more selectors - main_content = None - content_selectors = [ - 'main', - 'article', - '#content', - '.content', - '#main', - '.main', - '.post-content', - '.entry-content', - '.article-content', - '.page-content', - '[role="main"]', - '.container', - '.wrapper' - ] - - for selector in content_selectors: - content = soup.select_one(selector) - if content: - main_content = content - logger.debug(f"Found main content using selector: {selector}") - break - - # If no main content found, use the body - if not main_content: - main_content = soup.find('body') or soup - logger.debug("Using body as main content") - - # Safely copy the main_content element - if main_content is None: - return "" - try: - content_copy = copy.copy(main_content) - except Exception: - content_copy = main_content - - # Remove elements that don't contribute to main content (less aggressive) - elements_to_remove = [ - 'script', 'style', 'noscript', - 'nav', 'footer', 'header', 'aside', - '.sidebar', '#sidebar', '.comments', '#comments', - '.advertisement', '.ads', '.ad', '.banner', - 'iframe', '.social-share', '.share-buttons', - '.breadcrumb', '.breadcrumbs', '.pagination', - '.related-posts', '.related-articles', - '.newsletter', '.subscribe', '.signup', - '.cookie-notice', '.privacy-notice', - '.popup', '.modal', '.overlay' - ] - - for selector in elements_to_remove: - for element in content_copy.select(selector): - element.extract() - - # Extract text content with better formatting - text_content = content_copy.get_text(separator='\n', strip=True) - - # Clean up the text - lines = text_content.split('\n') - cleaned_lines = [] - - for line in lines: - line = line.strip() - if line and len(line) > 10: # Only keep meaningful lines - cleaned_lines.append(line) - - # Join lines with proper spacing - cleaned_content = '\n\n'.join(cleaned_lines) - - # If content is too short, try alternative extraction - if len(cleaned_content) < 500: - logger.debug("Content too short, trying alternative extraction...") - - # Try to extract from all paragraphs - paragraphs = soup.find_all(['p', 'div', 'section']) - alt_content = [] - - for p in paragraphs: - text = p.get_text(strip=True) - if text and len(text) > 20: # Only meaningful paragraphs - alt_content.append(text) - - if alt_content: - cleaned_content = '\n\n'.join(alt_content[:20]) # Limit to first 20 paragraphs - - # Limit to max_chars but preserve complete sentences - if len(cleaned_content) > max_chars: - # Try to cut at a sentence boundary - sentences = cleaned_content.split('. ') - truncated_content = "" - - for sentence in sentences: - if len(truncated_content + sentence) < max_chars: - truncated_content += sentence + ". " - else: - break - - cleaned_content = truncated_content.strip() - - logger.debug(f"Extracted {len(cleaned_content)} characters of content") - return cleaned_content - - def _checkAccessibility(self, soup: BeautifulSoup) -> Dict[str, Any]: - """Check basic accessibility features""" - issues = [] - warnings = [] - - # Check for alt text on images - images_without_alt = soup.find_all('img', alt='') - if images_without_alt: - issues.append(f"Found {len(images_without_alt)} images without alt text") - - # Check for proper heading structure - headings = soup.find_all(['h1', 'h2', 'h3', 'h4', 'h5', 'h6']) - if not headings: - warnings.append("No headings found - poor document structure") - - # Check for form labels - forms = soup.find_all('form') - for form in forms: - inputs = form.find_all('input') - for input_elem in inputs: - if input_elem.get('type') not in ['submit', 'button', 'hidden']: - if not input_elem.get('id') or not soup.find('label', attrs={'for': input_elem.get('id')}): - warnings.append("Form input without proper label") - - return { - "status": "warning" if warnings else "pass", - "issues": issues, - "warnings": warnings - } - - def _checkSEO(self, soup: BeautifulSoup) -> Dict[str, Any]: - """Check basic SEO features""" - issues = [] - warnings = [] - - # Check for title tag - title = soup.find('title') - if not title: - issues.append("Missing title tag") - elif len(title.get_text()) < 10: - warnings.append("Title tag is too short") - elif len(title.get_text()) > 60: - warnings.append("Title tag is too long") - - # Check for meta description - meta_desc = soup.find('meta', attrs={'name': 'description'}) - if not meta_desc: - warnings.append("Missing meta description") - elif meta_desc.get('content'): - if len(meta_desc.get('content')) < 50: - warnings.append("Meta description is too short") - elif len(meta_desc.get('content')) > 160: - warnings.append("Meta description is too long") - - # Check for h1 tag - h1_tags = soup.find_all('h1') - if not h1_tags: - warnings.append("No H1 tag found") - elif len(h1_tags) > 1: - warnings.append("Multiple H1 tags found") - - return { - "status": "warning" if warnings else "pass", - "issues": issues, - "warnings": warnings - } - - def _checkPerformance(self, soup: BeautifulSoup, url: str) -> Dict[str, Any]: - """Check basic performance indicators""" - warnings = [] - - # Count images - images = soup.find_all('img') - if len(images) > 20: - warnings.append(f"Many images found ({len(images)}) - may impact loading speed") - - # Check for external resources - external_scripts = soup.find_all('script', src=True) - external_styles = soup.find_all('link', rel='stylesheet') - - if len(external_scripts) > 10: - warnings.append(f"Many external scripts ({len(external_scripts)}) - may impact loading speed") - - if len(external_styles) > 5: - warnings.append(f"Many external stylesheets ({len(external_styles)}) - may impact loading speed") - - return { - "status": "warning" if warnings else "pass", - "warnings": warnings, - "metrics": { - "images": len(images), - "external_scripts": len(external_scripts), - "external_styles": len(external_styles) - } - } - - def _detectJavaScriptRendering(self, soup: BeautifulSoup) -> bool: - """Detect if a page likely requires JavaScript rendering""" - if not soup: - return False - - # Check for common indicators of JavaScript-rendered content - indicators = [ - # Angular, React, Vue indicators - soup.find('div', {'ng-app': True}), - soup.find('div', {'id': 'root'}), - soup.find('div', {'id': 'app'}), - soup.find('div', {'id': 'react-root'}), - - # SPA indicators - soup.find('div', {'id': 'spa-root'}), - soup.find('div', {'class': 'spa-container'}), - - # Modern framework indicators - soup.find('div', {'data-reactroot': True}), - soup.find('div', {'data-ng-controller': True}), - - # Empty content with scripts - len(soup.get_text(strip=True)) < 100 and len(soup.find_all('script')) > 2 - ] - - return any(indicators) - - def _extractMetaInformation(self, soup: BeautifulSoup, url: str) -> Dict[str, Any]: - """Extract meta information from the page""" - meta_info = { - "url": url, - "title": self._extractTitle(soup, url), - "description": "", - "keywords": "", - "author": "", - "language": "", - "robots": "", - "viewport": "", - "charset": "", - "canonical": "" - } - - # Extract meta tags - meta_tags = soup.find_all('meta') - for meta in meta_tags: - name = meta.get('name', '').lower() - property = meta.get('property', '').lower() - content = meta.get('content', '') - - if name == 'description' or property == 'og:description': - meta_info['description'] = content - elif name == 'keywords': - meta_info['keywords'] = content - elif name == 'author': - meta_info['author'] = content - elif name == 'language': - meta_info['language'] = content - elif name == 'robots': - meta_info['robots'] = content - elif name == 'viewport': - meta_info['viewport'] = content - elif property == 'og:title': - meta_info['title'] = content - elif property == 'og:url': - meta_info['canonical'] = content - - # Extract charset - charset_meta = soup.find('meta', charset=True) - if charset_meta: - meta_info['charset'] = charset_meta.get('charset', '') - - # Extract canonical URL - canonical_link = soup.find('link', rel='canonical') - if canonical_link: - meta_info['canonical'] = canonical_link.get('href', '') - - return meta_info - - def _getAlternativeApproaches(self, url: str, requires_js: bool, content_length: int) -> List[str]: - """Get alternative approaches for sites that are difficult to crawl""" - approaches = [] - - if requires_js: - approaches.extend([ - "Site requires JavaScript rendering - consider using a headless browser", - "Try accessing the site's API endpoints directly", - "Look for RSS feeds or sitemaps", - "Check if the site has a mobile version that's easier to parse" - ]) - - if content_length < 100: - approaches.extend([ - "Site may have anti-bot protection - try with different user agents", - "Check if the site requires authentication", - "Look for alternative URLs (www vs non-www, http vs https)", - "Try accessing the site's robots.txt for crawling guidelines" - ]) - - # Add general suggestions - approaches.extend([ - "Use the web.search action to find alternative sources", - "Try the web.scrape action with specific CSS selectors", - "Check if the site has a public API or data export" - ]) - - return approaches + self.description = "Web search, crawling, and scraping operations using Tavily" @action async def search(self, parameters: Dict[str, Any]) -> ActionResult: - """ - Perform a web search and output a .txt file with a plain list of URLs (one per line). - + """Perform a web search and outputs a csv file with a list of found URLs + + Each result contains columns "url" and "title". + Parameters: query (str): Search query to perform maxResults (int, optional): Maximum number of results (default: 10) - filter (str, optional): Filter criteria for search results + """ + + try: + # Prepare request data + web_search_request = WebSearchRequest( + query=parameters.get("query"), + max_results=parameters.get("maxResults", 10), + ) + + # Perform request + web_interface = await WebInterface.create() + web_search_result = await web_interface.search(web_search_request) + + # Convert search results to CSV format + if web_search_result.success and web_search_result.documents: + csv_content = web_interface.convert_web_search_result_to_csv(web_search_result) + + # Create CSV document + csv_document = web_interface.create_csv_action_document( + csv_content, + f"web_search_results.csv" + ) + + return ActionResult( + success=True, + documents=[csv_document] + ) + else: + return web_search_result + + except Exception as e: + return ActionResult(success=False, error=str(e)) + + + + def _read_csv_with_urls(self, csv_content: str) -> list: + """Read CSV content and extract URLs from url,title or title,url format (both ; and , delimiters)""" + urls = [] + + # Try both semicolon and comma delimiters + for delimiter in [';', ',']: + try: + reader = csv.DictReader(io.StringIO(csv_content), delimiter=delimiter) + for row in reader: + # Look for url column (case insensitive) + url = None + for key in row.keys(): + if key.lower() == 'url': + url = row[key].strip() + break + + if url and (url.startswith('http://') or url.startswith('https://')): + urls.append(url) + + # If we found URLs with this delimiter, return them + if urls: + return urls + + except Exception: + # Try next delimiter + continue + + # If no valid CSV found, try simple text parsing as fallback + lines = csv_content.split('\n') + for line in lines: + line = line.strip() + if line and (line.startswith('http://') or line.startswith('https://')): + urls.append(line) + + return urls + + @action + async def crawl(self, parameters: Dict[str, Any]) -> ActionResult: + """Crawls a list of URLs and extracts information from them. + + Parameters: + documentList (str): Document list reference containing URL lists from search results expectedDocumentFormats (list, optional): Expected document formats with extension, mimeType, description """ + try: + document_list = parameters.get("documentList") + + if not document_list: + return ActionResult( + success=False, error="No document list reference provided." + ) + + # Resolve document list reference to ChatDocument objects + chat_documents = self.service.getChatDocumentsFromDocumentList(document_list) + + if not chat_documents: + return ActionResult( + success=False, + error=f"No documents found for reference: {document_list}", + ) + + # Extract URLs from all documents and combine them + all_urls = [] + import json + import re + + for i, doc in enumerate(chat_documents): + logger.info(f"Processing document {i+1}/{len(chat_documents)}: {doc.fileName}") + + # Get file data using the service center + file_data = self.service.getFileData(doc.fileId) + if not file_data: + logger.warning(f"Could not retrieve file data for document: {doc.fileName}") + continue + + content = file_data.decode("utf-8") + + # Try to parse as CSV first (for new CSV format) + if doc.fileName.lower().endswith('.csv') or 'csv' in doc.mimeType.lower(): + logger.info(f"Processing CSV file: {doc.fileName}") + doc_urls = self._read_csv_with_urls(content) + else: + # Parse JSON to extract URLs from search results + try: + # The document structure from WebSearchActionResult + search_data = json.loads(content) + + # Extract URLs from the search results structure + doc_urls = [] + if isinstance(search_data, dict): + # Handle the document structure: documentData contains the actual search results + doc_data = search_data.get("documentData", search_data) + if "results" in doc_data and isinstance(doc_data["results"], list): + doc_urls = [ + result["url"] + for result in doc_data["results"] + if isinstance(result, dict) and "url" in result + ] + elif "urls" in doc_data and isinstance(doc_data["urls"], list): + # Fallback: if URLs are stored directly in a 'urls' field + doc_urls = [url for url in doc_data["urls"] if isinstance(url, str)] + + # Fallback: try to parse as plain text with regex (for backward compatibility) + if not doc_urls: + logger.warning( + f"Could not extract URLs from JSON structure in {doc.fileName}, trying plain text parsing" + ) + doc_urls = re.split(r"[\n,;]+", content) + doc_urls = [ + u.strip() + for u in doc_urls + if u.strip() + and ( + u.strip().startswith("http://") + or u.strip().startswith("https://") + ) + ] + + except json.JSONDecodeError: + # Fallback to plain text parsing if JSON parsing fails + logger.warning(f"Document {doc.fileName} is not valid JSON, trying plain text parsing") + doc_urls = re.split(r"[\n,;]+", content) + doc_urls = [ + u.strip() + for u in doc_urls + if u.strip() + and ( + u.strip().startswith("http://") + or u.strip().startswith("https://") + ) + ] + + if doc_urls: + all_urls.extend(doc_urls) + logger.info(f"Extracted {len(doc_urls)} URLs from {doc.fileName}") + else: + logger.warning(f"No valid URLs found in document: {doc.fileName}") + + if not all_urls: + return ActionResult( + success=False, error="No valid URLs found in any of the documents." + ) + + # Remove duplicates while preserving order + unique_urls = list(dict.fromkeys(all_urls)) + logger.info(f"Extracted {len(unique_urls)} unique URLs from {len(chat_documents)} documents") + + # Prepare request data + web_crawl_request = WebCrawlRequest(urls=unique_urls) + + # Perform request + web_interface = await WebInterface.create() + web_crawl_result = await web_interface.crawl(web_crawl_request) + + # Convert to proper JSON format + if web_crawl_result.success: + json_content = web_interface.convert_web_result_to_json(web_crawl_result) + json_document = web_interface.create_json_action_document( + json_content, + f"web_crawl_results.json" + ) + return ActionResult( + success=True, + documents=[json_document] + ) + else: + return web_crawl_result + + except Exception as e: + logger.error(f"Error in crawl method: {str(e)}") + return ActionResult(success=False, error=str(e)) + + @action + async def scrape(self, parameters: Dict[str, Any]) -> ActionResult: + """Scrapes web content by searching for URLs and then extracting their content. + + Combines search and crawl operations in one step. + + Parameters: + query (str): Search query to perform + maxResults (int, optional): Maximum number of results (default: 10) + """ try: query = parameters.get("query") max_results = parameters.get("maxResults", 10) - filter_param = parameters.get("filter") - expectedDocumentFormats = parameters.get("expectedDocumentFormats", []) - + if not query: - return ActionResult.isFailure(error="Search query is required") - - if not self.srcApikey: - return ActionResult.isFailure(error="SerpAPI key not configured") - - userLanguage = "en" - if hasattr(self.service, 'user') and hasattr(self.service.user, 'language'): - userLanguage = self.service.user.language - - params = { - "engine": self.srcEngine, - "q": query, - "api_key": self.srcApikey, - "num": min(max_results, self.maxResults), - "hl": userLanguage - } - - if filter_param: - params["filter"] = filter_param - - response = requests.get("https://serpapi.com/search", params=params, timeout=self.timeout) - response.raise_for_status() - search_results = response.json() - results = [] - - if "organic_results" in search_results: - results = search_results["organic_results"][:max_results] - - # Assume 'results' is a list of dicts with 'url' keys - urls = [item['url'] for item in results if 'url' in item and isinstance(item['url'], str)] - url_list_str = "\n".join(urls) - - # Determine output format based on expected formats - output_extension = ".txt" # Default - output_mime_type = "text/plain" # Default - - if expectedDocumentFormats and len(expectedDocumentFormats) > 0: - # Use the first expected format - expected_format = expectedDocumentFormats[0] - output_extension = expected_format.get("extension", ".txt") - output_mime_type = expected_format.get("mimeType", "text/plain") - logger.info(f"Using expected format: {output_extension} ({output_mime_type})") - else: - logger.info("No expected format specified, using default .txt format") - - # Create result data - result_data = { - "query": query, - "maxResults": max_results, - "filter": filter_param, - "totalResults": len(urls), - "urls": urls, - "urlList": url_list_str, - "timestamp": get_utc_timestamp() - } - - return ActionResult( - success=True, - documents=[ - { - "documentName": f"web_search_{self._format_timestamp_for_filename()}{output_extension}", - "documentData": result_data, - "mimeType": output_mime_type - } - ] - ) - - except Exception as e: - logger.error(f"Error searching web: {str(e)}") - return ActionResult( - success=False, - error=str(e) + return ActionResult(success=False, error="Search query is required") + + # Prepare request data + web_scrape_request = WebScrapeRequest( + query=query, + max_results=max_results, ) - def _selenium_extract_content(self, url: str) -> Optional[str]: - """Use Selenium to fetch and extract main content from a JS-heavy page.""" - options = Options() - options.headless = True - options.add_argument('--no-sandbox') - options.add_argument('--disable-dev-shm-usage') - options.add_argument(f'user-agent={self.user_agent}') - try: - driver = webdriver.Chrome(options=options) - driver.set_page_load_timeout(self.timeout) - driver.get(url) - # Wait for body to load - WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.TAG_NAME, "body"))) - html = driver.page_source - driver.quit() - soup = BeautifulSoup(html, 'html.parser') - return self._extractMainContent(soup) - except WebDriverException as e: - logger.warning(f"Selenium failed for {url}: {str(e)}") - return None - except Exception as e: - logger.warning(f"Selenium error for {url}: {str(e)}") - return None + # Perform request + web_interface = await WebInterface.create() + web_scrape_result = await web_interface.scrape(web_scrape_request) - @action - async def crawl(self, parameters: Dict[str, Any]) -> ActionResult: - """ - Crawl a list of URLs provided in a document (.txt) with URLs separated by newline, comma, or semicolon. - - Parameters: - document (str): Document containing URL list - expectedDocumentFormats (list, optional): Expected document formats with extension, mimeType, description - """ - try: - document = parameters.get("document") - expectedDocumentFormats = parameters.get("expectedDocumentFormats", []) - - if not document: - return ActionResult.isFailure(error="No document with URL list provided.") - - # Read the document content - with open(document, "r", encoding="utf-8") as f: - content = f.read() - - # Split URLs by newline, comma, or semicolon - import re - urls = re.split(r'[\n,;]+', content) - urls = [u.strip() for u in urls if u.strip()] - - if not urls: - return ActionResult.isFailure(error="No valid URLs provided in the document.") - - crawl_results = [] - for url in urls: - try: - logger.info(f"Crawling URL: {url}") - # Try Selenium first - content = self._selenium_extract_content(url) - if not content: - # Fallback to requests/BeautifulSoup - soup = self._readUrl(url) - content = self._extractMainContent(soup) - - title = self._extractTitle(BeautifulSoup(content, 'html.parser'), url) if content else "No title" - meta_info = {"url": url, "title": title} - content_length = len(content) if content else 0 - - crawl_results.append({ - "url": url, - "title": title, - "content": content, - "content_length": content_length, - "meta_info": meta_info, - "timestamp": get_utc_timestamp() - }) - logger.info(f"Successfully crawled {url} - extracted {content_length} characters") - - except Exception as e: - logger.error(f"Error crawling web page {url}: {str(e)}") - crawl_results.append({ - "error": str(e), - "url": url, - "suggestions": [ - "Check if the URL is accessible", - "Try with a different user agent", - "Verify the site doesn't block automated access" - ] - }) - - # Determine output format based on expected formats - output_extension = ".json" # Default - output_mime_type = "application/json" # Default - - if expectedDocumentFormats and len(expectedDocumentFormats) > 0: - # Use the first expected format - expected_format = expectedDocumentFormats[0] - output_extension = expected_format.get("extension", ".json") - output_mime_type = expected_format.get("mimeType", "application/json") - logger.info(f"Using expected format: {output_extension} ({output_mime_type})") - else: - logger.info("No expected format specified, using default .json format") - - result_data = { - "urls": urls, - "maxDepth": 1, # Simplified crawl - "includeImages": False, - "followLinks": True, - "crawlResults": crawl_results, - "summary": { - "total_urls": len(urls), - "successful_crawls": len([r for r in crawl_results if "error" not in r]), - "failed_crawls": len([r for r in crawl_results if "error" in r]), - "total_content_chars": sum([r.get("content_length", 0) for r in crawl_results if "content_length" in r]) - }, - "timestamp": get_utc_timestamp() - } - - return ActionResult( - success=True, - documents=[ - { - "documentName": f"web_crawl_{self._format_timestamp_for_filename()}{output_extension}", - "documentData": result_data, - "mimeType": output_mime_type - } - ] - ) - - except Exception as e: - logger.error(f"Error crawling web pages: {str(e)}") - return ActionResult( - success=False, - error=str(e) - ) - - @action - async def scrape(self, parameters: Dict[str, Any]) -> ActionResult: - """ - Scrape specific data from web pages - - Parameters: - url (str): URL to scrape - selectors (Dict[str, str]): CSS selectors for data extraction - format (str, optional): Output format (default: "json") - expectedDocumentFormats (list, optional): Expected document formats with extension, mimeType, description - """ - try: - url = parameters.get("url") - selectors = parameters.get("selectors") - format = parameters.get("format", "json") - expectedDocumentFormats = parameters.get("expectedDocumentFormats", []) - - if not url or not selectors: - return ActionResult( - success=False, - error="URL and selectors are required" + # Convert to proper JSON format + if web_scrape_result.success: + json_content = web_interface.convert_web_result_to_json(web_scrape_result) + json_document = web_interface.create_json_action_document( + json_content, + f"web_scrape_results.json" ) - - # Read the URL - soup = self._readUrl(url) - if not soup: return ActionResult( - success=False, - error="Failed to read URL" + success=True, + documents=[json_document] ) - - extracted_content = {} - - if selectors: - # Extract content using provided selectors - for selector_name, selector in selectors.items(): - elements = soup.select(selector) - if elements: - if format == "text": - extracted_content[selector_name] = [elem.get_text(strip=True) for elem in elements] - elif format == "html": - extracted_content[selector_name] = [str(elem) for elem in elements] - else: - extracted_content[selector_name] = [elem.get_text(strip=True) for elem in elements] - else: - extracted_content[selector_name] = [] else: - # Auto-extract common elements - extracted_content = { - "title": self._extractTitle(soup, url), - "main_content": self._extractMainContent(soup), - "headings": [h.get_text(strip=True) for h in soup.find_all(['h1', 'h2', 'h3'])], - "links": [a.get('href') for a in soup.find_all('a', href=True) if a.get('href').startswith(('http://', 'https://'))], - "images": [img.get('src') for img in soup.find_all('img', src=True)] - } - - scrape_result = { - "url": url, - "selectors": selectors, - "format": format, - "content": extracted_content, - "timestamp": get_utc_timestamp() - } - - # Create result data - result_data = { - "url": url, - "selectors": selectors, - "format": format, - "scrapedData": scrape_result, - "timestamp": get_utc_timestamp() - } - - # Determine output format based on expected formats - output_extension = f".{format}" # Default to format parameter - output_mime_type = "application/json" # Default - - if expectedDocumentFormats and len(expectedDocumentFormats) > 0: - # Use the first expected format - expected_format = expectedDocumentFormats[0] - output_extension = expected_format.get("extension", f".{format}") - output_mime_type = expected_format.get("mimeType", "application/json") - logger.info(f"Using expected format: {output_extension} ({output_mime_type})") - else: - logger.info(f"No expected format specified, using format parameter: {format}") - - return ActionResult( - success=True, - documents=[ - { - "documentName": f"web_scrape_{self._format_timestamp_for_filename()}{output_extension}", - "documentData": result_data, - "mimeType": output_mime_type - } - ] - ) - + return web_scrape_result + except Exception as e: - logger.error(f"Error scraping web page: {str(e)}") - return ActionResult( - success=False, - error=str(e) - ) - + return ActionResult(success=False, error=str(e)) diff --git a/modules/methods/method_web.py b/modules/methods/method_web.py deleted file mode 100644 index ccb0f185..00000000 --- a/modules/methods/method_web.py +++ /dev/null @@ -1,197 +0,0 @@ -import logging -from typing import Any, Dict -from modules.chat.methodBase import MethodBase, action -from modules.interfaces.interfaceChatModel import ActionResult -from modules.interfaces.interface_web_objects import WebInterface -from modules.interfaces.interface_web_model import ( - WebSearchRequest, - WebCrawlRequest, - WebScrapeRequest, -) - - -logger = logging.getLogger(__name__) - - -class MethodWeb(MethodBase): - """Web method implementation for web operations.""" - - def __init__(self, serviceCenter: Any): - super().__init__(serviceCenter) - self.name = "web" - self.description = "Web search, crawling, and scraping operations using Tavily" - - @action - async def search(self, parameters: Dict[str, Any]) -> ActionResult: - """Perform a web search and outputs a .json file with a list of found URLs. - - Each result contains "title" and "url". - - Parameters: - query (str): Search query to perform - maxResults (int, optional): Maximum number of results (default: 10) - """ - # TODO: Fix docstrings - do we need that format for parsing? - - try: - # Prepare request data - web_search_request = WebSearchRequest( - query=parameters.get("query"), - max_results=parameters.get("maxResults", 10), - ) - - # Perform request - web_interface = await WebInterface.create() - web_search_result = await web_interface.search(web_search_request) - - return web_search_result - - except Exception as e: - return ActionResult(success=False, error=str(e)) - - @action - async def crawl(self, parameters: Dict[str, Any]) -> ActionResult: - """Crawls a list of URLs and extracts information from them. - - Parameters: - document (str): Document reference containing URL list from search results - expectedDocumentFormats (list, optional): Expected document formats with extension, mimeType, description - """ - try: - document_ref = parameters.get("document") - - if not document_ref: - return ActionResult( - success=False, error="No document reference provided." - ) - - # Resolve document reference to ChatDocument objects - chat_documents = self.service.getChatDocumentsFromDocumentList( - [document_ref] - ) - - if not chat_documents: - return ActionResult( - success=False, - error=f"No documents found for reference: {document_ref}", - ) - - # Get the first document (search results) - search_doc = chat_documents[0] - - # Get file data using the service center - file_data = self.service.getFileData(search_doc.fileId) - if not file_data: - return ActionResult( - success=False, error="Could not retrieve file data for document" - ) - - content = file_data.decode("utf-8") - - # Parse JSON to extract URLs from search results - import json - - try: - # The document structure from WebSearchActionResult - search_data = json.loads(content) - - # Extract URLs from the search results structure - urls = [] - if isinstance(search_data, dict): - # Handle the document structure: documentData contains the actual search results - doc_data = search_data.get("documentData", search_data) - if "results" in doc_data and isinstance(doc_data["results"], list): - urls = [ - result["url"] - for result in doc_data["results"] - if isinstance(result, dict) and "url" in result - ] - elif "urls" in doc_data and isinstance(doc_data["urls"], list): - # Fallback: if URLs are stored directly in a 'urls' field - urls = [url for url in doc_data["urls"] if isinstance(url, str)] - - # Fallback: try to parse as plain text with regex (for backward compatibility) - if not urls: - logger.warning( - "Could not extract URLs from JSON structure, trying plain text parsing" - ) - import re - - urls = re.split(r"[\n,;]+", content) - urls = [ - u.strip() - for u in urls - if u.strip() - and ( - u.strip().startswith("http://") - or u.strip().startswith("https://") - ) - ] - - except json.JSONDecodeError: - # Fallback to plain text parsing if JSON parsing fails - logger.warning("Document is not valid JSON, trying plain text parsing") - import re - - urls = re.split(r"[\n,;]+", content) - urls = [ - u.strip() - for u in urls - if u.strip() - and ( - u.strip().startswith("http://") - or u.strip().startswith("https://") - ) - ] - - if not urls: - return ActionResult( - success=False, error="No valid URLs found in the document." - ) - - logger.info(f"Extracted {len(urls)} URLs from document: {urls}") - - # Prepare request data - web_crawl_request = WebCrawlRequest(urls=urls) - - # Perform request - web_interface = await WebInterface.create() - web_crawl_result = await web_interface.crawl(web_crawl_request) - - return web_crawl_result - - except Exception as e: - logger.error(f"Error in crawl method: {str(e)}") - return ActionResult(success=False, error=str(e)) - - @action - async def scrape(self, parameters: Dict[str, Any]) -> ActionResult: - """Scrapes web content by searching for URLs and then extracting their content. - - Combines search and crawl operations in one step. - - Parameters: - query (str): Search query to perform - maxResults (int, optional): Maximum number of results (default: 10) - """ - try: - query = parameters.get("query") - max_results = parameters.get("maxResults", 10) - - if not query: - return ActionResult(success=False, error="Search query is required") - - # Prepare request data - web_scrape_request = WebScrapeRequest( - query=query, - max_results=max_results, - ) - - # Perform request - web_interface = await WebInterface.create() - web_scrape_result = await web_interface.scrape(web_scrape_request) - - return web_scrape_result - - except Exception as e: - return ActionResult(success=False, error=str(e)) diff --git a/modules/methods/web/web_search/web_search_base.py b/modules/methods/web/web_search/web_search_base.py deleted file mode 100644 index d655bfd3..00000000 --- a/modules/methods/web/web_search/web_search_base.py +++ /dev/null @@ -1,31 +0,0 @@ -"""Base class for web search classes.""" - -from abc import ABC, abstractmethod -from modules.interfaces.interfaceChatModel import ActionDocument, ActionResult - - -from pydantic import BaseModel, Field -from typing import List - - -class WebSearchRequest(BaseModel): - query: str - max_results: int - - -class WebSearchDocumentData(BaseModel): - title: str - url: str - - -class WebSearchActionDocument(ActionDocument): - documentData: List[WebSearchDocumentData] - - -class WebSearchActionResult(ActionResult): - documents: List[WebSearchActionDocument] = Field(default_factory=list) - - -class WebSearchBase(ABC): - @abstractmethod - async def __call__(self, request: WebSearchRequest) -> WebSearchActionResult: ... diff --git a/modules/methods/web/web_search/web_search_tavily.py b/modules/methods/web/web_search/web_search_tavily.py deleted file mode 100644 index dcbea35c..00000000 --- a/modules/methods/web/web_search/web_search_tavily.py +++ /dev/null @@ -1,70 +0,0 @@ -"""Tavily web search class.""" - -import os -from dataclasses import dataclass -from web_search_base import ( - WebSearchBase, - WebSearchRequest, - WebSearchActionResult, - WebSearchActionDocument, - WebSearchDocumentData, -) - -# from modules.interfaces.interfaceChatModel import ActionResult, ActionDocument -from tavily import AsyncTavilyClient -from modules.shared.timezoneUtils import get_utc_timestamp - - -@dataclass -class WebSearchTavily(WebSearchBase): - client: AsyncTavilyClient = None - - @classmethod - async def create(cls): - return cls(client=AsyncTavilyClient(api_key=os.getenv("TAVILY_API_KEY"))) - - async def __call__(self, request: WebSearchRequest) -> WebSearchActionResult: - """Handles the web search request.""" - # Step 1: Search - try: - search_results = await self._search(request.query, request.max_results) - except Exception as e: - return WebSearchActionResult(success=False, error=str(e)) - - # Step 2: Build ActionResult - try: - result = self._build_action_result(search_results) - except Exception as e: - return WebSearchActionResult(success=False, error=str(e)) - - return result - - async def _search(self, query: str, max_results: int) -> WebSearchActionResult: - """Calls the Tavily API to perform a web search.""" - # Make sure max_results is within the allowed range - if max_results < 0 or max_results > 20: - raise ValueError("max_results must be between 0 and 20") - - # Perform actual API call - response = await self.client.search(query=query, max_results=max_results) - return response["results"] - - def _build_action_result(self, search_results: list) -> WebSearchActionResult: - """Builds the ActionResult from the search results.""" - documents = [] - for result in search_results: - document_name = f"web_search_{get_utc_timestamp()}.txt" - document_data = WebSearchDocumentData( - title=result["title"], url=result["url"] - ) - mime_type = "text/plain" - doc = WebSearchActionDocument( - documentName=document_name, - documentData=document_data, - mimeType=mime_type, - ) - documents.append(doc) - - return WebSearchActionResult( - success=True, documents=documents, resultLabel="web_search_results" - ) diff --git a/notes/changelog.txt b/notes/changelog.txt index 5e017239..fca700df 100644 --- a/notes/changelog.txt +++ b/notes/changelog.txt @@ -19,7 +19,13 @@ TODO - check zusammenfassung von 10 dokumenten >10 MB - test case bewerbung - +# DOCUMENTATION +Design principles +- UI: Module classes for data management (CRUD tables & forms --> formGeneric) +- Basic: All timestamps to be timezone aware fehlerabfangroutinen +- Backend: All external components to attach over connectorXxx --> interfaceXxx --> our codebase +- all model definitions in interfaceXxxModel +- action functions for ai: why to use documentList and not just document as input parameter? --> to have full flexibility to pass either list of documents, or documentList ******************** diff --git a/requirements.txt b/requirements.txt index 75bd81b6..783db728 100644 --- a/requirements.txt +++ b/requirements.txt @@ -72,3 +72,22 @@ chardet>=4.0.0 # For encoding detection ## Testing Dependencies pytest>=8.0.0 pytest-asyncio>=0.21.0 + +## Missing Dependencies for IPython and other tools +decorator>=5.0.0 +jedi>=0.16 +matplotlib-inline>=0.1.0 +stack-data>=0.1.0 +traitlets>=5.0.0 +docutils>=0.13.1 +markdown-it-py>=2.2.0 +keyring>=15.1 +pkginfo>=1.8.1 + +## Missing Dependencies for Panel 1.3.8 +bleach>=4.0.0 +bokeh>=3.2.0,<3.4.0 +linkify-it-py>=1.0.0 +mdit-py-plugins>=0.3.0 +pyviz-comms>=2.0.0 +xyzservices>=2021.09.1 \ No newline at end of file diff --git a/test_ai_calls.md b/test_ai_calls.md new file mode 100644 index 00000000..75ed3310 --- /dev/null +++ b/test_ai_calls.md @@ -0,0 +1,235 @@ +# AI Call Functions Test and Content Size Analysis + +## Overview +This file documents the ServiceCenter AI functions that have risk of delivering too big content, +along with their usage patterns and potential size issues. + +## High-Risk AI Functions + +### 1. summarizeChat() -> callAiTextBasic() +**Location**: gateway/modules/chat/handling/promptFactory.py:122 +**Risk Level**: MEDIUM +**Content**: Entire workflow message history +**Usage**: +```python +messageSummary = await service.summarizeChat(context.workflow.messages) if context.workflow else "" +``` +**Potential Issues**: +- Long conversations can generate very large summaries +- Includes all previous messages in workflow +- No size limits or truncation + +### 2. callAiTextAdvanced() -> interfaceAiCalls.callAiTextAdvanced() +**Risk Level**: HIGH +**Multiple Usage Points**: + +#### A. Task Planning (handlingTasks.py:116) +```python +prompt = await self.service.callAiTextAdvanced(task_planning_prompt) +``` +**Content**: User input + document context + connection context + previous results +**Risk**: VERY HIGH - includes all available documents and context + +#### B. Action Definition (handlingTasks.py:388) +```python +prompt = await self.service.callAiTextAdvanced(action_prompt) +``` +**Content**: Task context + available documents + connections + previous results +**Risk**: HIGH - comprehensive context for action planning + +#### C. Result Review (handlingTasks.py:894) +```python +response = await self.service.callAiTextAdvanced(prompt) +``` +**Content**: Action results + success criteria + context +**Risk**: MEDIUM-HIGH - depends on result size + +#### D. Email Composition (methodOutlook.py:1609) +```python +composed_email = await self.service.interfaceAiCalls.callAiTextAdvanced(ai_prompt) +``` +**Content**: Document content + email requirements +**Risk**: MEDIUM - depends on document size + +#### E. AI Processing (methodAi.py:175) +```python +result = await self.service.callAiTextAdvanced(enhanced_prompt, context) +``` +**Content**: User prompt + extracted document content +**Risk**: HIGH - includes full document content + +### 3. callAiTextBasic() -> interfaceAiCalls.callAiTextBasic() +**Risk Level**: MEDIUM +**Multiple Usage Points**: + +#### A. Document Format Conversion (methodDocument.py:429) +```python +formatted_content = await self.service.callAiTextBasic(ai_prompt, content) +``` +**Content**: Document content + format requirements +**Risk**: MEDIUM - depends on document size + +#### B. HTML Report Generation (methodDocument.py:642) +```python +aiReport = await self.service.callAiTextBasic(aiPrompt, combinedContent) +``` +**Content**: Combined content from multiple documents +**Risk**: HIGH - combines multiple documents + +#### C. AI Processing Fallback (methodAi.py:177) +```python +result = await self.service.callAiTextBasic(enhanced_prompt, context) +``` +**Content**: User prompt + document context +**Risk**: MEDIUM - includes document content + +#### D. Document Content Processing (documentExtraction.py:1459) +```python +processedContent = await self._serviceCenter.callAiTextBasic(aiPrompt, contentToProcess) +``` +**Content**: Document chunks + AI prompt +**Risk**: MEDIUM - processes document chunks + +### 4. extractContentFromDocument() -> documentProcessor.processFileData() +**Risk Level**: HIGH +**Multiple Usage Points**: + +#### A. Document Content Extraction (methodDocument.py:74) +```python +extracted_content = await self.service.extractContentFromDocument( + prompt=aiPrompt, + document=chatDocument +) +``` +**Content**: Full document + extraction prompt +**Risk**: HIGH - processes entire documents + +#### B. HTML Report Generation (methodDocument.py:581) +```python +extracted_content = await self.service.extractContentFromDocument( + prompt="Extract readable text content for HTML report generation", + document=doc +) +``` +**Content**: Full document content +**Risk**: HIGH - processes documents for reports + +#### C. Email Composition (methodOutlook.py:1510) +```python +extracted_content = await self.service.extractContentFromDocument( + prompt="Extract readable text content for email composition", + document=doc +) +``` +**Content**: Full document content +**Risk**: HIGH - processes documents for emails + +#### D. AI Processing (methodAi.py:94) +```python +extracted_content = await self.service.extractContentFromDocument( + prompt=extraction_prompt.strip(), + document=doc +) +``` +**Content**: Full document content +**Risk**: HIGH - processes documents for AI analysis + +## Risk Assessment Summary + +### CRITICAL RISK (Immediate Attention Required) +1. **Task Planning** (handlingTasks.py:116) - Entire workflow context +2. **Action Definition** (handlingTasks.py:388) - Comprehensive context +3. **Document Processing** (all extractContentFromDocument calls) - Full documents +4. **AI Method Processing** (methodAi.py:175) - Document content + context +5. **Report Generation** (methodDocument.py:642) - Multiple documents combined + +### HIGH RISK (Monitor Closely) +1. **Chat Summarization** (promptFactory.py:122) - Message history +2. **Document Format Conversion** (methodDocument.py:429) - Single documents +3. **Email Composition** (methodOutlook.py:1609) - Document content + +## Potential Issues + +### Content Size Problems +- Large documents (PDFs, Word docs, Excel files) can exceed AI model limits +- Combined document content in reports can be massive +- Long conversation histories in chat summarization +- Full workflow context in task planning + +### Performance Issues +- Timeout errors for large content +- Memory issues with large document processing +- API rate limiting with large requests +- Cost implications for large AI calls + +### Error Scenarios +- OpenAI API 400 errors (content too large) +- Timeout errors (processing too slow) +- Memory exhaustion (large document processing) +- Incomplete processing (truncated content) + +## Recommended Solutions + +### 1. Content Size Limits +- Implement maximum content size checks before AI calls +- Truncate large content with appropriate warnings +- Split large documents into chunks + +### 2. Content Filtering +- Remove unnecessary context from prompts +- Filter out large binary content +- Use document summaries instead of full content + +### 3. Chunking Strategy +- Process large documents in smaller chunks +- Implement progressive processing +- Use streaming for large responses + +### 4. Caching and Optimization +- Cache processed document content +- Reuse extracted content across operations +- Implement smart content selection + +### 5. Error Handling +- Graceful degradation for oversized content +- Fallback strategies for failed AI calls +- User notifications for content size issues + +## Test Scenarios + +### Test Case 1: Large Document Processing +- Upload a 10MB PDF document +- Try to extract content for AI processing +- Monitor for size limit errors + +### Test Case 2: Multiple Document Reports +- Upload 5+ large documents +- Generate HTML report +- Check for combined content size issues + +### Test Case 3: Long Conversation History +- Create workflow with 50+ messages +- Test chat summarization +- Monitor for context size limits + +### Test Case 4: Task Planning with Large Context +- Create workflow with many documents +- Test task planning functionality +- Check for prompt size limits + +## Monitoring Recommendations + +1. **Log Content Sizes**: Track the size of content sent to AI functions +2. **Monitor API Errors**: Watch for 400 errors indicating content too large +3. **Performance Metrics**: Track processing times for large content +4. **User Feedback**: Monitor for incomplete or failed operations +5. **Cost Tracking**: Monitor AI API costs for large requests + +## Implementation Priority + +1. **Immediate**: Add content size checks to extractContentFromDocument +2. **High**: Implement chunking for large document processing +3. **Medium**: Add content filtering to task planning prompts +4. **Low**: Implement caching for processed content + +This analysis should help identify and mitigate the risks of delivering too big content to AI functions. diff --git a/test_ai_fallback.py b/test_ai_fallback.py new file mode 100644 index 00000000..fc320229 --- /dev/null +++ b/test_ai_fallback.py @@ -0,0 +1,103 @@ +#!/usr/bin/env python3 +""" +Test script to verify AI fallback mechanism from Basic to Advanced when context length is exceeded. +""" + +import asyncio +import logging +from modules.interfaces.interfaceAiCalls import AiCalls +from modules.connectors.connectorAiOpenai import ContextLengthExceededException + +# Configure logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +async def test_context_length_fallback(): + """Test the fallback mechanism when context length is exceeded""" + + # Create AI calls instance + ai_calls = AiCalls() + + # Create a very large context that would exceed OpenAI's context limit + large_context = "This is a test context. " * 10000 # Create a large context + prompt = "Please summarize this context in one sentence." + + logger.info("Testing AI Basic with large context (should trigger fallback)...") + + try: + # This should trigger the context length exceeded error and fallback to Advanced + result = await ai_calls.callAiTextBasic(prompt, large_context) + logger.info(f"✅ Fallback successful! Result: {result[:100]}...") + return True + except Exception as e: + logger.error(f"❌ Test failed: {str(e)}") + return False + +async def test_direct_context_length_exception(): + """Test that the ContextLengthExceededException is properly raised""" + + from modules.connectors.connectorAiOpenai import AiOpenai + + logger.info("Testing direct ContextLengthExceededException...") + + try: + # Create OpenAI connector + openai_connector = AiOpenai() + + # Create messages that would exceed context length + large_messages = [ + {"role": "user", "content": "Test message. " * 50000} # Very large message + ] + + # This should raise ContextLengthExceededException + await openai_connector.callAiBasic(large_messages) + logger.error("❌ Expected ContextLengthExceededException but none was raised") + return False + + except ContextLengthExceededException as e: + logger.info(f"✅ ContextLengthExceededException properly raised: {str(e)}") + return True + except Exception as e: + logger.error(f"❌ Unexpected exception: {str(e)}") + return False + +async def main(): + """Run all tests""" + logger.info("Starting AI fallback mechanism tests...") + + tests = [ + ("Context Length Fallback", test_context_length_fallback), + ("Direct Exception Test", test_direct_context_length_exception), + ] + + results = [] + for test_name, test_func in tests: + logger.info(f"\n--- Running {test_name} ---") + try: + result = await test_func() + results.append((test_name, result)) + except Exception as e: + logger.error(f"Test {test_name} crashed: {str(e)}") + results.append((test_name, False)) + + # Summary + logger.info("\n" + "="*50) + logger.info("TEST SUMMARY") + logger.info("="*50) + + passed = 0 + for test_name, result in results: + status = "✅ PASSED" if result else "❌ FAILED" + logger.info(f"{test_name}: {status}") + if result: + passed += 1 + + logger.info(f"\nTotal: {passed}/{len(results)} tests passed") + + if passed == len(results): + logger.info("🎉 All tests passed! Fallback mechanism is working correctly.") + else: + logger.warning("⚠️ Some tests failed. Please check the implementation.") + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/test_methodWeb.py b/test_methodWeb.py new file mode 100644 index 00000000..a4578418 --- /dev/null +++ b/test_methodWeb.py @@ -0,0 +1,658 @@ +#!/usr/bin/env python3 +""" +Test script for MethodWeb class. +Tests all web actions: search, crawl, and scrape with various parameter sets. + +Features: +- Tests web search functionality with different queries +- Tests web crawling with URL lists +- Tests web scraping (search + crawl combined) +- Detailed logging and progress tracking +- Error handling and validation testing +- Configuration validation + +Usage: +- Interactive mode: python test_methodWeb.py +- Automated mode: python test_methodWeb.py --auto +- Verbose mode: python test_methodWeb.py --verbose +""" + +import os +import asyncio +import logging +import sys +import argparse +import json +from pathlib import Path +from typing import List, Optional, Dict, Any +from datetime import datetime, UTC + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + +# Filter out specific unwanted log messages +class LogFilter(logging.Filter): + """Filter to hide specific unwanted log messages.""" + + def filter(self, record): + # Hide HTTP request info messages + if "HTTP Request:" in record.getMessage() and "POST https://api.tavily.com" in record.getMessage(): + return False + + # Hide HTTP response info messages + if "HTTP/1.1 200 OK" in record.getMessage(): + return False + + return True + +# Apply the filter to the root logger +root_logger = logging.getLogger() +root_logger.addFilter(LogFilter()) + +def check_dependencies(): + """Check if required dependencies are available.""" + missing_deps = [] + + # Check for required dependencies + try: + import tavily + logger.info("tavily-python is available") + except ImportError: + missing_deps.append("tavily-python") + logger.error("tavily-python is missing") + + try: + import httpx + logger.info("httpx is available") + except ImportError: + missing_deps.append("httpx") + logger.error("httpx is missing") + + if missing_deps: + logger.error("\n" + "="*60) + logger.error("MISSING DEPENDENCIES DETECTED!") + logger.error("="*60) + logger.error("The following packages are required but not installed:") + for dep in missing_deps: + logger.error(f" - {dep}") + logger.error("\nTo install all dependencies, run:") + logger.error("pip install -r requirements.txt") + logger.error("="*60) + return False + + logger.info("All required dependencies are available!") + return True + +def check_module_imports(): + """Check if we can import the required modules.""" + try: + # Add the gateway directory to the path so we can import our modules + sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', '..')) + + from modules.methods.methodWeb import MethodWeb + from modules.chat.serviceCenter import ServiceCenter + from modules.interfaces.interfaceAppModel import User, UserConnection, UserPrivilege, AuthAuthority + from modules.interfaces.interfaceChatModel import ChatWorkflow, TaskItem, TaskStatus + from modules.shared.configuration import APP_CONFIG + + logger.info("All required modules imported successfully") + return True + except ImportError as e: + logger.error(f"Failed to import required modules: {e}") + logger.error("Make sure you're running this script from the gateway directory") + return False + except Exception as e: + logger.error(f"Unexpected error importing modules: {e}") + return False + +def check_configuration(): + """Check if required configuration is available.""" + try: + from modules.shared.configuration import APP_CONFIG + + # Check Tavily API key + tavily_api_key = APP_CONFIG.get("Connector_WebTavily_API_KEY") + if not tavily_api_key or tavily_api_key == "your_tavily_api_key_here": + logger.error("Tavily API key not configured!") + logger.error("Please set Connector_WebTavily_API_KEY in config.ini") + return False + + logger.info("Tavily API key is configured") + + # Check other web configuration + web_configs = [ + "Web_Search_MAX_QUERY_LENGTH", + "Web_Search_MAX_RESULTS", + "Web_Search_MIN_RESULTS", + "Web_Crawl_TIMEOUT", + "Web_Crawl_MAX_RETRIES", + "Web_Crawl_RETRY_DELAY" + ] + + for config_key in web_configs: + value = APP_CONFIG.get(config_key) + if value: + logger.info(f"Configuration {config_key}: {value}") + else: + logger.warning(f"Configuration {config_key} not set, using default") + + return True + + except Exception as e: + logger.error(f"Failed to check configuration: {e}") + return False + +def create_mock_service_center(): + """Create a proper ServiceCenter for testing purposes.""" + try: + from modules.chat.serviceCenter import ServiceCenter + from modules.interfaces.interfaceAppModel import User, UserPrivilege, AuthAuthority + from modules.interfaces.interfaceChatModel import ChatWorkflow, TaskItem, TaskStatus + from modules.interfaces.interfaceChatModel import ChatLog, ChatMessage, ChatStat + + # Create proper user with all required fields + mock_user = User( + id="test_user_web_001", + username="testuser_web", + email="testweb@example.com", + fullName="Test Web User", + language="en", + enabled=True, + privilege=UserPrivilege.USER, + authenticationAuthority=AuthAuthority.LOCAL, + mandateId="test_mandate_web_001" + ) + + # Create proper workflow with all required fields + current_time = datetime.now(UTC).timestamp() + mock_workflow = ChatWorkflow( + id="test_workflow_web_001", + mandateId="test_mandate_web_001", + status="active", + name="Test Web Method Workflow", + currentRound=1, + lastActivity=current_time, + startedAt=current_time, + logs=[], + messages=[], + stats=None, + tasks=[] + ) + + # Create service center + service_center = ServiceCenter(mock_user, mock_workflow) + logger.info("ServiceCenter created successfully for web testing") + return service_center + + except Exception as e: + logger.error(f"Failed to create ServiceCenter: {e}") + return None + +class MethodWebTester: + """Test class for MethodWeb functionality.""" + + def __init__(self): + """Initialize the tester.""" + self.method_web = None + self.service_center = None + + # Test results tracking + self.test_results = [] + + # Test parameter sets + self.test_queries = [ + "Python programming tutorial", + "FastAPI documentation", + "machine learning basics", + "web scraping best practices" + ] + + self.test_urls = [ + "https://docs.python.org/3/tutorial/", + "https://fastapi.tiangolo.com/", + "https://scikit-learn.org/stable/", + "https://requests.readthedocs.io/en/latest/" + ] + + def initialize_method_web(self): + """Initialize the MethodWeb instance with a proper ServiceCenter.""" + try: + # First create the service center + self.service_center = create_mock_service_center() + if not self.service_center: + logger.error("Failed to create ServiceCenter!") + return False + + # Now create MethodWeb with the service center + from modules.methods.methodWeb import MethodWeb + self.method_web = MethodWeb(self.service_center) + logger.info("MethodWeb initialized successfully with ServiceCenter") + return True + except Exception as e: + logger.error(f"Failed to initialize MethodWeb: {e}") + return False + + async def test_search_action(self, query: str, max_results: int = 5) -> Dict[str, Any]: + """Test the search action with given parameters.""" + logger.info(f"Testing search action with query: '{query}', max_results: {max_results}") + + try: + parameters = { + "query": query, + "maxResults": max_results + } + + result = await self.method_web.search(parameters) + + test_result = { + "action": "search", + "query": query, + "max_results": max_results, + "success": result.success, + "error": result.error if not result.success else None, + "documents_count": len(result.documents) if result.documents else 0, + "result_label": result.resultLabel if hasattr(result, 'resultLabel') else None + } + + if result.success: + logger.info(f"✅ Search successful: {test_result['documents_count']} documents returned") + if result.documents: + for i, doc in enumerate(result.documents): + logger.info(f" Document {i+1}: {doc.documentName}") + if hasattr(doc, 'documentData') and hasattr(doc.documentData, 'results'): + logger.info(f" Results count: {len(doc.documentData.results)}") + else: + logger.error(f"❌ Search failed: {result.error}") + + return test_result + + except Exception as e: + error_msg = str(e) + logger.error(f"❌ Search action exception: {error_msg}") + return { + "action": "search", + "query": query, + "max_results": max_results, + "success": False, + "error": f"Exception: {error_msg}", + "documents_count": 0, + "result_label": None + } + + async def test_crawl_action(self, urls: List[str]) -> Dict[str, Any]: + """Test the crawl action with given URLs.""" + logger.info(f"Testing crawl action with {len(urls)} URLs") + + try: + # Monkeypatch the service to return a mock document and file data + class _MockDoc: + def __init__(self, fileId: str, fileName: str = "mock_search_results.json"): + self.fileId = fileId + self.fileName = fileName + + def _mock_get_docs(_doc_ids): + return [_MockDoc(fileId="mock_file_id", fileName="mock_search_results.json")] + + # Build minimal JSON structure expected by methodWeb.crawl + mock_payload = { + "documentData": { + "results": [{"url": u} for u in urls] + } + } + + def _mock_get_file_data(_file_id): + return json.dumps(mock_payload).encode("utf-8") + + # Apply monkeypatches to the method's service + self.method_web.service.getChatDocumentsFromDocumentList = _mock_get_docs + self.method_web.service.getFileData = _mock_get_file_data + + # Use any string as the document list reference; service is mocked + parameters = {"documentList": "mock_document_list_ref"} + + result = await self.method_web.crawl(parameters) + + test_result = { + "action": "crawl", + "urls_count": len(urls), + "success": result.success, + "error": result.error if not result.success else None, + "documents_count": len(result.documents) if result.documents else 0, + "result_label": result.resultLabel if hasattr(result, 'resultLabel') else None + } + + if result.success: + logger.info(f"✅ Crawl successful: {test_result['documents_count']} documents returned") + if result.documents: + for i, doc in enumerate(result.documents): + logger.info(f" Document {i+1}: {doc.documentName}") + else: + logger.error(f"❌ Crawl failed: {result.error}") + + return test_result + + except Exception as e: + error_msg = str(e) + logger.error(f"❌ Crawl action exception: {error_msg}") + return { + "action": "crawl", + "urls_count": len(urls), + "success": False, + "error": f"Exception: {error_msg}", + "documents_count": 0, + "result_label": None + } + + async def test_scrape_action(self, query: str, max_results: int = 3) -> Dict[str, Any]: + """Test the scrape action (search + crawl combined) with given parameters.""" + logger.info(f"Testing scrape action with query: '{query}', max_results: {max_results}") + + try: + parameters = { + "query": query, + "maxResults": max_results + } + + result = await self.method_web.scrape(parameters) + + test_result = { + "action": "scrape", + "query": query, + "max_results": max_results, + "success": result.success, + "error": result.error if not result.success else None, + "documents_count": len(result.documents) if result.documents else 0, + "result_label": result.resultLabel if hasattr(result, 'resultLabel') else None + } + + if result.success: + logger.info(f"✅ Scrape successful: {test_result['documents_count']} documents returned") + if result.documents: + for i, doc in enumerate(result.documents): + logger.info(f" Document {i+1}: {doc.documentName}") + if hasattr(doc, 'documentData') and hasattr(doc.documentData, 'results'): + logger.info(f" Results count: {len(doc.documentData.results)}") + else: + logger.error(f"❌ Scrape failed: {result.error}") + + return test_result + + except Exception as e: + error_msg = str(e) + logger.error(f"❌ Scrape action exception: {error_msg}") + return { + "action": "scrape", + "query": query, + "max_results": max_results, + "success": False, + "error": f"Exception: {error_msg}", + "documents_count": 0, + "result_label": None + } + + async def test_parameter_validation(self) -> List[Dict[str, Any]]: + """Test parameter validation with invalid inputs.""" + logger.info("Testing parameter validation with invalid inputs") + + validation_tests = [] + + # Test 1: Empty query + logger.info("Test 1: Empty query") + result = await self.test_search_action("", 5) + # For validation tests, we expect the request to fail with validation error + if not result["success"] and "validation error" in result.get("error", "").lower(): + result["success"] = True # Mark as successful validation test + result["validation_test"] = True + result["expected_behavior"] = "Correctly rejected empty query" + logger.info("✅ Validation test PASSED: Empty query correctly rejected") + validation_tests.append(result) + + # Test 2: Query too long (over 400 characters) + long_query = "a" * 500 + logger.info("Test 2: Query too long") + result = await self.test_search_action(long_query, 5) + if not result["success"] and "validation error" in result.get("error", "").lower(): + result["success"] = True # Mark as successful validation test + result["validation_test"] = True + result["expected_behavior"] = "Correctly rejected overly long query" + logger.info("✅ Validation test PASSED: Long query correctly rejected") + validation_tests.append(result) + + # Test 3: Max results too high + logger.info("Test 3: Max results too high") + result = await self.test_search_action("test", 25) + if not result["success"] and "validation error" in result.get("error", "").lower(): + result["success"] = True # Mark as successful validation test + result["validation_test"] = True + result["expected_behavior"] = "Correctly rejected excessive max results" + logger.info("✅ Validation test PASSED: High max results correctly rejected") + validation_tests.append(result) + + # Test 4: Max results too low + logger.info("Test 4: Max results too low") + result = await self.test_search_action("test", 0) + if not result["success"] and "validation error" in result.get("error", "").lower(): + result["success"] = True # Mark as successful validation test + result["validation_test"] = True + result["expected_behavior"] = "Correctly rejected zero max results" + logger.info("✅ Validation test PASSED: Zero max results correctly rejected") + validation_tests.append(result) + + return validation_tests + + async def run_all_tests(self) -> None: + """Run all web method tests.""" + logger.info("Starting MethodWeb comprehensive tests") + logger.info("=" * 60) + + # Initialize the method + if not self.initialize_method_web(): + logger.error("Cannot proceed without MethodWeb!") + return + + # Test 1: Search actions with different queries + logger.info("\n" + "=" * 60) + logger.info("TEST 1: SEARCH ACTIONS") + logger.info("=" * 60) + + for i, query in enumerate(self.test_queries): + logger.info(f"\nSearch test {i+1}/{len(self.test_queries)}") + result = await self.test_search_action(query, 3) + self.test_results.append(result) + await asyncio.sleep(1) # Rate limiting + + # Test 2: Scrape actions (search + crawl combined) + logger.info("\n" + "=" * 60) + logger.info("TEST 2: SCRAPE ACTIONS") + logger.info("=" * 60) + + scrape_queries = self.test_queries[:2] # Use first 2 queries for scraping + for i, query in enumerate(scrape_queries): + logger.info(f"\nScrape test {i+1}/{len(scrape_queries)}") + result = await self.test_scrape_action(query, 2) + self.test_results.append(result) + await asyncio.sleep(2) # Rate limiting for scraping + + # Test 3: Parameter validation + logger.info("\n" + "=" * 60) + logger.info("TEST 3: PARAMETER VALIDATION") + logger.info("=" * 60) + + validation_results = await self.test_parameter_validation() + self.test_results.extend(validation_results) + + # Test 4: Crawl action (if we have search results) + logger.info("\n" + "=" * 60) + logger.info("TEST 4: CRAWL ACTIONS") + logger.info("=" * 60) + + logger.info("Testing crawl with sample URLs") + result = await self.test_crawl_action(self.test_urls[:2]) + self.test_results.append(result) + + # Print comprehensive summary + self.print_test_summary() + + def print_test_summary(self): + """Print comprehensive test summary.""" + logger.info("\n" + "=" * 80) + logger.info("COMPREHENSIVE TEST SUMMARY") + logger.info("=" * 80) + + total_tests = len(self.test_results) + successful_tests = sum(1 for result in self.test_results if result["success"]) + failed_tests = total_tests - successful_tests + + logger.info(f"Total tests run: {total_tests}") + logger.info(f"Successful: {successful_tests}") + logger.info(f"Failed: {failed_tests}") + logger.info(f"Success rate: {(successful_tests/total_tests)*100:.1f}%") + + # Group results by action type + action_groups = {} + for result in self.test_results: + action = result["action"] + if action not in action_groups: + action_groups[action] = [] + action_groups[action].append(result) + + logger.info("\n" + "-" * 80) + logger.info("RESULTS BY ACTION TYPE:") + logger.info("-" * 80) + + for action, results in action_groups.items(): + action_successful = sum(1 for r in results if r["success"]) + action_total = len(results) + logger.info(f"\n{action.upper()} ACTIONS:") + logger.info(f" Total: {action_total}, Successful: {action_successful}, Failed: {action_total - action_successful}") + + for i, result in enumerate(results): + status_icon = "✅" if result["success"] else "❌" + + # Handle validation tests specially + if result.get("validation_test", False): + logger.info(f" {status_icon} Validation Test {i+1}: {result.get('expected_behavior', 'Validation working correctly')}") + if result.get("error"): + logger.info(f" Validation Error: {result['error']}") + elif action == "search": + logger.info(f" {status_icon} Test {i+1}: '{result['query']}' -> {result['documents_count']} docs") + elif action == "scrape": + logger.info(f" {status_icon} Test {i+1}: '{result['query']}' -> {result['documents_count']} docs") + elif action == "crawl": + logger.info(f" {status_icon} Test {i+1}: {result['urls_count']} URLs -> {result['documents_count']} docs") + + if not result["success"] and not result.get("validation_test", False): + logger.info(f" Error: {result['error']}") + + logger.info("\n" + "-" * 80) + logger.info("CONFIGURATION STATUS:") + logger.info("-" * 80) + + try: + from modules.shared.configuration import APP_CONFIG + tavily_key = APP_CONFIG.get("Connector_WebTavily_API_KEY") + if tavily_key and tavily_key != "your_tavily_api_key_here": + logger.info("✅ Tavily API key: Configured") + else: + logger.info("❌ Tavily API key: Not configured") + + web_configs = [ + ("Web_Search_MAX_QUERY_LENGTH", "400"), + ("Web_Search_MAX_RESULTS", "20"), + ("Web_Search_MIN_RESULTS", "1"), + ("Web_Crawl_TIMEOUT", "30"), + ("Web_Crawl_MAX_RETRIES", "3"), + ("Web_Crawl_RETRY_DELAY", "2") + ] + + for config_key, default_value in web_configs: + value = APP_CONFIG.get(config_key, default_value) + logger.info(f"✅ {config_key}: {value}") + + except Exception as e: + logger.error(f"❌ Configuration check failed: {e}") + + logger.info("=" * 80) + +def parse_arguments(): + """Parse command line arguments.""" + parser = argparse.ArgumentParser(description='MethodWeb Test Script') + parser.add_argument('--auto', action='store_true', + help='Run tests automatically without user interaction') + parser.add_argument('--verbose', '-v', action='store_true', + help='Enable verbose logging') + parser.add_argument('--quick', action='store_true', + help='Run quick tests with fewer queries') + + return parser.parse_args() + +async def main(): + """Main function to run the tests.""" + # Parse command line arguments + args = parse_arguments() + + # Set logging level based on verbosity + if args.verbose: + logging.getLogger().setLevel(logging.DEBUG) + else: + logging.getLogger().setLevel(logging.INFO) + + logger.info("MethodWeb Test Script") + logger.info("=" * 50) + + # Check dependencies first + if not check_dependencies(): + logger.error("Please install missing dependencies before running tests.") + return + + # Check module imports + if not check_module_imports(): + logger.error("Cannot import required modules. Please check your setup.") + return + + # Check configuration + if not check_configuration(): + logger.error("Configuration check failed. Please check your config.ini file.") + return + + # Determine test mode + if args.auto: + logger.info("Running in automated mode") + else: + # Interactive mode: ask user for confirmation + print("\n" + "=" * 50) + print("METHODWEB TEST SCRIPT") + print("=" * 50) + print("This script will test the MethodWeb functionality including:") + print("- Web search actions") + print("- Web scraping actions") + print("- Web crawling actions") + print("- Parameter validation") + print("=" * 50) + + try: + choice = input("Do you want to proceed? (y/N): ").strip().lower() + if choice not in ['y', 'yes']: + logger.info("Test cancelled by user") + return + except (EOFError, KeyboardInterrupt): + logger.info("Test cancelled by user") + return + + # Create tester and run tests + tester = MethodWebTester() + + # Modify test queries for quick mode + if args.quick: + tester.test_queries = tester.test_queries[:2] # Use only first 2 queries + logger.info("Running in quick mode with reduced test set") + + await tester.run_all_tests() + +if __name__ == "__main__": + # Run the tests + asyncio.run(main()) diff --git a/test_web_csv_functionality.py b/test_web_csv_functionality.py new file mode 100644 index 00000000..7242c497 --- /dev/null +++ b/test_web_csv_functionality.py @@ -0,0 +1,207 @@ +#!/usr/bin/env python3 +""" +Test script for web CSV functionality +Tests both CSV output generation and CSV input reading +""" + +import sys +import os +import asyncio +from typing import Dict, Any + +# Add the gateway directory to the Python path +sys.path.insert(0, os.path.join(os.path.dirname(__file__))) + +from modules.methods.methodWeb import MethodWeb +from modules.interfaces.interfaceWebModel import WebSearchResultItem, WebSearchDocumentData, WebSearchActionDocument, WebSearchActionResult +from pydantic import HttpUrl + + +def create_mock_web_search_result(): + """Create a mock WebSearchActionResult with the provided example data""" + + # Create mock search result items based on the provided example + results = [ + WebSearchResultItem( + title="Switzerland Market Analysis :: Fitch Solutions", + url=HttpUrl("https://www.fitchsolutions.com/bmi/region/switzerland") + ), + WebSearchResultItem( + title="OECD Economic Outlook, Volume 2024 Issue 2: Switzerland", + url=HttpUrl("https://www.oecd.org/en/publications/2024/12/oecd-economic-outlook-volume-2024-issue-2_67bb8fac/full-report/switzerland_605fd31f.html") + ), + WebSearchResultItem( + title="The economic context of Switzerland - International Trade Portal", + url=HttpUrl("https://www.lloydsbanktrade.com/en/market-potential/switzerland/economical-context") + ), + WebSearchResultItem( + title="Switzerland: Country File, Economic Risk Analysis | Coface", + url=HttpUrl("https://www.coface.com/news-economy-and-insights/business-risk-dashboard/country-risk-files/switzerland") + ), + WebSearchResultItem( + title="Swiss Economic Outlook 2025 - Roland Berger", + url=HttpUrl("https://www.rolandberger.com/en/Insights/Publications/Swiss-Economic-Outlook-2025.html") + ) + ] + + # Create document data + document_data = WebSearchDocumentData( + query="current market trends Switzerland business economy 2024 analysis report", + results=results, + total_count=len(results) + ) + + # Create action document + action_document = WebSearchActionDocument( + documentName="test_search_results.json", + documentData=document_data, + mimeType="application/json" + ) + + # Create action result + action_result = WebSearchActionResult( + success=True, + documents=[action_document] + ) + + return action_result + + +def test_csv_output_generation(): + """Test CSV output generation from web search results""" + print("Testing CSV output generation...") + + # Create method instance (without service center for testing) + method = MethodWeb(None) + + # Create mock search result + mock_result = create_mock_web_search_result() + + # Convert to CSV + csv_content = method._convert_search_results_to_csv(mock_result) + + print("Generated CSV content:") + print(csv_content) + print() + + # Verify CSV format + lines = csv_content.strip().split('\n') + assert len(lines) == 6, f"Expected 6 lines (header + 5 results), got {len(lines)}" + + # Check header + assert lines[0] == "url;title", f"Expected header 'url;title', got '{lines[0]}'" + + # Check that URLs are present + for i, line in enumerate(lines[1:], 1): + parts = line.split(';') + assert len(parts) == 2, f"Line {i} should have 2 parts separated by ';', got {len(parts)}" + url, title = parts + assert url.startswith('https://'), f"Line {i} URL should start with 'https://', got '{url}'" + assert title, f"Line {i} should have a title, got empty title" + + print("✓ CSV output generation test passed!") + return csv_content + + +def test_csv_input_reading(): + """Test CSV input reading functionality""" + print("Testing CSV input reading...") + + # Create method instance + method = MethodWeb(None) + + # Test semicolon-separated CSV + semicolon_csv = """url;title +https://www.fitchsolutions.com/bmi/region/switzerland;Switzerland Market Analysis :: Fitch Solutions +https://www.oecd.org/en/publications/2024/12/oecd-economic-outlook-volume-2024-issue-2_67bb8fac/full-report/switzerland_605fd31f.html;OECD Economic Outlook, Volume 2024 Issue 2: Switzerland +https://www.lloydsbanktrade.com/en/market-potential/switzerland/economical-context;The economic context of Switzerland - International Trade Portal""" + + urls_semicolon = method._read_csv_with_urls(semicolon_csv) + print(f"Extracted {len(urls_semicolon)} URLs from semicolon CSV:") + for url in urls_semicolon: + print(f" - {url}") + + assert len(urls_semicolon) == 3, f"Expected 3 URLs, got {len(urls_semicolon)}" + assert all(url.startswith('https://') for url in urls_semicolon), "All URLs should start with https://" + + print("✓ Semicolon CSV reading test passed!") + + # Test comma-separated CSV + comma_csv = """url,title +https://www.fitchsolutions.com/bmi/region/switzerland,Switzerland Market Analysis :: Fitch Solutions +https://www.oecd.org/en/publications/2024/12/oecd-economic-outlook-volume-2024-issue-2_67bb8fac/full-report/switzerland_605fd31f.html,OECD Economic Outlook, Volume 2024 Issue 2: Switzerland""" + + urls_comma = method._read_csv_with_urls(comma_csv) + print(f"Extracted {len(urls_comma)} URLs from comma CSV:") + for url in urls_comma: + print(f" - {url}") + + assert len(urls_comma) == 2, f"Expected 2 URLs, got {len(urls_comma)}" + assert all(url.startswith('https://') for url in urls_comma), "All URLs should start with https://" + + print("✓ Comma CSV reading test passed!") + + # Test case-insensitive column names + case_insensitive_csv = """URL;Title +https://example.com/test;Test Title""" + + urls_case = method._read_csv_with_urls(case_insensitive_csv) + assert len(urls_case) == 1, f"Expected 1 URL, got {len(urls_case)}" + assert urls_case[0] == "https://example.com/test", f"Expected 'https://example.com/test', got '{urls_case[0]}'" + + print("✓ Case-insensitive CSV reading test passed!") + + +def test_integration(): + """Test the complete integration: generate CSV and then read it back""" + print("Testing integration: generate CSV and read it back...") + + method = MethodWeb(None) + + # Generate CSV from mock data + mock_result = create_mock_web_search_result() + csv_content = method._convert_search_results_to_csv(mock_result) + + # Read URLs back from the generated CSV + extracted_urls = method._read_csv_with_urls(csv_content) + + print(f"Generated CSV with {len(mock_result.documents[0].documentData.results)} results") + print(f"Extracted {len(extracted_urls)} URLs from generated CSV") + + # Verify we got the same number of URLs + assert len(extracted_urls) == len(mock_result.documents[0].documentData.results), \ + f"Expected {len(mock_result.documents[0].documentData.results)} URLs, got {len(extracted_urls)}" + + # Verify URLs match + original_urls = [str(result.url) for result in mock_result.documents[0].documentData.results] + for i, (original, extracted) in enumerate(zip(original_urls, extracted_urls)): + assert original == extracted, f"URL {i} mismatch: expected '{original}', got '{extracted}'" + + print("✓ Integration test passed!") + + +if __name__ == "__main__": + print("Running Web CSV Functionality Tests") + print("=" * 50) + + try: + # Test CSV output generation + csv_content = test_csv_output_generation() + print() + + # Test CSV input reading + test_csv_input_reading() + print() + + # Test integration + test_integration() + print() + + print("=" * 50) + print("🎉 All tests passed successfully!") + + except Exception as e: + print(f"❌ Test failed: {e}") + import traceback + traceback.print_exc() + sys.exit(1) diff --git a/tests/connectors/test_connector_tavily.py b/tests/connectors/test_connector_tavily.py index 81ce64c3..23253cba 100644 --- a/tests/connectors/test_connector_tavily.py +++ b/tests/connectors/test_connector_tavily.py @@ -4,12 +4,12 @@ import pytest import logging from modules.interfaces.interfaceChatModel import ActionResult -from modules.interfaces.interface_web_model import ( +from gateway.modules.interfaces.interfaceWebModel import ( WebSearchRequest, WebCrawlRequest, WebScrapeRequest, ) -from modules.connectors.connector_tavily import ConnectorTavily +from gateway.modules.connectors.connectorWebTavily import ConnectorTavily logger = logging.getLogger(__name__) @@ -23,10 +23,10 @@ async def test_tavily_connector_search_test_live_api(): request = WebSearchRequest(query="How old is the Earth?", max_results=5) # Tavily instance - connector_tavily = await ConnectorTavily.create() + connectorWebTavily = await ConnectorTavily.create() # Search test - action_result = await connector_tavily.search_urls(request=request) + action_result = await connectorWebTavily.search_urls(request=request) # Check results assert isinstance(action_result, ActionResult) @@ -57,10 +57,10 @@ async def test_tavily_connector_crawl_test_live_api(): request = WebCrawlRequest(urls=urls) # Tavily instance - connector_tavily = await ConnectorTavily.create() + connectorWebTavily = await ConnectorTavily.create() # Crawl test - action_result = await connector_tavily.crawl_urls(request=request) + action_result = await connectorWebTavily.crawl_urls(request=request) # Check results assert isinstance(action_result, ActionResult) @@ -87,10 +87,10 @@ async def test_tavily_connector_scrape_test_live_api(): request = WebScrapeRequest(query="How old is the Earth?", max_results=3) # Tavily instance - connector_tavily = await ConnectorTavily.create() + connectorWebTavily = await ConnectorTavily.create() # Scrape test - action_result = await connector_tavily.scrape(request=request) + action_result = await connectorWebTavily.scrape(request=request) # Check results assert isinstance(action_result, ActionResult) diff --git a/tests/methods/test_method_web.py b/tests/methods/test_method_web.py index 2d9305a2..27344ab3 100644 --- a/tests/methods/test_method_web.py +++ b/tests/methods/test_method_web.py @@ -5,7 +5,7 @@ import logging import pytest from unittest.mock import patch -from modules.methods.method_web import MethodWeb +from modules.methods.methodWeb import MethodWeb from tests.fixtures.tavily_responses import ( RESPONSE_SEARCH_HOW_OLD_IS_EARTH_NO_ANSWER, RESPONSE_EXTRACT_HOW_OLD_IS_EARTH_NO_ANSWER, @@ -101,14 +101,14 @@ async def test_method_web_crawl_live(): # Mock the service center methods with patch.object(method_web, "service") as mock_service: mock_service.getChatDocumentsFromDocumentList.return_value = [ - type("MockDoc", (), {"fileId": "test-file-id"})() + type("MockDoc", (), {"fileId": "test-file-id", "fileName": "test-search-results.json"})() ] mock_service.getFileData.return_value = json.dumps(search_results_json).encode( "utf-8" ) # Actual request - action_result = await method_web.crawl({"document": "test-document-ref"}) + action_result = await method_web.crawl({"documentList": "test-document-list-ref"}) # Evaluate results assert action_result.success @@ -153,13 +153,13 @@ async def test_method_web_crawl_dummy(): ) as mock_client, ): mock_service.getChatDocumentsFromDocumentList.return_value = [ - type("MockDoc", (), {"fileId": "test-file-id"})() + type("MockDoc", (), {"fileId": "test-file-id", "fileName": "test-search-results.json"})() ] mock_service.getFileData.return_value = json.dumps(search_results_json).encode( "utf-8" ) - action_result = await method_web.crawl({"document": "test-document-ref"}) + action_result = await method_web.crawl({"documentList": "test-document-list-ref"}) mock_client.assert_called_once() # Evaluate results