""" Dynamic model selector using configurable rules and scoring. """ import logging from typing import List, Optional, Dict, Any, Tuple from modules.datamodels.datamodelAi import AiModel, AiCallOptions, OperationTypeEnum, PriorityEnum, ProcessingModeEnum, ModelCapabilitiesEnum from modules.aicore.aicoreModelSelectionConfig import model_selection_config logger = logging.getLogger(__name__) class ModelSelector: """Dynamic model selector using configurable rules.""" def __init__(self): self.config = model_selection_config def selectModel(self, prompt: str, context: str, options: AiCallOptions, availableModels: List[AiModel]) -> Optional[AiModel]: """ Select the best model based on configurable rules and scoring. Args: prompt: User prompt context: Context data options: AI call options availableModels: List of available models to choose from Returns: Selected model or None if no suitable model found """ if not availableModels: logger.warning("No models available for selection") return None logger.info(f"Selecting model for operation: {options.operationType}, priority: {options.priority}") # Calculate input size inputSize = len(prompt.encode("utf-8")) + len(context.encode("utf-8")) # Get applicable rules rules = self.config.getRulesForOperation(options.operationType) logger.debug(f"Found {len(rules)} applicable rules for {options.operationType}") # Score each model scoredModels = [] for model in availableModels: if not model.isAvailable: continue score = self._calculateModelScore(model, inputSize, options, rules) if score > 0: # Only consider models with positive scores scoredModels.append((model, score)) logger.debug(f"Model {model.name}: score={score:.2f}") if not scoredModels: logger.warning("No models passed the selection criteria, trying fallback criteria") # Try fallback criteria fallbackCriteria = self.getFallbackCriteria(options.operationType) return self._selectWithFallbackCriteria(availableModels, fallbackCriteria, inputSize, options) # Sort by score (highest first) scoredModels.sort(key=lambda x: x[1], reverse=True) selectedModel = scoredModels[0][0] selectedScore = scoredModels[0][1] logger.info(f"Selected model: {selectedModel.name} (score: {selectedScore:.2f})") # Log selection details self._logSelectionDetails(selectedModel, inputSize, options) return selectedModel def _calculateModelScore(self, model: AiModel, inputSize: int, options: AiCallOptions, rules: List) -> float: """Calculate score for a model based on rules and criteria.""" score = 0.0 # Check basic requirements if not self._meetsBasicRequirements(model, inputSize, options): return 0.0 # Apply rules for rule in rules: ruleScore = self._applyRule(model, inputSize, options, rule) score += ruleScore * rule.weight # Apply priority-based scoring priorityScore = self._applyPriorityScoring(model, options) score += priorityScore # Apply processing mode scoring modeScore = self._applyProcessingModeScoring(model, options) score += modeScore # Apply cost constraints if not self._meetsCostConstraints(model, inputSize, options): score *= 0.1 # Heavily penalize but don't eliminate return max(0.0, score) def _meetsBasicRequirements(self, model: AiModel, inputSize: int, options: AiCallOptions) -> bool: """Check if model meets basic requirements.""" # Context length check if model.contextLength > 0 and inputSize > model.contextLength * 0.8: logger.debug(f"Model {model.name} rejected: input too large ({inputSize} > {model.contextLength * 0.8})") return False # Required operation types check if options.operationTypes: if not all(opType in model.operationTypes for opType in options.operationTypes): logger.debug(f"Model {model.name} rejected: missing required operation types") return False # Capabilities check if options.capabilities: if not all(cap in model.capabilities for cap in options.capabilities): logger.debug(f"Model {model.name} rejected: missing required capabilities") return False # Avoid operation types check for rule in self.config.getRulesForOperation(options.operationType): if any(opType in model.operationTypes for opType in rule.avoidOperationTypes): logger.debug(f"Model {model.name} rejected: has avoid operation types") return False return True def _applyRule(self, model: AiModel, inputSize: int, options: AiCallOptions, rule) -> float: """Apply a specific rule to calculate score contribution.""" score = 0.0 # Required operation types match if all(opType in model.operationTypes for opType in rule.operationTypes): score += 1.0 # Preferred capabilities match preferredMatches = sum(1 for cap in rule.preferredCapabilities if cap in model.capabilities) if rule.preferredCapabilities: score += (preferredMatches / len(rule.preferredCapabilities)) * 0.5 # Quality rating check if rule.minQualityRating and model.qualityRating >= rule.minQualityRating: score += 0.3 # Context length check if rule.minContextLength and model.contextLength >= rule.minContextLength: score += 0.2 return score def _applyPriorityScoring(self, model: AiModel, options: AiCallOptions) -> float: """Apply priority-based scoring.""" if options.priority == PriorityEnum.SPEED: return model.speedRating * 0.1 elif options.priority == PriorityEnum.QUALITY: return model.qualityRating * 0.1 elif options.priority == PriorityEnum.COST: # Lower cost = higher score costScore = max(0, 1.0 - (model.costPer1kTokensInput * 1000)) return costScore * 0.1 else: # BALANCED return (model.qualityRating + model.speedRating) * 0.05 def _applyProcessingModeScoring(self, model: AiModel, options: AiCallOptions) -> float: """Apply processing mode scoring.""" if options.processingMode == ProcessingModeEnum.DETAILED: if model.priority == PriorityEnum.QUALITY: return 0.2 elif options.processingMode == ProcessingModeEnum.BASIC: if model.priority == PriorityEnum.SPEED: return 0.2 return 0.0 def _meetsCostConstraints(self, model: AiModel, inputSize: int, options: AiCallOptions) -> bool: """Check if model meets cost constraints.""" if options.maxCost is None: return True # Estimate cost estimatedTokens = inputSize / 4 estimatedCost = (estimatedTokens / 1000) * model.costPer1kTokensInput return estimatedCost <= options.maxCost def _logSelectionDetails(self, model: AiModel, inputSize: int, options: AiCallOptions): """Log detailed selection information.""" logger.info(f"Model Selection Details:") logger.info(f" Selected: {model.displayName} ({model.name})") logger.info(f" Connector: {model.connectorType}") logger.info(f" Operation: {options.operationType}") logger.info(f" Priority: {options.priority}") logger.info(f" Processing Mode: {options.processingMode}") logger.info(f" Input Size: {inputSize} bytes") logger.info(f" Context Length: {model.contextLength}") logger.info(f" Max Tokens: {model.maxTokens}") logger.info(f" Quality Rating: {model.qualityRating}/10") logger.info(f" Speed Rating: {model.speedRating}/10") logger.info(f" Cost: ${model.costPer1kTokensInput:.4f}/1k tokens") logger.info(f" Capabilities: {', '.join(model.capabilities)}") logger.info(f" Priority: {model.priority}") def getFallbackCriteria(self, operationType: str) -> Dict[str, Any]: """Get fallback selection criteria for an operation type.""" return self.config.getFallbackCriteria(operationType) def _selectWithFallbackCriteria(self, availableModels: List[AiModel], fallbackCriteria: Dict[str, Any], inputSize: int, options: AiCallOptions) -> Optional[AiModel]: """Select model using fallback criteria when normal selection fails.""" logger.info("Using fallback criteria for model selection") # Filter models by fallback criteria candidates = [] for model in availableModels: if not model.isAvailable: continue # Check required operation types if fallbackCriteria.get("operationTypes"): if not all(opType in model.operationTypes for opType in fallbackCriteria["operationTypes"]): continue # Check quality rating if fallbackCriteria.get("minQualityRating"): if model.qualityRating < fallbackCriteria["minQualityRating"]: continue # Check cost if fallbackCriteria.get("maxCostPer1k"): if model.costPer1kTokensInput > fallbackCriteria["maxCostPer1k"]: continue # Check context length if model.contextLength > 0 and inputSize > model.contextLength * 0.8: continue candidates.append(model) if not candidates: logger.error("No models available even with fallback criteria") return None # Sort by priority order from fallback criteria priorityOrder = fallbackCriteria.get("priorityOrder", ["quality", "speed", "cost"]) def _getPriorityScore(model: AiModel) -> float: score = 0.0 for i, priority in enumerate(priorityOrder): weight = len(priorityOrder) - i # Higher weight for earlier priorities if priority == "quality": score += model.qualityRating * weight elif priority == "speed": score += model.speedRating * weight elif priority == "cost": # Lower cost = higher score score += (1.0 - model.costPer1kTokensInput * 1000) * weight return score candidates.sort(key=_getPriorityScore, reverse=True) selectedModel = candidates[0] logger.info(f"Fallback selection: {selectedModel.name} (score: {_getPriorityScore(selectedModel):.2f})") return selectedModel def getFallbackModels(self, prompt: str, context: str, options: AiCallOptions, availableModels: List[AiModel]) -> List[AiModel]: """ Get prioritized list of models for fallback sequence. Steps: 1. Filter models by capability requirements 2. Rate models by business requirements (priority, processing mode) 3. Sort by rating (descending), then by cost (ascending) Args: prompt: User prompt context: Context data options: AI call options availableModels: List of available models Returns: Prioritized list of models for fallback sequence """ if not availableModels: logger.warning("No models available for fallback selection") return [] logger.info(f"Building fallback sequence for operation: {options.operationType}, priority: {options.priority}") # Step 1: Filter by capability requirements capableModels = self._filterByCapabilities(availableModels, options) logger.info(f"Step 1 - Capable models: {[m.name for m in capableModels]}") if not capableModels: logger.warning("No models meet capability requirements") return [] # Step 2: Rate models by business requirements ratedModels = self._rateModelsByBusinessRequirements(capableModels, prompt, context, options) logger.info(f"Step 2 - Rated models: {[(m.name, rating) for m, rating in ratedModels]}") # Step 3: Sort by rating (descending), then by cost (ascending) sortedModels = self._sortModelsByRatingAndCost(ratedModels) logger.info(f"Step 3 - Sorted fallback sequence: {[m.name for m in sortedModels]}") return sortedModels def _filterByCapabilities(self, models: List[AiModel], options: AiCallOptions) -> List[AiModel]: """Filter models by required capabilities.""" capableModels = [] for model in models: if not model.isAvailable: continue # Check if model supports required capabilities if options.capabilities: if not all(cap in model.capabilities for cap in options.capabilities): logger.debug(f"Model {model.name} missing required capabilities: {options.capabilities}") continue # Check operation type compatibility if not self._meetsBasicRequirements(model, options): logger.debug(f"Model {model.name} doesn't meet basic requirements") continue capableModels.append(model) return capableModels def _rateModelsByBusinessRequirements(self, models: List[AiModel], prompt: str, context: str, options: AiCallOptions) -> List[Tuple[AiModel, float]]: """Rate models based on business requirements (priority, processing mode).""" ratedModels = [] inputSize = len(prompt.encode("utf-8")) + len(context.encode("utf-8")) for model in models: # Base score from model selection logic baseScore = self._calculateModelScore(model, inputSize, options, []) # Apply priority-based scoring priorityScore = self._applyPriorityScoring(model, options) # Apply processing mode scoring processingScore = self._applyProcessingModeScoring(model, options) # Combine scores totalScore = baseScore + priorityScore + processingScore ratedModels.append((model, totalScore)) logger.debug(f"Model {model.name}: base={baseScore:.2f}, priority={priorityScore:.2f}, processing={processingScore:.2f}, total={totalScore:.2f}") return ratedModels def _sortModelsByRatingAndCost(self, ratedModels: List[Tuple[AiModel, float]]) -> List[AiModel]: """Sort models by rating (descending), then by cost (ascending).""" def sortKey(item): model, rating = item # Primary sort: rating (descending) # Secondary sort: cost (ascending) return (-rating, model.costPer1kTokensInput) sortedItems = sorted(ratedModels, key=sortKey) return [model for model, rating in sortedItems] # Global selector instance model_selector = ModelSelector()