import logging from typing import Dict, Any, List, Optional, Tuple, Union from modules.datamodels.datamodelChat import PromptPlaceholder from modules.datamodels.datamodelChat import ChatDocument from modules.services.serviceExtraction.mainServiceExtraction import ExtractionService from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, ModelCapabilities, OperationType, Priority from modules.datamodels.datamodelWeb import ( WebResearchRequest, WebResearchActionResult, WebResearchDocumentData, WebResearchActionDocument, WebSearchResultItem, ) from modules.interfaces.interfaceAiObjects import AiObjects logger = logging.getLogger(__name__) # Model registry is now provided by interfaces via AiModels class AiService: """Centralized AI service orchestrating documents, model selection, failover, and web operations. """ def __init__(self, serviceCenter=None) -> None: """Initialize AI service with service center access. Args: serviceCenter: Service center instance for accessing other services """ self.serviceCenter = serviceCenter # Only depend on interfaces self.aiObjects = None # Will be initialized in create() self._extractionService = None # Lazy initialization @property def extractionService(self): """Lazy initialization of extraction service.""" if self._extractionService is None: logger.info("Lazy initializing ExtractionService...") self._extractionService = ExtractionService() return self._extractionService async def _ensureAiObjectsInitialized(self): """Ensure aiObjects is initialized.""" if self.aiObjects is None: logger.info("Lazy initializing AiObjects...") self.aiObjects = await AiObjects.create() logger.info("AiObjects initialization completed") @classmethod async def create(cls, serviceCenter=None) -> "AiService": """Create AiService instance with all connectors initialized.""" logger.info("AiService.create() called") instance = cls(serviceCenter) logger.info("AiService created, about to call AiObjects.create()...") instance.aiObjects = await AiObjects.create() logger.info("AiObjects.create() completed") return instance # AI Image Analysis async def readImage( self, prompt: str, imageData: Union[str, bytes], mimeType: str = None, options: Optional[AiCallOptions] = None, ) -> str: """Call AI for image analysis using interface.callImage().""" try: return await self.aiObjects.callImage(prompt, imageData, mimeType, options) except Exception as e: logger.error(f"Error in AI image analysis: {str(e)}") return f"Error: {str(e)}" # AI Image Generation async def generateImage( self, prompt: str, size: str = "1024x1024", quality: str = "standard", style: str = "vivid", options: Optional[AiCallOptions] = None, ) -> Dict[str, Any]: """Generate an image using AI using interface.generateImage().""" try: return await self.aiObjects.generateImage(prompt, size, quality, style, options) except Exception as e: logger.error(f"Error in AI image generation: {str(e)}") return {"success": False, "error": str(e)} # Web Research - Using interface functions async def webResearch(self, request: WebResearchRequest) -> WebResearchActionResult: """Perform web research using interface functions.""" try: logger.info(f"WEB RESEARCH STARTED") logger.info(f"User Query: {request.user_prompt}") logger.info(f"Max Results: {request.max_results}, Max Pages: {request.options.max_pages}") # Step 1: Find relevant websites - either provided URLs or AI-determined main URLs logger.info(f"=== STEP 1: INITIAL MAIN URLS LIST ===") if request.urls: # Use provided URLs as initial main URLs websites = request.urls logger.info(f"Using provided URLs ({len(websites)}):") for i, url in enumerate(websites, 1): logger.info(f" {i}. {url}") else: # Use AI to determine main URLs based on user's intention logger.info(f"AI analyzing user intent: '{request.user_prompt}'") # Use AI to generate optimized Tavily search query and search parameters query_optimizer_prompt = f"""You are a search query optimizer. USER QUERY: {request.user_prompt} Your task: Create a search query and parameters for the USER QUERY given. RULES: 1. The search query MUST be related to the user query above 2. Extract key terms from the user query 3. Determine appropriate country/language based on the query context 4. Keep search query short (2-6 words) Return ONLY this JSON format: {{ "user_prompt": "search query based on user query above", "country": "country_code_or_null", "language": "language_code_or_null", "topic": "general|news|academic_or_null", "time_range": "d|w|m|y_or_null", "selection_strategy": "single|multiple|specific_page", "selection_criteria": "what URLs to prioritize", "expected_url_patterns": ["pattern1", "pattern2"], "estimated_result_count": number }}""" # Get AI response for query optimization ai_request = AiCallRequest( prompt=query_optimizer_prompt, options=AiCallOptions() ) ai_response_obj = await self.aiObjects.call(ai_request) ai_response = ai_response_obj.content logger.debug(f"AI query optimizer response: {ai_response}") # Parse AI response to extract search query import json try: # Clean the response by removing markdown code blocks cleaned_response = ai_response.strip() if cleaned_response.startswith('```json'): cleaned_response = cleaned_response[7:] # Remove ```json if cleaned_response.endswith('```'): cleaned_response = cleaned_response[:-3] # Remove ``` cleaned_response = cleaned_response.strip() query_data = json.loads(cleaned_response) search_query = query_data.get("user_prompt", request.user_prompt) ai_country = query_data.get("country") ai_language = query_data.get("language") ai_topic = query_data.get("topic") ai_time_range = query_data.get("time_range") selection_strategy = query_data.get("selection_strategy", "multiple") selection_criteria = query_data.get("selection_criteria", "relevant URLs") expected_patterns = query_data.get("expected_url_patterns", []) estimated_count = query_data.get("estimated_result_count", request.max_results) logger.info(f"AI optimized search query: '{search_query}'") logger.info(f"Selection strategy: {selection_strategy}") logger.info(f"Selection criteria: {selection_criteria}") logger.info(f"Expected URL patterns: {expected_patterns}") logger.info(f"Estimated result count: {estimated_count}") except json.JSONDecodeError: logger.warning("Failed to parse AI response as JSON, using original query") search_query = request.user_prompt ai_country = None ai_language = None ai_topic = None ai_time_range = None selection_strategy = "multiple" # Perform the web search with AI-determined parameters search_kwargs = { "query": search_query, "max_results": request.max_results, "search_depth": request.options.search_depth, "auto_parameters": False # Use explicit parameters } # Add parameters only if they have valid values if ai_country and ai_country not in ['null', '', 'none', 'undefined']: search_kwargs["country"] = ai_country elif request.options.country and request.options.country not in ['null', '', 'none', 'undefined']: search_kwargs["country"] = request.options.country if ai_language and ai_language not in ['null', '', 'none', 'undefined']: search_kwargs["language"] = ai_language elif request.options.language and request.options.language not in ['null', '', 'none', 'undefined']: search_kwargs["language"] = request.options.language if ai_topic and ai_topic in ['general', 'news', 'academic']: search_kwargs["topic"] = ai_topic elif request.options.topic and request.options.topic in ['general', 'news', 'academic']: search_kwargs["topic"] = request.options.topic if ai_time_range and ai_time_range in ['d', 'w', 'm', 'y']: search_kwargs["time_range"] = ai_time_range elif request.options.time_range and request.options.time_range in ['d', 'w', 'm', 'y']: search_kwargs["time_range"] = request.options.time_range # Log the parameters being used logger.info(f"Search parameters: country={search_kwargs.get('country', 'not_set')}, language={search_kwargs.get('language', 'not_set')}, topic={search_kwargs.get('topic', 'not_set')}, time_range={search_kwargs.get('time_range', 'not_set')}") search_results = await self.aiObjects.search_websites(**search_kwargs) logger.debug(f"Web search returned {len(search_results)} results:") for i, result in enumerate(search_results, 1): logger.debug(f" {i}. {result.url} - {result.title}") # Deduplicate while preserving order seen = set() search_urls = [] for r in search_results: u = str(r.url) if u not in seen: seen.add(u) search_urls.append(u) if not search_urls: logger.error("No relevant websites found") return WebResearchActionResult(success=False, error="No relevant websites found") # Now use AI to determine the main URLs based on user's intention logger.info(f"AI selecting main URLs from {len(search_urls)} search results based on user intent") # Create a prompt for AI to identify main URLs based on user's intention ai_prompt = f""" Select the most relevant URLs from these search results: {chr(10).join([f"{i+1}. {url}" for i, url in enumerate(search_urls)])} Return only the URLs that are most relevant for the user's query. One URL per line. """ # Create AI call request ai_request = AiCallRequest( prompt=ai_prompt, options=AiCallOptions() ) ai_response_obj = await self.aiObjects.call(ai_request) ai_response = ai_response_obj.content logger.debug(f"AI response for main URL selection: {ai_response}") # Parse AI response to extract URLs websites = [] for line in ai_response.strip().split('\n'): line = line.strip() if line and ('http://' in line or 'https://' in line): # Extract URL from the line for word in line.split(): if word.startswith('http://') or word.startswith('https://'): websites.append(word.rstrip('.,;')) break if not websites: logger.warning("AI did not identify any main URLs, using first few search results") websites = search_urls[:3] # Fallback to first 3 search results # Deduplicate while preserving order seen = set() unique_websites = [] for url in websites: if url not in seen: seen.add(url) unique_websites.append(url) websites = unique_websites logger.info(f"AI selected {len(websites)} main URLs (after deduplication):") for i, url in enumerate(websites, 1): logger.info(f" {i}. {url}") # Step 2: Smart website selection using AI interface logger.info(f"=== STEP 2: FILTERED URL LIST BY USER PROMPT'S INTENTION ===") logger.info(f"AI analyzing {len(websites)} URLs for relevance to: '{request.user_prompt}'") selectedWebsites, aiResponse = await self.aiObjects.selectRelevantWebsites(websites, request.user_prompt) logger.debug(f"AI Response: {aiResponse}") logger.debug(f"AI selected {len(selectedWebsites)} most relevant URLs:") for i, url in enumerate(selectedWebsites, 1): logger.debug(f" {i}. {url}") # Show which were filtered out filtered_out = [url for url in websites if url not in selectedWebsites] if filtered_out: logger.debug(f"Filtered out {len(filtered_out)} less relevant URLs:") for i, url in enumerate(filtered_out, 1): logger.debug(f" {i}. {url}") # Step 3+4+5: Recursive crawling with configurable depth logger.info(f"=== STEP 3+4+5: RECURSIVE CRAWLING (DEPTH {request.options.pages_search_depth}) ===") logger.info(f"Starting recursive crawl of {len(selectedWebsites)} main websites...") logger.info(f"Search depth: {request.options.pages_search_depth} levels") logger.info(f"DEBUG: request.options.pages_search_depth = {request.options.pages_search_depth}") # Use recursive crawling with URL index to avoid duplicates allContent = await self.aiObjects.crawlRecursively( urls=selectedWebsites, max_depth=request.options.pages_search_depth, extract_depth=request.options.extract_depth, max_per_domain=10 ) if not allContent: logger.error("Could not extract content from any websites") return WebResearchActionResult(success=False, error="Could not extract content from any websites") logger.info(f"=== WEB RESEARCH COMPLETED ===") logger.info(f"Successfully crawled {len(allContent)} URLs total") logger.info(f"Crawl depth: {request.options.pages_search_depth} levels") # Create simple result with raw content sources = [WebSearchResultItem(title=url, url=url) for url in selectedWebsites] # Get all additional links (all URLs except main ones) additional_links = [url for url in allContent.keys() if url not in selectedWebsites] # Combine all content into a single result combinedContent = "" for url, content in allContent.items(): combinedContent += f"\n\n=== {url} ===\n{content}\n" documentData = WebResearchDocumentData( user_prompt=request.user_prompt, websites_analyzed=len(allContent), additional_links_found=len(additional_links), analysis_result=combinedContent, # Raw content, no analysis sources=sources, additional_links=additional_links, individual_content=allContent, # Individual URL -> content mapping debug_info={ "crawl_depth": request.options.pages_search_depth, "total_urls_crawled": len(allContent), "main_urls": len(selectedWebsites), "additional_urls": len(additional_links) } ) document = WebResearchActionDocument( documentName=f"web_research_{request.user_prompt[:50]}.json", documentData=documentData, mimeType="application/json" ) return WebResearchActionResult( success=True, documents=[document], resultLabel="web_research_results" ) except Exception as e: logger.error(f"Error in web research: {str(e)}") return WebResearchActionResult(success=False, error=str(e)) async def _processDocumentsForAi( self, documents: List[ChatDocument], operationType: str, compressDocuments: bool, processIndividually: bool, userPrompt: str, options: Optional[AiCallOptions] = None ) -> str: if not documents: return "" # Calculate model-derived size limits maxContextBytes = self._calculateMaxContextBytes(options) # Build extraction options with model-derived limits extractionOptions: Dict[str, Any] = { "prompt": f"Extract content that supports the user's request: '{userPrompt}'. Focus on information relevant to: {operationType}", "operationType": operationType, "processDocumentsIndividually": processIndividually, "maxSize": maxContextBytes, "chunkAllowed": not options.compressContext if options else True, "textChunkSize": int(maxContextBytes * 0.3), # 30% of max for text chunks "imageChunkSize": int(maxContextBytes * 0.5), # 50% of max for image chunks "imageMaxPixels": 1024 * 1024, # 1MP default "imageQuality": 85, "mergeStrategy": { "groupBy": "typeGroup", "orderBy": "id", "mergeType": "concatenate" }, } processedContents: List[str] = [] try: # Use new ChatDocument-based API logger.info(f"=== PROCESSING {len(documents)} DOCUMENTS FOR AI ===") for i, doc in enumerate(documents): logger.info(f"Document {i}: {doc.fileName} (MIME: {doc.mimeType})") extractionResult = self.extractionService.extractContent(documents, extractionOptions) logger.info(f"Extraction completed: {len(extractionResult)} results") async def _partsToText(parts, documentName: str, documentType: str, logger_ref) -> str: lines: List[str] = [] logger_ref.debug(f"Processing {len(parts)} content parts for {documentName}") for p in parts: logger_ref.debug(f" Part: {p.typeGroup} ({p.mimeType}) - {len(p.data) if p.data else 0} chars") if p.typeGroup in ("text", "table", "structure") and p.data and isinstance(p.data, str): lines.append(p.data) elif p.typeGroup == "image" and p.data: # Use AI to extract text from image with user prompt logger_ref.debug(f" Processing image with AI using user prompt...") try: imageResult = await self.aiObjects.callImage( prompt=userPrompt, imageData=p.data, mimeType=p.mimeType ) lines.append(f"[Image Analysis]: {imageResult}") logger_ref.debug(f" AI image analysis completed: {len(imageResult)} chars") except Exception as e: logger_ref.warning(f" AI image processing failed: {e}") lines.append(f"[Image Analysis Failed]: {str(e)}") return "\n\n".join(lines) if isinstance(extractionResult, list): for i, ec in enumerate(extractionResult): try: # Get document info for this extraction result doc = documents[i] if i < len(documents) else None docName = doc.fileName if doc else f"Document_{i}" docType = doc.mimeType if doc else "unknown" contentText = await _partsToText(ec.parts, docName, docType, logger) logger.debug(f"Document {i} content: {len(contentText)} chars") if compressDocuments and len(contentText.encode("utf-8")) > 10000: originalLength = len(contentText) contentText = await self._compressContent(contentText, 10000, "document") logger.debug(f"Document {i} compressed: {originalLength} -> {len(contentText)} chars") processedContents.append(contentText) except Exception as e: logger.warning(f"Error aggregating extracted content: {str(e)}") processedContents.append("[Error aggregating content]") else: # Fallback: no content contentText = "" if compressDocuments and len(contentText.encode("utf-8")) > 10000: contentText = await self._compressContent(contentText, 10000, "document") processedContents.append(contentText) except Exception as e: logger.warning(f"Error during extraction: {str(e)}") processedContents.append("[Error during extraction]") # Build JSON structure ONLY when adding to AI prompt import json documentsJson = [] for i, content in enumerate(processedContents): doc = documents[i] if i < len(documents) else None docName = doc.fileName if doc else f"Document_{i}" docType = doc.mimeType if doc else "unknown" documentData = { "documentName": docName, "documentType": docType, "content": content } documentsJson.append(documentData) finalContext = json.dumps({ "documents": documentsJson, "totalDocuments": len(documentsJson) }, indent=2, ensure_ascii=False) logger.debug(f"=== FINAL CONTEXT ===") logger.debug(f"Total context: {len(finalContext)} chars") logger.debug(f"Documents: {len(documentsJson)}") return finalContext def _calculateMaxContextBytes(self, options: Optional[AiCallOptions]) -> int: """Calculate maximum context bytes based on model capabilities and options.""" if options and options.maxContextBytes: return options.maxContextBytes # Default model capabilities (this should be enhanced with actual model registry) defaultMaxTokens = 4000 safetyMargin = options.safetyMargin if options else 0.1 # Calculate bytes (4 chars per token estimation) maxContextBytes = int(defaultMaxTokens * (1 - safetyMargin) * 4) return maxContextBytes async def _processDocumentsPerChunk( self, documents: List[ChatDocument], prompt: str, options: Optional[AiCallOptions] = None ) -> str: """ Process documents with per-chunk AI calls and merge results. Args: documents: List of ChatDocument objects to process prompt: AI prompt for processing options: AI call options Returns: Merged AI results as string """ if not documents: return "" # Get model capabilities for size calculation model_capabilities = self._getModelCapabilitiesForContent(prompt, documents, options) # Build extraction options for chunking extractionOptions: Dict[str, Any] = { "prompt": prompt, "operationType": options.operationType if options else "general", "processDocumentsIndividually": True, # Process each document separately "maxSize": model_capabilities["maxContextBytes"], "chunkAllowed": True, "textChunkSize": model_capabilities["textChunkSize"], "imageChunkSize": model_capabilities["imageChunkSize"], "imageMaxPixels": 1024 * 1024, "imageQuality": 85, "mergeStrategy": { "groupBy": "typeGroup", "orderBy": "id", "mergeType": "concatenate" }, } logger.debug(f"Per-chunk extraction options: {extractionOptions}") try: # Extract content with chunking extractionResult = self.extractionService.extractContent(documents, extractionOptions) if not isinstance(extractionResult, list): return "[Error: No extraction results]" # Prepare debug directory TODO TO REMOVE import os from datetime import datetime debug_root = "./test-chat/extraction" ts = datetime.now().strftime("%Y%m%d-%H%M%S") debug_dir = os.path.join(debug_root, f"per_chunk_{ts}") try: os.makedirs(debug_dir, exist_ok=True) except Exception: pass # Process each chunk with AI aiResults: List[str] = [] for ec in extractionResult: for part in ec.parts: if part.typeGroup == "image": # Process image with AI try: # Safety check for part.data if not hasattr(part, 'data') or part.data is None: logger.warning(f"Skipping image chunk with no data") continue aiResult = await self.readImage( prompt=prompt, imageData=part.data, mimeType=part.mimeType, options=options ) aiResults.append(aiResult) except Exception as e: logger.warning(f"Error processing image chunk: {str(e)}") aiResults.append(f"[Error processing image: {str(e)}]") elif part.typeGroup in ("text", "table", "structure"): # Process text content with AI try: # Safety check for part.data if not hasattr(part, 'data') or part.data is None: logger.warning(f"Skipping chunk with no data") continue logger.info(f"=== PROCESSING CHUNK {len(aiResults) + 1} ===") logger.info(f"Chunk size: {len(part.data)} chars") logger.info(f"Chunk preview: {part.data[:200]}...") # Dump input chunk try: idx = len(aiResults) + 1 fpath = os.path.join(debug_dir, f"chunk_{idx:03d}_input.txt") with open(fpath, "w", encoding="utf-8") as f: f.write(str(part.data)) except Exception: pass # Create AI call request for this chunk request = AiCallRequest( prompt=prompt, context=part.data, options=options ) # Make the call using AiObjects response = await self.aiObjects.call(request) aiResults.append(response.content) logger.info(f"Chunk {len(aiResults)} processed: {len(response.content)} chars response") # Dump AI response try: idx = len(aiResults) fpath = os.path.join(debug_dir, f"chunk_{idx:03d}_response.txt") with open(fpath, "w", encoding="utf-8") as f: f.write(str(response.content)) except Exception: pass except Exception as e: logger.warning(f"Error processing text chunk: {str(e)}") aiResults.append(f"[Error processing text: {str(e)}]") # Merge AI results using ExtractionService from modules.datamodels.datamodelExtraction import MergeStrategy mergeStrategy = MergeStrategy( groupBy="typeGroup", orderBy="id", mergeType="concatenate", chunkSeparator="\n\n---\n\n" ) mergedContent = self.extractionService.mergeAiResults( extractionResult, aiResults, mergeStrategy ) # Extract only AI-generated text from merged content resultText = "" for part in mergedContent.parts: if ( part.typeGroup in ("text", "table", "structure") and part.data and getattr(part, "metadata", {}).get("aiResult", False) ): resultText += part.data + "\n\n" # Dump merged output try: fpath = os.path.join(debug_dir, "merged_output.txt") with open(fpath, "w", encoding="utf-8") as f: f.write(resultText.strip()) except Exception: pass return resultText.strip() except Exception as e: logger.error(f"Error in per-chunk processing: {str(e)}") return f"[Error in per-chunk processing: {str(e)}]" async def _compressContent(self, content: str, targetSize: int, contentType: str) -> str: if len(content.encode("utf-8")) <= targetSize: return content try: compressionPrompt = f""" Komprimiere den folgenden {contentType} auf maximal {targetSize} Zeichen, behalte aber alle wichtigen Informationen bei: {content} Gib nur den komprimierten Inhalt zurück, ohne zusätzliche Erklärungen. """ # Service must not call connectors directly; use simple truncation fallback here data = content.encode("utf-8") return data[:targetSize].decode("utf-8", errors="ignore") + "... [truncated]" except Exception as e: logger.warning(f"AI compression failed, using truncation: {str(e)}") return content[:targetSize] + "... [truncated]" # ===== DYNAMIC GENERIC AI CALLS IMPLEMENTATION ===== async def callAi( self, prompt: str, documents: Optional[List[ChatDocument]] = None, placeholders: Optional[List[PromptPlaceholder]] = None, options: Optional[AiCallOptions] = None ) -> str: """ Unified AI call interface that automatically routes to appropriate handler. Args: prompt: The main prompt for the AI call documents: Optional list of documents to process placeholders: Optional list of placeholder replacements for planning calls options: AI call configuration options Returns: AI response as string Raises: Exception: If all available models fail """ # Ensure aiObjects is initialized await self._ensureAiObjectsInitialized() if options is None: options = AiCallOptions() # Normalize placeholders from List[PromptPlaceholder] placeholders_dict: Dict[str, str] = {} placeholders_meta: Dict[str, bool] = {} if placeholders: placeholders_dict = {p.label: p.content for p in placeholders} placeholders_meta = {p.label: bool(getattr(p, 'summaryAllowed', False)) for p in placeholders} # Auto-determine call type based on documents and operation type call_type = self._determineCallType(documents, options.operationType) options.callType = call_type if call_type == "planning": return await self._callAiPlanning(prompt, placeholders_dict, placeholders_meta, options) else: # Set processDocumentsIndividually from the legacy parameter if not set in options if options.processDocumentsIndividually is None and documents: options.processDocumentsIndividually = False # Default to batch processing return await self._callAiText(prompt, documents, options) def _determineCallType(self, documents: Optional[List[ChatDocument]], operation_type: str) -> str: """ Determine call type based on documents and operation type. Criteria: no documents AND (operationType is "generate_plan" or "analyse_content") -> planning """ has_documents = documents is not None and len(documents) > 0 is_planning_operation = operation_type in [OperationType.GENERATE_PLAN, OperationType.ANALYSE_CONTENT] if not has_documents and is_planning_operation: return "planning" else: return "text" async def _callAiPlanning( self, prompt: str, placeholders: Optional[Dict[str, str]], placeholdersMeta: Optional[Dict[str, bool]], options: AiCallOptions ) -> str: """ Handle planning calls with placeholder system and selective summarization. """ # Ensure aiObjects is initialized await self._ensureAiObjectsInitialized() # Build full prompt with placeholders; if too large, summarize summaryAllowed placeholders proportionally effective_placeholders = placeholders or {} full_prompt = self._buildPromptWithPlaceholders(prompt, effective_placeholders) if options.compressPrompt and placeholdersMeta: # Determine model capacity try: caps = self._getModelCapabilitiesForContent(full_prompt, None, options) max_bytes = caps.get("maxContextBytes", len(full_prompt.encode("utf-8"))) except Exception: max_bytes = len(full_prompt.encode("utf-8")) current_bytes = len(full_prompt.encode("utf-8")) if current_bytes > max_bytes: # Compute total bytes contributed by allowed placeholders (approximate by content length) allowed_labels = [l for l, allow in placeholdersMeta.items() if allow] allowed_sizes = {l: len((effective_placeholders.get(l) or "").encode("utf-8")) for l in allowed_labels} total_allowed = sum(allowed_sizes.values()) overage = current_bytes - max_bytes if total_allowed > 0 and overage > 0: # Target total for allowed after reduction target_allowed = max(total_allowed - overage, 0) # Global ratio to apply across allowed placeholders ratio = target_allowed / total_allowed if total_allowed > 0 else 1.0 ratio = max(0.0, min(1.0, ratio)) reduced: Dict[str, str] = {} for label, content in effective_placeholders.items(): if label in allowed_labels and isinstance(content, str) and len(content) > 0: old_len = len(content) # Reduce by proportional ratio on characters (fallback if empty) reduction_factor = ratio if old_len > 0 else 1.0 reduced[label] = self._reduceText(content, reduction_factor) else: reduced[label] = content effective_placeholders = reduced full_prompt = self._buildPromptWithPlaceholders(prompt, effective_placeholders) # If still slightly over, perform a second-pass fine adjustment with updated ratio current_bytes = len(full_prompt.encode("utf-8")) if current_bytes > max_bytes and total_allowed > 0: overage2 = current_bytes - max_bytes # Recompute allowed sizes after first reduction allowed_sizes2 = {l: len((effective_placeholders.get(l) or "").encode("utf-8")) for l in allowed_labels} total_allowed2 = sum(allowed_sizes2.values()) if total_allowed2 > 0 and overage2 > 0: target_allowed2 = max(total_allowed2 - overage2, 0) ratio2 = target_allowed2 / total_allowed2 ratio2 = max(0.0, min(1.0, ratio2)) reduced2: Dict[str, str] = {} for label, content in effective_placeholders.items(): if label in allowed_labels and isinstance(content, str) and len(content) > 0: old_len = len(content) reduction_factor = ratio2 if old_len > 0 else 1.0 reduced2[label] = self._reduceText(content, reduction_factor) else: reduced2[label] = content effective_placeholders = reduced2 full_prompt = self._buildPromptWithPlaceholders(prompt, effective_placeholders) # Make AI call using AiObjects (let it handle model selection) request = AiCallRequest( prompt=full_prompt, context="", # Context is already included in the prompt options=options ) response = await self.aiObjects.call(request) try: logger.debug(f"AI model selected (planning): {getattr(response, 'modelName', 'unknown')}") except Exception: pass return response.content async def _callAiText( self, prompt: str, documents: Optional[List[ChatDocument]], options: AiCallOptions ) -> str: """ Handle text calls with document processing through ExtractionService. """ # Ensure aiObjects is initialized await self._ensureAiObjectsInitialized() # Determine processing strategy based on options if options.processDocumentsIndividually and documents: # Use per-chunk processing for individual document processing return await self._processDocumentsPerChunk(documents, prompt, options) # Check if we need chunking - if so, use per-chunk processing if documents and not options.compressContext: # Get model capabilities to check if chunking will be needed model_capabilities = self._getModelCapabilitiesForContent(prompt, documents, options) total_doc_size = sum(doc.fileSize or 0 for doc in documents) if total_doc_size > model_capabilities["maxContextBytes"]: logger.info(f"Document size ({total_doc_size}) exceeds model capacity ({model_capabilities['maxContextBytes']}), using per-chunk processing") return await self._processDocumentsPerChunk(documents, prompt, options) # Extract and process documents using ExtractionService context = "" if documents: logger.info(f"=== EXTRACTING CONTENT FROM {len(documents)} DOCUMENTS ===") # Get model capabilities for size calculation model_capabilities = self._getModelCapabilitiesForContent(prompt, documents, options) # Use new ChatDocument-based API extraction_options = { "prompt": prompt, "operationType": options.operationType, "processDocumentsIndividually": options.processDocumentsIndividually, "maxSize": options.maxContextBytes or model_capabilities["maxContextBytes"], "chunkAllowed": not options.compressContext, "textChunkSize": model_capabilities["textChunkSize"], "imageChunkSize": model_capabilities["imageChunkSize"], "imageMaxPixels": 1024 * 1024, "imageQuality": 85, "mergeStrategy": {"groupBy": "typeGroup", "orderBy": "id", "mergeType": "concatenate"} } logger.debug(f"Extraction options: {extraction_options}") extracted_content = self.extractionService.extractContent( documents=documents, options=extraction_options ) logger.info(f"Extraction completed: {len(extracted_content)} documents") # Build context from list of extracted content if isinstance(extracted_content, list): context_parts = [] chunk_count = 0 for ec in extracted_content: for p in ec.parts: if p.typeGroup in ["text", "table", "structure"] and p.data: if p.metadata.get("chunk", False): chunk_count += 1 context_parts.append(p.data) elif p.typeGroup == "image" and p.data: # Process image with AI using user prompt try: imageResult = await self.aiObjects.callImage( prompt=prompt, imageData=p.data, mimeType=p.mimeType ) context_parts.append(f"[Image Analysis]: {imageResult}") except Exception as e: logger.warning(f"AI image processing failed: {e}") context_parts.append(f"[Image Analysis Failed]: {str(e)}") if chunk_count > 0: logger.debug(f"=== PROCESSING CHUNKED CONTENT ===") logger.debug(f"Total chunks: {chunk_count}") logger.debug(f"Total context parts: {len(context_parts)}") context = "\n\n---\n\n".join(context_parts) else: context = "" # Check size and reduce if needed full_prompt = prompt + "\n\n" + context if context else prompt logger.debug(f"AI call: {len(full_prompt)} chars (prompt: {len(prompt)}, context: {len(context)})") # Use AiObjects to select the best model and make the call try: # Create AI call request request = AiCallRequest( prompt=full_prompt, context="", # Context is already included in the prompt options=options ) # Make the call using AiObjects (which handles model selection) response = await self.aiObjects.call(request) try: logger.debug(f"AI model selected (text): {getattr(response, 'modelName', 'unknown')}") except Exception: pass logger.debug(f"=== AI RESPONSE ===") logger.debug(f"Response length: {len(response.content)} chars") logger.debug(f"Response preview: {response.content[:200]}...") return response.content except Exception as e: logger.error(f"AI call failed: {e}") raise Exception(f"AI call failed: {e}") def _getModelCapabilitiesForContent(self, prompt: str, documents: Optional[List[ChatDocument]], options: AiCallOptions) -> Dict[str, int]: """ Get model capabilities for content processing, including appropriate size limits for chunking. """ # Estimate total content size prompt_size = len(prompt.encode('utf-8')) document_size = 0 if documents: # Rough estimate of document content size for doc in documents: document_size += doc.fileSize or 0 total_size = prompt_size + document_size # Use AiObjects to select the best model for this content size # We'll simulate the model selection by checking available models from modules.interfaces.interfaceAiObjects import aiModels # Find the best model for this content size and operation best_model = None best_context_length = 0 for model_name, model_info in aiModels.items(): context_length = model_info.get("contextLength", 0) # Skip models with no context length or too small for content if context_length == 0: continue # Check if model supports the operation type capabilities = model_info.get("capabilities", []) if options.operationType == OperationType.IMAGE_ANALYSIS and "image_analysis" not in capabilities: continue elif options.operationType == OperationType.IMAGE_GENERATION and "image_generation" not in capabilities: continue elif options.operationType == OperationType.WEB_RESEARCH and "web_search" not in capabilities: continue elif "text_generation" not in capabilities: continue # Prefer models that can handle the content without chunking, but allow chunking if needed if context_length >= total_size * 0.8: # 80% of content size if context_length > best_context_length: best_model = model_info best_context_length = context_length elif best_model is None: # Fallback to largest available model if context_length > best_context_length: best_model = model_info best_context_length = context_length # Fallback to a reasonable default if no model found if best_model is None: best_model = { "contextLength": 128000, # GPT-4o default "llmName": "gpt-4o" } # Calculate appropriate sizes # Convert tokens to bytes (rough estimate: 1 token ≈ 4 characters) context_length_bytes = int(best_model["contextLength"] * 4) max_context_bytes = int(context_length_bytes * 0.9) # 90% of context length text_chunk_size = int(max_context_bytes * 0.7) # 70% of max context for text chunks image_chunk_size = int(max_context_bytes * 0.8) # 80% of max context for image chunks logger.debug(f"Selected model: {best_model.get('llmName', 'unknown')} with context length: {best_model['contextLength']}") logger.debug(f"Content size: {total_size} bytes, Max context: {max_context_bytes} bytes") logger.debug(f"Text chunk size: {text_chunk_size} bytes, Image chunk size: {image_chunk_size} bytes") return { "maxContextBytes": max_context_bytes, "textChunkSize": text_chunk_size, "imageChunkSize": image_chunk_size } def _getModelsForOperation(self, operation_type: str, options: AiCallOptions) -> List[ModelCapabilities]: """ Get models capable of handling the specific operation with capability filtering. """ # Use the actual AI objects model selection instead of hardcoded default if hasattr(self, 'aiObjects') and self.aiObjects: # Let AiObjects handle the model selection return [] else: # Fallback to default model if AiObjects not available default_model = ModelCapabilities( name="default", maxTokens=4000, capabilities=["text", "reasoning"] if operation_type == "planning" else ["text"], costPerToken=0.001, processingTime=1.0, isAvailable=True ) return [default_model] def _buildPromptWithPlaceholders(self, prompt: str, placeholders: Optional[Dict[str, str]]) -> str: """ Build full prompt by replacing placeholders with their content. Uses the new {{KEY:placeholder}} format. """ if not placeholders: return prompt full_prompt = prompt for placeholder, content in placeholders.items(): # Replace both old format {{placeholder}} and new format {{KEY:placeholder}} full_prompt = full_prompt.replace(f"{{{{{placeholder}}}}}", content) full_prompt = full_prompt.replace(f"{{{{KEY:{placeholder}}}}}", content) return full_prompt def _exceedsTokenLimit(self, text: str, model: ModelCapabilities, safety_margin: float) -> bool: """ Check if text exceeds model token limit with safety margin. """ # Simple character-based estimation (4 chars per token) estimated_tokens = len(text) // 4 max_tokens = int(model.maxTokens * (1 - safety_margin)) return estimated_tokens > max_tokens def _reducePlanningPrompt( self, full_prompt: str, placeholders: Optional[Dict[str, str]], model: ModelCapabilities, options: AiCallOptions ) -> str: """ Reduce planning prompt size by summarizing placeholders while preserving prompt structure. """ if not placeholders: return self._reduceText(full_prompt, 0.7) # Reduce placeholders while preserving prompt reduced_placeholders = {} for placeholder, content in placeholders.items(): if len(content) > 1000: # Only reduce long content reduction_factor = 0.7 reduced_content = self._reduceText(content, reduction_factor) reduced_placeholders[placeholder] = reduced_content else: reduced_placeholders[placeholder] = content return self._buildPromptWithPlaceholders(full_prompt, reduced_placeholders) def _reduceTextPrompt( self, prompt: str, context: str, model: ModelCapabilities, options: AiCallOptions ) -> str: """ Reduce text prompt size using typeGroup-aware chunking and merging. """ max_size = int(model.maxTokens * (1 - options.safetyMargin)) if options.compressPrompt: # Reduce both prompt and context target_size = max_size current_size = len(prompt) + len(context) reduction_factor = (target_size * 0.7) / current_size if reduction_factor < 1.0: prompt = self._reduceText(prompt, reduction_factor) context = self._reduceText(context, reduction_factor) else: # Only reduce context, preserve prompt integrity max_context_size = max_size - len(prompt) if len(context) > max_context_size: reduction_factor = max_context_size / len(context) context = self._reduceText(context, reduction_factor) return prompt + "\n\n" + context if context else prompt def _extractTextFromContentParts(self, extracted_content) -> str: """ Extract text content from ExtractionService ContentPart objects. """ if not extracted_content or not hasattr(extracted_content, 'parts'): return "" text_parts = [] for part in extracted_content.parts: if hasattr(part, 'typeGroup') and part.typeGroup in ['text', 'table', 'structure']: if hasattr(part, 'data') and part.data: text_parts.append(part.data) return "\n\n".join(text_parts) def _reduceText(self, text: str, reduction_factor: float) -> str: """ Reduce text size by the specified factor. """ if reduction_factor >= 1.0: return text target_length = int(len(text) * reduction_factor) return text[:target_length] + "... [reduced]"