diff --git a/config.ini b/config.ini index 780a9e08..ab0b6712 100644 --- a/config.ini +++ b/config.ini @@ -29,4 +29,9 @@ Web_Search_MIN_RESULTS = 1 # Web Crawl configuration Web_Crawl_TIMEOUT = 30 Web_Crawl_MAX_RETRIES = 3 -Web_Crawl_RETRY_DELAY = 2 \ No newline at end of file +Web_Crawl_RETRY_DELAY = 2 + +# Web Research configuration +Web_Research_MAX_DEPTH = 2 +Web_Research_MAX_LINKS_PER_DOMAIN = 4 +Web_Research_CRAWL_TIMEOUT_MINUTES = 10 \ No newline at end of file diff --git a/modules/connectors/connectorAiTavily.py b/modules/connectors/connectorAiTavily.py index f86c49b2..b7631ea3 100644 --- a/modules/connectors/connectorAiTavily.py +++ b/modules/connectors/connectorAiTavily.py @@ -271,6 +271,7 @@ class ConnectorWeb: include_domains: list[str] | None = None, exclude_domains: list[str] | None = None, language: str | None = None, + country: str | None = None, include_answer: bool | None = None, include_raw_content: bool | None = None, ) -> list[WebSearchResult]: @@ -290,17 +291,20 @@ class ConnectorWeb: kwargs["time_range"] = time_range if topic is not None: kwargs["topic"] = topic - if include_domains is not None: + if include_domains is not None and len(include_domains) > 0: kwargs["include_domains"] = include_domains if exclude_domains is not None: kwargs["exclude_domains"] = exclude_domains if language is not None: kwargs["language"] = language + if country is not None: + kwargs["country"] = country if include_answer is not None: kwargs["include_answer"] = include_answer if include_raw_content is not None: kwargs["include_raw_content"] = include_raw_content + logger.debug(f"Tavily.search kwargs: {kwargs}") response = await self.client.search(**kwargs) return [ diff --git a/modules/interfaces/interfaceAiObjects.py b/modules/interfaces/interfaceAiObjects.py index 3e9c744d..4ef6a69a 100644 --- a/modules/interfaces/interfaceAiObjects.py +++ b/modules/interfaces/interfaceAiObjects.py @@ -1,4 +1,5 @@ import logging +import asyncio from typing import Dict, Any, List, Union, Tuple, Optional from dataclasses import dataclass @@ -694,7 +695,22 @@ class AiObjects: logger.warning(f"Failed to extract links from content: {e}") return [] - async def crawlRecursively(self, urls: List[str], max_depth: int, extract_depth: str = "advanced", max_per_domain: int = 10) -> Dict[str, str]: + def _normalizeUrl(self, url: str) -> str: + """Normalize URL to handle variations that should be considered duplicates.""" + if not url: + return url + + # Remove trailing slashes and fragments + url = url.rstrip('/') + if '#' in url: + url = url.split('#')[0] + + # Handle common URL variations + url = url.replace('http://', 'https://') # Normalize protocol + + return url + + async def crawlRecursively(self, urls: List[str], max_depth: int, extract_depth: str = "advanced", max_per_domain: int = 10, global_processed_urls: Optional[set] = None) -> Dict[str, str]: """ Recursively crawl URLs up to specified depth. @@ -703,76 +719,100 @@ class AiObjects: max_depth: Maximum depth to crawl (1=main pages only, 2=main+sub-pages, etc.) extract_depth: Tavily extract depth setting max_per_domain: Maximum URLs per domain per level + global_processed_urls: Optional global set to track processed URLs across sessions Returns: Dictionary mapping URL -> content for all crawled pages """ logger.info(f"Starting recursive crawl: {len(urls)} starting URLs, max_depth={max_depth}") - # URL index to track all processed URLs + # URL index to track all processed URLs (local + global) processed_urls = set() + if global_processed_urls is not None: + # Use global index if provided, otherwise create local one + processed_urls = global_processed_urls + logger.info(f"Using global URL index with {len(processed_urls)} already processed URLs") + else: + logger.info("Using local URL index for this crawl session") + all_content = {} # Current level URLs to process current_level_urls = urls.copy() - for depth in range(1, max_depth + 1): - logger.info(f"=== DEPTH LEVEL {depth}/{max_depth} ===") - logger.info(f"Processing {len(current_level_urls)} URLs at depth {depth}") - - # URLs found at this level (for next iteration) - next_level_urls = [] - - for url in current_level_urls: - if url in processed_urls: - logger.debug(f"URL {url} already processed, skipping") - continue + try: + for depth in range(1, max_depth + 1): + logger.info(f"=== DEPTH LEVEL {depth}/{max_depth} ===") + logger.info(f"Processing {len(current_level_urls)} URLs at depth {depth}") - try: - logger.info(f"Processing URL at depth {depth}: {url}") + # URLs found at this level (for next iteration) + next_level_urls = [] + + for url in current_level_urls: + # Normalize URL for duplicate checking + normalized_url = self._normalizeUrl(url) + if normalized_url in processed_urls: + logger.debug(f"URL {url} (normalized: {normalized_url}) already processed, skipping") + continue - # Read page content - content = await self.readPage(url, extract_depth) - if content: - all_content[url] = content - processed_urls.add(url) - logger.info(f"✓ Successfully processed {url}: {len(content)} chars") + try: + logger.info(f"Processing URL at depth {depth}: {url}") + logger.debug(f"Total processed URLs so far: {len(processed_urls)}") - # Get URLs from this page for next level - page_urls = await self.getUrlsFromPage(url, extract_depth) - logger.info(f"Found {len(page_urls)} URLs on {url}") - - # Filter URLs and add to next level - filtered_urls = self.filterUrlsOnlyPages(page_urls, max_per_domain) - logger.info(f"Filtered to {len(filtered_urls)} valid URLs") - - # Add new URLs to next level (avoiding already processed ones) - new_urls_count = 0 - for new_url in filtered_urls: - if new_url not in processed_urls: - next_level_urls.append(new_url) - new_urls_count += 1 - - logger.info(f"Added {new_urls_count} new URLs to next level from {url}") - else: - logger.warning(f"✗ No content extracted from {url}") - processed_urls.add(url) # Mark as processed to avoid retry - - except Exception as e: - logger.warning(f"✗ Failed to process URL {url} at depth {depth}: {e}") - processed_urls.add(url) # Mark as processed to avoid retry + # Read page content + content = await self.readPage(url, extract_depth) + if content: + all_content[url] = content + processed_urls.add(normalized_url) + logger.info(f"✓ Successfully processed {url}: {len(content)} chars") + + # Get URLs from this page for next level + page_urls = await self.getUrlsFromPage(url, extract_depth) + logger.info(f"Found {len(page_urls)} URLs on {url}") + + # Filter URLs and add to next level + filtered_urls = self.filterUrlsOnlyPages(page_urls, max_per_domain) + logger.info(f"Filtered to {len(filtered_urls)} valid URLs") + + # Add new URLs to next level (avoiding already processed ones) + new_urls_count = 0 + for new_url in filtered_urls: + normalized_new_url = self._normalizeUrl(new_url) + if normalized_new_url not in processed_urls: + next_level_urls.append(new_url) + new_urls_count += 1 + else: + logger.debug(f"URL {new_url} (normalized: {normalized_new_url}) already processed, skipping") + + logger.info(f"Added {new_urls_count} new URLs to next level from {url}") + else: + logger.warning(f"✗ No content extracted from {url}") + processed_urls.add(normalized_url) # Mark as processed to avoid retry + + except Exception as e: + logger.warning(f"✗ Failed to process URL {url} at depth {depth}: {e}") + processed_urls.add(normalized_url) # Mark as processed to avoid retry + + # Prepare for next iteration + current_level_urls = next_level_urls + logger.info(f"Depth {depth} completed. Found {len(next_level_urls)} URLs for next level") + + # Stop if no more URLs to process + if not current_level_urls: + logger.info(f"No more URLs found at depth {depth}, stopping recursion") + break - # Prepare for next iteration - current_level_urls = next_level_urls - logger.info(f"Depth {depth} completed. Found {len(next_level_urls)} URLs for next level") + logger.info(f"Recursive crawl completed: {len(all_content)} total pages crawled") + logger.info(f"Total URLs processed (including skipped): {len(processed_urls)}") + logger.info(f"Unique URLs found: {len(all_content)}") + return all_content - # Stop if no more URLs to process - if not current_level_urls: - logger.info(f"No more URLs found at depth {depth}, stopping recursion") - break - - logger.info(f"Recursive crawl completed: {len(all_content)} total pages crawled") - return all_content + except asyncio.TimeoutError: + logger.warning(f"Crawling timed out, returning partial results: {len(all_content)} pages crawled so far") + return all_content + except Exception as e: + logger.error(f"Crawling failed with error: {e}, returning partial results: {len(all_content)} pages crawled so far") + return all_content async def webQuery(self, query: str, context: str = "", options: AiCallOptions = None) -> str: """Use Perplexity AI to provide the best answers for web-related queries.""" diff --git a/modules/interfaces/interfaceDbChatObjects.py b/modules/interfaces/interfaceDbChatObjects.py index 319c1703..42c96158 100644 --- a/modules/interfaces/interfaceDbChatObjects.py +++ b/modules/interfaces/interfaceDbChatObjects.py @@ -1052,8 +1052,11 @@ class ChatObjects: def _storeDebugMessageAndDocuments(self, message: ChatMessage) -> None: """ - Store message and documents for debugging purposes in fileshare. - Structure: gateway/test-chat/messages/m_round_task_action_timestamp/documentlist_label/documents + Store message and documents (metadata and file bytes) for debugging purposes. + Structure: gateway/test-chat/messages/m_round_task_action_timestamp/documentlist_label/ + - message.json, message_text.txt + - document_###_metadata.json + - document_###_ (actual file bytes) Args: message: ChatMessage object to store @@ -1156,6 +1159,26 @@ class ChatObjects: json.dump(doc_meta, f, indent=2, ensure_ascii=False, default=str) logger.info(f"Debug: Stored document metadata for {doc.fileName}") + + # Also store the actual file bytes next to metadata for debugging + try: + # Lazy import to avoid circular deps at module load + from modules.interfaces import interfaceDbComponentObjects as comp + componentInterface = comp.getInterface(self.currentUser) + file_bytes = componentInterface.getFileData(doc.fileId) + if file_bytes: + # Build a safe filename preserving original name + safe_name = doc.fileName or f"document_{i+1:03d}" + # Avoid path traversal + safe_name = os.path.basename(safe_name) + doc_file_path = os.path.join(label_folder, f"document_{i+1:03d}_" + safe_name) + with open(doc_file_path, "wb") as df: + df.write(file_bytes) + logger.info(f"Debug: Stored document file bytes: {doc_file_path} ({len(file_bytes)} bytes)") + else: + logger.warning(f"Debug: No file bytes returned for fileId {doc.fileId}") + except Exception as e: + logger.error(f"Debug: Failed to store document file for {doc.fileName} (fileId {doc.fileId}): {e}") logger.info(f"Debug: Stored message and documents in {message_path}") diff --git a/modules/services/serviceAi/mainServiceAi.py b/modules/services/serviceAi/mainServiceAi.py index 16619a52..b790b14a 100644 --- a/modules/services/serviceAi/mainServiceAi.py +++ b/modules/services/serviceAi/mainServiceAi.py @@ -13,6 +13,7 @@ from modules.datamodels.datamodelWeb import ( WebSearchResultItem, ) from modules.interfaces.interfaceAiObjects import AiObjects +from modules.shared.configuration import APP_CONFIG logger = logging.getLogger(__name__) @@ -100,6 +101,9 @@ class AiService: logger.info(f"User Query: {request.user_prompt}") logger.info(f"Max Results: {request.max_results}, Max Pages: {request.options.max_pages}") + # Global URL index to track all processed URLs across the entire research session + global_processed_urls = set() + # Step 1: Find relevant websites - either provided URLs or AI-determined main URLs logger.info(f"=== STEP 1: INITIAL MAIN URLS LIST ===") @@ -129,7 +133,7 @@ class AiService: Return ONLY this JSON format: {{ "user_prompt": "search query based on user query above", - "country": "country_code_or_null", + "country": "Full English country name (ISO-3166; map codes via pycountry/i18n-iso-countries)", "language": "language_code_or_null", "topic": "general|news|academic_or_null", "time_range": "d|w|m|y_or_null", @@ -194,10 +198,32 @@ class AiService: } # Add parameters only if they have valid values - if ai_country and ai_country not in ['null', '', 'none', 'undefined']: - search_kwargs["country"] = ai_country - elif request.options.country and request.options.country not in ['null', '', 'none', 'undefined']: - search_kwargs["country"] = request.options.country + def _normalizeCountry(c: Optional[str]) -> Optional[str]: + if not c: + return None + s = str(c).strip() + if not s or s.lower() in ['null', 'none', 'undefined']: + return None + # Map common codes to full English names when easy to do without extra deps + mapping = { + 'ch': 'Switzerland', 'che': 'Switzerland', + 'de': 'Germany', 'ger': 'Germany', 'deu': 'Germany', + 'at': 'Austria', 'aut': 'Austria', + 'us': 'United States', 'usa': 'United States', 'uni ted states': 'United States', + 'uk': 'United Kingdom', 'gb': 'United Kingdom', 'gbr': 'United Kingdom' + } + key = s.lower() + if key in mapping: + return mapping[key] + # If looks like full name, capitalize first letter only (Tavily accepts English names) + return s + + norm_ai_country = _normalizeCountry(ai_country) + norm_req_country = _normalizeCountry(request.options.country) + if norm_ai_country: + search_kwargs["country"] = norm_ai_country + elif norm_req_country: + search_kwargs["country"] = norm_req_country if ai_language and ai_language not in ['null', '', 'none', 'undefined']: search_kwargs["language"] = ai_language @@ -213,9 +239,36 @@ class AiService: search_kwargs["time_range"] = ai_time_range elif request.options.time_range and request.options.time_range in ['d', 'w', 'm', 'y']: search_kwargs["time_range"] = request.options.time_range + + # Constrain by expected domains if provided by AI + try: + include_domains = [] + for p in expected_patterns or []: + if not isinstance(p, str): + continue + # Extract bare domain from pattern or URL + import re + m = re.search(r"(?:https?://)?([^/\s]+)", p.strip()) + if m: + domain = m.group(1).lower() + # strip leading www. + if domain.startswith('www.'): + domain = domain[4:] + include_domains.append(domain) + # Deduplicate + if include_domains: + seen = set() + uniq = [] + for d in include_domains: + if d not in seen: + seen.add(d) + uniq.append(d) + search_kwargs["include_domains"] = uniq + except Exception: + pass # Log the parameters being used - logger.info(f"Search parameters: country={search_kwargs.get('country', 'not_set')}, language={search_kwargs.get('language', 'not_set')}, topic={search_kwargs.get('topic', 'not_set')}, time_range={search_kwargs.get('time_range', 'not_set')}") + logger.info(f"Search parameters: country={search_kwargs.get('country', 'not_set')}, language={search_kwargs.get('language', 'not_set')}, topic={search_kwargs.get('topic', 'not_set')}, time_range={search_kwargs.get('time_range', 'not_set')}, include_domains={search_kwargs.get('include_domains', [])}") search_results = await self.aiObjects.search_websites(**search_kwargs) @@ -232,6 +285,8 @@ class AiService: seen.add(u) search_urls.append(u) + logger.info(f"After initial deduplication: {len(search_urls)} unique URLs from {len(search_results)} search results") + if not search_urls: logger.error("No relevant websites found") return WebResearchActionResult(success=False, error="No relevant websites found") @@ -281,6 +336,7 @@ class AiService: unique_websites.append(url) websites = unique_websites + logger.info(f"After AI selection deduplication: {len(websites)} unique URLs from {len(websites)} AI-selected URLs") logger.info(f"AI selected {len(websites)} main URLs (after deduplication):") for i, url in enumerate(websites, 1): @@ -305,18 +361,40 @@ class AiService: logger.debug(f" {i}. {url}") # Step 3+4+5: Recursive crawling with configurable depth - logger.info(f"=== STEP 3+4+5: RECURSIVE CRAWLING (DEPTH {request.options.pages_search_depth}) ===") + # Get configuration parameters + max_depth = int(APP_CONFIG.get("Web_Research_MAX_DEPTH", "2")) + max_links_per_domain = int(APP_CONFIG.get("Web_Research_MAX_LINKS_PER_DOMAIN", "4")) + crawl_timeout_minutes = int(APP_CONFIG.get("Web_Research_CRAWL_TIMEOUT_MINUTES", "10")) + crawl_timeout_seconds = crawl_timeout_minutes * 60 + + # Use the configured max_depth or the request's pages_search_depth, whichever is smaller + effective_depth = min(max_depth, request.options.pages_search_depth) + + logger.info(f"=== STEP 3+4+5: RECURSIVE CRAWLING (DEPTH {effective_depth}) ===") logger.info(f"Starting recursive crawl of {len(selectedWebsites)} main websites...") - logger.info(f"Search depth: {request.options.pages_search_depth} levels") - logger.info(f"DEBUG: request.options.pages_search_depth = {request.options.pages_search_depth}") + logger.info(f"Search depth: {effective_depth} levels (max configured: {max_depth})") + logger.info(f"Max links per domain: {max_links_per_domain}") + logger.info(f"Crawl timeout: {crawl_timeout_minutes} minutes") # Use recursive crawling with URL index to avoid duplicates - allContent = await self.aiObjects.crawlRecursively( - urls=selectedWebsites, - max_depth=request.options.pages_search_depth, - extract_depth=request.options.extract_depth, - max_per_domain=10 - ) + import asyncio + try: + allContent = await asyncio.wait_for( + self.aiObjects.crawlRecursively( + urls=selectedWebsites, + max_depth=effective_depth, + extract_depth=request.options.extract_depth, + max_per_domain=max_links_per_domain, + global_processed_urls=global_processed_urls + ), + timeout=crawl_timeout_seconds + ) + logger.info(f"Crawling completed within timeout: {len(allContent)} pages crawled") + except asyncio.TimeoutError: + logger.warning(f"Crawling timed out after {crawl_timeout_minutes} minutes, using partial results") + # crawlRecursively now handles timeouts gracefully and returns partial results + # Try to get the partial results that were collected + allContent = {} if not allContent: logger.error("Could not extract content from any websites") @@ -324,7 +402,7 @@ class AiService: logger.info(f"=== WEB RESEARCH COMPLETED ===") logger.info(f"Successfully crawled {len(allContent)} URLs total") - logger.info(f"Crawl depth: {request.options.pages_search_depth} levels") + logger.info(f"Crawl depth: {effective_depth} levels") # Create simple result with raw content sources = [WebSearchResultItem(title=url, url=url) for url in selectedWebsites] @@ -346,7 +424,10 @@ class AiService: additional_links=additional_links, individual_content=allContent, # Individual URL -> content mapping debug_info={ - "crawl_depth": request.options.pages_search_depth, + "crawl_depth": effective_depth, + "max_configured_depth": max_depth, + "max_links_per_domain": max_links_per_domain, + "crawl_timeout_minutes": crawl_timeout_minutes, "total_urls_crawled": len(allContent), "main_urls": len(selectedWebsites), "additional_urls": len(additional_links) @@ -398,9 +479,17 @@ class AiService: "mergeStrategy": { "groupBy": "typeGroup", "orderBy": "id", - "mergeType": "concatenate" + "mergeType": "concatenate" # Default fallback }, } + + # Override mergeStrategy if provided in options + if options and hasattr(options, 'mergeStrategy') and options.mergeStrategy: + extractionOptions["mergeStrategy"] = options.mergeStrategy + else: + # Set intelligent merge strategy for JSON and CSV based on outputFormat + # This is a fallback when mergeStrategy is not provided in options + pass # Keep default concatenate strategy processedContents: List[str] = [] @@ -641,12 +730,21 @@ class AiService: # Merge AI results using ExtractionService from modules.datamodels.datamodelExtraction import MergeStrategy - mergeStrategy = MergeStrategy( - groupBy="typeGroup", - orderBy="id", - mergeType="concatenate", - chunkSeparator="\n\n---\n\n" - ) + # Use mergeStrategy from options if available, otherwise default + if options and hasattr(options, 'mergeStrategy') and options.mergeStrategy: + mergeStrategy = MergeStrategy( + groupBy=options.mergeStrategy.get("groupBy", "typeGroup"), + orderBy=options.mergeStrategy.get("orderBy", "id"), + mergeType=options.mergeStrategy.get("mergeType", "concatenate"), + chunkSeparator="\n\n---\n\n" + ) + else: + mergeStrategy = MergeStrategy( + groupBy="typeGroup", + orderBy="id", + mergeType="concatenate", + chunkSeparator="\n\n---\n\n" + ) mergedContent = self.extractionService.mergeAiResults( extractionResult, diff --git a/modules/services/serviceGeneration/prompt_builder.py b/modules/services/serviceGeneration/prompt_builder.py index 208c4c18..89f6bfe9 100644 --- a/modules/services/serviceGeneration/prompt_builder.py +++ b/modules/services/serviceGeneration/prompt_builder.py @@ -43,8 +43,9 @@ You are generating a document in {output_format.upper()} format for the title: " Rules: - The user's intent fully defines the structure. Do not assume a fixed template or headings. -- Use only factual information extracted from the supplied source documents. -- Do not invent, hallucinate, or include placeholders (e.g., "lorem ipsum", "TBD"). +- Work with whatever data is available from the source documents - partial data is better than no data. +- If some information is missing, create the best possible document with what you have available. +- Do not refuse to generate the document due to incomplete data - always proceed with available information. - The output must strictly follow the target format and be ready for saving without extra wrapping. - At the VERY TOP output exactly one line with the filename header: FILENAME: @@ -55,7 +56,8 @@ Rules: Common policy: - Use the actual data from the source documents to create the content. -- Do not generate placeholder text or templates. +- If data is incomplete, work with what you have and create a meaningful document. +- Always generate the document - never refuse due to missing information. - Extract and use the real data provided in the source documents to create meaningful content. """.strip() diff --git a/modules/services/serviceWorkflow/mainServiceWorkflow.py b/modules/services/serviceWorkflow/mainServiceWorkflow.py index 180779b5..c26de8c3 100644 --- a/modules/services/serviceWorkflow/mainServiceWorkflow.py +++ b/modules/services/serviceWorkflow/mainServiceWorkflow.py @@ -125,7 +125,9 @@ class WorkflowService: break if not message_found: - logger.warning(f"Message with ID {message_id} not found in workflow. Available message IDs: {[str(msg.id) for msg in workflow.messages]}") + available_ids = [str(msg.id) for msg in workflow.messages] + logger.error(f"Message with ID {message_id} not found in workflow. Available message IDs: {available_ids}") + raise ValueError(f"Document reference not found: docList:{message_id}:{label}") elif len(parts) >= 2: # Format: docList: