""" Smart AI Engine with intelligent content management and model selection """ import logging import asyncio from typing import List, Dict, Any, Optional, Tuple from modules.interfaces.interfaceAiEngine import ( AIEngine, AIRequest, AIResponse, AIModelType, ProcessingStrategy, ContentReductionStrategy, ModelCapabilities, ContentReducer ) from modules.interfaces.interfaceChatModel import ChatDocument from modules.interfaces.interfaceAiCalls import AiCalls from modules.chat.documents.documentExtraction import DocumentExtraction from modules.shared.configuration import APP_CONFIG logger = logging.getLogger(__name__) class SmartAIEngine(AIEngine): """Smart AI Engine with automatic content management and model selection""" def __init__(self, service_center=None): self.service_center = service_center self.ai_calls = AiCalls() self.document_processor = DocumentExtraction(service_center) self.content_reducer = SmartContentReducer(service_center) # Model capabilities mapping self.model_capabilities = { AIModelType.OPENAI_GPT4: ModelCapabilities( max_tokens=8192, max_input_tokens=128000, supports_vision=False, supports_function_calling=True, cost_per_1k_tokens=0.03, processing_speed="medium" ), AIModelType.OPENAI_GPT35: ModelCapabilities( max_tokens=4096, max_input_tokens=16384, supports_vision=False, supports_function_calling=True, cost_per_1k_tokens=0.002, processing_speed="fast" ), AIModelType.ANTHROPIC_CLAUDE: ModelCapabilities( max_tokens=4096, max_input_tokens=200000, supports_vision=False, supports_function_calling=False, cost_per_1k_tokens=0.015, processing_speed="medium" ), AIModelType.OPENAI_VISION: ModelCapabilities( max_tokens=4096, max_input_tokens=128000, supports_vision=True, supports_function_calling=False, cost_per_1k_tokens=0.01, processing_speed="slow" ) } # Processing strategy preferences self.strategy_preferences = { "task_planning": ProcessingStrategy.SINGLE_CALL, "action_definition": ProcessingStrategy.SINGLE_CALL, "document_extraction": ProcessingStrategy.DOCUMENT_BY_DOCUMENT, "report_generation": ProcessingStrategy.CHUNKED_PROCESSING, "email_composition": ProcessingStrategy.SINGLE_CALL, "chat_summarization": ProcessingStrategy.SUMMARIZED_CONTENT } async def process_request(self, request: AIRequest) -> AIResponse: """Process AI request with intelligent content management""" try: # Step 1: Determine optimal processing strategy strategy = self._determine_processing_strategy(request) request.processing_strategy = strategy # Step 2: Estimate token usage estimated_tokens = await self.estimate_token_usage(request) # Step 3: Select appropriate model model = self._select_optimal_model(request, estimated_tokens) # Step 4: Process with selected strategy if strategy == ProcessingStrategy.SINGLE_CALL: return await self._process_single_call(request, model) elif strategy == ProcessingStrategy.DOCUMENT_BY_DOCUMENT: return await self._process_document_by_document(request, model) elif strategy == ProcessingStrategy.CHUNKED_PROCESSING: return await self._process_chunked(request, model) elif strategy == ProcessingStrategy.SUMMARIZED_CONTENT: return await self._process_with_summarization(request, model) else: raise ValueError(f"Unknown processing strategy: {strategy}") except Exception as e: logger.error(f"Error processing AI request: {str(e)}") return AIResponse( success=False, content="", model_used=AIModelType.OPENAI_GPT35, processing_strategy=ProcessingStrategy.SINGLE_CALL, error=str(e) ) def _determine_processing_strategy(self, request: AIRequest) -> ProcessingStrategy: """Determine the best processing strategy based on request characteristics""" # Use explicit strategy if provided if request.processing_strategy: return request.processing_strategy # Determine based on metadata or content characteristics metadata = request.metadata or {} operation_type = metadata.get("operation_type", "general") # Check if we have a preference for this operation type if operation_type in self.strategy_preferences: return self.strategy_preferences[operation_type] # Auto-determine based on content characteristics num_documents = len(request.documents) prompt_length = len(request.prompt) if num_documents == 0: return ProcessingStrategy.SINGLE_CALL elif num_documents == 1: return ProcessingStrategy.SINGLE_CALL elif num_documents <= 3 and prompt_length < 1000: return ProcessingStrategy.SINGLE_CALL elif num_documents > 5: return ProcessingStrategy.DOCUMENT_BY_DOCUMENT else: return ProcessingStrategy.CHUNKED_PROCESSING def _select_optimal_model(self, request: AIRequest, estimated_tokens: int) -> AIModelType: """Select the optimal AI model based on request characteristics""" # Use preferred model if specified and suitable if request.preferred_model: capabilities = self.get_model_capabilities(request.preferred_model) if estimated_tokens <= capabilities.max_input_tokens: return request.preferred_model # Select model based on requirements metadata = request.metadata or {} requires_vision = metadata.get("requires_vision", False) requires_function_calling = metadata.get("requires_function_calling", False) # Filter models by requirements suitable_models = [] for model, capabilities in self.model_capabilities.items(): if estimated_tokens <= capabilities.max_input_tokens: if requires_vision and not capabilities.supports_vision: continue if requires_function_calling and not capabilities.supports_function_calling: continue suitable_models.append((model, capabilities)) if not suitable_models: # If no model can handle the full content, use the one with highest capacity best_model = max(self.model_capabilities.items(), key=lambda x: x[1].max_input_tokens) logger.warning(f"No model can handle {estimated_tokens} tokens, using {best_model[0]}") return best_model[0] # Select based on cost and speed preferences # For now, prefer Claude for large content, GPT-4 for complex tasks, GPT-3.5 for simple tasks if estimated_tokens > 50000: return AIModelType.ANTHROPIC_CLAUDE elif metadata.get("complex_task", False): return AIModelType.OPENAI_GPT4 else: return AIModelType.OPENAI_GPT35 async def _process_single_call(self, request: AIRequest, model: AIModelType) -> AIResponse: """Process request with a single AI call""" try: # Prepare content content = await self._prepare_content_for_single_call(request) # Make AI call if model in [AIModelType.OPENAI_GPT4, AIModelType.OPENAI_GPT35]: response = await self.ai_calls.callAiTextAdvanced(content, request.context) elif model == AIModelType.ANTHROPIC_CLAUDE: response = await self.ai_calls.callAiTextAdvanced(content, request.context) else: raise ValueError(f"Unsupported model for single call: {model}") return AIResponse( success=True, content=response, model_used=model, processing_strategy=ProcessingStrategy.SINGLE_CALL ) except Exception as e: # If single call fails due to size, try with content reduction if "too large" in str(e).lower() or "400" in str(e): return await self._process_with_content_reduction(request, model) else: raise e async def _process_document_by_document(self, request: AIRequest, model: AIModelType) -> AIResponse: """Process each document separately and merge results""" try: results = [] for i, document in enumerate(request.documents): # Create individual request for each document doc_request = AIRequest( prompt=request.prompt, documents=[document], context=request.context, preferred_model=model, metadata=request.metadata ) # Process document doc_response = await self._process_single_call(doc_request, model) if doc_response.success: results.append(f"Document {i+1} ({document.fileName}):\n{doc_response.content}") else: results.append(f"Document {i+1} ({document.fileName}): Error - {doc_response.error}") # Merge results merged_content = "\n\n".join(results) return AIResponse( success=True, content=merged_content, model_used=model, processing_strategy=ProcessingStrategy.DOCUMENT_BY_DOCUMENT ) except Exception as e: logger.error(f"Error in document-by-document processing: {str(e)}") return AIResponse( success=False, content="", model_used=model, processing_strategy=ProcessingStrategy.DOCUMENT_BY_DOCUMENT, error=str(e) ) async def _process_chunked(self, request: AIRequest, model: AIModelType) -> AIResponse: """Process content in chunks and merge results""" try: # This would implement chunked processing logic # For now, fall back to document-by-document return await self._process_document_by_document(request, model) except Exception as e: logger.error(f"Error in chunked processing: {str(e)}") return AIResponse( success=False, content="", model_used=model, processing_strategy=ProcessingStrategy.CHUNKED_PROCESSING, error=str(e) ) async def _process_with_summarization(self, request: AIRequest, model: AIModelType) -> AIResponse: """Process with content summarization first""" try: # Summarize documents first summarized_docs = [] for document in request.documents: summary_doc = await self.content_reducer.summarize_document( document, f"Summarize this document for: {request.prompt}" ) summarized_docs.append(summary_doc) # Create new request with summarized documents summary_request = AIRequest( prompt=request.prompt, documents=summarized_docs, context=request.context, preferred_model=model, metadata=request.metadata ) # Process with summarized content return await self._process_single_call(summary_request, model) except Exception as e: logger.error(f"Error in summarization processing: {str(e)}") return AIResponse( success=False, content="", model_used=model, processing_strategy=ProcessingStrategy.SUMMARIZED_CONTENT, error=str(e) ) async def _process_with_content_reduction(self, request: AIRequest, model: AIModelType) -> AIResponse: """Process with automatic content reduction""" try: # Determine reduction strategy strategy = self._determine_reduction_strategy(request) # Reduce content reduced_docs, reduced_prompt = await self.content_reducer.reduce_content( request.documents, request.prompt, strategy, target_reduction=0.5 ) # Create new request with reduced content reduced_request = AIRequest( prompt=reduced_prompt, documents=reduced_docs, context=request.context, preferred_model=model, metadata=request.metadata ) # Try processing with reduced content return await self._process_single_call(reduced_request, model) except Exception as e: logger.error(f"Error in content reduction processing: {str(e)}") return AIResponse( success=False, content="", model_used=model, processing_strategy=ProcessingStrategy.SINGLE_CALL, error=f"Content reduction failed: {str(e)}" ) def _determine_reduction_strategy(self, request: AIRequest) -> ContentReductionStrategy: """Determine the best content reduction strategy""" # Use explicit strategy if provided if request.reduction_strategy: return request.reduction_strategy # Auto-determine based on request characteristics metadata = request.metadata or {} operation_type = metadata.get("operation_type", "general") # Different strategies for different operations if operation_type in ["task_planning", "action_definition"]: # For planning tasks, prompt is crucial return ContentReductionStrategy.REDUCE_DOCUMENTS_ONLY elif operation_type in ["document_extraction", "report_generation"]: # For document processing, documents are crucial return ContentReductionStrategy.REDUCE_PROMPT_AND_DOCS else: # Default: reduce both return ContentReductionStrategy.REDUCE_PROMPT_AND_DOCS async def _prepare_content_for_single_call(self, request: AIRequest) -> str: """Prepare content for a single AI call""" content_parts = [request.prompt] if request.context: content_parts.append(f"Context: {request.context}") # Add document content for i, document in enumerate(request.documents): try: # Extract document content extracted = await self.service_center.extractContentFromDocument( "Extract all relevant text content", document ) if extracted and extracted.contents: doc_content = "\n".join([item.data for item in extracted.contents]) content_parts.append(f"Document {i+1} ({document.fileName}):\n{doc_content}") else: content_parts.append(f"Document {i+1} ({document.fileName}): [No content extracted]") except Exception as e: logger.warning(f"Could not extract content from document {document.fileName}: {str(e)}") content_parts.append(f"Document {i+1} ({document.fileName}): [Error extracting content]") return "\n\n".join(content_parts) def get_model_capabilities(self, model: AIModelType) -> ModelCapabilities: """Get capabilities for a specific model""" return self.model_capabilities.get(model, self.model_capabilities[AIModelType.OPENAI_GPT35]) async def estimate_token_usage(self, request: AIRequest) -> int: """Estimate token usage for a request""" # Simple estimation: ~4 characters per token prompt_tokens = len(request.prompt) // 4 context_tokens = len(request.context or "") // 4 # Estimate document tokens doc_tokens = 0 for document in request.documents: # Rough estimate based on file size doc_tokens += document.fileSize // 4 return prompt_tokens + context_tokens + doc_tokens class SmartContentReducer(ContentReducer): """Smart content reducer using document extraction engine""" def __init__(self, service_center): self.service_center = service_center self.document_processor = DocumentExtraction(service_center) async def reduce_content( self, documents: List[ChatDocument], prompt: str, strategy: ContentReductionStrategy, target_reduction: float = 0.5 ) -> Tuple[List[ChatDocument], str]: """Reduce content size while preserving important information""" reduced_docs = [] reduced_prompt = prompt # Sort documents by size (largest first) sorted_docs = sorted(documents, key=lambda d: d.fileSize, reverse=True) for document in sorted_docs: try: # Create reduction prompt based on strategy if strategy == ContentReductionStrategy.REDUCE_DOCUMENTS_ONLY: reduction_prompt = f""" Summarize this document to {int(100 * (1 - target_reduction))}% of its original size. Focus on the most important information relevant to: {prompt} Preserve key facts, data, and conclusions. """ elif strategy == ContentReductionStrategy.SUMMARIZE_DOCUMENTS: reduction_prompt = f""" Create a concise summary of this document focusing on: {prompt} Include only the most relevant information. """ else: # REDUCE_PROMPT_AND_DOCS or EXTRACT_KEY_INFO reduction_prompt = f""" Extract only the key information from this document that is relevant to: {prompt} Be very selective and concise. """ # Process document with reduction extracted = await self.service_center.extractContentFromDocument( reduction_prompt, document ) if extracted and extracted.contents: # Create new document with reduced content reduced_content = "\n".join([item.data for item in extracted.contents]) reduced_doc = await self._create_reduced_document(document, reduced_content) reduced_docs.append(reduced_doc) else: # If reduction fails, keep original document reduced_docs.append(document) except Exception as e: logger.warning(f"Could not reduce document {document.fileName}: {str(e)}") reduced_docs.append(document) # Reduce prompt if strategy requires it if strategy in [ContentReductionStrategy.REDUCE_PROMPT_AND_DOCS]: reduced_prompt = self._reduce_prompt(prompt, target_reduction) return reduced_docs, reduced_prompt async def summarize_document( self, document: ChatDocument, focus_prompt: str ) -> ChatDocument: """Create a summary of a document focused on specific aspects""" summary_prompt = f""" Create a comprehensive summary of this document focusing on: {focus_prompt} Include: - Key points and main ideas - Important data and statistics - Conclusions and recommendations - Any relevant details Keep the summary concise but informative. """ try: extracted = await self.service_center.extractContentFromDocument( summary_prompt, document ) if extracted and extracted.contents: summary_content = "\n".join([item.data for item in extracted.contents]) return await self._create_reduced_document(document, summary_content) else: return document except Exception as e: logger.warning(f"Could not summarize document {document.fileName}: {str(e)}") return document async def _create_reduced_document(self, original_doc: ChatDocument, reduced_content: str) -> ChatDocument: """Create a new document with reduced content""" try: # Create new file with reduced content file_id = self.service_center.createFile( f"reduced_{original_doc.fileName}", "text/plain", reduced_content, base64encoded=False ) # Create new document return self.service_center.createDocument( f"reduced_{original_doc.fileName}", "text/plain", reduced_content, base64encoded=False, existing_file_id=file_id ) except Exception as e: logger.error(f"Could not create reduced document: {str(e)}") return original_doc def _reduce_prompt(self, prompt: str, target_reduction: float) -> str: """Reduce prompt size while preserving essential information""" # Simple prompt reduction - keep first and last parts lines = prompt.split('\n') if len(lines) <= 3: return prompt # Keep first 30% and last 20% of lines keep_start = int(len(lines) * 0.3) keep_end = int(len(lines) * 0.2) reduced_lines = lines[:keep_start] + ["... (content reduced) ..."] + lines[-keep_end:] return '\n'.join(reduced_lines)