diff --git a/modules/aicore/aicoreModelRegistry.py b/modules/aicore/aicoreModelRegistry.py index 8d6c5ac6..a3666114 100644 --- a/modules/aicore/aicoreModelRegistry.py +++ b/modules/aicore/aicoreModelRegistry.py @@ -48,7 +48,7 @@ class ModelRegistry: try: # Import the module - module = importlib.import_module(f'modules.connectors.{moduleName}') + module = importlib.import_module(f'modules.aicore.{moduleName}') # Find connector classes (classes that inherit from BaseConnectorAi) for attrName in dir(module): diff --git a/modules/aicore/aicoreModelSelectionConfig.py b/modules/aicore/aicoreModelSelectionConfig.py deleted file mode 100644 index 476dc527..00000000 --- a/modules/aicore/aicoreModelSelectionConfig.py +++ /dev/null @@ -1,158 +0,0 @@ -""" -Configuration for dynamic model selection rules. -This makes model selection configurable rather than hardcoded. -""" - -from typing import Dict, List, Any -from modules.datamodels.datamodelAi import OperationTypeEnum, ModelCapabilitiesEnum, PriorityEnum, SelectionRule - - -class ModelSelectionConfig: - """Configuration for model selection rules.""" - - def __init__(self): - self.rules = self._loadDefaultRules() - self.fallbackModels = self._loadFallbackModels() - - def _loadDefaultRules(self) -> List[SelectionRule]: - """Load default selection rules.""" - return [ - # High quality for planning and analysis - SelectionRule( - name="highQualityAnalysis", - condition="Planning or analysis operations requiring high quality", - weight=10.0, - operationTypes=[OperationTypeEnum.PLAN, OperationTypeEnum.ANALYSE], - priority=PriorityEnum.QUALITY, - capabilities=[ModelCapabilitiesEnum.TEXT_GENERATION, ModelCapabilitiesEnum.REASONING, ModelCapabilitiesEnum.ANALYSIS], - minQualityRating=8 - ), - - # Fast processing for basic operations - SelectionRule( - name="fastBasicProcessing", - condition="Basic operations requiring speed", - weight=8.0, - operationTypes=[OperationTypeEnum.GENERAL], - priority=PriorityEnum.SPEED, - capabilities=[ModelCapabilitiesEnum.TEXT_GENERATION, ModelCapabilitiesEnum.CHAT], - minQualityRating=5 - ), - - # Cost-effective for high-volume operations - SelectionRule( - name="costEffectiveProcessing", - condition="High-volume operations where cost matters", - weight=7.0, - operationTypes=[OperationTypeEnum.GENERAL, OperationTypeEnum.GENERATE], - priority=PriorityEnum.COST, - capabilities=[ModelCapabilitiesEnum.TEXT_GENERATION], - maxCost=0.01 # $0.01 per 1k tokens - ), - - # Image analysis specific - SelectionRule( - name="imageAnalyse", - condition="Image analysis operations", - weight=10.0, - operationTypes=[OperationTypeEnum.IMAGE_ANALYSE], - priority=PriorityEnum.QUALITY, - capabilities=[ModelCapabilitiesEnum.VISION, ModelCapabilitiesEnum.MULTIMODAL], - minQualityRating=8 - ), - - # Web research specific - SelectionRule( - name="webResearch", - condition="Web research operations", - weight=9.0, - operationTypes=[OperationTypeEnum.WEB_RESEARCH], - priority=PriorityEnum.BALANCED, - capabilities=[ModelCapabilitiesEnum.TEXT_GENERATION, ModelCapabilitiesEnum.ANALYSIS, ModelCapabilitiesEnum.WEB_SEARCH], - minQualityRating=7 - ), - - # Large context requirements - SelectionRule( - name="largeContext", - condition="Operations requiring large context", - weight=8.0, - operationTypes=[OperationTypeEnum.GENERAL, OperationTypeEnum.ANALYSE], - priority=PriorityEnum.BALANCED, - capabilities=[ModelCapabilitiesEnum.TEXT_GENERATION], - minContextLength=100000 # 100k tokens - ) - ] - - def _loadFallbackModels(self) -> Dict[str, Dict[str, Any]]: - """Load fallback model selection criteria.""" - return { - OperationTypeEnum.GENERAL: { - "priorityOrder": ["speed", "quality", "cost"], - "operationTypes": [ModelCapabilitiesEnum.TEXT_GENERATION, ModelCapabilitiesEnum.CHAT], - "minQualityRating": 5, - "maxCostPer1k": 0.01 - }, - OperationTypeEnum.IMAGE_ANALYSE: { - "priorityOrder": ["quality", "speed"], - "operationTypes": [ModelCapabilitiesEnum.VISION, ModelCapabilitiesEnum.MULTIMODAL], - "minQualityRating": 8, - "maxCostPer1k": 0.1 - }, - OperationTypeEnum.IMAGE_GENERATE: { - "priorityOrder": ["quality", "speed"], - "operationTypes": [ModelCapabilitiesEnum.IMAGE_GENERATE, ModelCapabilitiesEnum.ART, ModelCapabilitiesEnum.VISUAL_CREATION], - "minQualityRating": 8, - "maxCostPer1k": 0.1 - }, - OperationTypeEnum.WEB_RESEARCH: { - "priorityOrder": ["quality", "speed", "cost"], - "operationTypes": [ModelCapabilitiesEnum.TEXT_GENERATION, ModelCapabilitiesEnum.ANALYSIS], - "preferredTags": [ModelCapabilitiesEnum.WEB_SEARCH], - "minQualityRating": 7, - "maxCostPer1k": 0.02 - }, - OperationTypeEnum.PLAN: { - "priorityOrder": ["quality", "speed"], - "operationTypes": [ModelCapabilitiesEnum.TEXT_GENERATION, ModelCapabilitiesEnum.REASONING, ModelCapabilitiesEnum.ANALYSIS], - "preferredTags": [PriorityEnum.QUALITY], - "minQualityRating": 8, - "maxCostPer1k": 0.1 - }, - OperationTypeEnum.ANALYSE: { - "priorityOrder": ["quality", "speed"], - "operationTypes": [ModelCapabilitiesEnum.TEXT_GENERATION, ModelCapabilitiesEnum.ANALYSIS, ModelCapabilitiesEnum.REASONING], - "preferredTags": [PriorityEnum.QUALITY], - "minQualityRating": 8, - "maxCostPer1k": 0.1 - } - } - - def getRulesForOperation(self, operationType: str) -> List[SelectionRule]: - """Get rules that apply to a specific operation type.""" - return [rule for rule in self.rules if operationType in rule.operationTypes] - - def getFallbackCriteria(self, operationType: str) -> Dict[str, Any]: - """Get fallback selection criteria for a specific operation type.""" - return self.fallbackModels.get(operationType, self.fallbackModels[OperationTypeEnum.GENERAL]) - - def addRule(self, rule: SelectionRule): - """Add a new selection rule.""" - self.rules.append(rule) - - def removeRule(self, ruleName: str): - """Remove a selection rule by name.""" - self.rules = [rule for rule in self.rules if rule.name != ruleName] - - def updateRule(self, ruleName: str, **kwargs): - """Update an existing rule.""" - for rule in self.rules: - if rule.name == ruleName: - for key, value in kwargs.items(): - if hasattr(rule, key): - setattr(rule, key, value) - break - - -# Global configuration instance -model_selection_config = ModelSelectionConfig() diff --git a/modules/aicore/aicoreModelSelector.py b/modules/aicore/aicoreModelSelector.py index 4f3de674..76674ed8 100644 --- a/modules/aicore/aicoreModelSelector.py +++ b/modules/aicore/aicoreModelSelector.py @@ -1,20 +1,20 @@ """ -Dynamic model selector using configurable rules and scoring. +Simplified model selection based on model properties and priority-based sorting. +No complex rules needed - just filter by properties and sort by priority! """ import logging -from typing import List, Optional, Dict, Any, Tuple -from modules.datamodels.datamodelAi import AiModel, AiCallOptions, OperationTypeEnum, PriorityEnum, ProcessingModeEnum, ModelCapabilitiesEnum -from modules.aicore.aicoreModelSelectionConfig import model_selection_config +from typing import List, Dict, Any, Optional +from modules.datamodels.datamodelAi import AiModel, AiCallOptions, OperationTypeEnum, PriorityEnum, ProcessingModeEnum +# Configure logger logger = logging.getLogger(__name__) - class ModelSelector: - """Dynamic model selector using configurable rules.""" + """Simple model selector based on properties and priority-based sorting.""" def __init__(self): - self.config = model_selection_config + logger.info("ModelSelector initialized with simplified approach") def selectModel(self, prompt: str, @@ -22,270 +22,7 @@ class ModelSelector: options: AiCallOptions, availableModels: List[AiModel]) -> Optional[AiModel]: """ - Select the best model based on configurable rules and scoring. - - Args: - prompt: User prompt - context: Context data - options: AI call options - availableModels: List of available models to choose from - - Returns: - Selected model or None if no suitable model found - """ - if not availableModels: - logger.warning("No models available for selection") - return None - - logger.info(f"Selecting model for operation: {options.operationType}, priority: {options.priority}") - - # Calculate input size - inputSize = len(prompt.encode("utf-8")) + len(context.encode("utf-8")) - - # Get applicable rules - rules = self.config.getRulesForOperation(options.operationType) - logger.debug(f"Found {len(rules)} applicable rules for {options.operationType}") - - # Score each model - scoredModels = [] - for model in availableModels: - if not model.isAvailable: - continue - - score = self._calculateModelScore(model, inputSize, options, rules) - if score > 0: # Only consider models with positive scores - scoredModels.append((model, score)) - logger.debug(f"Model {model.name}: score={score:.2f}") - - if not scoredModels: - logger.warning("No models passed the selection criteria, trying fallback criteria") - # Try fallback criteria - fallbackCriteria = self.getFallbackCriteria(options.operationType) - return self._selectWithFallbackCriteria(availableModels, fallbackCriteria, inputSize, options) - - # Sort by score (highest first) - scoredModels.sort(key=lambda x: x[1], reverse=True) - - selectedModel = scoredModels[0][0] - selectedScore = scoredModels[0][1] - - logger.info(f"Selected model: {selectedModel.name} (score: {selectedScore:.2f})") - - # Log selection details - self._logSelectionDetails(selectedModel, inputSize, options) - - return selectedModel - - def _calculateModelScore(self, - model: AiModel, - inputSize: int, - options: AiCallOptions, - rules: List) -> float: - """Calculate score for a model based on rules and criteria.""" - score = 0.0 - - # Check basic requirements - if not self._meetsBasicRequirements(model, inputSize, options): - return 0.0 - - # Apply rules - for rule in rules: - ruleScore = self._applyRule(model, inputSize, options, rule) - score += ruleScore * rule.weight - - # Apply priority-based scoring - priorityScore = self._applyPriorityScoring(model, options) - score += priorityScore - - # Apply processing mode scoring - modeScore = self._applyProcessingModeScoring(model, options) - score += modeScore - - # Apply cost constraints - if not self._meetsCostConstraints(model, inputSize, options): - score *= 0.1 # Heavily penalize but don't eliminate - - return max(0.0, score) - - def _meetsBasicRequirements(self, model: AiModel, inputSize: int, options: AiCallOptions) -> bool: - """Check if model meets basic requirements.""" - # Context length check - if model.contextLength > 0 and inputSize > model.contextLength * 0.8: - logger.debug(f"Model {model.name} rejected: input too large ({inputSize} > {model.contextLength * 0.8})") - return False - - # Required operation types check - if options.operationTypes: - if not all(opType in model.operationTypes for opType in options.operationTypes): - logger.debug(f"Model {model.name} rejected: missing required operation types") - return False - - # Capabilities check - if options.capabilities: - if not all(cap in model.capabilities for cap in options.capabilities): - logger.debug(f"Model {model.name} rejected: missing required capabilities") - return False - - # Avoid operation types check - for rule in self.config.getRulesForOperation(options.operationType): - if any(opType in model.operationTypes for opType in rule.avoidOperationTypes): - logger.debug(f"Model {model.name} rejected: has avoid operation types") - return False - - return True - - def _applyRule(self, model: AiModel, inputSize: int, options: AiCallOptions, rule) -> float: - """Apply a specific rule to calculate score contribution.""" - score = 0.0 - - # Required operation types match - if all(opType in model.operationTypes for opType in rule.operationTypes): - score += 1.0 - - # Preferred capabilities match - preferredMatches = sum(1 for cap in rule.preferredCapabilities if cap in model.capabilities) - if rule.preferredCapabilities: - score += (preferredMatches / len(rule.preferredCapabilities)) * 0.5 - - # Quality rating check - if rule.minQualityRating and model.qualityRating >= rule.minQualityRating: - score += 0.3 - - # Context length check - if rule.minContextLength and model.contextLength >= rule.minContextLength: - score += 0.2 - - return score - - def _applyPriorityScoring(self, model: AiModel, options: AiCallOptions) -> float: - """Apply priority-based scoring.""" - if options.priority == PriorityEnum.SPEED: - return model.speedRating * 0.1 - elif options.priority == PriorityEnum.QUALITY: - return model.qualityRating * 0.1 - elif options.priority == PriorityEnum.COST: - # Lower cost = higher score - costScore = max(0, 1.0 - (model.costPer1kTokensInput * 1000)) - return costScore * 0.1 - else: # BALANCED - return (model.qualityRating + model.speedRating) * 0.05 - - def _applyProcessingModeScoring(self, model: AiModel, options: AiCallOptions) -> float: - """Apply processing mode scoring.""" - if options.processingMode == ProcessingModeEnum.DETAILED: - if model.priority == PriorityEnum.QUALITY: - return 0.2 - elif options.processingMode == ProcessingModeEnum.BASIC: - if model.priority == PriorityEnum.SPEED: - return 0.2 - - return 0.0 - - def _meetsCostConstraints(self, model: AiModel, inputSize: int, options: AiCallOptions) -> bool: - """Check if model meets cost constraints.""" - if options.maxCost is None: - return True - - # Estimate cost - estimatedTokens = inputSize / 4 - estimatedCost = (estimatedTokens / 1000) * model.costPer1kTokensInput - - return estimatedCost <= options.maxCost - - def _logSelectionDetails(self, model: AiModel, inputSize: int, options: AiCallOptions): - """Log detailed selection information.""" - logger.info(f"Model Selection Details:") - logger.info(f" Selected: {model.displayName} ({model.name})") - logger.info(f" Connector: {model.connectorType}") - logger.info(f" Operation: {options.operationType}") - logger.info(f" Priority: {options.priority}") - logger.info(f" Processing Mode: {options.processingMode}") - logger.info(f" Input Size: {inputSize} bytes") - logger.info(f" Context Length: {model.contextLength}") - logger.info(f" Max Tokens: {model.maxTokens}") - logger.info(f" Quality Rating: {model.qualityRating}/10") - logger.info(f" Speed Rating: {model.speedRating}/10") - logger.info(f" Cost: ${model.costPer1kTokensInput:.4f}/1k tokens") - logger.info(f" Capabilities: {', '.join(model.capabilities)}") - logger.info(f" Priority: {model.priority}") - - def getFallbackCriteria(self, operationType: str) -> Dict[str, Any]: - """Get fallback selection criteria for an operation type.""" - return self.config.getFallbackCriteria(operationType) - - def _selectWithFallbackCriteria(self, - availableModels: List[AiModel], - fallbackCriteria: Dict[str, Any], - inputSize: int, - options: AiCallOptions) -> Optional[AiModel]: - """Select model using fallback criteria when normal selection fails.""" - logger.info("Using fallback criteria for model selection") - - # Filter models by fallback criteria - candidates = [] - for model in availableModels: - if not model.isAvailable: - continue - - # Check required operation types - if fallbackCriteria.get("operationTypes"): - if not all(opType in model.operationTypes for opType in fallbackCriteria["operationTypes"]): - continue - - # Check quality rating - if fallbackCriteria.get("minQualityRating"): - if model.qualityRating < fallbackCriteria["minQualityRating"]: - continue - - # Check cost - if fallbackCriteria.get("maxCostPer1k"): - if model.costPer1kTokensInput > fallbackCriteria["maxCostPer1k"]: - continue - - # Check context length - if model.contextLength > 0 and inputSize > model.contextLength * 0.8: - continue - - candidates.append(model) - - if not candidates: - logger.error("No models available even with fallback criteria") - return None - - # Sort by priority order from fallback criteria - priorityOrder = fallbackCriteria.get("priorityOrder", ["quality", "speed", "cost"]) - - def _getPriorityScore(model: AiModel) -> float: - score = 0.0 - for i, priority in enumerate(priorityOrder): - weight = len(priorityOrder) - i # Higher weight for earlier priorities - if priority == "quality": - score += model.qualityRating * weight - elif priority == "speed": - score += model.speedRating * weight - elif priority == "cost": - # Lower cost = higher score - score += (1.0 - model.costPer1kTokensInput * 1000) * weight - return score - - candidates.sort(key=_getPriorityScore, reverse=True) - selectedModel = candidates[0] - - logger.info(f"Fallback selection: {selectedModel.name} (score: {_getPriorityScore(selectedModel):.2f})") - return selectedModel - - def getFallbackModels(self, - prompt: str, - context: str, - options: AiCallOptions, - availableModels: List[AiModel]) -> List[AiModel]: - """ - Get prioritized list of models for fallback sequence. - - Steps: - 1. Filter models by capability requirements - 2. Rate models by business requirements (priority, processing mode) - 3. Sort by rating (descending), then by cost (ascending) + Select the best model using simple filtering and priority-based sorting. Args: prompt: User prompt @@ -294,93 +31,195 @@ class ModelSelector: availableModels: List of available models Returns: - Prioritized list of models for fallback sequence + Best model for the request, or None if no suitable model found """ - if not availableModels: - logger.warning("No models available for fallback selection") + try: + # Get failover models (which includes all filtering and sorting) + failoverModelList = self.getFailoverModelList(prompt, context, options, availableModels) + + if not failoverModelList: + logger.warning("No suitable models found for the request") + return None + + selectedModel = failoverModelList[0] # First model is the best one + logger.info(f"Selected model: {selectedModel.name} (quality: {selectedModel.qualityRating}, cost: ${selectedModel.costPer1kTokensInput:.4f})") + return selectedModel + + except Exception as e: + logger.error(f"Error selecting model: {str(e)}") + return None + + def getFailoverModelList(self, + prompt: str, + context: str, + options: AiCallOptions, + availableModels: List[AiModel]) -> List[AiModel]: + """ + Get prioritized list of models using scoring-based ranking. + + Args: + prompt: User prompt + context: Context data + options: AI call options + availableModels: List of available models + + Returns: + List of models sorted by score (descending) + """ + try: + promptSize = len(prompt.encode("utf-8")) + contextSize = len(context.encode("utf-8")) + totalSize = promptSize + contextSize + + # Step 1: Filter by operation type (MUST match) + operationFiltered = [m for m in availableModels if options.operationType in m.operationTypes] + logger.debug(f"After operation type filtering: {len(operationFiltered)} models") + + # Step 2: Filter by prompt size (MUST be <= 80% of context size) + promptFiltered = [m for m in operationFiltered if m.contextLength == 0 or promptSize <= m.contextLength * 0.8] + logger.debug(f"After prompt size filtering: {len(promptFiltered)} models") + + # Step 3: Calculate scores for each model + scoredModels = [] + for model in promptFiltered: + score = self._calculateModelScore(model, promptSize, contextSize, totalSize, options) + scoredModels.append((model, score)) + logger.debug(f"Model {model.name}: score={score:.3f}") + + # Step 4: Sort by score (descending) + scoredModels.sort(key=lambda x: x[1], reverse=True) + sortedModels = [model for model, score in scoredModels] + + logger.debug(f"Final sorted models: {len(sortedModels)} models") + return sortedModels + + except Exception as e: + logger.error(f"Error getting failover models: {str(e)}") return [] - logger.info(f"Building fallback sequence for operation: {options.operationType}, priority: {options.priority}") + def _calculateModelScore(self, model: AiModel, promptSize: int, contextSize: int, totalSize: int, options: AiCallOptions) -> float: + """ + Calculate a score for a model based on how well it fulfills the criteria. - # Step 1: Filter by capability requirements - capableModels = self._filterByCapabilities(availableModels, options) - logger.info(f"Step 1 - Capable models: {[m.name for m in capableModels]}") - - if not capableModels: - logger.warning("No models meet capability requirements") - return [] - - # Step 2: Rate models by business requirements - ratedModels = self._rateModelsByBusinessRequirements(capableModels, prompt, context, options) - logger.info(f"Step 2 - Rated models: {[(m.name, rating) for m, rating in ratedModels]}") - - # Step 3: Sort by rating (descending), then by cost (ascending) - sortedModels = self._sortModelsByRatingAndCost(ratedModels) - logger.info(f"Step 3 - Sorted fallback sequence: {[m.name for m in sortedModels]}") - - return sortedModels - - def _filterByCapabilities(self, models: List[AiModel], options: AiCallOptions) -> List[AiModel]: - """Filter models by required capabilities.""" - capableModels = [] - - for model in models: - if not model.isAvailable: - continue + Args: + model: The model to score + promptSize: Size of the prompt in bytes + contextSize: Size of the context in bytes + totalSize: Total size (prompt + context) in bytes + options: AI call options - # Check if model supports required capabilities - if options.capabilities: - if not all(cap in model.capabilities for cap in options.capabilities): - logger.debug(f"Model {model.name} missing required capabilities: {options.capabilities}") - continue - - # Check operation type compatibility - if not self._meetsBasicRequirements(model, options): - logger.debug(f"Model {model.name} doesn't meet basic requirements") - continue - - capableModels.append(model) + Returns: + Score for the model (higher is better) + """ + score = 0.0 - return capableModels - - def _rateModelsByBusinessRequirements(self, - models: List[AiModel], - prompt: str, - context: str, - options: AiCallOptions) -> List[Tuple[AiModel, float]]: - """Rate models based on business requirements (priority, processing mode).""" - ratedModels = [] - inputSize = len(prompt.encode("utf-8")) + len(context.encode("utf-8")) + # 1. Prompt + Context size rating + if model.contextLength > 0: + modelMaxSize = model.contextLength * 0.8 # 80% of model context length + if totalSize <= modelMaxSize: + # Within limits: rating = (prompt+contextsize) / (80% modelsize) + score += totalSize / modelMaxSize + else: + # Exceeds limits: rating = modelsize / (prompt+contextsize) (ensures minimum chunks) + score += modelMaxSize / totalSize + else: + # No context length limit + score += 1.0 - for model in models: - # Base score from model selection logic - baseScore = self._calculateModelScore(model, inputSize, options, []) - - # Apply priority-based scoring - priorityScore = self._applyPriorityScoring(model, options) - - # Apply processing mode scoring - processingScore = self._applyProcessingModeScoring(model, options) - - # Combine scores - totalScore = baseScore + priorityScore + processingScore - - ratedModels.append((model, totalScore)) - logger.debug(f"Model {model.name}: base={baseScore:.2f}, priority={priorityScore:.2f}, processing={processingScore:.2f}, total={totalScore:.2f}") + # 2. Processing Mode rating + if hasattr(options, 'processingMode') and options.processingMode: + score += self._getProcessingModeRating(model.processingMode, options.processingMode) + else: + score += 1.0 # No preference - return ratedModels - - def _sortModelsByRatingAndCost(self, ratedModels: List[Tuple[AiModel, float]]) -> List[AiModel]: - """Sort models by rating (descending), then by cost (ascending).""" - def sortKey(item): - model, rating = item - # Primary sort: rating (descending) - # Secondary sort: cost (ascending) - return (-rating, model.costPer1kTokensInput) + # 3. Priority rating + if hasattr(options, 'priority') and options.priority: + score += self._getPriorityRating(model, options.priority) + else: + score += 1.0 # No preference - sortedItems = sorted(ratedModels, key=sortKey) - return [model for model, rating in sortedItems] + return score + + def _getProcessingModeRating(self, modelMode: ProcessingModeEnum, requestedMode: ProcessingModeEnum) -> float: + """Get processing mode rating based on compatibility.""" + if modelMode == requestedMode: + return 1.0 + + # Compatibility matrix + if requestedMode == ProcessingModeEnum.BASIC: + if modelMode == ProcessingModeEnum.ADVANCED: + return 0.5 + elif modelMode == ProcessingModeEnum.DETAILED: + return 0.2 + + elif requestedMode == ProcessingModeEnum.ADVANCED: + if modelMode == ProcessingModeEnum.BASIC: + return 0.2 + elif modelMode == ProcessingModeEnum.DETAILED: + return 0.5 + + elif requestedMode == ProcessingModeEnum.DETAILED: + if modelMode == ProcessingModeEnum.BASIC: + return 0.2 + elif modelMode == ProcessingModeEnum.ADVANCED: + return 0.5 + + return 0.0 # No compatibility + + def _getPriorityRating(self, model: AiModel, requestedPriority: PriorityEnum) -> float: + """Get priority rating based on model capabilities.""" + if requestedPriority == PriorityEnum.BALANCED: + return 1.0 + + elif requestedPriority == PriorityEnum.SPEED: + return model.speedRating / 10.0 + + elif requestedPriority == PriorityEnum.QUALITY: + return model.qualityRating / 10.0 + + elif requestedPriority == PriorityEnum.COST: + # Cost priority: cost gives 1, speed gives 0.5, quality gives 0.2 + # Lower cost is better, so we invert the cost rating + costRating = 1.0 - (model.costPer1kTokensInput / 0.1) # Normalize to 0-1 + costRating = max(0, costRating) # Ensure non-negative + + speedRating = model.speedRating / 10.0 * 0.5 + qualityRating = model.qualityRating / 10.0 * 0.2 + + return costRating + speedRating + qualityRating + + return 1.0 # Default + + def _getSizeRating(self, model: AiModel, totalSize: int) -> float: + """Get size rating for a model based on total input size.""" + if model.contextLength > 0: + modelMaxSize = model.contextLength * 0.8 # 80% of model context length + if totalSize <= modelMaxSize: + # Within limits: rating = (prompt+contextsize) / (80% modelsize) + return totalSize / modelMaxSize + else: + # Exceeds limits: rating = modelsize / (prompt+contextsize) (ensures minimum chunks) + return modelMaxSize / totalSize + else: + # No context length limit + return 1.0 + + + def _logModelDetails(self, model: AiModel): + """Log detailed information about a model.""" + logger.info(f"Model: {model.name}") + logger.info(f" Display Name: {model.displayName}") + logger.info(f" Connector: {model.connectorType}") + logger.info(f" Context Length: {model.contextLength}") + logger.info(f" Max Tokens: {model.maxTokens}") + logger.info(f" Quality Rating: {model.qualityRating}/10") + logger.info(f" Speed Rating: {model.speedRating}/10") + logger.info(f" Cost: ${model.costPer1kTokensInput:.4f}/1k tokens") + logger.info(f" Capabilities: {', '.join(model.capabilities)}") + logger.info(f" Priority: {model.priority}") + logger.info(f" Processing Mode: {model.processingMode}") + logger.info(f" Operation Types: {', '.join(model.operationTypes)}") -# Global selector instance -model_selector = ModelSelector() +# Global model selector instance +modelSelector = ModelSelector() \ No newline at end of file diff --git a/modules/aicore/aicorePluginAnthropic.py b/modules/aicore/aicorePluginAnthropic.py index 7e11801f..6091f872 100644 --- a/modules/aicore/aicorePluginAnthropic.py +++ b/modules/aicore/aicorePluginAnthropic.py @@ -63,7 +63,7 @@ class AiAnthropic(BaseConnectorAi): functionCall=self.callAiBasic, priority=PriorityEnum.QUALITY, processingMode=ProcessingModeEnum.DETAILED, - operationTypes=[OperationTypeEnum.PLAN, OperationTypeEnum.ANALYSE], + operationTypes=[OperationTypeEnum.GENERAL, OperationTypeEnum.PLAN, OperationTypeEnum.ANALYSE, OperationTypeEnum.GENERATE], version="claude-3-5-sonnet-20241022", calculatePriceUsd=lambda processingTime, bytesSent, bytesReceived: (bytesSent / 4 / 1000) * 0.015 + (bytesReceived / 4 / 1000) * 0.075 ), diff --git a/modules/aicore/aicorePluginInternal.py b/modules/aicore/aicorePluginInternal.py index baa686c5..e0473678 100644 --- a/modules/aicore/aicorePluginInternal.py +++ b/modules/aicore/aicorePluginInternal.py @@ -34,7 +34,7 @@ class AiInternal(BaseConnectorAi): functionCall=self.extractDocument, priority=PriorityEnum.COST, processingMode=ProcessingModeEnum.BASIC, - operationTypes=[OperationTypeEnum.GENERAL], + operationTypes=[OperationTypeEnum.EXTRACT], version="internal-extractor-v1", calculatePriceUsd=lambda processingTime, bytesSent, bytesReceived: 0.001 + (bytesSent + bytesReceived) / (1024 * 1024) * 0.01 ), diff --git a/modules/aicore/aicorePluginOpenai.py b/modules/aicore/aicorePluginOpenai.py index b7429f5b..1202d004 100644 --- a/modules/aicore/aicorePluginOpenai.py +++ b/modules/aicore/aicorePluginOpenai.py @@ -65,7 +65,7 @@ class AiOpenai(BaseConnectorAi): functionCall=self.callAiBasic, priority=PriorityEnum.BALANCED, processingMode=ProcessingModeEnum.ADVANCED, - operationTypes=[OperationTypeEnum.GENERAL, OperationTypeEnum.ANALYSE], + operationTypes=[OperationTypeEnum.GENERAL, OperationTypeEnum.PLAN, OperationTypeEnum.ANALYSE, OperationTypeEnum.GENERATE], version="gpt-4o", calculatePriceUsd=lambda processingTime, bytesSent, bytesReceived: (bytesSent / 4 / 1000) * 0.03 + (bytesReceived / 4 / 1000) * 0.06 ), @@ -83,7 +83,7 @@ class AiOpenai(BaseConnectorAi): functionCall=self.callAiBasic, priority=PriorityEnum.SPEED, processingMode=ProcessingModeEnum.BASIC, - operationTypes=[OperationTypeEnum.GENERAL], + operationTypes=[OperationTypeEnum.GENERAL, OperationTypeEnum.PLAN, OperationTypeEnum.GENERATE], version="gpt-3.5-turbo", calculatePriceUsd=lambda processingTime, bytesSent, bytesReceived: (bytesSent / 4 / 1000) * 0.0015 + (bytesReceived / 4 / 1000) * 0.002 ), diff --git a/modules/aicore/aicorePluginPerplexity.py b/modules/aicore/aicorePluginPerplexity.py index 9701039f..f2f80f3d 100644 --- a/modules/aicore/aicorePluginPerplexity.py +++ b/modules/aicore/aicorePluginPerplexity.py @@ -63,7 +63,7 @@ class AiPerplexity(BaseConnectorAi): functionCall=self.callAiBasic, priority=PriorityEnum.BALANCED, processingMode=ProcessingModeEnum.ADVANCED, - operationTypes=[OperationTypeEnum.GENERAL, OperationTypeEnum.WEB_RESEARCH], + operationTypes=[OperationTypeEnum.GENERAL, OperationTypeEnum.PLAN, OperationTypeEnum.ANALYSE, OperationTypeEnum.GENERATE, OperationTypeEnum.WEB_RESEARCH], version="llama-3.1-sonar-large-128k-online", calculatePriceUsd=lambda processingTime, bytesSent, bytesReceived: (bytesSent / 4 / 1000) * 0.005 + (bytesReceived / 4 / 1000) * 0.005 ), diff --git a/modules/datamodels/datamodelAi.py b/modules/datamodels/datamodelAi.py index 8487744f..f154a79c 100644 --- a/modules/datamodels/datamodelAi.py +++ b/modules/datamodels/datamodelAi.py @@ -1,13 +1,17 @@ -from typing import Optional, List, Dict, Any, Literal, Callable +from typing import Optional, List, Dict, Any, Literal, Callable, TYPE_CHECKING from pydantic import BaseModel, Field from enum import Enum +if TYPE_CHECKING: + from modules.datamodels.datamodelExtraction import ContentPart + # Operation Types class OperationTypeEnum(str, Enum): GENERAL = "general" PLAN = "plan" ANALYSE = "analyse" GENERATE = "generate" + EXTRACT = "extract" WEB_RESEARCH = "webResearch" IMAGE_ANALYSE = "imageAnalyse" IMAGE_GENERATE = "imageGenerate" @@ -141,6 +145,7 @@ class AiCallRequest(BaseModel): prompt: str = Field(description="The user prompt") context: Optional[str] = Field(default=None, description="Optional external context (e.g., extracted docs)") options: AiCallOptions = Field(default_factory=AiCallOptions) + contentParts: Optional[List['ContentPart']] = None # NEW: Content parts for model-aware chunking class AiCallResponse(BaseModel): diff --git a/modules/datamodels/datamodelExtraction.py b/modules/datamodels/datamodelExtraction.py index cfce0275..b0ba0f9b 100644 --- a/modules/datamodels/datamodelExtraction.py +++ b/modules/datamodels/datamodelExtraction.py @@ -28,6 +28,16 @@ class ChunkResult(BaseModel): metadata: Dict[str, Any] = Field(default_factory=dict) +class PartResult(BaseModel): + """Preserves the relationship between a content part and its AI result.""" + originalPart: ContentPart + aiResult: str + partIndex: int + documentId: str + processingTime: float = 0.0 + metadata: Dict[str, Any] = Field(default_factory=dict) + + class MergeStrategy(BaseModel): """Strategy configuration for merging content parts and AI results.""" diff --git a/modules/interfaces/interfaceAiObjects.py b/modules/interfaces/interfaceAiObjects.py index 5707002e..918944a7 100644 --- a/modules/interfaces/interfaceAiObjects.py +++ b/modules/interfaces/interfaceAiObjects.py @@ -7,7 +7,7 @@ import time logger = logging.getLogger(__name__) from modules.aicore.aicoreModelRegistry import modelRegistry -from modules.aicore.aicoreModelSelector import model_selector +from modules.aicore.aicoreModelSelector import modelSelector from modules.datamodels.datamodelAi import ( AiModel, AiCallOptions, @@ -70,7 +70,7 @@ class AiObjects: raise ValueError("No AI models available") # Use the dynamic model selector - selectedModel = model_selector.selectModel(prompt, context, options, availableModels) + selectedModel = modelSelector.selectModel(prompt, context, options, availableModels) if not selectedModel: logger.error("No suitable model found for the given criteria") @@ -81,8 +81,15 @@ class AiObjects: async def call(self, request: AiCallRequest) -> AiCallResponse: - """Call AI model for text generation with fallback mechanism.""" - + """Call AI model for text generation with model-aware chunking.""" + # Handle content parts (unified path) + if hasattr(request, 'contentParts') and request.contentParts: + return await self._callWithContentParts(request) + # Handle traditional text/context calls + return await self._callWithTextContext(request) + + async def _callWithTextContext(self, request: AiCallRequest) -> AiCallResponse: + """Call AI model for traditional text/context calls with fallback mechanism.""" prompt = request.prompt context = request.context or "" options = request.options @@ -108,11 +115,11 @@ class AiObjects: temperature = 0.2 maxTokens = getattr(options, "maxTokens", None) - # Get fallback models for this operation type + # Get failover models for this operation type availableModels = modelRegistry.getAvailableModels() - fallbackModels = model_selector.getFallbackModels(prompt, context, options, availableModels) + failoverModelList = modelSelector.getFailoverModelList(prompt, context, options, availableModels) - if not fallbackModels: + if not failoverModelList: errorMsg = f"No suitable models found for operation {options.operationType}" logger.error(errorMsg) return AiCallResponse( @@ -125,11 +132,11 @@ class AiObjects: errorCount=1 ) - # Try each model in fallback sequence + # Try each model in failover sequence lastError = None - for attempt, model in enumerate(fallbackModels): + for attempt, model in enumerate(failoverModelList): try: - logger.info(f"Attempting AI call with model: {model.name} (attempt {attempt + 1}/{len(fallbackModels)})") + logger.info(f"Attempting AI call with model: {model.name} (attempt {attempt + 1}/{len(failoverModelList)})") # Call the model response = await self._callWithModel(model, prompt, context, temperature, maxTokens, inputBytes) @@ -142,15 +149,15 @@ class AiObjects: logger.warning(f"❌ AI call failed with model {model.name}: {str(e)}") # If this is not the last model, try the next one - if attempt < len(fallbackModels) - 1: - logger.info(f"🔄 Trying next fallback model...") + if attempt < len(failoverModelList) - 1: + logger.info(f"🔄 Trying next failover model...") continue else: # All models failed - logger.error(f"💥 All {len(fallbackModels)} models failed for operation {options.operationType}") + logger.error(f"💥 All {len(failoverModelList)} models failed for operation {options.operationType}") break - # All fallback attempts failed - return error response + # All failover attempts failed - return error response errorMsg = f"All AI models failed for operation {options.operationType}. Last error: {str(lastError)}" logger.error(errorMsg) return AiCallResponse( @@ -163,6 +170,241 @@ class AiObjects: errorCount=1 ) + async def _callWithContentParts(self, request: AiCallRequest) -> AiCallResponse: + """Process content parts with model-aware chunking (unified for single and multiple parts).""" + prompt = request.prompt + options = request.options + contentParts = request.contentParts + + # Get failover models + availableModels = modelRegistry.getAvailableModels() + failoverModelList = modelSelector.getFailoverModelList(prompt, "", options, availableModels) + + if not failoverModelList: + return self._createErrorResponse("No suitable models found", 0, 0) + + # Process each content part + allResults = [] + for contentPart in contentParts: + partResult = await self._processContentPartWithFallback(contentPart, prompt, options, failoverModelList) + allResults.append(partResult) + + # Merge all results + mergedContent = self._mergePartResults(allResults) + + return AiCallResponse( + content=mergedContent, + modelName="multiple", + priceUsd=sum(r.priceUsd for r in allResults), + processingTime=sum(r.processingTime for r in allResults), + bytesSent=sum(r.bytesSent for r in allResults), + bytesReceived=sum(r.bytesReceived for r in allResults), + errorCount=sum(r.errorCount for r in allResults) + ) + + async def _processContentPartWithFallback(self, contentPart, prompt: str, options, failoverModelList) -> AiCallResponse: + """Process a single content part with model-aware chunking and fallback.""" + lastError = None + + for attempt, model in enumerate(failoverModelList): + try: + logger.info(f"Processing content part with model: {model.name} (attempt {attempt + 1}/{len(failoverModelList)})") + + # Check if part fits in model context + partSize = len(contentPart.data.encode('utf-8')) if contentPart.data else 0 + modelContextBytes = model.contextLength * 4 # Convert tokens to bytes + + if partSize <= modelContextBytes: + # Part fits - call AI directly + response = await self._callWithModel(model, prompt, contentPart.data, 0.2, None, partSize) + logger.info(f"✅ Content part processed successfully with model: {model.name}") + return response + else: + # Part too large - chunk it + chunks = await self._chunkContentPart(contentPart, model, options) + if not chunks: + raise ValueError(f"Failed to chunk content part for model {model.name}") + + # Process each chunk + chunkResults = [] + for chunk in chunks: + chunkResponse = await self._callWithModel(model, prompt, chunk['data'], 0.2, None, chunk['size']) + chunkResults.append(chunkResponse) + + # Merge chunk results + mergedContent = self._mergeChunkResults(chunkResults) + totalPrice = sum(r.priceUsd for r in chunkResults) + totalTime = sum(r.processingTime for r in chunkResults) + totalBytesSent = sum(r.bytesSent for r in chunkResults) + totalBytesReceived = sum(r.bytesReceived for r in chunkResults) + totalErrors = sum(r.errorCount for r in chunkResults) + + logger.info(f"✅ Content part chunked and processed with model: {model.name} ({len(chunks)} chunks)") + return AiCallResponse( + content=mergedContent, + modelName=model.name, + priceUsd=totalPrice, + processingTime=totalTime, + bytesSent=totalBytesSent, + bytesReceived=totalBytesReceived, + errorCount=totalErrors + ) + + except Exception as e: + lastError = e + logger.warning(f"❌ Model {model.name} failed for content part: {str(e)}") + + if attempt < len(failoverModelList) - 1: + logger.info(f"🔄 Trying next failover model...") + continue + else: + logger.error(f"💥 All {len(failoverModelList)} models failed for content part") + break + + # All models failed + return self._createErrorResponse(f"All models failed: {str(lastError)}", 0, 0) + + async def _chunkContentPart(self, contentPart, model, options) -> List[Dict[str, Any]]: + """Chunk a content part based on model capabilities.""" + # Calculate model-specific chunk sizes + modelContextBytes = model.contextLength * 4 # Convert tokens to bytes + maxContextBytes = int(modelContextBytes * 0.9) # 90% of context length + textChunkSize = int(maxContextBytes * 0.7) # 70% of max context for text chunks + imageChunkSize = int(maxContextBytes * 0.8) # 80% of max context for image chunks + + # Build chunking options + chunkingOptions = { + "textChunkSize": textChunkSize, + "imageChunkSize": imageChunkSize, + "maxSize": maxContextBytes, + "chunkAllowed": True + } + + # Get appropriate chunker + from modules.services.serviceExtraction.subRegistry import ChunkerRegistry + chunkerRegistry = ChunkerRegistry() + chunker = chunkerRegistry.resolve(contentPart.typeGroup) + + if not chunker: + logger.warning(f"No chunker found for typeGroup: {contentPart.typeGroup}") + return [] + + # Chunk the content part + try: + chunks = chunker.chunk(contentPart, chunkingOptions) + logger.debug(f"Created {len(chunks)} chunks for {contentPart.typeGroup} part") + return chunks + except Exception as e: + logger.error(f"Chunking failed for {contentPart.typeGroup}: {str(e)}") + return [] + + def _mergePartResults(self, partResults: List[AiCallResponse]) -> str: + """Merge part results using the existing sophisticated merging system.""" + if not partResults: + return "" + + # Convert AiCallResponse results to ContentParts for merging + from modules.datamodels.datamodelExtraction import ContentPart + from modules.services.serviceExtraction.subUtils import makeId + + content_parts = [] + for i, result in enumerate(partResults): + if result.content: + content_part = ContentPart( + id=makeId(), + parentId=None, + label=f"ai_result_{i}", + typeGroup="text", # Default to text for AI results + mimeType="text/plain", + data=result.content, + metadata={ + "aiResult": True, + "modelName": result.modelName, + "priceUsd": result.priceUsd, + "processingTime": result.processingTime, + "bytesSent": result.bytesSent, + "bytesReceived": result.bytesReceived + } + ) + content_parts.append(content_part) + + # Use existing merging system + merge_strategy = { + "useIntelligentMerging": True, + "groupBy": "typeGroup", + "orderBy": "id", + "mergeType": "concatenate" + } + + from modules.services.serviceExtraction.subPipeline import _applyMerging + merged_parts = _applyMerging(content_parts, merge_strategy) + + # Convert merged parts back to final string + final_content = "\n\n".join([part.data for part in merged_parts]) + + logger.info(f"Merged {len(partResults)} AI results using existing merging system") + return final_content.strip() + + def _mergeChunkResults(self, chunkResults: List[AiCallResponse]) -> str: + """Merge chunk results using the existing sophisticated merging system.""" + if not chunkResults: + return "" + + # Convert AiCallResponse results to ContentParts for merging + from modules.datamodels.datamodelExtraction import ContentPart + from modules.services.serviceExtraction.subUtils import makeId + + content_parts = [] + for i, result in enumerate(chunkResults): + if result.content: + content_part = ContentPart( + id=makeId(), + parentId=None, + label=f"chunk_result_{i}", + typeGroup="text", # Default to text for AI results + mimeType="text/plain", + data=result.content, + metadata={ + "aiResult": True, + "chunk": True, + "modelName": result.modelName, + "priceUsd": result.priceUsd, + "processingTime": result.processingTime, + "bytesSent": result.bytesSent, + "bytesReceived": result.bytesReceived + } + ) + content_parts.append(content_part) + + # Use existing merging system + merge_strategy = { + "useIntelligentMerging": True, + "groupBy": "typeGroup", + "orderBy": "id", + "mergeType": "concatenate" + } + + from modules.services.serviceExtraction.subPipeline import _applyMerging + merged_parts = _applyMerging(content_parts, merge_strategy) + + # Convert merged parts back to final string + final_content = "\n\n".join([part.data for part in merged_parts]) + + logger.info(f"Merged {len(chunkResults)} chunk results using existing merging system") + return final_content.strip() + + def _createErrorResponse(self, errorMsg: str, inputBytes: int, outputBytes: int) -> AiCallResponse: + """Create an error response.""" + return AiCallResponse( + content=errorMsg, + modelName="error", + priceUsd=0.0, + processingTime=0.0, + bytesSent=inputBytes, + bytesReceived=outputBytes, + errorCount=1 + ) + async def _callWithModel(self, model: AiModel, prompt: str, context: str, temperature: float, maxTokens: int, inputBytes: int) -> AiCallResponse: """Call a specific model and return the response.""" # Replace placeholder in prompt for this specific model @@ -245,9 +487,9 @@ class AiObjects: # Get fallback models for image analysis availableModels = modelRegistry.getAvailableModels() - fallbackModels = model_selector.getFallbackModels(prompt, "", options, availableModels) + failoverModelList = modelSelector.getFailoverModelList(prompt, "", options, availableModels) - if not fallbackModels: + if not failoverModelList: errorMsg = f"No suitable models found for image analysis" logger.error(errorMsg) return AiCallResponse( @@ -262,9 +504,9 @@ class AiObjects: # Try each model in fallback sequence lastError = None - for attempt, model in enumerate(fallbackModels): + for attempt, model in enumerate(failoverModelList): try: - logger.info(f"Attempting image analysis with model: {model.name} (attempt {attempt + 1}/{len(fallbackModels)})") + logger.info(f"Attempting image analysis with model: {model.name} (attempt {attempt + 1}/{len(failoverModelList)})") # Call the model response = await self._callImageWithModel(model, prompt, imageData, mimeType, inputBytes) @@ -277,12 +519,12 @@ class AiObjects: logger.warning(f"❌ Image analysis failed with model {model.name}: {str(e)}") # If this is not the last model, try the next one - if attempt < len(fallbackModels) - 1: + if attempt < len(failoverModelList) - 1: logger.info(f"🔄 Trying next fallback model for image analysis...") continue else: # All models failed - logger.error(f"💥 All {len(fallbackModels)} models failed for image analysis") + logger.error(f"💥 All {len(failoverModelList)} models failed for image analysis") break # All fallback attempts failed - return error response diff --git a/modules/services/serviceAi/subDocumentProcessing.py b/modules/services/serviceAi/subDocumentProcessing.py index c81a8b0c..72ac2950 100644 --- a/modules/services/serviceAi/subDocumentProcessing.py +++ b/modules/services/serviceAi/subDocumentProcessing.py @@ -54,8 +54,8 @@ class SubDocumentProcessing: options: Optional[AiCallOptions] = None ) -> str: """ - Process documents with per-chunk AI calls and merge results. - FIXED: Now preserves chunk relationships and document structure. + Process documents with model-aware chunking and merge results. + NEW: Uses model-aware chunking in AI call phase instead of extraction phase. Args: documents: List of ChatDocument objects to process @@ -68,23 +68,14 @@ class SubDocumentProcessing: if not documents: return "" - # Get model capabilities for size calculation - model_capabilities = self._getModelCapabilitiesForContent(prompt, documents, options) - - # Build extraction options for chunking with intelligent merging + # Build extraction options WITHOUT chunking parameters extractionOptions: Dict[str, Any] = { "prompt": prompt, "operationType": options.operationType if options else "general", - "processDocumentsIndividually": True, # Process each document separately - "maxSize": model_capabilities["maxContextBytes"], - "chunkAllowed": True, - "textChunkSize": model_capabilities["textChunkSize"], - "imageChunkSize": model_capabilities["imageChunkSize"], - "imageMaxPixels": 1024 * 1024, - "imageQuality": 85, + "processDocumentsIndividually": True, + # REMOVED: maxSize, textChunkSize, imageChunkSize "mergeStrategy": { - "useIntelligentMerging": True, # Enable intelligent token-aware merging - "capabilities": model_capabilities, + "useIntelligentMerging": True, "prompt": prompt, "groupBy": "typeGroup", "orderBy": "id", @@ -95,17 +86,17 @@ class SubDocumentProcessing: logger.debug(f"Per-chunk extraction options: prompt length={len(extractionOptions.get('prompt', ''))} chars, operationType={extractionOptions.get('operationType')}") try: - # Extract content with chunking + # Extract content WITHOUT chunking extractionResult = self.extractionService.extractContent(documents, extractionOptions) if not isinstance(extractionResult, list): return "[Error: No extraction results]" - # FIXED: Process chunks with proper mapping - chunkResults = await self._processChunksWithMapping(extractionResult, prompt, options) + # Process parts (not chunks) with model-aware AI calls + partResults = await self._processPartsWithMapping(extractionResult, prompt, options) - # FIXED: Merge with preserved chunk relationships - mergedContent = self._mergeChunkResults(chunkResults, options) + # Merge results using existing merging system + mergedContent = self._mergePartResults(partResults, options) # Save merged extraction content to debug self.services.utils.writeDebugFile(mergedContent or '', "extractionMergedText") @@ -123,29 +114,19 @@ class SubDocumentProcessing: options: Optional[AiCallOptions] = None ) -> Dict[str, Any]: """ - Process documents with per-chunk AI calls and merge results in JSON mode. + Process documents with model-aware chunking and merge results in JSON mode. Returns structured JSON document instead of text. """ if not documents: return {"metadata": {"title": "Empty Document"}, "sections": []} - # Get model capabilities for size calculation - model_capabilities = self._getModelCapabilitiesForContent(prompt, documents, options) - - # Build extraction options for chunking with intelligent merging + # Build extraction options WITHOUT chunking parameters extractionOptions: Dict[str, Any] = { "prompt": prompt, "operationType": options.operationType if options else "general", - "processDocumentsIndividually": True, # Process each document separately - "maxSize": model_capabilities["maxContextBytes"], - "chunkAllowed": True, - "textChunkSize": model_capabilities["textChunkSize"], - "imageChunkSize": model_capabilities["imageChunkSize"], - "imageMaxPixels": 1024 * 1024, - "imageQuality": 85, + "processDocumentsIndividually": True, "mergeStrategy": { - "useIntelligentMerging": True, # Enable intelligent token-aware merging - "capabilities": model_capabilities, + "useIntelligentMerging": True, "prompt": prompt, "groupBy": "typeGroup", "orderBy": "id", @@ -156,17 +137,17 @@ class SubDocumentProcessing: logger.debug(f"Per-chunk extraction options (JSON mode): prompt length={len(extractionOptions.get('prompt', ''))} chars, operationType={extractionOptions.get('operationType')}") try: - # Extract content with chunking + # Extract content WITHOUT chunking extractionResult = self.extractionService.extractContent(documents, extractionOptions) if not isinstance(extractionResult, list): return {"metadata": {"title": "Error Document"}, "sections": []} - # Process chunks with proper mapping - chunkResults = await self._processChunksWithMapping(extractionResult, prompt, options, generate_json=True) + # Process parts with model-aware chunking + partResults = await self._processPartsWithMapping(extractionResult, prompt, options) - # Merge with JSON mode - mergedJsonDocument = self._mergeChunkResultsJson(chunkResults, options) + # Convert to JSON format (simplified for now) + mergedJsonDocument = self._convertPartResultsToJson(partResults, options) # Normalize merged JSON into a single canonical table (only if table content exists) try: @@ -505,6 +486,127 @@ CONTINUATION INSTRUCTIONS: """ return await self.processDocumentsPerChunk(documents, prompt, options) + async def _processPartsWithMapping( + self, + extractionResult: List[ContentExtracted], + prompt: str, + options: Optional[AiCallOptions] = None + ) -> List['PartResult']: + """Process content parts with model-aware chunking and proper mapping.""" + from modules.datamodels.datamodelExtraction import PartResult + import asyncio + + # Collect all parts that need processing + parts_to_process = [] + part_index = 0 + + for ec in extractionResult: + for part in ec.parts: + if part.typeGroup in ("text", "table", "structure", "image", "container", "binary"): + # Skip empty container parts + if part.typeGroup == "container" and (not part.data or len(part.data.strip()) == 0): + logger.debug(f"Skipping empty container part: mimeType={part.mimeType}") + continue + + parts_to_process.append({ + 'part': part, + 'part_index': part_index, + 'document_id': ec.id + }) + part_index += 1 + + logger.info(f"Processing {len(parts_to_process)} parts with model-aware chunking") + + # Process parts in parallel + async def process_single_part(part_info: Dict) -> PartResult: + part = part_info['part'] + part_index = part_info['part_index'] + document_id = part_info['document_id'] + + start_time = time.time() + + try: + # Create AI call request with content part + from modules.datamodels.datamodelAi import AiCallRequest + request = AiCallRequest( + prompt=prompt, + context="", # Context is in the content part + options=options, + contentParts=[part] # Pass as list for unified processing + ) + + # Call AI with model-aware chunking + response = await self.aiObjects.call(request) + + processing_time = time.time() - start_time + + return PartResult( + originalPart=part, + aiResult=response.content, + partIndex=part_index, + documentId=document_id, + processingTime=processing_time, + metadata={ + "success": True, + "partSize": len(part.data) if part.data else 0, + "resultSize": len(response.content), + "typeGroup": part.typeGroup, + "modelName": response.modelName, + "priceUsd": response.priceUsd + } + ) + + except Exception as e: + processing_time = time.time() - start_time + logger.warning(f"Error processing part {part_index}: {str(e)}") + + return PartResult( + originalPart=part, + aiResult=f"[Error processing part: {str(e)}]", + partIndex=part_index, + documentId=document_id, + processingTime=processing_time, + metadata={ + "success": False, + "error": str(e), + "partSize": len(part.data) if part.data else 0, + "typeGroup": part.typeGroup + } + ) + + # Process parts with concurrency control + max_concurrent = 5 + if options and hasattr(options, 'maxConcurrentParts'): + max_concurrent = options.maxConcurrentParts + + semaphore = asyncio.Semaphore(max_concurrent) + + async def process_with_semaphore(part_info): + async with semaphore: + return await process_single_part(part_info) + + tasks = [process_with_semaphore(part_info) for part_info in parts_to_process] + part_results = await asyncio.gather(*tasks, return_exceptions=True) + + # Handle exceptions + processed_results = [] + for i, result in enumerate(part_results): + if isinstance(result, Exception): + part_info = parts_to_process[i] + processed_results.append(PartResult( + originalPart=part_info['part'], + aiResult=f"[Error in parallel processing: {str(result)}]", + partIndex=part_info['part_index'], + documentId=part_info['document_id'], + processingTime=0.0, + metadata={"success": False, "error": str(result)} + )) + elif result is not None: + processed_results.append(result) + + logger.info(f"Completed processing {len(processed_results)} parts") + return processed_results + async def _processChunksWithMapping( self, extractionResult: List[ContentExtracted], @@ -907,340 +1009,451 @@ CONTINUATION INSTRUCTIONS: logger.info(f"Completed processing {len(processed_results)} chunks") return processed_results + def _mergePartResults( + self, + partResults: List['PartResult'], + options: Optional[AiCallOptions] = None + ) -> str: + """Merge part results using existing sophisticated merging system.""" + if not partResults: + return "" + + # Convert PartResults back to ContentParts for existing merger system + from modules.datamodels.datamodelExtraction import ContentPart + content_parts = [] + for part_result in partResults: + # Create ContentPart from PartResult with proper typeGroup + content_part = ContentPart( + id=part_result.originalPart.id, + parentId=part_result.originalPart.parentId, + label=part_result.originalPart.label, + typeGroup=part_result.originalPart.typeGroup, # Use original typeGroup + mimeType=part_result.originalPart.mimeType, + data=part_result.aiResult, # Use AI result as data + metadata={ + **part_result.originalPart.metadata, + "aiResult": True, + "partIndex": part_result.partIndex, + "documentId": part_result.documentId, + "processingTime": part_result.processingTime, + "success": part_result.metadata.get("success", False) + } + ) + content_parts.append(content_part) + + # Use existing merging strategy from options + merge_strategy = { + "useIntelligentMerging": True, + "groupBy": "documentId", # Group by document + "orderBy": "partIndex", # Order by part index + "mergeType": "concatenate" + } + + if options and hasattr(options, 'mergeStrategy'): + merge_strategy.update(options.mergeStrategy) + + # Apply existing merging logic using the sophisticated merging system + from modules.services.serviceExtraction.subPipeline import _applyMerging + merged_parts = _applyMerging(content_parts, merge_strategy) + + # Convert merged parts back to final string + final_content = "\n\n".join([part.data for part in merged_parts]) + + logger.info(f"Merged {len(partResults)} parts using existing sophisticated merging system") + return final_content.strip() + + def _convertPartResultsToJson( + self, + partResults: List['PartResult'], + options: Optional[AiCallOptions] = None + ) -> Dict[str, Any]: + """Convert part results to JSON format using existing sophisticated merging system.""" + if not partResults: + return {"metadata": {"title": "Empty Document"}, "sections": []} + + # Convert PartResults back to ContentParts for existing merger system + from modules.datamodels.datamodelExtraction import ContentPart + content_parts = [] + for part_result in partResults: + # Create ContentPart from PartResult with proper typeGroup + content_part = ContentPart( + id=part_result.originalPart.id, + parentId=part_result.originalPart.parentId, + label=part_result.originalPart.label, + typeGroup=part_result.originalPart.typeGroup, # Use original typeGroup + mimeType=part_result.originalPart.mimeType, + data=part_result.aiResult, # Use AI result as data + metadata={ + **part_result.originalPart.metadata, + "aiResult": True, + "partIndex": part_result.partIndex, + "documentId": part_result.documentId, + "processingTime": part_result.processingTime, + "success": part_result.metadata.get("success", False) + } + ) + content_parts.append(content_part) + + # Use existing merging strategy for JSON mode + merge_strategy = { + "useIntelligentMerging": True, + "groupBy": "documentId", # Group by document + "orderBy": "partIndex", # Order by part index + "mergeType": "concatenate" + } + + if options and hasattr(options, 'mergeStrategy'): + merge_strategy.update(options.mergeStrategy) + + # Apply existing merging logic using the sophisticated merging system + from modules.services.serviceExtraction.subPipeline import _applyMerging + merged_parts = _applyMerging(content_parts, merge_strategy) + + # Convert merged parts to JSON format + all_sections = [] + document_titles = [] + + for part in merged_parts: + if part.metadata.get("success", False): + try: + # Parse JSON from AI result + part_json = json.loads(part.data) + + # Check if this is a multi-file response (has "documents" key) + if isinstance(part_json, dict) and "documents" in part_json: + # This is a multi-file response - merge all documents + logger.debug(f"Processing multi-file response from part {part.id} with {len(part_json['documents'])} documents") + + # Return multi-file response directly + return { + "metadata": part_json.get("metadata", {"title": "Merged Document"}), + "documents": part_json["documents"] + } + + # Extract sections from single-file response + elif isinstance(part_json, dict) and "sections" in part_json: + for section in part_json["sections"]: + # Add part context to section + section["metadata"] = section.get("metadata", {}) + section["metadata"]["source_part"] = part.id + section["metadata"]["source_document"] = part.metadata.get("documentId", "unknown") + section["metadata"]["part_index"] = part.metadata.get("partIndex", 0) + all_sections.append(section) + + # Extract document title + if isinstance(part_json, dict) and "metadata" in part_json: + title = part_json["metadata"].get("title", "") + if title and title not in document_titles: + document_titles.append(title) + + except json.JSONDecodeError as e: + logger.warning(f"Failed to parse JSON from part {part.id}: {str(e)}") + # Create a fallback section for invalid JSON + fallback_section = { + "id": f"error_section_{part.id}", + "title": "Error Section", + "content_type": "paragraph", + "elements": [{ + "text": f"Error parsing part {part.id}: {str(e)}" + }], + "order": part.metadata.get("partIndex", 0), + "metadata": { + "source_document": part.metadata.get("documentId", "unknown"), + "part_id": part.id, + "error": str(e) + } + } + all_sections.append(fallback_section) + else: + # Handle error parts + error_section = { + "id": f"error_section_{part.id}", + "title": "Error Section", + "content_type": "paragraph", + "elements": [{ + "text": f"Error in part {part.id}: {part.metadata.get('error', 'Unknown error')}" + }], + "order": part.metadata.get("partIndex", 0), + "metadata": { + "source_document": part.metadata.get("documentId", "unknown"), + "part_id": part.id, + "error": part.metadata.get('error', 'Unknown error') + } + } + all_sections.append(error_section) + + # Sort sections by order + all_sections.sort(key=lambda x: x.get("order", 0)) + + # Create merged document with sections + merged_document = { + "metadata": { + "title": document_titles[0] if document_titles else "Merged Document", + "extraction_method": "model_aware_chunking_with_merging", + "version": "2.0" + }, + "sections": all_sections, + "summary": f"Merged document using sophisticated merging system", + "tags": ["merged", "ai_generated", "model_aware", "sophisticated_merging"] + } + + logger.info(f"Converted {len(partResults)} parts to JSON format using existing sophisticated merging system") + return merged_document + def _mergeChunkResults( self, chunkResults: List[ChunkResult], options: Optional[AiCallOptions] = None ) -> str: - """Merge chunk results while preserving document structure and chunk order.""" - + """Merge chunk results using existing sophisticated merging system.""" if not chunkResults: return "" - # Get merging configuration from options - chunk_separator = "\n\n---\n\n" - include_document_headers = True - include_chunk_metadata = False - - if options: - if hasattr(options, 'chunkSeparator'): - chunk_separator = options.chunkSeparator - elif hasattr(options, 'mergeStrategy') and options.mergeStrategy: - chunk_separator = options.mergeStrategy.get("chunkSeparator", "\n\n---\n\n") - - # Check for enhanced options - if hasattr(options, 'preserveChunkMetadata'): - include_chunk_metadata = options.preserveChunkMetadata - - # Group chunk results by document - results_by_document = {} + # Convert ChunkResults back to ContentParts for existing merger system + from modules.datamodels.datamodelExtraction import ContentPart + content_parts = [] for chunk_result in chunkResults: - doc_id = chunk_result.documentId - if doc_id not in results_by_document: - results_by_document[doc_id] = [] - results_by_document[doc_id].append(chunk_result) + # Create ContentPart from ChunkResult with proper typeGroup + content_part = ContentPart( + id=chunk_result.originalChunk.id, + parentId=chunk_result.originalChunk.parentId, + label=chunk_result.originalChunk.label, + typeGroup=chunk_result.originalChunk.typeGroup, # Use original typeGroup + mimeType=chunk_result.originalChunk.mimeType, + data=chunk_result.aiResult, # Use AI result as data + metadata={ + **chunk_result.originalChunk.metadata, + "aiResult": True, + "chunk": True, + "chunkIndex": chunk_result.chunkIndex, + "documentId": chunk_result.documentId, + "processingTime": chunk_result.processingTime, + "success": chunk_result.metadata.get("success", False) + } + ) + content_parts.append(content_part) - # Sort chunks within each document by chunk index - for doc_id in results_by_document: - results_by_document[doc_id].sort(key=lambda x: x.chunkIndex) + # Use existing merging strategy from options + merge_strategy = { + "useIntelligentMerging": True, + "groupBy": "documentId", # Group by document + "orderBy": "chunkIndex", # Order by chunk index + "mergeType": "concatenate" + } - # Merge results for each document - merged_documents = [] + if options and hasattr(options, 'mergeStrategy'): + merge_strategy.update(options.mergeStrategy) - for doc_id, doc_chunks in results_by_document.items(): - # Build document header if enabled - doc_header = "" - if include_document_headers: - doc_header = f"\n\n=== DOCUMENT: {doc_id} ===\n\n" - - # Merge chunks for this document - doc_content = "" - for i, chunk_result in enumerate(doc_chunks): - # Add chunk separator (except for first chunk) - if i > 0: - doc_content += chunk_separator - - # Add chunk content with optional metadata - chunk_metadata = chunk_result.metadata - if chunk_metadata.get("success", False): - chunk_content = chunk_result.aiResult - - # Add chunk metadata if enabled - if include_chunk_metadata: - chunk_info = f"[Chunk {chunk_result.chunkIndex} - {chunk_metadata.get('typeGroup', 'unknown')} - {chunk_metadata.get('chunkSize', 0)} chars]" - chunk_content = f"{chunk_info}\n{chunk_content}" - - doc_content += chunk_content - else: - # Handle error chunks - error_msg = f"[ERROR in chunk {chunk_result.chunkIndex}: {chunk_metadata.get('error', 'Unknown error')}]" - doc_content += error_msg - - merged_documents.append(doc_header + doc_content) + # Apply existing merging logic using the sophisticated merging system + from modules.services.serviceExtraction.subPipeline import _applyMerging + merged_parts = _applyMerging(content_parts, merge_strategy) - # Join all documents - final_result = "\n\n".join(merged_documents) + # Convert merged parts back to final string + final_content = "\n\n".join([part.data for part in merged_parts]) - logger.info(f"Merged {len(chunkResults)} chunks from {len(results_by_document)} documents") - return final_result.strip() + logger.info(f"Merged {len(chunkResults)} chunks using existing sophisticated merging system") + return final_content.strip() def _mergeChunkResultsClean( self, chunkResults: List[ChunkResult], options: Optional[AiCallOptions] = None ) -> str: - """Merge chunk results in CLEAN mode - no debug metadata or document headers.""" - + """Merge chunk results in CLEAN mode using existing sophisticated merging system.""" if not chunkResults: return "" - # Get merging configuration from options - chunk_separator = "\n\n" - include_document_headers = False # CLEAN MODE: No document headers - include_chunk_metadata = False # CLEAN MODE: No chunk metadata - - if options: - if hasattr(options, 'chunkSeparator'): - chunk_separator = options.chunkSeparator - elif hasattr(options, 'mergeStrategy') and options.mergeStrategy: - chunk_separator = options.mergeStrategy.get("chunkSeparator", "\n\n") - - # Group chunk results by document - results_by_document = {} + # Convert ChunkResults back to ContentParts for existing merger system + from modules.datamodels.datamodelExtraction import ContentPart + content_parts = [] for chunk_result in chunkResults: - doc_id = chunk_result.documentId - if doc_id not in results_by_document: - results_by_document[doc_id] = [] - results_by_document[doc_id].append(chunk_result) - - # Sort chunks within each document by chunk index - for doc_id in results_by_document: - results_by_document[doc_id].sort(key=lambda x: x.chunkIndex) - - # Merge results for each document in CLEAN mode - merged_documents = [] - - for doc_id, doc_chunks in results_by_document.items(): - # CLEAN MODE: No document headers - doc_header = "" + # Skip empty or error chunks in clean mode + if not chunk_result.metadata.get("success", False): + continue + if not chunk_result.aiResult or not chunk_result.aiResult.strip(): + continue + # Skip container/binary chunks in clean mode + if chunk_result.aiResult.startswith("[Skipped ") and "content:" in chunk_result.aiResult: + continue - # Merge chunks for this document - doc_content = "" - for i, chunk_result in enumerate(doc_chunks): - # Add chunk separator (except for first chunk) - if i > 0: - doc_content += chunk_separator - - # Add chunk content without metadata - chunk_metadata = chunk_result.metadata - if chunk_metadata.get("success", False): - chunk_content = chunk_result.aiResult - - # CLEAN MODE: Skip container/binary chunks entirely - if chunk_content.startswith("[Skipped ") and "content:" in chunk_content: - continue # Skip container/binary chunks in clean mode - - # CLEAN MODE: Skip empty or whitespace-only chunks - if not chunk_content.strip(): - continue # Skip empty chunks in clean mode - - # CLEAN MODE: No chunk metadata - doc_content += chunk_content - else: - # Handle error chunks silently in clean mode - continue - - merged_documents.append(doc_header + doc_content) + # Create ContentPart from ChunkResult with proper typeGroup + content_part = ContentPart( + id=chunk_result.originalChunk.id, + parentId=chunk_result.originalChunk.parentId, + label=chunk_result.originalChunk.label, + typeGroup=chunk_result.originalChunk.typeGroup, # Use original typeGroup + mimeType=chunk_result.originalChunk.mimeType, + data=chunk_result.aiResult, # Use AI result as data + metadata={ + **chunk_result.originalChunk.metadata, + "aiResult": True, + "chunk": True, + "chunkIndex": chunk_result.chunkIndex, + "documentId": chunk_result.documentId, + "processingTime": chunk_result.processingTime, + "success": chunk_result.metadata.get("success", False) + } + ) + content_parts.append(content_part) - # Join all documents - final_result = "\n\n".join(merged_documents) + # Use existing merging strategy for clean mode + merge_strategy = { + "useIntelligentMerging": True, + "groupBy": "documentId", # Group by document + "orderBy": "chunkIndex", # Order by chunk index + "mergeType": "concatenate" + } - return final_result.strip() + if options and hasattr(options, 'mergeStrategy'): + merge_strategy.update(options.mergeStrategy) + + # Apply existing merging logic using the sophisticated merging system + from modules.services.serviceExtraction.subPipeline import _applyMerging + merged_parts = _applyMerging(content_parts, merge_strategy) + + # Convert merged parts back to final string + final_content = "\n\n".join([part.data for part in merged_parts]) + + logger.info(f"Merged {len(content_parts)} chunks in clean mode using existing sophisticated merging system") + return final_content.strip() def _mergeChunkResultsJson( self, chunkResults: List[ChunkResult], options: Optional[AiCallOptions] = None ) -> Dict[str, Any]: - """Merge chunk results in JSON mode - returns structured JSON document.""" - + """Merge chunk results in JSON mode using existing sophisticated merging system.""" if not chunkResults: return {"metadata": {"title": "Empty Document"}, "sections": []} - # Group chunk results by document - results_by_document = {} + # Convert ChunkResults back to ContentParts for existing merger system + from modules.datamodels.datamodelExtraction import ContentPart + content_parts = [] for chunk_result in chunkResults: - doc_id = chunk_result.documentId - if doc_id not in results_by_document: - results_by_document[doc_id] = [] - results_by_document[doc_id].append(chunk_result) + # Create ContentPart from ChunkResult with proper typeGroup + content_part = ContentPart( + id=chunk_result.originalChunk.id, + parentId=chunk_result.originalChunk.parentId, + label=chunk_result.originalChunk.label, + typeGroup=chunk_result.originalChunk.typeGroup, # Use original typeGroup + mimeType=chunk_result.originalChunk.mimeType, + data=chunk_result.aiResult, # Use AI result as data + metadata={ + **chunk_result.originalChunk.metadata, + "aiResult": True, + "chunk": True, + "chunkIndex": chunk_result.chunkIndex, + "documentId": chunk_result.documentId, + "processingTime": chunk_result.processingTime, + "success": chunk_result.metadata.get("success", False) + } + ) + content_parts.append(content_part) - # Sort chunks within each document by chunk index - for doc_id in results_by_document: - results_by_document[doc_id].sort(key=lambda x: x.chunkIndex) + # Use existing merging strategy for JSON mode + merge_strategy = { + "useIntelligentMerging": True, + "groupBy": "documentId", # Group by document + "orderBy": "chunkIndex", # Order by chunk index + "mergeType": "concatenate" + } - # Merge JSON results for each document - all_documents = [] + if options and hasattr(options, 'mergeStrategy'): + merge_strategy.update(options.mergeStrategy) + + # Apply existing merging logic using the sophisticated merging system + from modules.services.serviceExtraction.subPipeline import _applyMerging + merged_parts = _applyMerging(content_parts, merge_strategy) + + # Convert merged parts to JSON format all_sections = [] document_titles = [] - combined_metadata = {"title": "Merged Document", "splitStrategy": "by_section"} - for doc_id, doc_chunks in results_by_document.items(): - # Process each chunk's JSON result - for chunk_result in doc_chunks: - chunk_metadata = chunk_result.metadata - if chunk_metadata.get("success", False): - try: - # Parse JSON from AI result - chunk_json = json.loads(chunk_result.aiResult) + for part in merged_parts: + if part.metadata.get("success", False): + try: + # Parse JSON from AI result + chunk_json = json.loads(part.data) + + # Check if this is a multi-file response (has "documents" key) + if isinstance(chunk_json, dict) and "documents" in chunk_json: + # This is a multi-file response - merge all documents + logger.debug(f"Processing multi-file response from part {part.id} with {len(chunk_json['documents'])} documents") - # Check if this is a multi-file response (has "documents" key) - if isinstance(chunk_json, dict) and "documents" in chunk_json: - # This is a multi-file response - merge all documents - logger.debug(f"Processing multi-file response from chunk {chunk_result.chunkIndex} with {len(chunk_json['documents'])} documents") - - # Add all documents from this chunk - for doc in chunk_json["documents"]: - # Add chunk context to document - doc["metadata"] = doc.get("metadata", {}) - doc["metadata"]["source_chunk"] = chunk_result.chunkIndex - doc["metadata"]["source_document"] = doc_id - all_documents.append(doc) - - # Update combined metadata - if "metadata" in chunk_json: - combined_metadata.update(chunk_json["metadata"]) - - # Extract sections from single-file response (fallback) - elif isinstance(chunk_json, dict) and "sections" in chunk_json: - for section in chunk_json["sections"]: - # Add document context to section - section["metadata"] = section.get("metadata", {}) - section["metadata"]["source_document"] = doc_id - section["metadata"]["chunk_index"] = chunk_result.chunkIndex - all_sections.append(section) - - # Extract document title - if isinstance(chunk_json, dict) and "metadata" in chunk_json: - title = chunk_json["metadata"].get("title", "") - if title and title not in document_titles: - document_titles.append(title) - - except json.JSONDecodeError as e: - logger.warning(f"Failed to parse JSON from chunk {chunk_result.chunkIndex}: {str(e)}") - # Create a fallback section for invalid JSON - fallback_section = { - "id": f"error_section_{chunk_result.chunkIndex}", - "title": "Error Section", - "content_type": "paragraph", - "elements": [{ - "text": f"Error parsing chunk {chunk_result.chunkIndex}: {str(e)}" - }], - "order": chunk_result.chunkIndex, - "metadata": { - "source_document": doc_id, - "chunk_index": chunk_result.chunkIndex, - "error": str(e) - } + # Return multi-file response directly + return { + "metadata": chunk_json.get("metadata", {"title": "Merged Document"}), + "documents": chunk_json["documents"] } - all_sections.append(fallback_section) - else: - # Handle error chunks - error_section = { - "id": f"error_section_{chunk_result.chunkIndex}", + + # Extract sections from single-file response + elif isinstance(chunk_json, dict) and "sections" in chunk_json: + for section in chunk_json["sections"]: + # Add part context to section + section["metadata"] = section.get("metadata", {}) + section["metadata"]["source_part"] = part.id + section["metadata"]["source_document"] = part.metadata.get("documentId", "unknown") + section["metadata"]["chunk_index"] = part.metadata.get("chunkIndex", 0) + all_sections.append(section) + + # Extract document title + if isinstance(chunk_json, dict) and "metadata" in chunk_json: + title = chunk_json["metadata"].get("title", "") + if title and title not in document_titles: + document_titles.append(title) + + except json.JSONDecodeError as e: + logger.warning(f"Failed to parse JSON from part {part.id}: {str(e)}") + # Create a fallback section for invalid JSON + fallback_section = { + "id": f"error_section_{part.id}", "title": "Error Section", "content_type": "paragraph", "elements": [{ - "text": f"Error in chunk {chunk_result.chunkIndex}: {chunk_metadata.get('error', 'Unknown error')}" + "text": f"Error parsing part {part.id}: {str(e)}" }], - "order": chunk_result.chunkIndex, + "order": part.metadata.get("chunkIndex", 0), "metadata": { - "source_document": doc_id, - "chunk_index": chunk_result.chunkIndex, - "error": chunk_metadata.get('error', 'Unknown error') + "source_document": part.metadata.get("documentId", "unknown"), + "part_id": part.id, + "error": str(e) } } - all_sections.append(error_section) + all_sections.append(fallback_section) + else: + # Handle error parts + error_section = { + "id": f"error_section_{part.id}", + "title": "Error Section", + "content_type": "paragraph", + "elements": [{ + "text": f"Error in part {part.id}: {part.metadata.get('error', 'Unknown error')}" + }], + "order": part.metadata.get("chunkIndex", 0), + "metadata": { + "source_document": part.metadata.get("documentId", "unknown"), + "part_id": part.id, + "error": part.metadata.get('error', 'Unknown error') + } + } + all_sections.append(error_section) # Sort sections by order all_sections.sort(key=lambda x: x.get("order", 0)) - # If we have merged documents from multi-file responses, return them - if all_documents: - logger.info(f"Merged {len(all_documents)} documents from {len(chunkResults)} chunks") - return { - "metadata": combined_metadata, - "documents": all_documents - } - - # Otherwise, create merged document with sections (single-file fallback) + # Create merged document with sections merged_document = { "metadata": { "title": document_titles[0] if document_titles else "Merged Document", - "source_documents": list(results_by_document.keys()), - "extraction_method": "ai_json_extraction", - "version": "1.0" + "extraction_method": "ai_json_extraction_with_merging", + "version": "2.0" }, "sections": all_sections, - "summary": f"Merged document from {len(results_by_document)} source documents", - "tags": ["merged", "ai_generated"] + "summary": f"Merged document using sophisticated merging system", + "tags": ["merged", "ai_generated", "sophisticated_merging"] } - logger.info(f"Merged {len(chunkResults)} chunks from {len(results_by_document)} documents (JSON mode)") + logger.info(f"Merged {len(chunkResults)} chunks using existing sophisticated merging system (JSON mode)") return merged_document - def _getModelCapabilitiesForContent(self, prompt: str, documents: Optional[List[ChatDocument]], options: AiCallOptions) -> Dict[str, int]: - """ - Get model capabilities for content processing, including appropriate size limits for chunking. - Uses centralized model selection to determine chunking parameters. - """ - # Estimate total content size - prompt_size = len(prompt.encode('utf-8')) - document_size = 0 - if documents: - # Rough estimate of document content size - for doc in documents: - document_size += doc.fileSize or 0 - - total_size = prompt_size + document_size - - # Use centralized model selection to get the best model for chunking parameters - try: - from modules.aicore.aicoreModelRegistry import modelRegistry - from modules.aicore.aicoreModelSelector import model_selector - - # Get available models and select the best one for this operation - availableModels = modelRegistry.getAvailableModels() - selectedModel = model_selector.selectModel(prompt, "", options, availableModels) - - if selectedModel: - context_length = selectedModel.contextLength - model_name = selectedModel.name - logger.debug(f"Selected model for chunking: {model_name} with context length: {context_length}") - else: - # Fallback to conservative default if no model selected - context_length = 128000 # GPT-4o default - model_name = "fallback" - logger.warning(f"No model selected for chunking, using fallback context length: {context_length}") - - except Exception as e: - # Fallback to conservative default if model selection fails - context_length = 128000 # GPT-4o default - model_name = "fallback" - logger.error(f"Model selection failed for chunking: {e}, using fallback context length: {context_length}") - - # Calculate appropriate sizes - # Convert tokens to bytes (rough estimate: 1 token ≈ 4 characters) - context_length_bytes = int(context_length * 4) - max_context_bytes = int(context_length_bytes * 0.9) # 90% of context length - text_chunk_size = int(max_context_bytes * 0.7) # 70% of max context for text chunks - image_chunk_size = int(max_context_bytes * 0.8) # 80% of max context for image chunks - - logger.debug(f"Content size: {total_size} bytes, Max context: {max_context_bytes} bytes") - logger.debug(f"Text chunk size: {text_chunk_size} bytes, Image chunk size: {image_chunk_size} bytes") - - return { - "maxContextBytes": max_context_bytes, - "textChunkSize": text_chunk_size, - "imageChunkSize": image_chunk_size - } +# REMOVED: _getModelCapabilitiesForContent method - no longer needed with model-aware chunking diff --git a/modules/services/serviceExtraction/subPipeline.py b/modules/services/serviceExtraction/subPipeline.py index d0b79eaf..9d8193da 100644 --- a/modules/services/serviceExtraction/subPipeline.py +++ b/modules/services/serviceExtraction/subPipeline.py @@ -73,139 +73,18 @@ def runExtraction(extractorRegistry: ExtractorRegistry, chunkerRegistry: Chunker parts = extractor.extract(documentBytes, {"fileName": fileName, "mimeType": mimeType, "options": options}) - # Apply chunking and size limiting - parts = poolAndLimit(parts, chunkerRegistry, options) + # REMOVED: poolAndLimit(parts, chunkerRegistry, options) + # REMOVED: Chunking logic - now handled in AI call phase - # Optional merge step - but preserve chunks + # Apply merging strategy if provided (preserve existing logic) mergeStrategy = options.get("mergeStrategy", {}) if mergeStrategy: - - # Don't merge chunks - they should stay separate for processing - non_chunk_parts = [p for p in parts if not p.metadata.get("chunk", False)] - chunk_parts = [p for p in parts if p.metadata.get("chunk", False)] - - logger.debug(f"runExtraction: Preserving {len(chunk_parts)} chunks from merging") - logger.debug(f"runExtraction - non_chunk_parts: {len(non_chunk_parts)}, chunk_parts: {len(chunk_parts)}") - - # Apply intelligent merging for small text parts - if non_chunk_parts: - # Count text parts - text_parts = [p for p in non_chunk_parts if p.typeGroup == "text"] - if len(text_parts) > 5: # If we have many small text parts, merge them - logger.info(f"🔧 Merging {len(text_parts)} small text parts for efficiency") - non_chunk_parts = _mergeParts(non_chunk_parts, mergeStrategy) - - # Combine non-chunk parts with chunk parts (chunks stay separate) - parts = non_chunk_parts + chunk_parts - - logger.debug(f"runExtraction: Final parts after merging: {len(parts)} (chunks: {len(chunk_parts)})") - logger.debug(f"runExtraction - Final parts: {len(parts)} (chunks: {len(chunk_parts)})") - # Timestamp-only extraction debug dumps removed - + parts = _applyMerging(parts, mergeStrategy) + return ContentExtracted(id=makeId(), parts=parts) -def poolAndLimit(parts: List[ContentPart], chunkerRegistry: ChunkerRegistry, options: Dict[str, Any]) -> List[ContentPart]: - maxSize = int(options.get("maxSize", 0) or 0) - chunkAllowed = bool(options.get("chunkAllowed", False)) - mergeStrategy = options.get("mergeStrategy", {}) - - if maxSize <= 0: - # Still apply merging if strategy provided - if mergeStrategy: - return _applyMerging(parts, mergeStrategy) - return parts - - # First, try to fit within size limit - current = 0 - kept: List[ContentPart] = [] - remaining: List[ContentPart] = [] - - logger.debug(f"Starting poolAndLimit with {len(parts)} parts, maxSize={maxSize}") - - for i, p in enumerate(parts): - size = int(p.metadata.get("size", 0) or 0) - # Show first 50 characters of text content for debugging - content_preview = p.data[:50].replace('\n', '\\n') if p.data else "" - logger.debug(f"Part {i}: {p.typeGroup} - {size} bytes - '{content_preview}...' (current: {current})") - if current + size <= maxSize: - kept.append(p) - current += size - logger.debug(f"Part {i} kept (total: {current})") - else: - remaining.append(p) - logger.debug(f"Part {i} moved to remaining") - - logger.debug(f"Kept: {len(kept)}, Remaining: {len(remaining)}") - - # If we have remaining parts and chunking is allowed, try chunking - if remaining and chunkAllowed: - logger.debug(f"=== CHUNKING ACTIVATED ===") - logger.debug(f"Remaining parts to chunk: {len(remaining)}") - logger.debug(f"Max size limit: {maxSize} bytes") - logger.debug(f"Current size used: {current} bytes") - logger.debug(f"Chunking {len(remaining)} remaining parts") - - for p in remaining: - if p.typeGroup in ("text", "table", "structure", "image", "container", "binary"): - logger.debug(f"Chunking {p.typeGroup} part: {len(p.data)} chars") - logger.debug(f"Chunking {p.typeGroup} part with {len(p.data)} chars") - chunks = chunkerRegistry.resolve(p.typeGroup).chunk(p, options) - logger.debug(f"Created {len(chunks)} chunks") - logger.debug(f"Created {len(chunks)} chunks") - - chunks_added = 0 - for ch in chunks: - chSize = int(ch.get("size", 0) or 0) - # Add all chunks - don't limit by maxSize since they'll be processed separately - kept.append(ContentPart( - id=makeId(), - parentId=p.id, - label=f"chunk_{ch.get('order', 0)}", - typeGroup=p.typeGroup, - mimeType=p.mimeType, - data=ch.get("data", ""), - metadata={ - "size": chSize, - "chunk": True, - **ch.get("metadata", {}) - } - )) - chunks_added += 1 - logger.debug(f"Added chunk {ch.get('order', 0)}: {chSize} bytes") - - logger.debug(f"Added {chunks_added} chunks from {p.typeGroup} part") - - # Apply merging strategy if provided, but preserve chunks - if mergeStrategy: - # Don't merge chunks - they should stay separate for processing - non_chunk_parts = [p for p in kept if not p.metadata.get("chunk", False)] - chunk_parts = [p for p in kept if p.metadata.get("chunk", False)] - - logger.debug(f"Preserving {len(chunk_parts)} chunks from merging") - - # Apply intelligent merging for small text parts - if non_chunk_parts: - # Count text parts - text_parts = [p for p in non_chunk_parts if p.typeGroup == "text"] - if len(text_parts) > 5: # If we have many small text parts, merge them - logger.info(f"🔧 Merging {len(text_parts)} small text parts for efficiency") - non_chunk_parts = _applyMerging(non_chunk_parts, mergeStrategy) - - # Combine non-chunk parts with chunk parts (chunks stay separate) - kept = non_chunk_parts + chunk_parts - - logger.debug(f"Final parts after merging: {len(kept)} (chunks: {len(chunk_parts)})") - logger.debug(f"Final parts after merging: {len(kept)} (chunks: {len(chunk_parts)})") - - # Re-check size after merging - totalSize = sum(int(p.metadata.get("size", 0) or 0) for p in kept) - if totalSize > maxSize and mergeStrategy.get("maxSize"): - # Apply size limit to merged parts - kept = _applySizeLimit(kept, maxSize) - - logger.debug(f"poolAndLimit returning {len(kept)} parts") - return kept +# REMOVED: poolAndLimit function - chunking now handled in AI call phase def _applyMerging(parts: List[ContentPart], strategy: Dict[str, Any]) -> List[ContentPart]: @@ -264,37 +143,5 @@ def _applyMerging(parts: List[ContentPart], strategy: Dict[str, Any]) -> List[Co return merged -def _applySizeLimit(parts: List[ContentPart], maxSize: int) -> List[ContentPart]: - """Apply size limit by prioritizing parts and truncating if necessary.""" - # Sort by priority: text first, then others - priority_order = {"text": 0, "table": 1, "structure": 2, "image": 3, "binary": 4, "metadata": 5, "container": 6} - sorted_parts = sorted(parts, key=lambda p: priority_order.get(p.typeGroup, 99)) - - kept: List[ContentPart] = [] - current_size = 0 - - for part in sorted_parts: - part_size = int(part.metadata.get("size", 0) or 0) - if current_size + part_size <= maxSize: - kept.append(part) - current_size += part_size - else: - # Try to truncate text parts - if part.typeGroup == "text" and part_size > 0: - remaining_size = maxSize - current_size - if remaining_size > 1000: # Only truncate if we have meaningful space - truncated_data = part.data[:remaining_size * 4] # Rough character estimate - truncated_part = ContentPart( - id=makeId(), - parentId=part.parentId, - label=f"{part.label}_truncated", - typeGroup=part.typeGroup, - mimeType=part.mimeType, - data=truncated_data, - metadata={**part.metadata, "size": len(truncated_data.encode('utf-8')), "truncated": True} - ) - kept.append(truncated_part) - break - - return kept +# REMOVED: _applySizeLimit function - no longer needed after removing poolAndLimit diff --git a/test_ai_behavior.py b/test_ai_behavior.py index b5951d06..34a2ad60 100644 --- a/test_ai_behavior.py +++ b/test_ai_behavior.py @@ -138,6 +138,7 @@ class AIBehaviorTester: self.testResults.append(result) return result + def _extractContinuationInstruction(self, response: str) -> str: """Extract continuation instruction from response.""" try: diff --git a/test_ai_model_selection.py b/test_ai_model_selection.py index c11742d1..2f7b016f 100644 --- a/test_ai_model_selection.py +++ b/test_ai_model_selection.py @@ -25,7 +25,7 @@ from modules.datamodels.datamodelAi import ( ) from modules.datamodels.datamodelUam import User from modules.aicore.aicoreModelRegistry import modelRegistry -from modules.aicore.aicoreModelSelector import model_selector +from modules.aicore.aicoreModelSelector import modelSelector class ModelSelectionTester: @@ -45,6 +45,51 @@ class ModelSelectionTester: self.services.ai = await AiService.create(self.services) + async def _printFallbackListWithContext(self, title: str, prompt: str, context: str, options: AiCallOptions) -> None: + print(f"\n{'='*80}") + print(f"{title}") + print(f"{'='*80}") + print( + f"Operation={options.operationType.name}, Priority={options.priority.name}, ProcessingMode={options.processingMode.name}" + ) + + # Show context and prompt sizes + promptSize = len(prompt.encode("utf-8")) + contextSize = len(context.encode("utf-8")) + totalSize = promptSize + contextSize + print(f"Prompt size: {promptSize} bytes, Context size: {contextSize} bytes, Total: {totalSize} bytes") + + availableModels = modelRegistry.getAvailableModels() + failoverModelList = modelSelector.getFailoverModelList( + prompt=prompt, + context=context, + options=options, + availableModels=availableModels, + ) + + if not failoverModelList: + print("No suitable models found (capability filter returned empty list).") + return + + print("Prioritized fallback model sequence (name | quality | speed | $/1k in | ctx | score):") + for idx, m in enumerate(failoverModelList, 1): + costIn = getattr(m, "costPer1kTokensInput", 0.0) + # Calculate detailed score breakdown + promptSize = len(prompt.encode("utf-8")) + contextSize = len(context.encode("utf-8")) + totalSize = promptSize + contextSize + + # Get detailed scoring + sizeRating = modelSelector._getSizeRating(m, totalSize) + processingModeRating = modelSelector._getProcessingModeRating(m.processingMode, options.processingMode) + priorityRating = modelSelector._getPriorityRating(m, options.priority) + totalScore = sizeRating + processingModeRating + priorityRating + + print( + f" {idx:>2}. {m.name} | Q={getattr(m, 'qualityRating', 0)} | S={getattr(m, 'speedRating', 0)} | ${costIn:.4f} | ctx={getattr(m, 'contextLength', 0)} | score={totalScore:.3f}" + ) + print(f" Size: {sizeRating:.3f}, ProcessingMode: {processingModeRating:.3f}, Priority: {priorityRating:.3f}") + async def _printFallbackList(self, title: str, prompt: str, options: AiCallOptions) -> None: print(f"\n{'='*80}") print(f"{title}") @@ -53,24 +98,43 @@ class ModelSelectionTester: f"Operation={options.operationType.name}, Priority={options.priority.name}, ProcessingMode={options.processingMode.name}" ) + # Show context and prompt sizes + context = "" # Currently using empty context + promptSize = len(prompt.encode("utf-8")) + contextSize = len(context.encode("utf-8")) + totalSize = promptSize + contextSize + print(f"Prompt size: {promptSize} bytes, Context size: {contextSize} bytes, Total: {totalSize} bytes") + availableModels = modelRegistry.getAvailableModels() - fallbackModels = model_selector.getFallbackModels( + failoverModelList = modelSelector.getFailoverModelList( prompt=prompt, - context="", + context=context, options=options, availableModels=availableModels, ) - if not fallbackModels: + if not failoverModelList: print("No suitable models found (capability filter returned empty list).") return - print("Prioritized fallback model sequence (name | quality | speed | $/1k in | ctx):") - for idx, m in enumerate(fallbackModels, 1): + print("Prioritized fallback model sequence (name | quality | speed | $/1k in | ctx | score):") + for idx, m in enumerate(failoverModelList, 1): costIn = getattr(m, "costPer1kTokensInput", 0.0) + # Calculate detailed score breakdown + promptSize = len(prompt.encode("utf-8")) + contextSize = len(context.encode("utf-8")) + totalSize = promptSize + contextSize + + # Get detailed scoring + sizeRating = modelSelector._getSizeRating(m, totalSize) + processingModeRating = modelSelector._getProcessingModeRating(m.processingMode, options.processingMode) + priorityRating = modelSelector._getPriorityRating(m, options.priority) + totalScore = sizeRating + processingModeRating + priorityRating + print( - f" {idx:>2}. {m.name} | Q={getattr(m, 'qualityRating', 0)} | S={getattr(m, 'speedRating', 0)} | ${costIn:.4f} | ctx={getattr(m, 'contextLength', 0)}" + f" {idx:>2}. {m.name} | Q={getattr(m, 'qualityRating', 0)} | S={getattr(m, 'speedRating', 0)} | ${costIn:.4f} | ctx={getattr(m, 'contextLength', 0)} | score={totalScore:.3f}" ) + print(f" Size: {sizeRating:.3f}, ProcessingMode: {processingModeRating:.3f}, Priority: {priorityRating:.3f}") async def run(self) -> None: # Scenarios reflecting workflows/ @@ -146,10 +210,93 @@ class ModelSelectionTester: ) ) + # Intent analysis (user input understanding) + scenarios.append( + ( + "ANALYSE - Quality, Detailed (Intent Analysis)", + "Analyze user intent and extract key requirements from the following request: 'I need to create a comprehensive marketing strategy for our new product launch including budget allocation, timeline, and target audience analysis.'", + AiCallOptions( + operationType=OperationTypeEnum.ANALYSE, + priority=PriorityEnum.QUALITY, + compressPrompt=False, + compressContext=False, + processingMode=ProcessingModeEnum.DETAILED, + maxCost=0.08, + maxProcessingTime=45, + resultFormat="json", + temperature=0.2, + ), + ) + ) + + # Review/Validation (quality assurance) + scenarios.append( + ( + "ANALYSE - Quality, Detailed (Review/Validation)", + "Review and validate the following business proposal for completeness, accuracy, and compliance with industry standards. Identify any gaps or areas for improvement.", + AiCallOptions( + operationType=OperationTypeEnum.ANALYSE, + priority=PriorityEnum.QUALITY, + compressPrompt=False, + compressContext=False, + processingMode=ProcessingModeEnum.DETAILED, + maxCost=0.10, + maxProcessingTime=60, + resultFormat="json", + temperature=0.1, + ), + ) + ) + + # Large context scenario (to test size-based scoring) + scenarios.append( + ( + "GENERAL - Balanced, Advanced (Large Context Test)", + "Process this large document and provide a comprehensive summary.", + AiCallOptions( + operationType=OperationTypeEnum.GENERAL, + priority=PriorityEnum.BALANCED, + compressPrompt=False, + compressContext=False, + processingMode=ProcessingModeEnum.ADVANCED, + maxCost=0.15, + maxProcessingTime=120, + ), + ) + ) + # Iterate and print lists for title, prompt, options in scenarios: await self._printFallbackList(title, prompt, options) + # Test with actual context to see size-based scoring + largeContext = """ + This is a comprehensive business document containing detailed information about our company's strategic initiatives, + financial performance, market analysis, competitive landscape, operational metrics, customer feedback, + product development roadmap, technology stack, human resources, legal compliance, risk management, + sustainability efforts, and future growth plans. The document spans multiple sections including executive summary, + market research, financial statements, operational reports, customer insights, product specifications, + technology architecture, HR policies, legal frameworks, risk assessments, environmental impact studies, + and strategic recommendations. This extensive content is designed to test the model selection algorithm's + ability to handle large context sizes and make intelligent decisions about which models are best suited + for processing such substantial amounts of information while maintaining efficiency and cost-effectiveness. + """ * 10 # Repeat to make it even larger + + await self._printFallbackListWithContext( + "GENERAL - Balanced, Advanced (Large Context Test)", + "Analyze this comprehensive business document and provide key insights.", + largeContext, + AiCallOptions( + operationType=OperationTypeEnum.GENERAL, + priority=PriorityEnum.BALANCED, + compressPrompt=False, + compressContext=False, + processingMode=ProcessingModeEnum.ADVANCED, + maxCost=0.15, + maxProcessingTime=120, + ), + ) + async def main() -> None: tester = ModelSelectionTester()