import logging import asyncio import uuid import base64 from typing import Dict, Any, List, Union, Tuple, Optional from dataclasses import dataclass import time logger = logging.getLogger(__name__) from modules.aicore.aicoreModelRegistry import modelRegistry from modules.aicore.aicoreModelSelector import modelSelector from modules.datamodels.datamodelAi import ( AiModel, AiCallOptions, AiCallRequest, AiCallResponse, OperationTypeEnum, AiModelCall, AiModelResponse, ) from modules.datamodels.datamodelExtraction import ContentPart, MergeStrategy # Dynamic model registry - models are now loaded from connectors via aicore system @dataclass(slots=True) class AiObjects: """Centralized AI interface: dynamically discovers and uses AI models. Includes web functionality.""" def __post_init__(self) -> None: # Auto-discover and register all available connectors self._discoverAndRegisterConnectors() def _discoverAndRegisterConnectors(self): """Auto-discover and register all available AI connectors.""" logger.info("Auto-discovering AI connectors...") # Use the model registry's built-in discovery mechanism discoveredConnectors = modelRegistry.discoverConnectors() # Register each discovered connector for connector in discoveredConnectors: modelRegistry.registerConnector(connector) logger.info(f"Registered connector: {connector.getConnectorType()}") logger.info(f"Total connectors registered: {len(discoveredConnectors)}") logger.info("All AI connectors registered with dynamic model registry") @classmethod async def create(cls) -> "AiObjects": """Create AiObjects instance with auto-discovered connectors.""" # No need to manually create connectors - they're auto-discovered return cls() def _selectModel(self, prompt: str, context: str, options: AiCallOptions) -> str: """Select the best model using dynamic model selection system. Returns displayName (unique identifier).""" # Get available models from the dynamic registry availableModels = modelRegistry.getAvailableModels() if not availableModels: logger.error("No models available in the registry") raise ValueError("No AI models available") # Use the dynamic model selector selectedModel = modelSelector.selectModel(prompt, context, options, availableModels) if not selectedModel: logger.error("No suitable model found for the given criteria") raise ValueError("No suitable AI model found") logger.info(f"Selected model: {selectedModel.name} ({selectedModel.displayName})") return selectedModel.displayName # AI for Extraction, Processing, Generation async def call(self, request: AiCallRequest, progressCallback=None) -> AiCallResponse: """Call AI model for text generation with model-aware chunking.""" # Handle content parts (unified path) if hasattr(request, 'contentParts') and request.contentParts: return await self._callWithContentParts(request, progressCallback) # Handle traditional text/context calls return await self._callWithTextContext(request) async def _callWithTextContext(self, request: AiCallRequest) -> AiCallResponse: """Call AI model for traditional text/context calls with fallback mechanism.""" prompt = request.prompt context = request.context or "" options = request.options # Input bytes will be calculated inside _callWithModel # Generation parameters are handled inside _callWithModel # Get failover models for this operation type availableModels = modelRegistry.getAvailableModels() failoverModelList = modelSelector.getFailoverModelList(prompt, context, options, availableModels) if not failoverModelList: errorMsg = f"No suitable models found for operation {options.operationType}" logger.error(errorMsg) return AiCallResponse( content=errorMsg, modelName="error", priceUsd=0.0, processingTime=0.0, bytesSent=0, bytesReceived=0, errorCount=1 ) # Try each model in failover sequence lastError = None for attempt, model in enumerate(failoverModelList): try: logger.info(f"Attempting AI call with model: {model.name} (attempt {attempt + 1}/{len(failoverModelList)})") # Call the model directly - no truncation or compression here response = await self._callWithModel(model, prompt, context, options) logger.info(f"✅ AI call successful with model: {model.name}") return response except Exception as e: lastError = e logger.warning(f"❌ AI call failed with model {model.name}: {str(e)}") # If this is not the last model, try the next one if attempt < len(failoverModelList) - 1: logger.info(f"🔄 Trying next failover model...") continue else: # All models failed logger.error(f"💥 All {len(failoverModelList)} models failed for operation {options.operationType}") break # All failover attempts failed - return error response errorMsg = f"All AI models failed for operation {options.operationType}. Last error: {str(lastError)}" logger.error(errorMsg) return AiCallResponse( content=errorMsg, modelName="error", priceUsd=0.0, processingTime=0.0, bytesSent=0, bytesReceived=0, errorCount=1 ) async def _callWithContentParts(self, request: AiCallRequest, progressCallback=None) -> AiCallResponse: """Process content parts with model-aware chunking (unified for single and multiple parts).""" prompt = request.prompt options = request.options contentParts = request.contentParts # Get failover models availableModels = modelRegistry.getAvailableModels() failoverModelList = modelSelector.getFailoverModelList(prompt, "", options, availableModels) if not failoverModelList: return self._createErrorResponse("No suitable models found", 0, 0) # Process each content part allResults = [] for contentPart in contentParts: partResult = await self._processContentPartWithFallback(contentPart, prompt, options, failoverModelList, progressCallback) allResults.append(partResult) # Merge all results mergedContent = self._mergePartResults(allResults) return AiCallResponse( content=mergedContent, modelName="multiple", priceUsd=sum(r.priceUsd for r in allResults), processingTime=sum(r.processingTime for r in allResults), bytesSent=sum(r.bytesSent for r in allResults), bytesReceived=sum(r.bytesReceived for r in allResults), errorCount=sum(r.errorCount for r in allResults) ) async def _processContentPartWithFallback(self, contentPart, prompt: str, options, failoverModelList, progressCallback=None) -> AiCallResponse: """Process a single content part with model-aware chunking and fallback.""" lastError = None # Check if this is an image - Vision models need special handling isImage = (contentPart.typeGroup == "image") or (contentPart.mimeType and contentPart.mimeType.startswith("image/")) # Determine the correct operation type based on content type # Images should use IMAGE_ANALYSE, not the generic operation type actualOperationType = options.operationType if isImage: actualOperationType = OperationTypeEnum.IMAGE_ANALYSE # Get vision-capable models for images availableModels = modelRegistry.getAvailableModels() visionFailoverList = modelSelector.getFailoverModelList(prompt, "", AiCallOptions(operationType=actualOperationType), availableModels) if visionFailoverList: logger.debug(f"Using {len(visionFailoverList)} vision-capable models for image processing") failoverModelList = visionFailoverList for attempt, model in enumerate(failoverModelList): try: logger.info(f"Processing content part with model: {model.name} (attempt {attempt + 1}/{len(failoverModelList)})") # Special handling for images with Vision models if isImage and hasattr(model, 'functionCall'): # Call model's functionCall directly (for Vision models this is callAiImage) from modules.datamodels.datamodelAi import AiModelCall, AiCallOptions as AiCallOpts try: # Validate and prepare image data if not contentPart.data: raise ValueError("Image content part has no data") # Ensure mimeType is valid mimeType = contentPart.mimeType or "image/jpeg" if not mimeType.startswith("image/"): raise ValueError(f"Invalid mimeType for image: {mimeType}") # Prepare base64 data if isinstance(contentPart.data, str): # Already base64 encoded - validate it try: base64.b64decode(contentPart.data, validate=True) base64Data = contentPart.data except Exception as e: raise ValueError(f"Invalid base64 data in contentPart: {str(e)}") elif isinstance(contentPart.data, bytes): # Binary data - encode to base64 base64Data = base64.b64encode(contentPart.data).decode('utf-8') else: raise ValueError(f"Unsupported data type for image: {type(contentPart.data)}") # Create data URL imageDataUrl = f"data:{mimeType};base64,{base64Data}" modelCall = AiModelCall( messages=[ { "role": "user", "content": [ {"type": "text", "text": prompt or ""}, { "type": "image_url", "image_url": { "url": imageDataUrl } } ] } ], model=model, options=AiCallOpts(operationType=actualOperationType) ) modelResponse = await model.functionCall(modelCall) if not modelResponse.success: raise ValueError(f"Model call failed: {modelResponse.error}") logger.info(f"✅ Image content part processed successfully with model: {model.name}") # Convert to AiCallResponse format return AiCallResponse( content=modelResponse.content, modelName=model.name, priceUsd=modelResponse.priceUsd if hasattr(modelResponse, 'priceUsd') else 0.0, processingTime=modelResponse.processingTime if hasattr(modelResponse, 'processingTime') else 0.0, bytesSent=0, # Will be calculated elsewhere bytesReceived=0, # Will be calculated elsewhere errorCount=0 ) except Exception as e: # Image processing failed with this model lastError = e logger.warning(f"❌ Image processing failed with model {model.name}: {str(e)}") # If this is not the last model, try the next one if attempt < len(failoverModelList) - 1: logger.info(f"🔄 Trying next fallback model for image processing...") continue else: # All models failed logger.error(f"💥 All {len(failoverModelList)} models failed for image processing") raise # For non-image parts, check if part fits in model context partSize = len(contentPart.data.encode('utf-8')) if contentPart.data else 0 modelContextBytes = model.contextLength * 4 # Convert tokens to bytes if partSize <= modelContextBytes: # Part fits - call AI directly response = await self._callWithModel(model, prompt, contentPart.data, options) logger.info(f"✅ Content part processed successfully with model: {model.name}") return response else: # Part too large - chunk it (pass prompt to account for it in chunk size calculation) chunks = await self._chunkContentPart(contentPart, model, options, prompt) if not chunks: raise ValueError(f"Failed to chunk content part for model {model.name}") logger.info(f"Starting to process {len(chunks)} chunks with model {model.name}") # Log progress if callback provided if progressCallback: progressCallback(0.0, f"Starting to process {len(chunks)} chunks") # Process each chunk chunkResults = [] for idx, chunk in enumerate(chunks): chunkNum = idx + 1 logger.info(f"Processing chunk {chunkNum}/{len(chunks)} with model {model.name}") # Calculate and log progress if progressCallback: progress = chunkNum / len(chunks) progressCallback(progress, f"Processing chunk {chunkNum}/{len(chunks)}") try: chunkResponse = await self._callWithModel(model, prompt, chunk['data'], options) chunkResults.append(chunkResponse) logger.info(f"✅ Chunk {chunkNum}/{len(chunks)} processed successfully") # Log completion progress if progressCallback: progressCallback(chunkNum / len(chunks), f"Chunk {chunkNum}/{len(chunks)} processed") except Exception as e: logger.error(f"❌ Error processing chunk {chunkNum}/{len(chunks)}: {str(e)}") raise # Merge chunk results mergedContent = self._mergeChunkResults(chunkResults) totalPrice = sum(r.priceUsd for r in chunkResults) totalTime = sum(r.processingTime for r in chunkResults) totalBytesSent = sum(r.bytesSent for r in chunkResults) totalBytesReceived = sum(r.bytesReceived for r in chunkResults) totalErrors = sum(r.errorCount for r in chunkResults) logger.info(f"✅ Content part chunked and processed with model: {model.name} ({len(chunks)} chunks)") return AiCallResponse( content=mergedContent, modelName=model.name, priceUsd=totalPrice, processingTime=totalTime, bytesSent=totalBytesSent, bytesReceived=totalBytesReceived, errorCount=totalErrors ) except Exception as e: lastError = e error_msg = str(e) if str(e) else f"{type(e).__name__}" error_detail = f"❌ Model {model.name} failed for content part: {error_msg}" if hasattr(e, 'detail') and e.detail: error_detail += f" | Detail: {e.detail}" if hasattr(e, 'status_code'): error_detail += f" | Status: {e.status_code}" logger.warning(error_detail, exc_info=True) if attempt < len(failoverModelList) - 1: logger.info(f"🔄 Trying next failover model...") continue else: logger.error(f"💥 All {len(failoverModelList)} models failed for content part") break # All models failed return self._createErrorResponse(f"All models failed: {str(lastError)}", 0, 0) async def _chunkContentPart(self, contentPart, model, options, prompt: str = "") -> List[Dict[str, Any]]: """Chunk a content part based on model capabilities, accounting for prompt, system message overhead, and maxTokens output.""" # Calculate model-specific chunk sizes modelContextTokens = model.contextLength # Total context in tokens modelMaxOutputTokens = model.maxTokens # Maximum output tokens # Reserve tokens for: # 1. Prompt (user message) promptTokens = len(prompt.encode('utf-8')) / 4 if prompt else 0 # 2. System message wrapper ("Context from documents:\n") systemMessageTokens = 10 # ~40 bytes = 10 tokens # 3. Max output tokens (model will reserve space for completion) outputTokens = modelMaxOutputTokens # 4. JSON structure and message overhead (~100 tokens) messageOverheadTokens = 100 # Total reserved tokens = input overhead + output reservation totalReservedTokens = promptTokens + systemMessageTokens + messageOverheadTokens + outputTokens # Available tokens for content = context length - reserved tokens # Use 80% of available for safety margin availableContentTokens = int((modelContextTokens - totalReservedTokens) * 0.8) # Ensure we have at least some space if availableContentTokens < 100: logger.warning(f"Very limited space for content: {availableContentTokens} tokens available. Model: {model.name}, contextLength: {modelContextTokens}, maxTokens: {modelMaxOutputTokens}, prompt: {promptTokens:.0f} tokens") availableContentTokens = max(100, int(modelContextTokens * 0.1)) # Fallback to 10% of context # Convert tokens to bytes (1 token ≈ 4 bytes) availableContentBytes = availableContentTokens * 4 logger.debug(f"Chunking calculation for {model.name}: contextLength={modelContextTokens} tokens, maxTokens={modelMaxOutputTokens} tokens, prompt={promptTokens:.0f} tokens, reserved={totalReservedTokens:.0f} tokens, available={availableContentTokens} tokens ({availableContentBytes} bytes)") # Use 70% of available content bytes for text chunks (conservative) textChunkSize = int(availableContentBytes * 0.7) imageChunkSize = int(availableContentBytes * 0.8) # 80% for image chunks # Build chunking options chunkingOptions = { "textChunkSize": textChunkSize, "imageChunkSize": imageChunkSize, "maxSize": availableContentBytes, "chunkAllowed": True } # Get appropriate chunker from modules.services.serviceExtraction.subRegistry import ChunkerRegistry chunkerRegistry = ChunkerRegistry() chunker = chunkerRegistry.resolve(contentPart.typeGroup) if not chunker: logger.warning(f"No chunker found for typeGroup: {contentPart.typeGroup}") return [] # Chunk the content part try: chunks = chunker.chunk(contentPart, chunkingOptions) logger.debug(f"Created {len(chunks)} chunks for {contentPart.typeGroup} part") return chunks except Exception as e: logger.error(f"Chunking failed for {contentPart.typeGroup}: {str(e)}") return [] def _mergePartResults(self, partResults: List[AiCallResponse]) -> str: """Merge part results using the existing sophisticated merging system.""" if not partResults: return "" # Convert AiCallResponse results to ContentParts for merging from modules.datamodels.datamodelExtraction import ContentPart from modules.services.serviceExtraction.subUtils import makeId content_parts = [] for i, result in enumerate(partResults): if result.content: content_part = ContentPart( id=str(uuid.uuid4()), parentId=None, label=f"ai_result_{i}", typeGroup="text", # Default to text for AI results mimeType="text/plain", data=result.content, metadata={ "aiResult": True, "modelName": result.modelName, "priceUsd": result.priceUsd, "processingTime": result.processingTime, "bytesSent": result.bytesSent, "bytesReceived": result.bytesReceived } ) content_parts.append(content_part) # Use existing merging system merge_strategy = MergeStrategy( useIntelligentMerging=True, groupBy="typeGroup", orderBy="id", mergeType="concatenate" ) merged_parts = applyMerging(content_parts, merge_strategy) # Convert merged parts back to final string final_content = "\n\n".join([part.data for part in merged_parts]) logger.info(f"Merged {len(partResults)} AI results using existing merging system") return final_content.strip() def _mergeChunkResults(self, chunkResults: List[AiCallResponse]) -> str: """Merge chunk results using the existing sophisticated merging system.""" if not chunkResults: return "" # Convert AiCallResponse results to ContentParts for merging content_parts = [] for i, result in enumerate(chunkResults): if result.content: content_part = ContentPart( id=str(uuid.uuid4()), parentId=None, label=f"chunk_result_{i}", typeGroup="text", # Default to text for AI results mimeType="text/plain", data=result.content, metadata={ "aiResult": True, "chunk": True, "modelName": result.modelName, "priceUsd": result.priceUsd, "processingTime": result.processingTime, "bytesSent": result.bytesSent, "bytesReceived": result.bytesReceived } ) content_parts.append(content_part) # Use existing merging system merge_strategy = MergeStrategy( useIntelligentMerging=True, groupBy="typeGroup", orderBy="id", mergeType="concatenate" ) merged_parts = applyMerging(content_parts, merge_strategy) # Convert merged parts back to final string final_content = "\n\n".join([part.data for part in merged_parts]) logger.info(f"Merged {len(chunkResults)} chunk results using existing merging system") return final_content.strip() def _createErrorResponse(self, errorMsg: str, inputBytes: int, outputBytes: int) -> AiCallResponse: """Create an error response.""" return AiCallResponse( content=errorMsg, modelName="error", priceUsd=0.0, processingTime=0.0, bytesSent=inputBytes, bytesReceived=outputBytes, errorCount=1 ) async def _callWithModel(self, model: AiModel, prompt: str, context: str, options: AiCallOptions = None) -> AiCallResponse: """Call a specific model and return the response.""" # Calculate input bytes from prompt and context inputBytes = len((prompt + context).encode('utf-8')) # Replace placeholder with model's maxTokens value if "" in prompt: if model.maxTokens > 0: tokenLimit = str(model.maxTokens) modelPrompt = prompt.replace("", tokenLimit) logger.debug(f"Replaced with {tokenLimit} for model {model.name}") else: raise ValueError(f"Model {model.name} has invalid maxTokens ({model.maxTokens}). Cannot set token limit.") else: modelPrompt = prompt # Update messages array with replaced content messages = [] if context: messages.append({"role": "system", "content": f"Context from documents:\n{context}"}) messages.append({"role": "user", "content": modelPrompt}) # Start timing startTime = time.time() # Call the model's function directly - completely generic if model.functionCall: # Create standardized call object modelCall = AiModelCall( messages=messages, model=model, options=options or {} ) # Log before calling model logger.debug(f"Calling model {model.name} with {len(messages)} messages, context size: {len(context.encode('utf-8'))} bytes") # Call the model with standardized interface modelResponse = await model.functionCall(modelCall) # Log after successful call logger.debug(f"Model {model.name} returned successfully") # Extract content from standardized response if not modelResponse.success: raise ValueError(f"Model call failed: {modelResponse.error}") content = modelResponse.content else: raise ValueError(f"Model {model.name} has no function call defined") # Calculate timing and output bytes endTime = time.time() processingTime = endTime - startTime outputBytes = len(content.encode("utf-8")) # Calculate price using model's own price calculation method priceUsd = model.calculatePriceUsd(processingTime, inputBytes, outputBytes) return AiCallResponse( content=content, modelName=model.name, priceUsd=priceUsd, processingTime=processingTime, bytesSent=inputBytes, bytesReceived=outputBytes, errorCount=0 ) # Utility methods async def listAvailableModels(self, connectorType: str = None) -> List[Dict[str, Any]]: """List available models, optionally filtered by connector type.""" models = modelRegistry.getAvailableModels() if connectorType: return [model.model_dump() for model in models if model.connectorType == connectorType] return [model.model_dump() for model in models] async def getModelInfo(self, displayName: str) -> Dict[str, Any]: """Get information about a specific model by displayName.""" model = modelRegistry.getModel(displayName) if not model: raise ValueError(f"Model with displayName '{displayName}' not found") return model.model_dump() async def getModelsByTag(self, tag: str) -> List[str]: """Get model displayNames that have a specific tag. Returns displayNames (unique identifiers).""" models = modelRegistry.getModelsByTag(tag) return [model.displayName for model in models] def applyMerging(parts: List[ContentPart], strategy: MergeStrategy) -> List[ContentPart]: """Apply merging strategy to parts with intelligent token-aware merging.""" logger.debug(f"applyMerging called with {len(parts)} parts") # Import merging dependencies from modules.services.serviceExtraction.merging.mergerText import TextMerger from modules.services.serviceExtraction.merging.mergerTable import TableMerger from modules.services.serviceExtraction.merging.mergerDefault import DefaultMerger from modules.services.serviceExtraction.subMerger import IntelligentTokenAwareMerger # Check if intelligent merging is enabled if strategy.useIntelligentMerging: modelCapabilities = strategy.capabilities or {} subMerger = IntelligentTokenAwareMerger(modelCapabilities) # Use intelligent merging for all parts merged = subMerger.mergeChunksIntelligently(parts, strategy.prompt or "") # Calculate and log optimization stats stats = subMerger.calculateOptimizationStats(parts, merged) logger.info(f"🧠 Intelligent merging stats: {stats}") logger.debug(f"Intelligent merging: {stats['original_ai_calls']} → {stats['optimized_ai_calls']} calls ({stats['reduction_percent']}% reduction)") return merged # Fallback to traditional merging textMerger = TextMerger() tableMerger = TableMerger() defaultMerger = DefaultMerger() # Group by typeGroup textParts = [p for p in parts if p.typeGroup == "text"] tableParts = [p for p in parts if p.typeGroup == "table"] structureParts = [p for p in parts if p.typeGroup == "structure"] otherParts = [p for p in parts if p.typeGroup not in ("text", "table", "structure")] logger.debug(f"Grouped - text: {len(textParts)}, table: {len(tableParts)}, structure: {len(structureParts)}, other: {len(otherParts)}") merged: List[ContentPart] = [] if textParts: textMerged = textMerger.merge(textParts, strategy) logger.debug(f"TextMerger merged {len(textParts)} parts into {len(textMerged)} parts") merged.extend(textMerged) if tableParts: tableMerged = tableMerger.merge(tableParts, strategy) logger.debug(f"TableMerger merged {len(tableParts)} parts into {len(tableMerged)} parts") merged.extend(tableMerged) if structureParts: # For now, treat structure like text structureMerged = textMerger.merge(structureParts, strategy) logger.debug(f"StructureMerger merged {len(structureParts)} parts into {len(structureMerged)} parts") merged.extend(structureMerged) if otherParts: otherMerged = defaultMerger.merge(otherParts, strategy) logger.debug(f"DefaultMerger merged {len(otherParts)} parts into {len(otherMerged)} parts") merged.extend(otherMerged) logger.debug(f"applyMerging returning {len(merged)} parts") return merged