From 36947b6d7ec9183e970489298fe15b438c4ca9f2 Mon Sep 17 00:00:00 2001
From: ValueOn AG
Date: Thu, 23 Oct 2025 00:35:44 +0200
Subject: [PATCH] ready for test revised dynamic ai aware chunking system
---
modules/aicore/aicoreModelRegistry.py | 2 +-
modules/aicore/aicoreModelSelectionConfig.py | 158 ----
modules/aicore/aicoreModelSelector.py | 539 ++++-------
modules/aicore/aicorePluginAnthropic.py | 2 +-
modules/aicore/aicorePluginInternal.py | 2 +-
modules/aicore/aicorePluginOpenai.py | 4 +-
modules/aicore/aicorePluginPerplexity.py | 2 +-
modules/datamodels/datamodelAi.py | 7 +-
modules/datamodels/datamodelExtraction.py | 10 +
modules/interfaces/interfaceAiObjects.py | 282 +++++-
.../serviceAi/subDocumentProcessing.py | 837 +++++++++++-------
.../services/serviceExtraction/subPipeline.py | 167 +---
test_ai_behavior.py | 1 +
test_ai_model_selection.py | 161 +++-
14 files changed, 1160 insertions(+), 1014 deletions(-)
delete mode 100644 modules/aicore/aicoreModelSelectionConfig.py
diff --git a/modules/aicore/aicoreModelRegistry.py b/modules/aicore/aicoreModelRegistry.py
index 8d6c5ac6..a3666114 100644
--- a/modules/aicore/aicoreModelRegistry.py
+++ b/modules/aicore/aicoreModelRegistry.py
@@ -48,7 +48,7 @@ class ModelRegistry:
try:
# Import the module
- module = importlib.import_module(f'modules.connectors.{moduleName}')
+ module = importlib.import_module(f'modules.aicore.{moduleName}')
# Find connector classes (classes that inherit from BaseConnectorAi)
for attrName in dir(module):
diff --git a/modules/aicore/aicoreModelSelectionConfig.py b/modules/aicore/aicoreModelSelectionConfig.py
deleted file mode 100644
index 476dc527..00000000
--- a/modules/aicore/aicoreModelSelectionConfig.py
+++ /dev/null
@@ -1,158 +0,0 @@
-"""
-Configuration for dynamic model selection rules.
-This makes model selection configurable rather than hardcoded.
-"""
-
-from typing import Dict, List, Any
-from modules.datamodels.datamodelAi import OperationTypeEnum, ModelCapabilitiesEnum, PriorityEnum, SelectionRule
-
-
-class ModelSelectionConfig:
- """Configuration for model selection rules."""
-
- def __init__(self):
- self.rules = self._loadDefaultRules()
- self.fallbackModels = self._loadFallbackModels()
-
- def _loadDefaultRules(self) -> List[SelectionRule]:
- """Load default selection rules."""
- return [
- # High quality for planning and analysis
- SelectionRule(
- name="highQualityAnalysis",
- condition="Planning or analysis operations requiring high quality",
- weight=10.0,
- operationTypes=[OperationTypeEnum.PLAN, OperationTypeEnum.ANALYSE],
- priority=PriorityEnum.QUALITY,
- capabilities=[ModelCapabilitiesEnum.TEXT_GENERATION, ModelCapabilitiesEnum.REASONING, ModelCapabilitiesEnum.ANALYSIS],
- minQualityRating=8
- ),
-
- # Fast processing for basic operations
- SelectionRule(
- name="fastBasicProcessing",
- condition="Basic operations requiring speed",
- weight=8.0,
- operationTypes=[OperationTypeEnum.GENERAL],
- priority=PriorityEnum.SPEED,
- capabilities=[ModelCapabilitiesEnum.TEXT_GENERATION, ModelCapabilitiesEnum.CHAT],
- minQualityRating=5
- ),
-
- # Cost-effective for high-volume operations
- SelectionRule(
- name="costEffectiveProcessing",
- condition="High-volume operations where cost matters",
- weight=7.0,
- operationTypes=[OperationTypeEnum.GENERAL, OperationTypeEnum.GENERATE],
- priority=PriorityEnum.COST,
- capabilities=[ModelCapabilitiesEnum.TEXT_GENERATION],
- maxCost=0.01 # $0.01 per 1k tokens
- ),
-
- # Image analysis specific
- SelectionRule(
- name="imageAnalyse",
- condition="Image analysis operations",
- weight=10.0,
- operationTypes=[OperationTypeEnum.IMAGE_ANALYSE],
- priority=PriorityEnum.QUALITY,
- capabilities=[ModelCapabilitiesEnum.VISION, ModelCapabilitiesEnum.MULTIMODAL],
- minQualityRating=8
- ),
-
- # Web research specific
- SelectionRule(
- name="webResearch",
- condition="Web research operations",
- weight=9.0,
- operationTypes=[OperationTypeEnum.WEB_RESEARCH],
- priority=PriorityEnum.BALANCED,
- capabilities=[ModelCapabilitiesEnum.TEXT_GENERATION, ModelCapabilitiesEnum.ANALYSIS, ModelCapabilitiesEnum.WEB_SEARCH],
- minQualityRating=7
- ),
-
- # Large context requirements
- SelectionRule(
- name="largeContext",
- condition="Operations requiring large context",
- weight=8.0,
- operationTypes=[OperationTypeEnum.GENERAL, OperationTypeEnum.ANALYSE],
- priority=PriorityEnum.BALANCED,
- capabilities=[ModelCapabilitiesEnum.TEXT_GENERATION],
- minContextLength=100000 # 100k tokens
- )
- ]
-
- def _loadFallbackModels(self) -> Dict[str, Dict[str, Any]]:
- """Load fallback model selection criteria."""
- return {
- OperationTypeEnum.GENERAL: {
- "priorityOrder": ["speed", "quality", "cost"],
- "operationTypes": [ModelCapabilitiesEnum.TEXT_GENERATION, ModelCapabilitiesEnum.CHAT],
- "minQualityRating": 5,
- "maxCostPer1k": 0.01
- },
- OperationTypeEnum.IMAGE_ANALYSE: {
- "priorityOrder": ["quality", "speed"],
- "operationTypes": [ModelCapabilitiesEnum.VISION, ModelCapabilitiesEnum.MULTIMODAL],
- "minQualityRating": 8,
- "maxCostPer1k": 0.1
- },
- OperationTypeEnum.IMAGE_GENERATE: {
- "priorityOrder": ["quality", "speed"],
- "operationTypes": [ModelCapabilitiesEnum.IMAGE_GENERATE, ModelCapabilitiesEnum.ART, ModelCapabilitiesEnum.VISUAL_CREATION],
- "minQualityRating": 8,
- "maxCostPer1k": 0.1
- },
- OperationTypeEnum.WEB_RESEARCH: {
- "priorityOrder": ["quality", "speed", "cost"],
- "operationTypes": [ModelCapabilitiesEnum.TEXT_GENERATION, ModelCapabilitiesEnum.ANALYSIS],
- "preferredTags": [ModelCapabilitiesEnum.WEB_SEARCH],
- "minQualityRating": 7,
- "maxCostPer1k": 0.02
- },
- OperationTypeEnum.PLAN: {
- "priorityOrder": ["quality", "speed"],
- "operationTypes": [ModelCapabilitiesEnum.TEXT_GENERATION, ModelCapabilitiesEnum.REASONING, ModelCapabilitiesEnum.ANALYSIS],
- "preferredTags": [PriorityEnum.QUALITY],
- "minQualityRating": 8,
- "maxCostPer1k": 0.1
- },
- OperationTypeEnum.ANALYSE: {
- "priorityOrder": ["quality", "speed"],
- "operationTypes": [ModelCapabilitiesEnum.TEXT_GENERATION, ModelCapabilitiesEnum.ANALYSIS, ModelCapabilitiesEnum.REASONING],
- "preferredTags": [PriorityEnum.QUALITY],
- "minQualityRating": 8,
- "maxCostPer1k": 0.1
- }
- }
-
- def getRulesForOperation(self, operationType: str) -> List[SelectionRule]:
- """Get rules that apply to a specific operation type."""
- return [rule for rule in self.rules if operationType in rule.operationTypes]
-
- def getFallbackCriteria(self, operationType: str) -> Dict[str, Any]:
- """Get fallback selection criteria for a specific operation type."""
- return self.fallbackModels.get(operationType, self.fallbackModels[OperationTypeEnum.GENERAL])
-
- def addRule(self, rule: SelectionRule):
- """Add a new selection rule."""
- self.rules.append(rule)
-
- def removeRule(self, ruleName: str):
- """Remove a selection rule by name."""
- self.rules = [rule for rule in self.rules if rule.name != ruleName]
-
- def updateRule(self, ruleName: str, **kwargs):
- """Update an existing rule."""
- for rule in self.rules:
- if rule.name == ruleName:
- for key, value in kwargs.items():
- if hasattr(rule, key):
- setattr(rule, key, value)
- break
-
-
-# Global configuration instance
-model_selection_config = ModelSelectionConfig()
diff --git a/modules/aicore/aicoreModelSelector.py b/modules/aicore/aicoreModelSelector.py
index 4f3de674..76674ed8 100644
--- a/modules/aicore/aicoreModelSelector.py
+++ b/modules/aicore/aicoreModelSelector.py
@@ -1,20 +1,20 @@
"""
-Dynamic model selector using configurable rules and scoring.
+Simplified model selection based on model properties and priority-based sorting.
+No complex rules needed - just filter by properties and sort by priority!
"""
import logging
-from typing import List, Optional, Dict, Any, Tuple
-from modules.datamodels.datamodelAi import AiModel, AiCallOptions, OperationTypeEnum, PriorityEnum, ProcessingModeEnum, ModelCapabilitiesEnum
-from modules.aicore.aicoreModelSelectionConfig import model_selection_config
+from typing import List, Dict, Any, Optional
+from modules.datamodels.datamodelAi import AiModel, AiCallOptions, OperationTypeEnum, PriorityEnum, ProcessingModeEnum
+# Configure logger
logger = logging.getLogger(__name__)
-
class ModelSelector:
- """Dynamic model selector using configurable rules."""
+ """Simple model selector based on properties and priority-based sorting."""
def __init__(self):
- self.config = model_selection_config
+ logger.info("ModelSelector initialized with simplified approach")
def selectModel(self,
prompt: str,
@@ -22,270 +22,7 @@ class ModelSelector:
options: AiCallOptions,
availableModels: List[AiModel]) -> Optional[AiModel]:
"""
- Select the best model based on configurable rules and scoring.
-
- Args:
- prompt: User prompt
- context: Context data
- options: AI call options
- availableModels: List of available models to choose from
-
- Returns:
- Selected model or None if no suitable model found
- """
- if not availableModels:
- logger.warning("No models available for selection")
- return None
-
- logger.info(f"Selecting model for operation: {options.operationType}, priority: {options.priority}")
-
- # Calculate input size
- inputSize = len(prompt.encode("utf-8")) + len(context.encode("utf-8"))
-
- # Get applicable rules
- rules = self.config.getRulesForOperation(options.operationType)
- logger.debug(f"Found {len(rules)} applicable rules for {options.operationType}")
-
- # Score each model
- scoredModels = []
- for model in availableModels:
- if not model.isAvailable:
- continue
-
- score = self._calculateModelScore(model, inputSize, options, rules)
- if score > 0: # Only consider models with positive scores
- scoredModels.append((model, score))
- logger.debug(f"Model {model.name}: score={score:.2f}")
-
- if not scoredModels:
- logger.warning("No models passed the selection criteria, trying fallback criteria")
- # Try fallback criteria
- fallbackCriteria = self.getFallbackCriteria(options.operationType)
- return self._selectWithFallbackCriteria(availableModels, fallbackCriteria, inputSize, options)
-
- # Sort by score (highest first)
- scoredModels.sort(key=lambda x: x[1], reverse=True)
-
- selectedModel = scoredModels[0][0]
- selectedScore = scoredModels[0][1]
-
- logger.info(f"Selected model: {selectedModel.name} (score: {selectedScore:.2f})")
-
- # Log selection details
- self._logSelectionDetails(selectedModel, inputSize, options)
-
- return selectedModel
-
- def _calculateModelScore(self,
- model: AiModel,
- inputSize: int,
- options: AiCallOptions,
- rules: List) -> float:
- """Calculate score for a model based on rules and criteria."""
- score = 0.0
-
- # Check basic requirements
- if not self._meetsBasicRequirements(model, inputSize, options):
- return 0.0
-
- # Apply rules
- for rule in rules:
- ruleScore = self._applyRule(model, inputSize, options, rule)
- score += ruleScore * rule.weight
-
- # Apply priority-based scoring
- priorityScore = self._applyPriorityScoring(model, options)
- score += priorityScore
-
- # Apply processing mode scoring
- modeScore = self._applyProcessingModeScoring(model, options)
- score += modeScore
-
- # Apply cost constraints
- if not self._meetsCostConstraints(model, inputSize, options):
- score *= 0.1 # Heavily penalize but don't eliminate
-
- return max(0.0, score)
-
- def _meetsBasicRequirements(self, model: AiModel, inputSize: int, options: AiCallOptions) -> bool:
- """Check if model meets basic requirements."""
- # Context length check
- if model.contextLength > 0 and inputSize > model.contextLength * 0.8:
- logger.debug(f"Model {model.name} rejected: input too large ({inputSize} > {model.contextLength * 0.8})")
- return False
-
- # Required operation types check
- if options.operationTypes:
- if not all(opType in model.operationTypes for opType in options.operationTypes):
- logger.debug(f"Model {model.name} rejected: missing required operation types")
- return False
-
- # Capabilities check
- if options.capabilities:
- if not all(cap in model.capabilities for cap in options.capabilities):
- logger.debug(f"Model {model.name} rejected: missing required capabilities")
- return False
-
- # Avoid operation types check
- for rule in self.config.getRulesForOperation(options.operationType):
- if any(opType in model.operationTypes for opType in rule.avoidOperationTypes):
- logger.debug(f"Model {model.name} rejected: has avoid operation types")
- return False
-
- return True
-
- def _applyRule(self, model: AiModel, inputSize: int, options: AiCallOptions, rule) -> float:
- """Apply a specific rule to calculate score contribution."""
- score = 0.0
-
- # Required operation types match
- if all(opType in model.operationTypes for opType in rule.operationTypes):
- score += 1.0
-
- # Preferred capabilities match
- preferredMatches = sum(1 for cap in rule.preferredCapabilities if cap in model.capabilities)
- if rule.preferredCapabilities:
- score += (preferredMatches / len(rule.preferredCapabilities)) * 0.5
-
- # Quality rating check
- if rule.minQualityRating and model.qualityRating >= rule.minQualityRating:
- score += 0.3
-
- # Context length check
- if rule.minContextLength and model.contextLength >= rule.minContextLength:
- score += 0.2
-
- return score
-
- def _applyPriorityScoring(self, model: AiModel, options: AiCallOptions) -> float:
- """Apply priority-based scoring."""
- if options.priority == PriorityEnum.SPEED:
- return model.speedRating * 0.1
- elif options.priority == PriorityEnum.QUALITY:
- return model.qualityRating * 0.1
- elif options.priority == PriorityEnum.COST:
- # Lower cost = higher score
- costScore = max(0, 1.0 - (model.costPer1kTokensInput * 1000))
- return costScore * 0.1
- else: # BALANCED
- return (model.qualityRating + model.speedRating) * 0.05
-
- def _applyProcessingModeScoring(self, model: AiModel, options: AiCallOptions) -> float:
- """Apply processing mode scoring."""
- if options.processingMode == ProcessingModeEnum.DETAILED:
- if model.priority == PriorityEnum.QUALITY:
- return 0.2
- elif options.processingMode == ProcessingModeEnum.BASIC:
- if model.priority == PriorityEnum.SPEED:
- return 0.2
-
- return 0.0
-
- def _meetsCostConstraints(self, model: AiModel, inputSize: int, options: AiCallOptions) -> bool:
- """Check if model meets cost constraints."""
- if options.maxCost is None:
- return True
-
- # Estimate cost
- estimatedTokens = inputSize / 4
- estimatedCost = (estimatedTokens / 1000) * model.costPer1kTokensInput
-
- return estimatedCost <= options.maxCost
-
- def _logSelectionDetails(self, model: AiModel, inputSize: int, options: AiCallOptions):
- """Log detailed selection information."""
- logger.info(f"Model Selection Details:")
- logger.info(f" Selected: {model.displayName} ({model.name})")
- logger.info(f" Connector: {model.connectorType}")
- logger.info(f" Operation: {options.operationType}")
- logger.info(f" Priority: {options.priority}")
- logger.info(f" Processing Mode: {options.processingMode}")
- logger.info(f" Input Size: {inputSize} bytes")
- logger.info(f" Context Length: {model.contextLength}")
- logger.info(f" Max Tokens: {model.maxTokens}")
- logger.info(f" Quality Rating: {model.qualityRating}/10")
- logger.info(f" Speed Rating: {model.speedRating}/10")
- logger.info(f" Cost: ${model.costPer1kTokensInput:.4f}/1k tokens")
- logger.info(f" Capabilities: {', '.join(model.capabilities)}")
- logger.info(f" Priority: {model.priority}")
-
- def getFallbackCriteria(self, operationType: str) -> Dict[str, Any]:
- """Get fallback selection criteria for an operation type."""
- return self.config.getFallbackCriteria(operationType)
-
- def _selectWithFallbackCriteria(self,
- availableModels: List[AiModel],
- fallbackCriteria: Dict[str, Any],
- inputSize: int,
- options: AiCallOptions) -> Optional[AiModel]:
- """Select model using fallback criteria when normal selection fails."""
- logger.info("Using fallback criteria for model selection")
-
- # Filter models by fallback criteria
- candidates = []
- for model in availableModels:
- if not model.isAvailable:
- continue
-
- # Check required operation types
- if fallbackCriteria.get("operationTypes"):
- if not all(opType in model.operationTypes for opType in fallbackCriteria["operationTypes"]):
- continue
-
- # Check quality rating
- if fallbackCriteria.get("minQualityRating"):
- if model.qualityRating < fallbackCriteria["minQualityRating"]:
- continue
-
- # Check cost
- if fallbackCriteria.get("maxCostPer1k"):
- if model.costPer1kTokensInput > fallbackCriteria["maxCostPer1k"]:
- continue
-
- # Check context length
- if model.contextLength > 0 and inputSize > model.contextLength * 0.8:
- continue
-
- candidates.append(model)
-
- if not candidates:
- logger.error("No models available even with fallback criteria")
- return None
-
- # Sort by priority order from fallback criteria
- priorityOrder = fallbackCriteria.get("priorityOrder", ["quality", "speed", "cost"])
-
- def _getPriorityScore(model: AiModel) -> float:
- score = 0.0
- for i, priority in enumerate(priorityOrder):
- weight = len(priorityOrder) - i # Higher weight for earlier priorities
- if priority == "quality":
- score += model.qualityRating * weight
- elif priority == "speed":
- score += model.speedRating * weight
- elif priority == "cost":
- # Lower cost = higher score
- score += (1.0 - model.costPer1kTokensInput * 1000) * weight
- return score
-
- candidates.sort(key=_getPriorityScore, reverse=True)
- selectedModel = candidates[0]
-
- logger.info(f"Fallback selection: {selectedModel.name} (score: {_getPriorityScore(selectedModel):.2f})")
- return selectedModel
-
- def getFallbackModels(self,
- prompt: str,
- context: str,
- options: AiCallOptions,
- availableModels: List[AiModel]) -> List[AiModel]:
- """
- Get prioritized list of models for fallback sequence.
-
- Steps:
- 1. Filter models by capability requirements
- 2. Rate models by business requirements (priority, processing mode)
- 3. Sort by rating (descending), then by cost (ascending)
+ Select the best model using simple filtering and priority-based sorting.
Args:
prompt: User prompt
@@ -294,93 +31,195 @@ class ModelSelector:
availableModels: List of available models
Returns:
- Prioritized list of models for fallback sequence
+ Best model for the request, or None if no suitable model found
"""
- if not availableModels:
- logger.warning("No models available for fallback selection")
+ try:
+ # Get failover models (which includes all filtering and sorting)
+ failoverModelList = self.getFailoverModelList(prompt, context, options, availableModels)
+
+ if not failoverModelList:
+ logger.warning("No suitable models found for the request")
+ return None
+
+ selectedModel = failoverModelList[0] # First model is the best one
+ logger.info(f"Selected model: {selectedModel.name} (quality: {selectedModel.qualityRating}, cost: ${selectedModel.costPer1kTokensInput:.4f})")
+ return selectedModel
+
+ except Exception as e:
+ logger.error(f"Error selecting model: {str(e)}")
+ return None
+
+ def getFailoverModelList(self,
+ prompt: str,
+ context: str,
+ options: AiCallOptions,
+ availableModels: List[AiModel]) -> List[AiModel]:
+ """
+ Get prioritized list of models using scoring-based ranking.
+
+ Args:
+ prompt: User prompt
+ context: Context data
+ options: AI call options
+ availableModels: List of available models
+
+ Returns:
+ List of models sorted by score (descending)
+ """
+ try:
+ promptSize = len(prompt.encode("utf-8"))
+ contextSize = len(context.encode("utf-8"))
+ totalSize = promptSize + contextSize
+
+ # Step 1: Filter by operation type (MUST match)
+ operationFiltered = [m for m in availableModels if options.operationType in m.operationTypes]
+ logger.debug(f"After operation type filtering: {len(operationFiltered)} models")
+
+ # Step 2: Filter by prompt size (MUST be <= 80% of context size)
+ promptFiltered = [m for m in operationFiltered if m.contextLength == 0 or promptSize <= m.contextLength * 0.8]
+ logger.debug(f"After prompt size filtering: {len(promptFiltered)} models")
+
+ # Step 3: Calculate scores for each model
+ scoredModels = []
+ for model in promptFiltered:
+ score = self._calculateModelScore(model, promptSize, contextSize, totalSize, options)
+ scoredModels.append((model, score))
+ logger.debug(f"Model {model.name}: score={score:.3f}")
+
+ # Step 4: Sort by score (descending)
+ scoredModels.sort(key=lambda x: x[1], reverse=True)
+ sortedModels = [model for model, score in scoredModels]
+
+ logger.debug(f"Final sorted models: {len(sortedModels)} models")
+ return sortedModels
+
+ except Exception as e:
+ logger.error(f"Error getting failover models: {str(e)}")
return []
- logger.info(f"Building fallback sequence for operation: {options.operationType}, priority: {options.priority}")
+ def _calculateModelScore(self, model: AiModel, promptSize: int, contextSize: int, totalSize: int, options: AiCallOptions) -> float:
+ """
+ Calculate a score for a model based on how well it fulfills the criteria.
- # Step 1: Filter by capability requirements
- capableModels = self._filterByCapabilities(availableModels, options)
- logger.info(f"Step 1 - Capable models: {[m.name for m in capableModels]}")
-
- if not capableModels:
- logger.warning("No models meet capability requirements")
- return []
-
- # Step 2: Rate models by business requirements
- ratedModels = self._rateModelsByBusinessRequirements(capableModels, prompt, context, options)
- logger.info(f"Step 2 - Rated models: {[(m.name, rating) for m, rating in ratedModels]}")
-
- # Step 3: Sort by rating (descending), then by cost (ascending)
- sortedModels = self._sortModelsByRatingAndCost(ratedModels)
- logger.info(f"Step 3 - Sorted fallback sequence: {[m.name for m in sortedModels]}")
-
- return sortedModels
-
- def _filterByCapabilities(self, models: List[AiModel], options: AiCallOptions) -> List[AiModel]:
- """Filter models by required capabilities."""
- capableModels = []
-
- for model in models:
- if not model.isAvailable:
- continue
+ Args:
+ model: The model to score
+ promptSize: Size of the prompt in bytes
+ contextSize: Size of the context in bytes
+ totalSize: Total size (prompt + context) in bytes
+ options: AI call options
- # Check if model supports required capabilities
- if options.capabilities:
- if not all(cap in model.capabilities for cap in options.capabilities):
- logger.debug(f"Model {model.name} missing required capabilities: {options.capabilities}")
- continue
-
- # Check operation type compatibility
- if not self._meetsBasicRequirements(model, options):
- logger.debug(f"Model {model.name} doesn't meet basic requirements")
- continue
-
- capableModels.append(model)
+ Returns:
+ Score for the model (higher is better)
+ """
+ score = 0.0
- return capableModels
-
- def _rateModelsByBusinessRequirements(self,
- models: List[AiModel],
- prompt: str,
- context: str,
- options: AiCallOptions) -> List[Tuple[AiModel, float]]:
- """Rate models based on business requirements (priority, processing mode)."""
- ratedModels = []
- inputSize = len(prompt.encode("utf-8")) + len(context.encode("utf-8"))
+ # 1. Prompt + Context size rating
+ if model.contextLength > 0:
+ modelMaxSize = model.contextLength * 0.8 # 80% of model context length
+ if totalSize <= modelMaxSize:
+ # Within limits: rating = (prompt+contextsize) / (80% modelsize)
+ score += totalSize / modelMaxSize
+ else:
+ # Exceeds limits: rating = modelsize / (prompt+contextsize) (ensures minimum chunks)
+ score += modelMaxSize / totalSize
+ else:
+ # No context length limit
+ score += 1.0
- for model in models:
- # Base score from model selection logic
- baseScore = self._calculateModelScore(model, inputSize, options, [])
-
- # Apply priority-based scoring
- priorityScore = self._applyPriorityScoring(model, options)
-
- # Apply processing mode scoring
- processingScore = self._applyProcessingModeScoring(model, options)
-
- # Combine scores
- totalScore = baseScore + priorityScore + processingScore
-
- ratedModels.append((model, totalScore))
- logger.debug(f"Model {model.name}: base={baseScore:.2f}, priority={priorityScore:.2f}, processing={processingScore:.2f}, total={totalScore:.2f}")
+ # 2. Processing Mode rating
+ if hasattr(options, 'processingMode') and options.processingMode:
+ score += self._getProcessingModeRating(model.processingMode, options.processingMode)
+ else:
+ score += 1.0 # No preference
- return ratedModels
-
- def _sortModelsByRatingAndCost(self, ratedModels: List[Tuple[AiModel, float]]) -> List[AiModel]:
- """Sort models by rating (descending), then by cost (ascending)."""
- def sortKey(item):
- model, rating = item
- # Primary sort: rating (descending)
- # Secondary sort: cost (ascending)
- return (-rating, model.costPer1kTokensInput)
+ # 3. Priority rating
+ if hasattr(options, 'priority') and options.priority:
+ score += self._getPriorityRating(model, options.priority)
+ else:
+ score += 1.0 # No preference
- sortedItems = sorted(ratedModels, key=sortKey)
- return [model for model, rating in sortedItems]
+ return score
+
+ def _getProcessingModeRating(self, modelMode: ProcessingModeEnum, requestedMode: ProcessingModeEnum) -> float:
+ """Get processing mode rating based on compatibility."""
+ if modelMode == requestedMode:
+ return 1.0
+
+ # Compatibility matrix
+ if requestedMode == ProcessingModeEnum.BASIC:
+ if modelMode == ProcessingModeEnum.ADVANCED:
+ return 0.5
+ elif modelMode == ProcessingModeEnum.DETAILED:
+ return 0.2
+
+ elif requestedMode == ProcessingModeEnum.ADVANCED:
+ if modelMode == ProcessingModeEnum.BASIC:
+ return 0.2
+ elif modelMode == ProcessingModeEnum.DETAILED:
+ return 0.5
+
+ elif requestedMode == ProcessingModeEnum.DETAILED:
+ if modelMode == ProcessingModeEnum.BASIC:
+ return 0.2
+ elif modelMode == ProcessingModeEnum.ADVANCED:
+ return 0.5
+
+ return 0.0 # No compatibility
+
+ def _getPriorityRating(self, model: AiModel, requestedPriority: PriorityEnum) -> float:
+ """Get priority rating based on model capabilities."""
+ if requestedPriority == PriorityEnum.BALANCED:
+ return 1.0
+
+ elif requestedPriority == PriorityEnum.SPEED:
+ return model.speedRating / 10.0
+
+ elif requestedPriority == PriorityEnum.QUALITY:
+ return model.qualityRating / 10.0
+
+ elif requestedPriority == PriorityEnum.COST:
+ # Cost priority: cost gives 1, speed gives 0.5, quality gives 0.2
+ # Lower cost is better, so we invert the cost rating
+ costRating = 1.0 - (model.costPer1kTokensInput / 0.1) # Normalize to 0-1
+ costRating = max(0, costRating) # Ensure non-negative
+
+ speedRating = model.speedRating / 10.0 * 0.5
+ qualityRating = model.qualityRating / 10.0 * 0.2
+
+ return costRating + speedRating + qualityRating
+
+ return 1.0 # Default
+
+ def _getSizeRating(self, model: AiModel, totalSize: int) -> float:
+ """Get size rating for a model based on total input size."""
+ if model.contextLength > 0:
+ modelMaxSize = model.contextLength * 0.8 # 80% of model context length
+ if totalSize <= modelMaxSize:
+ # Within limits: rating = (prompt+contextsize) / (80% modelsize)
+ return totalSize / modelMaxSize
+ else:
+ # Exceeds limits: rating = modelsize / (prompt+contextsize) (ensures minimum chunks)
+ return modelMaxSize / totalSize
+ else:
+ # No context length limit
+ return 1.0
+
+
+ def _logModelDetails(self, model: AiModel):
+ """Log detailed information about a model."""
+ logger.info(f"Model: {model.name}")
+ logger.info(f" Display Name: {model.displayName}")
+ logger.info(f" Connector: {model.connectorType}")
+ logger.info(f" Context Length: {model.contextLength}")
+ logger.info(f" Max Tokens: {model.maxTokens}")
+ logger.info(f" Quality Rating: {model.qualityRating}/10")
+ logger.info(f" Speed Rating: {model.speedRating}/10")
+ logger.info(f" Cost: ${model.costPer1kTokensInput:.4f}/1k tokens")
+ logger.info(f" Capabilities: {', '.join(model.capabilities)}")
+ logger.info(f" Priority: {model.priority}")
+ logger.info(f" Processing Mode: {model.processingMode}")
+ logger.info(f" Operation Types: {', '.join(model.operationTypes)}")
-# Global selector instance
-model_selector = ModelSelector()
+# Global model selector instance
+modelSelector = ModelSelector()
\ No newline at end of file
diff --git a/modules/aicore/aicorePluginAnthropic.py b/modules/aicore/aicorePluginAnthropic.py
index 7e11801f..6091f872 100644
--- a/modules/aicore/aicorePluginAnthropic.py
+++ b/modules/aicore/aicorePluginAnthropic.py
@@ -63,7 +63,7 @@ class AiAnthropic(BaseConnectorAi):
functionCall=self.callAiBasic,
priority=PriorityEnum.QUALITY,
processingMode=ProcessingModeEnum.DETAILED,
- operationTypes=[OperationTypeEnum.PLAN, OperationTypeEnum.ANALYSE],
+ operationTypes=[OperationTypeEnum.GENERAL, OperationTypeEnum.PLAN, OperationTypeEnum.ANALYSE, OperationTypeEnum.GENERATE],
version="claude-3-5-sonnet-20241022",
calculatePriceUsd=lambda processingTime, bytesSent, bytesReceived: (bytesSent / 4 / 1000) * 0.015 + (bytesReceived / 4 / 1000) * 0.075
),
diff --git a/modules/aicore/aicorePluginInternal.py b/modules/aicore/aicorePluginInternal.py
index baa686c5..e0473678 100644
--- a/modules/aicore/aicorePluginInternal.py
+++ b/modules/aicore/aicorePluginInternal.py
@@ -34,7 +34,7 @@ class AiInternal(BaseConnectorAi):
functionCall=self.extractDocument,
priority=PriorityEnum.COST,
processingMode=ProcessingModeEnum.BASIC,
- operationTypes=[OperationTypeEnum.GENERAL],
+ operationTypes=[OperationTypeEnum.EXTRACT],
version="internal-extractor-v1",
calculatePriceUsd=lambda processingTime, bytesSent, bytesReceived: 0.001 + (bytesSent + bytesReceived) / (1024 * 1024) * 0.01
),
diff --git a/modules/aicore/aicorePluginOpenai.py b/modules/aicore/aicorePluginOpenai.py
index b7429f5b..1202d004 100644
--- a/modules/aicore/aicorePluginOpenai.py
+++ b/modules/aicore/aicorePluginOpenai.py
@@ -65,7 +65,7 @@ class AiOpenai(BaseConnectorAi):
functionCall=self.callAiBasic,
priority=PriorityEnum.BALANCED,
processingMode=ProcessingModeEnum.ADVANCED,
- operationTypes=[OperationTypeEnum.GENERAL, OperationTypeEnum.ANALYSE],
+ operationTypes=[OperationTypeEnum.GENERAL, OperationTypeEnum.PLAN, OperationTypeEnum.ANALYSE, OperationTypeEnum.GENERATE],
version="gpt-4o",
calculatePriceUsd=lambda processingTime, bytesSent, bytesReceived: (bytesSent / 4 / 1000) * 0.03 + (bytesReceived / 4 / 1000) * 0.06
),
@@ -83,7 +83,7 @@ class AiOpenai(BaseConnectorAi):
functionCall=self.callAiBasic,
priority=PriorityEnum.SPEED,
processingMode=ProcessingModeEnum.BASIC,
- operationTypes=[OperationTypeEnum.GENERAL],
+ operationTypes=[OperationTypeEnum.GENERAL, OperationTypeEnum.PLAN, OperationTypeEnum.GENERATE],
version="gpt-3.5-turbo",
calculatePriceUsd=lambda processingTime, bytesSent, bytesReceived: (bytesSent / 4 / 1000) * 0.0015 + (bytesReceived / 4 / 1000) * 0.002
),
diff --git a/modules/aicore/aicorePluginPerplexity.py b/modules/aicore/aicorePluginPerplexity.py
index 9701039f..f2f80f3d 100644
--- a/modules/aicore/aicorePluginPerplexity.py
+++ b/modules/aicore/aicorePluginPerplexity.py
@@ -63,7 +63,7 @@ class AiPerplexity(BaseConnectorAi):
functionCall=self.callAiBasic,
priority=PriorityEnum.BALANCED,
processingMode=ProcessingModeEnum.ADVANCED,
- operationTypes=[OperationTypeEnum.GENERAL, OperationTypeEnum.WEB_RESEARCH],
+ operationTypes=[OperationTypeEnum.GENERAL, OperationTypeEnum.PLAN, OperationTypeEnum.ANALYSE, OperationTypeEnum.GENERATE, OperationTypeEnum.WEB_RESEARCH],
version="llama-3.1-sonar-large-128k-online",
calculatePriceUsd=lambda processingTime, bytesSent, bytesReceived: (bytesSent / 4 / 1000) * 0.005 + (bytesReceived / 4 / 1000) * 0.005
),
diff --git a/modules/datamodels/datamodelAi.py b/modules/datamodels/datamodelAi.py
index 8487744f..f154a79c 100644
--- a/modules/datamodels/datamodelAi.py
+++ b/modules/datamodels/datamodelAi.py
@@ -1,13 +1,17 @@
-from typing import Optional, List, Dict, Any, Literal, Callable
+from typing import Optional, List, Dict, Any, Literal, Callable, TYPE_CHECKING
from pydantic import BaseModel, Field
from enum import Enum
+if TYPE_CHECKING:
+ from modules.datamodels.datamodelExtraction import ContentPart
+
# Operation Types
class OperationTypeEnum(str, Enum):
GENERAL = "general"
PLAN = "plan"
ANALYSE = "analyse"
GENERATE = "generate"
+ EXTRACT = "extract"
WEB_RESEARCH = "webResearch"
IMAGE_ANALYSE = "imageAnalyse"
IMAGE_GENERATE = "imageGenerate"
@@ -141,6 +145,7 @@ class AiCallRequest(BaseModel):
prompt: str = Field(description="The user prompt")
context: Optional[str] = Field(default=None, description="Optional external context (e.g., extracted docs)")
options: AiCallOptions = Field(default_factory=AiCallOptions)
+ contentParts: Optional[List['ContentPart']] = None # NEW: Content parts for model-aware chunking
class AiCallResponse(BaseModel):
diff --git a/modules/datamodels/datamodelExtraction.py b/modules/datamodels/datamodelExtraction.py
index cfce0275..b0ba0f9b 100644
--- a/modules/datamodels/datamodelExtraction.py
+++ b/modules/datamodels/datamodelExtraction.py
@@ -28,6 +28,16 @@ class ChunkResult(BaseModel):
metadata: Dict[str, Any] = Field(default_factory=dict)
+class PartResult(BaseModel):
+ """Preserves the relationship between a content part and its AI result."""
+ originalPart: ContentPart
+ aiResult: str
+ partIndex: int
+ documentId: str
+ processingTime: float = 0.0
+ metadata: Dict[str, Any] = Field(default_factory=dict)
+
+
class MergeStrategy(BaseModel):
"""Strategy configuration for merging content parts and AI results."""
diff --git a/modules/interfaces/interfaceAiObjects.py b/modules/interfaces/interfaceAiObjects.py
index 5707002e..918944a7 100644
--- a/modules/interfaces/interfaceAiObjects.py
+++ b/modules/interfaces/interfaceAiObjects.py
@@ -7,7 +7,7 @@ import time
logger = logging.getLogger(__name__)
from modules.aicore.aicoreModelRegistry import modelRegistry
-from modules.aicore.aicoreModelSelector import model_selector
+from modules.aicore.aicoreModelSelector import modelSelector
from modules.datamodels.datamodelAi import (
AiModel,
AiCallOptions,
@@ -70,7 +70,7 @@ class AiObjects:
raise ValueError("No AI models available")
# Use the dynamic model selector
- selectedModel = model_selector.selectModel(prompt, context, options, availableModels)
+ selectedModel = modelSelector.selectModel(prompt, context, options, availableModels)
if not selectedModel:
logger.error("No suitable model found for the given criteria")
@@ -81,8 +81,15 @@ class AiObjects:
async def call(self, request: AiCallRequest) -> AiCallResponse:
- """Call AI model for text generation with fallback mechanism."""
-
+ """Call AI model for text generation with model-aware chunking."""
+ # Handle content parts (unified path)
+ if hasattr(request, 'contentParts') and request.contentParts:
+ return await self._callWithContentParts(request)
+ # Handle traditional text/context calls
+ return await self._callWithTextContext(request)
+
+ async def _callWithTextContext(self, request: AiCallRequest) -> AiCallResponse:
+ """Call AI model for traditional text/context calls with fallback mechanism."""
prompt = request.prompt
context = request.context or ""
options = request.options
@@ -108,11 +115,11 @@ class AiObjects:
temperature = 0.2
maxTokens = getattr(options, "maxTokens", None)
- # Get fallback models for this operation type
+ # Get failover models for this operation type
availableModels = modelRegistry.getAvailableModels()
- fallbackModels = model_selector.getFallbackModels(prompt, context, options, availableModels)
+ failoverModelList = modelSelector.getFailoverModelList(prompt, context, options, availableModels)
- if not fallbackModels:
+ if not failoverModelList:
errorMsg = f"No suitable models found for operation {options.operationType}"
logger.error(errorMsg)
return AiCallResponse(
@@ -125,11 +132,11 @@ class AiObjects:
errorCount=1
)
- # Try each model in fallback sequence
+ # Try each model in failover sequence
lastError = None
- for attempt, model in enumerate(fallbackModels):
+ for attempt, model in enumerate(failoverModelList):
try:
- logger.info(f"Attempting AI call with model: {model.name} (attempt {attempt + 1}/{len(fallbackModels)})")
+ logger.info(f"Attempting AI call with model: {model.name} (attempt {attempt + 1}/{len(failoverModelList)})")
# Call the model
response = await self._callWithModel(model, prompt, context, temperature, maxTokens, inputBytes)
@@ -142,15 +149,15 @@ class AiObjects:
logger.warning(f"❌ AI call failed with model {model.name}: {str(e)}")
# If this is not the last model, try the next one
- if attempt < len(fallbackModels) - 1:
- logger.info(f"🔄 Trying next fallback model...")
+ if attempt < len(failoverModelList) - 1:
+ logger.info(f"🔄 Trying next failover model...")
continue
else:
# All models failed
- logger.error(f"💥 All {len(fallbackModels)} models failed for operation {options.operationType}")
+ logger.error(f"💥 All {len(failoverModelList)} models failed for operation {options.operationType}")
break
- # All fallback attempts failed - return error response
+ # All failover attempts failed - return error response
errorMsg = f"All AI models failed for operation {options.operationType}. Last error: {str(lastError)}"
logger.error(errorMsg)
return AiCallResponse(
@@ -163,6 +170,241 @@ class AiObjects:
errorCount=1
)
+ async def _callWithContentParts(self, request: AiCallRequest) -> AiCallResponse:
+ """Process content parts with model-aware chunking (unified for single and multiple parts)."""
+ prompt = request.prompt
+ options = request.options
+ contentParts = request.contentParts
+
+ # Get failover models
+ availableModels = modelRegistry.getAvailableModels()
+ failoverModelList = modelSelector.getFailoverModelList(prompt, "", options, availableModels)
+
+ if not failoverModelList:
+ return self._createErrorResponse("No suitable models found", 0, 0)
+
+ # Process each content part
+ allResults = []
+ for contentPart in contentParts:
+ partResult = await self._processContentPartWithFallback(contentPart, prompt, options, failoverModelList)
+ allResults.append(partResult)
+
+ # Merge all results
+ mergedContent = self._mergePartResults(allResults)
+
+ return AiCallResponse(
+ content=mergedContent,
+ modelName="multiple",
+ priceUsd=sum(r.priceUsd for r in allResults),
+ processingTime=sum(r.processingTime for r in allResults),
+ bytesSent=sum(r.bytesSent for r in allResults),
+ bytesReceived=sum(r.bytesReceived for r in allResults),
+ errorCount=sum(r.errorCount for r in allResults)
+ )
+
+ async def _processContentPartWithFallback(self, contentPart, prompt: str, options, failoverModelList) -> AiCallResponse:
+ """Process a single content part with model-aware chunking and fallback."""
+ lastError = None
+
+ for attempt, model in enumerate(failoverModelList):
+ try:
+ logger.info(f"Processing content part with model: {model.name} (attempt {attempt + 1}/{len(failoverModelList)})")
+
+ # Check if part fits in model context
+ partSize = len(contentPart.data.encode('utf-8')) if contentPart.data else 0
+ modelContextBytes = model.contextLength * 4 # Convert tokens to bytes
+
+ if partSize <= modelContextBytes:
+ # Part fits - call AI directly
+ response = await self._callWithModel(model, prompt, contentPart.data, 0.2, None, partSize)
+ logger.info(f"✅ Content part processed successfully with model: {model.name}")
+ return response
+ else:
+ # Part too large - chunk it
+ chunks = await self._chunkContentPart(contentPart, model, options)
+ if not chunks:
+ raise ValueError(f"Failed to chunk content part for model {model.name}")
+
+ # Process each chunk
+ chunkResults = []
+ for chunk in chunks:
+ chunkResponse = await self._callWithModel(model, prompt, chunk['data'], 0.2, None, chunk['size'])
+ chunkResults.append(chunkResponse)
+
+ # Merge chunk results
+ mergedContent = self._mergeChunkResults(chunkResults)
+ totalPrice = sum(r.priceUsd for r in chunkResults)
+ totalTime = sum(r.processingTime for r in chunkResults)
+ totalBytesSent = sum(r.bytesSent for r in chunkResults)
+ totalBytesReceived = sum(r.bytesReceived for r in chunkResults)
+ totalErrors = sum(r.errorCount for r in chunkResults)
+
+ logger.info(f"✅ Content part chunked and processed with model: {model.name} ({len(chunks)} chunks)")
+ return AiCallResponse(
+ content=mergedContent,
+ modelName=model.name,
+ priceUsd=totalPrice,
+ processingTime=totalTime,
+ bytesSent=totalBytesSent,
+ bytesReceived=totalBytesReceived,
+ errorCount=totalErrors
+ )
+
+ except Exception as e:
+ lastError = e
+ logger.warning(f"❌ Model {model.name} failed for content part: {str(e)}")
+
+ if attempt < len(failoverModelList) - 1:
+ logger.info(f"🔄 Trying next failover model...")
+ continue
+ else:
+ logger.error(f"💥 All {len(failoverModelList)} models failed for content part")
+ break
+
+ # All models failed
+ return self._createErrorResponse(f"All models failed: {str(lastError)}", 0, 0)
+
+ async def _chunkContentPart(self, contentPart, model, options) -> List[Dict[str, Any]]:
+ """Chunk a content part based on model capabilities."""
+ # Calculate model-specific chunk sizes
+ modelContextBytes = model.contextLength * 4 # Convert tokens to bytes
+ maxContextBytes = int(modelContextBytes * 0.9) # 90% of context length
+ textChunkSize = int(maxContextBytes * 0.7) # 70% of max context for text chunks
+ imageChunkSize = int(maxContextBytes * 0.8) # 80% of max context for image chunks
+
+ # Build chunking options
+ chunkingOptions = {
+ "textChunkSize": textChunkSize,
+ "imageChunkSize": imageChunkSize,
+ "maxSize": maxContextBytes,
+ "chunkAllowed": True
+ }
+
+ # Get appropriate chunker
+ from modules.services.serviceExtraction.subRegistry import ChunkerRegistry
+ chunkerRegistry = ChunkerRegistry()
+ chunker = chunkerRegistry.resolve(contentPart.typeGroup)
+
+ if not chunker:
+ logger.warning(f"No chunker found for typeGroup: {contentPart.typeGroup}")
+ return []
+
+ # Chunk the content part
+ try:
+ chunks = chunker.chunk(contentPart, chunkingOptions)
+ logger.debug(f"Created {len(chunks)} chunks for {contentPart.typeGroup} part")
+ return chunks
+ except Exception as e:
+ logger.error(f"Chunking failed for {contentPart.typeGroup}: {str(e)}")
+ return []
+
+ def _mergePartResults(self, partResults: List[AiCallResponse]) -> str:
+ """Merge part results using the existing sophisticated merging system."""
+ if not partResults:
+ return ""
+
+ # Convert AiCallResponse results to ContentParts for merging
+ from modules.datamodels.datamodelExtraction import ContentPart
+ from modules.services.serviceExtraction.subUtils import makeId
+
+ content_parts = []
+ for i, result in enumerate(partResults):
+ if result.content:
+ content_part = ContentPart(
+ id=makeId(),
+ parentId=None,
+ label=f"ai_result_{i}",
+ typeGroup="text", # Default to text for AI results
+ mimeType="text/plain",
+ data=result.content,
+ metadata={
+ "aiResult": True,
+ "modelName": result.modelName,
+ "priceUsd": result.priceUsd,
+ "processingTime": result.processingTime,
+ "bytesSent": result.bytesSent,
+ "bytesReceived": result.bytesReceived
+ }
+ )
+ content_parts.append(content_part)
+
+ # Use existing merging system
+ merge_strategy = {
+ "useIntelligentMerging": True,
+ "groupBy": "typeGroup",
+ "orderBy": "id",
+ "mergeType": "concatenate"
+ }
+
+ from modules.services.serviceExtraction.subPipeline import _applyMerging
+ merged_parts = _applyMerging(content_parts, merge_strategy)
+
+ # Convert merged parts back to final string
+ final_content = "\n\n".join([part.data for part in merged_parts])
+
+ logger.info(f"Merged {len(partResults)} AI results using existing merging system")
+ return final_content.strip()
+
+ def _mergeChunkResults(self, chunkResults: List[AiCallResponse]) -> str:
+ """Merge chunk results using the existing sophisticated merging system."""
+ if not chunkResults:
+ return ""
+
+ # Convert AiCallResponse results to ContentParts for merging
+ from modules.datamodels.datamodelExtraction import ContentPart
+ from modules.services.serviceExtraction.subUtils import makeId
+
+ content_parts = []
+ for i, result in enumerate(chunkResults):
+ if result.content:
+ content_part = ContentPart(
+ id=makeId(),
+ parentId=None,
+ label=f"chunk_result_{i}",
+ typeGroup="text", # Default to text for AI results
+ mimeType="text/plain",
+ data=result.content,
+ metadata={
+ "aiResult": True,
+ "chunk": True,
+ "modelName": result.modelName,
+ "priceUsd": result.priceUsd,
+ "processingTime": result.processingTime,
+ "bytesSent": result.bytesSent,
+ "bytesReceived": result.bytesReceived
+ }
+ )
+ content_parts.append(content_part)
+
+ # Use existing merging system
+ merge_strategy = {
+ "useIntelligentMerging": True,
+ "groupBy": "typeGroup",
+ "orderBy": "id",
+ "mergeType": "concatenate"
+ }
+
+ from modules.services.serviceExtraction.subPipeline import _applyMerging
+ merged_parts = _applyMerging(content_parts, merge_strategy)
+
+ # Convert merged parts back to final string
+ final_content = "\n\n".join([part.data for part in merged_parts])
+
+ logger.info(f"Merged {len(chunkResults)} chunk results using existing merging system")
+ return final_content.strip()
+
+ def _createErrorResponse(self, errorMsg: str, inputBytes: int, outputBytes: int) -> AiCallResponse:
+ """Create an error response."""
+ return AiCallResponse(
+ content=errorMsg,
+ modelName="error",
+ priceUsd=0.0,
+ processingTime=0.0,
+ bytesSent=inputBytes,
+ bytesReceived=outputBytes,
+ errorCount=1
+ )
+
async def _callWithModel(self, model: AiModel, prompt: str, context: str, temperature: float, maxTokens: int, inputBytes: int) -> AiCallResponse:
"""Call a specific model and return the response."""
# Replace placeholder in prompt for this specific model
@@ -245,9 +487,9 @@ class AiObjects:
# Get fallback models for image analysis
availableModels = modelRegistry.getAvailableModels()
- fallbackModels = model_selector.getFallbackModels(prompt, "", options, availableModels)
+ failoverModelList = modelSelector.getFailoverModelList(prompt, "", options, availableModels)
- if not fallbackModels:
+ if not failoverModelList:
errorMsg = f"No suitable models found for image analysis"
logger.error(errorMsg)
return AiCallResponse(
@@ -262,9 +504,9 @@ class AiObjects:
# Try each model in fallback sequence
lastError = None
- for attempt, model in enumerate(fallbackModels):
+ for attempt, model in enumerate(failoverModelList):
try:
- logger.info(f"Attempting image analysis with model: {model.name} (attempt {attempt + 1}/{len(fallbackModels)})")
+ logger.info(f"Attempting image analysis with model: {model.name} (attempt {attempt + 1}/{len(failoverModelList)})")
# Call the model
response = await self._callImageWithModel(model, prompt, imageData, mimeType, inputBytes)
@@ -277,12 +519,12 @@ class AiObjects:
logger.warning(f"❌ Image analysis failed with model {model.name}: {str(e)}")
# If this is not the last model, try the next one
- if attempt < len(fallbackModels) - 1:
+ if attempt < len(failoverModelList) - 1:
logger.info(f"🔄 Trying next fallback model for image analysis...")
continue
else:
# All models failed
- logger.error(f"💥 All {len(fallbackModels)} models failed for image analysis")
+ logger.error(f"💥 All {len(failoverModelList)} models failed for image analysis")
break
# All fallback attempts failed - return error response
diff --git a/modules/services/serviceAi/subDocumentProcessing.py b/modules/services/serviceAi/subDocumentProcessing.py
index c81a8b0c..72ac2950 100644
--- a/modules/services/serviceAi/subDocumentProcessing.py
+++ b/modules/services/serviceAi/subDocumentProcessing.py
@@ -54,8 +54,8 @@ class SubDocumentProcessing:
options: Optional[AiCallOptions] = None
) -> str:
"""
- Process documents with per-chunk AI calls and merge results.
- FIXED: Now preserves chunk relationships and document structure.
+ Process documents with model-aware chunking and merge results.
+ NEW: Uses model-aware chunking in AI call phase instead of extraction phase.
Args:
documents: List of ChatDocument objects to process
@@ -68,23 +68,14 @@ class SubDocumentProcessing:
if not documents:
return ""
- # Get model capabilities for size calculation
- model_capabilities = self._getModelCapabilitiesForContent(prompt, documents, options)
-
- # Build extraction options for chunking with intelligent merging
+ # Build extraction options WITHOUT chunking parameters
extractionOptions: Dict[str, Any] = {
"prompt": prompt,
"operationType": options.operationType if options else "general",
- "processDocumentsIndividually": True, # Process each document separately
- "maxSize": model_capabilities["maxContextBytes"],
- "chunkAllowed": True,
- "textChunkSize": model_capabilities["textChunkSize"],
- "imageChunkSize": model_capabilities["imageChunkSize"],
- "imageMaxPixels": 1024 * 1024,
- "imageQuality": 85,
+ "processDocumentsIndividually": True,
+ # REMOVED: maxSize, textChunkSize, imageChunkSize
"mergeStrategy": {
- "useIntelligentMerging": True, # Enable intelligent token-aware merging
- "capabilities": model_capabilities,
+ "useIntelligentMerging": True,
"prompt": prompt,
"groupBy": "typeGroup",
"orderBy": "id",
@@ -95,17 +86,17 @@ class SubDocumentProcessing:
logger.debug(f"Per-chunk extraction options: prompt length={len(extractionOptions.get('prompt', ''))} chars, operationType={extractionOptions.get('operationType')}")
try:
- # Extract content with chunking
+ # Extract content WITHOUT chunking
extractionResult = self.extractionService.extractContent(documents, extractionOptions)
if not isinstance(extractionResult, list):
return "[Error: No extraction results]"
- # FIXED: Process chunks with proper mapping
- chunkResults = await self._processChunksWithMapping(extractionResult, prompt, options)
+ # Process parts (not chunks) with model-aware AI calls
+ partResults = await self._processPartsWithMapping(extractionResult, prompt, options)
- # FIXED: Merge with preserved chunk relationships
- mergedContent = self._mergeChunkResults(chunkResults, options)
+ # Merge results using existing merging system
+ mergedContent = self._mergePartResults(partResults, options)
# Save merged extraction content to debug
self.services.utils.writeDebugFile(mergedContent or '', "extractionMergedText")
@@ -123,29 +114,19 @@ class SubDocumentProcessing:
options: Optional[AiCallOptions] = None
) -> Dict[str, Any]:
"""
- Process documents with per-chunk AI calls and merge results in JSON mode.
+ Process documents with model-aware chunking and merge results in JSON mode.
Returns structured JSON document instead of text.
"""
if not documents:
return {"metadata": {"title": "Empty Document"}, "sections": []}
- # Get model capabilities for size calculation
- model_capabilities = self._getModelCapabilitiesForContent(prompt, documents, options)
-
- # Build extraction options for chunking with intelligent merging
+ # Build extraction options WITHOUT chunking parameters
extractionOptions: Dict[str, Any] = {
"prompt": prompt,
"operationType": options.operationType if options else "general",
- "processDocumentsIndividually": True, # Process each document separately
- "maxSize": model_capabilities["maxContextBytes"],
- "chunkAllowed": True,
- "textChunkSize": model_capabilities["textChunkSize"],
- "imageChunkSize": model_capabilities["imageChunkSize"],
- "imageMaxPixels": 1024 * 1024,
- "imageQuality": 85,
+ "processDocumentsIndividually": True,
"mergeStrategy": {
- "useIntelligentMerging": True, # Enable intelligent token-aware merging
- "capabilities": model_capabilities,
+ "useIntelligentMerging": True,
"prompt": prompt,
"groupBy": "typeGroup",
"orderBy": "id",
@@ -156,17 +137,17 @@ class SubDocumentProcessing:
logger.debug(f"Per-chunk extraction options (JSON mode): prompt length={len(extractionOptions.get('prompt', ''))} chars, operationType={extractionOptions.get('operationType')}")
try:
- # Extract content with chunking
+ # Extract content WITHOUT chunking
extractionResult = self.extractionService.extractContent(documents, extractionOptions)
if not isinstance(extractionResult, list):
return {"metadata": {"title": "Error Document"}, "sections": []}
- # Process chunks with proper mapping
- chunkResults = await self._processChunksWithMapping(extractionResult, prompt, options, generate_json=True)
+ # Process parts with model-aware chunking
+ partResults = await self._processPartsWithMapping(extractionResult, prompt, options)
- # Merge with JSON mode
- mergedJsonDocument = self._mergeChunkResultsJson(chunkResults, options)
+ # Convert to JSON format (simplified for now)
+ mergedJsonDocument = self._convertPartResultsToJson(partResults, options)
# Normalize merged JSON into a single canonical table (only if table content exists)
try:
@@ -505,6 +486,127 @@ CONTINUATION INSTRUCTIONS:
"""
return await self.processDocumentsPerChunk(documents, prompt, options)
+ async def _processPartsWithMapping(
+ self,
+ extractionResult: List[ContentExtracted],
+ prompt: str,
+ options: Optional[AiCallOptions] = None
+ ) -> List['PartResult']:
+ """Process content parts with model-aware chunking and proper mapping."""
+ from modules.datamodels.datamodelExtraction import PartResult
+ import asyncio
+
+ # Collect all parts that need processing
+ parts_to_process = []
+ part_index = 0
+
+ for ec in extractionResult:
+ for part in ec.parts:
+ if part.typeGroup in ("text", "table", "structure", "image", "container", "binary"):
+ # Skip empty container parts
+ if part.typeGroup == "container" and (not part.data or len(part.data.strip()) == 0):
+ logger.debug(f"Skipping empty container part: mimeType={part.mimeType}")
+ continue
+
+ parts_to_process.append({
+ 'part': part,
+ 'part_index': part_index,
+ 'document_id': ec.id
+ })
+ part_index += 1
+
+ logger.info(f"Processing {len(parts_to_process)} parts with model-aware chunking")
+
+ # Process parts in parallel
+ async def process_single_part(part_info: Dict) -> PartResult:
+ part = part_info['part']
+ part_index = part_info['part_index']
+ document_id = part_info['document_id']
+
+ start_time = time.time()
+
+ try:
+ # Create AI call request with content part
+ from modules.datamodels.datamodelAi import AiCallRequest
+ request = AiCallRequest(
+ prompt=prompt,
+ context="", # Context is in the content part
+ options=options,
+ contentParts=[part] # Pass as list for unified processing
+ )
+
+ # Call AI with model-aware chunking
+ response = await self.aiObjects.call(request)
+
+ processing_time = time.time() - start_time
+
+ return PartResult(
+ originalPart=part,
+ aiResult=response.content,
+ partIndex=part_index,
+ documentId=document_id,
+ processingTime=processing_time,
+ metadata={
+ "success": True,
+ "partSize": len(part.data) if part.data else 0,
+ "resultSize": len(response.content),
+ "typeGroup": part.typeGroup,
+ "modelName": response.modelName,
+ "priceUsd": response.priceUsd
+ }
+ )
+
+ except Exception as e:
+ processing_time = time.time() - start_time
+ logger.warning(f"Error processing part {part_index}: {str(e)}")
+
+ return PartResult(
+ originalPart=part,
+ aiResult=f"[Error processing part: {str(e)}]",
+ partIndex=part_index,
+ documentId=document_id,
+ processingTime=processing_time,
+ metadata={
+ "success": False,
+ "error": str(e),
+ "partSize": len(part.data) if part.data else 0,
+ "typeGroup": part.typeGroup
+ }
+ )
+
+ # Process parts with concurrency control
+ max_concurrent = 5
+ if options and hasattr(options, 'maxConcurrentParts'):
+ max_concurrent = options.maxConcurrentParts
+
+ semaphore = asyncio.Semaphore(max_concurrent)
+
+ async def process_with_semaphore(part_info):
+ async with semaphore:
+ return await process_single_part(part_info)
+
+ tasks = [process_with_semaphore(part_info) for part_info in parts_to_process]
+ part_results = await asyncio.gather(*tasks, return_exceptions=True)
+
+ # Handle exceptions
+ processed_results = []
+ for i, result in enumerate(part_results):
+ if isinstance(result, Exception):
+ part_info = parts_to_process[i]
+ processed_results.append(PartResult(
+ originalPart=part_info['part'],
+ aiResult=f"[Error in parallel processing: {str(result)}]",
+ partIndex=part_info['part_index'],
+ documentId=part_info['document_id'],
+ processingTime=0.0,
+ metadata={"success": False, "error": str(result)}
+ ))
+ elif result is not None:
+ processed_results.append(result)
+
+ logger.info(f"Completed processing {len(processed_results)} parts")
+ return processed_results
+
async def _processChunksWithMapping(
self,
extractionResult: List[ContentExtracted],
@@ -907,340 +1009,451 @@ CONTINUATION INSTRUCTIONS:
logger.info(f"Completed processing {len(processed_results)} chunks")
return processed_results
+ def _mergePartResults(
+ self,
+ partResults: List['PartResult'],
+ options: Optional[AiCallOptions] = None
+ ) -> str:
+ """Merge part results using existing sophisticated merging system."""
+ if not partResults:
+ return ""
+
+ # Convert PartResults back to ContentParts for existing merger system
+ from modules.datamodels.datamodelExtraction import ContentPart
+ content_parts = []
+ for part_result in partResults:
+ # Create ContentPart from PartResult with proper typeGroup
+ content_part = ContentPart(
+ id=part_result.originalPart.id,
+ parentId=part_result.originalPart.parentId,
+ label=part_result.originalPart.label,
+ typeGroup=part_result.originalPart.typeGroup, # Use original typeGroup
+ mimeType=part_result.originalPart.mimeType,
+ data=part_result.aiResult, # Use AI result as data
+ metadata={
+ **part_result.originalPart.metadata,
+ "aiResult": True,
+ "partIndex": part_result.partIndex,
+ "documentId": part_result.documentId,
+ "processingTime": part_result.processingTime,
+ "success": part_result.metadata.get("success", False)
+ }
+ )
+ content_parts.append(content_part)
+
+ # Use existing merging strategy from options
+ merge_strategy = {
+ "useIntelligentMerging": True,
+ "groupBy": "documentId", # Group by document
+ "orderBy": "partIndex", # Order by part index
+ "mergeType": "concatenate"
+ }
+
+ if options and hasattr(options, 'mergeStrategy'):
+ merge_strategy.update(options.mergeStrategy)
+
+ # Apply existing merging logic using the sophisticated merging system
+ from modules.services.serviceExtraction.subPipeline import _applyMerging
+ merged_parts = _applyMerging(content_parts, merge_strategy)
+
+ # Convert merged parts back to final string
+ final_content = "\n\n".join([part.data for part in merged_parts])
+
+ logger.info(f"Merged {len(partResults)} parts using existing sophisticated merging system")
+ return final_content.strip()
+
+ def _convertPartResultsToJson(
+ self,
+ partResults: List['PartResult'],
+ options: Optional[AiCallOptions] = None
+ ) -> Dict[str, Any]:
+ """Convert part results to JSON format using existing sophisticated merging system."""
+ if not partResults:
+ return {"metadata": {"title": "Empty Document"}, "sections": []}
+
+ # Convert PartResults back to ContentParts for existing merger system
+ from modules.datamodels.datamodelExtraction import ContentPart
+ content_parts = []
+ for part_result in partResults:
+ # Create ContentPart from PartResult with proper typeGroup
+ content_part = ContentPart(
+ id=part_result.originalPart.id,
+ parentId=part_result.originalPart.parentId,
+ label=part_result.originalPart.label,
+ typeGroup=part_result.originalPart.typeGroup, # Use original typeGroup
+ mimeType=part_result.originalPart.mimeType,
+ data=part_result.aiResult, # Use AI result as data
+ metadata={
+ **part_result.originalPart.metadata,
+ "aiResult": True,
+ "partIndex": part_result.partIndex,
+ "documentId": part_result.documentId,
+ "processingTime": part_result.processingTime,
+ "success": part_result.metadata.get("success", False)
+ }
+ )
+ content_parts.append(content_part)
+
+ # Use existing merging strategy for JSON mode
+ merge_strategy = {
+ "useIntelligentMerging": True,
+ "groupBy": "documentId", # Group by document
+ "orderBy": "partIndex", # Order by part index
+ "mergeType": "concatenate"
+ }
+
+ if options and hasattr(options, 'mergeStrategy'):
+ merge_strategy.update(options.mergeStrategy)
+
+ # Apply existing merging logic using the sophisticated merging system
+ from modules.services.serviceExtraction.subPipeline import _applyMerging
+ merged_parts = _applyMerging(content_parts, merge_strategy)
+
+ # Convert merged parts to JSON format
+ all_sections = []
+ document_titles = []
+
+ for part in merged_parts:
+ if part.metadata.get("success", False):
+ try:
+ # Parse JSON from AI result
+ part_json = json.loads(part.data)
+
+ # Check if this is a multi-file response (has "documents" key)
+ if isinstance(part_json, dict) and "documents" in part_json:
+ # This is a multi-file response - merge all documents
+ logger.debug(f"Processing multi-file response from part {part.id} with {len(part_json['documents'])} documents")
+
+ # Return multi-file response directly
+ return {
+ "metadata": part_json.get("metadata", {"title": "Merged Document"}),
+ "documents": part_json["documents"]
+ }
+
+ # Extract sections from single-file response
+ elif isinstance(part_json, dict) and "sections" in part_json:
+ for section in part_json["sections"]:
+ # Add part context to section
+ section["metadata"] = section.get("metadata", {})
+ section["metadata"]["source_part"] = part.id
+ section["metadata"]["source_document"] = part.metadata.get("documentId", "unknown")
+ section["metadata"]["part_index"] = part.metadata.get("partIndex", 0)
+ all_sections.append(section)
+
+ # Extract document title
+ if isinstance(part_json, dict) and "metadata" in part_json:
+ title = part_json["metadata"].get("title", "")
+ if title and title not in document_titles:
+ document_titles.append(title)
+
+ except json.JSONDecodeError as e:
+ logger.warning(f"Failed to parse JSON from part {part.id}: {str(e)}")
+ # Create a fallback section for invalid JSON
+ fallback_section = {
+ "id": f"error_section_{part.id}",
+ "title": "Error Section",
+ "content_type": "paragraph",
+ "elements": [{
+ "text": f"Error parsing part {part.id}: {str(e)}"
+ }],
+ "order": part.metadata.get("partIndex", 0),
+ "metadata": {
+ "source_document": part.metadata.get("documentId", "unknown"),
+ "part_id": part.id,
+ "error": str(e)
+ }
+ }
+ all_sections.append(fallback_section)
+ else:
+ # Handle error parts
+ error_section = {
+ "id": f"error_section_{part.id}",
+ "title": "Error Section",
+ "content_type": "paragraph",
+ "elements": [{
+ "text": f"Error in part {part.id}: {part.metadata.get('error', 'Unknown error')}"
+ }],
+ "order": part.metadata.get("partIndex", 0),
+ "metadata": {
+ "source_document": part.metadata.get("documentId", "unknown"),
+ "part_id": part.id,
+ "error": part.metadata.get('error', 'Unknown error')
+ }
+ }
+ all_sections.append(error_section)
+
+ # Sort sections by order
+ all_sections.sort(key=lambda x: x.get("order", 0))
+
+ # Create merged document with sections
+ merged_document = {
+ "metadata": {
+ "title": document_titles[0] if document_titles else "Merged Document",
+ "extraction_method": "model_aware_chunking_with_merging",
+ "version": "2.0"
+ },
+ "sections": all_sections,
+ "summary": f"Merged document using sophisticated merging system",
+ "tags": ["merged", "ai_generated", "model_aware", "sophisticated_merging"]
+ }
+
+ logger.info(f"Converted {len(partResults)} parts to JSON format using existing sophisticated merging system")
+ return merged_document
+
def _mergeChunkResults(
self,
chunkResults: List[ChunkResult],
options: Optional[AiCallOptions] = None
) -> str:
- """Merge chunk results while preserving document structure and chunk order."""
-
+ """Merge chunk results using existing sophisticated merging system."""
if not chunkResults:
return ""
- # Get merging configuration from options
- chunk_separator = "\n\n---\n\n"
- include_document_headers = True
- include_chunk_metadata = False
-
- if options:
- if hasattr(options, 'chunkSeparator'):
- chunk_separator = options.chunkSeparator
- elif hasattr(options, 'mergeStrategy') and options.mergeStrategy:
- chunk_separator = options.mergeStrategy.get("chunkSeparator", "\n\n---\n\n")
-
- # Check for enhanced options
- if hasattr(options, 'preserveChunkMetadata'):
- include_chunk_metadata = options.preserveChunkMetadata
-
- # Group chunk results by document
- results_by_document = {}
+ # Convert ChunkResults back to ContentParts for existing merger system
+ from modules.datamodels.datamodelExtraction import ContentPart
+ content_parts = []
for chunk_result in chunkResults:
- doc_id = chunk_result.documentId
- if doc_id not in results_by_document:
- results_by_document[doc_id] = []
- results_by_document[doc_id].append(chunk_result)
+ # Create ContentPart from ChunkResult with proper typeGroup
+ content_part = ContentPart(
+ id=chunk_result.originalChunk.id,
+ parentId=chunk_result.originalChunk.parentId,
+ label=chunk_result.originalChunk.label,
+ typeGroup=chunk_result.originalChunk.typeGroup, # Use original typeGroup
+ mimeType=chunk_result.originalChunk.mimeType,
+ data=chunk_result.aiResult, # Use AI result as data
+ metadata={
+ **chunk_result.originalChunk.metadata,
+ "aiResult": True,
+ "chunk": True,
+ "chunkIndex": chunk_result.chunkIndex,
+ "documentId": chunk_result.documentId,
+ "processingTime": chunk_result.processingTime,
+ "success": chunk_result.metadata.get("success", False)
+ }
+ )
+ content_parts.append(content_part)
- # Sort chunks within each document by chunk index
- for doc_id in results_by_document:
- results_by_document[doc_id].sort(key=lambda x: x.chunkIndex)
+ # Use existing merging strategy from options
+ merge_strategy = {
+ "useIntelligentMerging": True,
+ "groupBy": "documentId", # Group by document
+ "orderBy": "chunkIndex", # Order by chunk index
+ "mergeType": "concatenate"
+ }
- # Merge results for each document
- merged_documents = []
+ if options and hasattr(options, 'mergeStrategy'):
+ merge_strategy.update(options.mergeStrategy)
- for doc_id, doc_chunks in results_by_document.items():
- # Build document header if enabled
- doc_header = ""
- if include_document_headers:
- doc_header = f"\n\n=== DOCUMENT: {doc_id} ===\n\n"
-
- # Merge chunks for this document
- doc_content = ""
- for i, chunk_result in enumerate(doc_chunks):
- # Add chunk separator (except for first chunk)
- if i > 0:
- doc_content += chunk_separator
-
- # Add chunk content with optional metadata
- chunk_metadata = chunk_result.metadata
- if chunk_metadata.get("success", False):
- chunk_content = chunk_result.aiResult
-
- # Add chunk metadata if enabled
- if include_chunk_metadata:
- chunk_info = f"[Chunk {chunk_result.chunkIndex} - {chunk_metadata.get('typeGroup', 'unknown')} - {chunk_metadata.get('chunkSize', 0)} chars]"
- chunk_content = f"{chunk_info}\n{chunk_content}"
-
- doc_content += chunk_content
- else:
- # Handle error chunks
- error_msg = f"[ERROR in chunk {chunk_result.chunkIndex}: {chunk_metadata.get('error', 'Unknown error')}]"
- doc_content += error_msg
-
- merged_documents.append(doc_header + doc_content)
+ # Apply existing merging logic using the sophisticated merging system
+ from modules.services.serviceExtraction.subPipeline import _applyMerging
+ merged_parts = _applyMerging(content_parts, merge_strategy)
- # Join all documents
- final_result = "\n\n".join(merged_documents)
+ # Convert merged parts back to final string
+ final_content = "\n\n".join([part.data for part in merged_parts])
- logger.info(f"Merged {len(chunkResults)} chunks from {len(results_by_document)} documents")
- return final_result.strip()
+ logger.info(f"Merged {len(chunkResults)} chunks using existing sophisticated merging system")
+ return final_content.strip()
def _mergeChunkResultsClean(
self,
chunkResults: List[ChunkResult],
options: Optional[AiCallOptions] = None
) -> str:
- """Merge chunk results in CLEAN mode - no debug metadata or document headers."""
-
+ """Merge chunk results in CLEAN mode using existing sophisticated merging system."""
if not chunkResults:
return ""
- # Get merging configuration from options
- chunk_separator = "\n\n"
- include_document_headers = False # CLEAN MODE: No document headers
- include_chunk_metadata = False # CLEAN MODE: No chunk metadata
-
- if options:
- if hasattr(options, 'chunkSeparator'):
- chunk_separator = options.chunkSeparator
- elif hasattr(options, 'mergeStrategy') and options.mergeStrategy:
- chunk_separator = options.mergeStrategy.get("chunkSeparator", "\n\n")
-
- # Group chunk results by document
- results_by_document = {}
+ # Convert ChunkResults back to ContentParts for existing merger system
+ from modules.datamodels.datamodelExtraction import ContentPart
+ content_parts = []
for chunk_result in chunkResults:
- doc_id = chunk_result.documentId
- if doc_id not in results_by_document:
- results_by_document[doc_id] = []
- results_by_document[doc_id].append(chunk_result)
-
- # Sort chunks within each document by chunk index
- for doc_id in results_by_document:
- results_by_document[doc_id].sort(key=lambda x: x.chunkIndex)
-
- # Merge results for each document in CLEAN mode
- merged_documents = []
-
- for doc_id, doc_chunks in results_by_document.items():
- # CLEAN MODE: No document headers
- doc_header = ""
+ # Skip empty or error chunks in clean mode
+ if not chunk_result.metadata.get("success", False):
+ continue
+ if not chunk_result.aiResult or not chunk_result.aiResult.strip():
+ continue
+ # Skip container/binary chunks in clean mode
+ if chunk_result.aiResult.startswith("[Skipped ") and "content:" in chunk_result.aiResult:
+ continue
- # Merge chunks for this document
- doc_content = ""
- for i, chunk_result in enumerate(doc_chunks):
- # Add chunk separator (except for first chunk)
- if i > 0:
- doc_content += chunk_separator
-
- # Add chunk content without metadata
- chunk_metadata = chunk_result.metadata
- if chunk_metadata.get("success", False):
- chunk_content = chunk_result.aiResult
-
- # CLEAN MODE: Skip container/binary chunks entirely
- if chunk_content.startswith("[Skipped ") and "content:" in chunk_content:
- continue # Skip container/binary chunks in clean mode
-
- # CLEAN MODE: Skip empty or whitespace-only chunks
- if not chunk_content.strip():
- continue # Skip empty chunks in clean mode
-
- # CLEAN MODE: No chunk metadata
- doc_content += chunk_content
- else:
- # Handle error chunks silently in clean mode
- continue
-
- merged_documents.append(doc_header + doc_content)
+ # Create ContentPart from ChunkResult with proper typeGroup
+ content_part = ContentPart(
+ id=chunk_result.originalChunk.id,
+ parentId=chunk_result.originalChunk.parentId,
+ label=chunk_result.originalChunk.label,
+ typeGroup=chunk_result.originalChunk.typeGroup, # Use original typeGroup
+ mimeType=chunk_result.originalChunk.mimeType,
+ data=chunk_result.aiResult, # Use AI result as data
+ metadata={
+ **chunk_result.originalChunk.metadata,
+ "aiResult": True,
+ "chunk": True,
+ "chunkIndex": chunk_result.chunkIndex,
+ "documentId": chunk_result.documentId,
+ "processingTime": chunk_result.processingTime,
+ "success": chunk_result.metadata.get("success", False)
+ }
+ )
+ content_parts.append(content_part)
- # Join all documents
- final_result = "\n\n".join(merged_documents)
+ # Use existing merging strategy for clean mode
+ merge_strategy = {
+ "useIntelligentMerging": True,
+ "groupBy": "documentId", # Group by document
+ "orderBy": "chunkIndex", # Order by chunk index
+ "mergeType": "concatenate"
+ }
- return final_result.strip()
+ if options and hasattr(options, 'mergeStrategy'):
+ merge_strategy.update(options.mergeStrategy)
+
+ # Apply existing merging logic using the sophisticated merging system
+ from modules.services.serviceExtraction.subPipeline import _applyMerging
+ merged_parts = _applyMerging(content_parts, merge_strategy)
+
+ # Convert merged parts back to final string
+ final_content = "\n\n".join([part.data for part in merged_parts])
+
+ logger.info(f"Merged {len(content_parts)} chunks in clean mode using existing sophisticated merging system")
+ return final_content.strip()
def _mergeChunkResultsJson(
self,
chunkResults: List[ChunkResult],
options: Optional[AiCallOptions] = None
) -> Dict[str, Any]:
- """Merge chunk results in JSON mode - returns structured JSON document."""
-
+ """Merge chunk results in JSON mode using existing sophisticated merging system."""
if not chunkResults:
return {"metadata": {"title": "Empty Document"}, "sections": []}
- # Group chunk results by document
- results_by_document = {}
+ # Convert ChunkResults back to ContentParts for existing merger system
+ from modules.datamodels.datamodelExtraction import ContentPart
+ content_parts = []
for chunk_result in chunkResults:
- doc_id = chunk_result.documentId
- if doc_id not in results_by_document:
- results_by_document[doc_id] = []
- results_by_document[doc_id].append(chunk_result)
+ # Create ContentPart from ChunkResult with proper typeGroup
+ content_part = ContentPart(
+ id=chunk_result.originalChunk.id,
+ parentId=chunk_result.originalChunk.parentId,
+ label=chunk_result.originalChunk.label,
+ typeGroup=chunk_result.originalChunk.typeGroup, # Use original typeGroup
+ mimeType=chunk_result.originalChunk.mimeType,
+ data=chunk_result.aiResult, # Use AI result as data
+ metadata={
+ **chunk_result.originalChunk.metadata,
+ "aiResult": True,
+ "chunk": True,
+ "chunkIndex": chunk_result.chunkIndex,
+ "documentId": chunk_result.documentId,
+ "processingTime": chunk_result.processingTime,
+ "success": chunk_result.metadata.get("success", False)
+ }
+ )
+ content_parts.append(content_part)
- # Sort chunks within each document by chunk index
- for doc_id in results_by_document:
- results_by_document[doc_id].sort(key=lambda x: x.chunkIndex)
+ # Use existing merging strategy for JSON mode
+ merge_strategy = {
+ "useIntelligentMerging": True,
+ "groupBy": "documentId", # Group by document
+ "orderBy": "chunkIndex", # Order by chunk index
+ "mergeType": "concatenate"
+ }
- # Merge JSON results for each document
- all_documents = []
+ if options and hasattr(options, 'mergeStrategy'):
+ merge_strategy.update(options.mergeStrategy)
+
+ # Apply existing merging logic using the sophisticated merging system
+ from modules.services.serviceExtraction.subPipeline import _applyMerging
+ merged_parts = _applyMerging(content_parts, merge_strategy)
+
+ # Convert merged parts to JSON format
all_sections = []
document_titles = []
- combined_metadata = {"title": "Merged Document", "splitStrategy": "by_section"}
- for doc_id, doc_chunks in results_by_document.items():
- # Process each chunk's JSON result
- for chunk_result in doc_chunks:
- chunk_metadata = chunk_result.metadata
- if chunk_metadata.get("success", False):
- try:
- # Parse JSON from AI result
- chunk_json = json.loads(chunk_result.aiResult)
+ for part in merged_parts:
+ if part.metadata.get("success", False):
+ try:
+ # Parse JSON from AI result
+ chunk_json = json.loads(part.data)
+
+ # Check if this is a multi-file response (has "documents" key)
+ if isinstance(chunk_json, dict) and "documents" in chunk_json:
+ # This is a multi-file response - merge all documents
+ logger.debug(f"Processing multi-file response from part {part.id} with {len(chunk_json['documents'])} documents")
- # Check if this is a multi-file response (has "documents" key)
- if isinstance(chunk_json, dict) and "documents" in chunk_json:
- # This is a multi-file response - merge all documents
- logger.debug(f"Processing multi-file response from chunk {chunk_result.chunkIndex} with {len(chunk_json['documents'])} documents")
-
- # Add all documents from this chunk
- for doc in chunk_json["documents"]:
- # Add chunk context to document
- doc["metadata"] = doc.get("metadata", {})
- doc["metadata"]["source_chunk"] = chunk_result.chunkIndex
- doc["metadata"]["source_document"] = doc_id
- all_documents.append(doc)
-
- # Update combined metadata
- if "metadata" in chunk_json:
- combined_metadata.update(chunk_json["metadata"])
-
- # Extract sections from single-file response (fallback)
- elif isinstance(chunk_json, dict) and "sections" in chunk_json:
- for section in chunk_json["sections"]:
- # Add document context to section
- section["metadata"] = section.get("metadata", {})
- section["metadata"]["source_document"] = doc_id
- section["metadata"]["chunk_index"] = chunk_result.chunkIndex
- all_sections.append(section)
-
- # Extract document title
- if isinstance(chunk_json, dict) and "metadata" in chunk_json:
- title = chunk_json["metadata"].get("title", "")
- if title and title not in document_titles:
- document_titles.append(title)
-
- except json.JSONDecodeError as e:
- logger.warning(f"Failed to parse JSON from chunk {chunk_result.chunkIndex}: {str(e)}")
- # Create a fallback section for invalid JSON
- fallback_section = {
- "id": f"error_section_{chunk_result.chunkIndex}",
- "title": "Error Section",
- "content_type": "paragraph",
- "elements": [{
- "text": f"Error parsing chunk {chunk_result.chunkIndex}: {str(e)}"
- }],
- "order": chunk_result.chunkIndex,
- "metadata": {
- "source_document": doc_id,
- "chunk_index": chunk_result.chunkIndex,
- "error": str(e)
- }
+ # Return multi-file response directly
+ return {
+ "metadata": chunk_json.get("metadata", {"title": "Merged Document"}),
+ "documents": chunk_json["documents"]
}
- all_sections.append(fallback_section)
- else:
- # Handle error chunks
- error_section = {
- "id": f"error_section_{chunk_result.chunkIndex}",
+
+ # Extract sections from single-file response
+ elif isinstance(chunk_json, dict) and "sections" in chunk_json:
+ for section in chunk_json["sections"]:
+ # Add part context to section
+ section["metadata"] = section.get("metadata", {})
+ section["metadata"]["source_part"] = part.id
+ section["metadata"]["source_document"] = part.metadata.get("documentId", "unknown")
+ section["metadata"]["chunk_index"] = part.metadata.get("chunkIndex", 0)
+ all_sections.append(section)
+
+ # Extract document title
+ if isinstance(chunk_json, dict) and "metadata" in chunk_json:
+ title = chunk_json["metadata"].get("title", "")
+ if title and title not in document_titles:
+ document_titles.append(title)
+
+ except json.JSONDecodeError as e:
+ logger.warning(f"Failed to parse JSON from part {part.id}: {str(e)}")
+ # Create a fallback section for invalid JSON
+ fallback_section = {
+ "id": f"error_section_{part.id}",
"title": "Error Section",
"content_type": "paragraph",
"elements": [{
- "text": f"Error in chunk {chunk_result.chunkIndex}: {chunk_metadata.get('error', 'Unknown error')}"
+ "text": f"Error parsing part {part.id}: {str(e)}"
}],
- "order": chunk_result.chunkIndex,
+ "order": part.metadata.get("chunkIndex", 0),
"metadata": {
- "source_document": doc_id,
- "chunk_index": chunk_result.chunkIndex,
- "error": chunk_metadata.get('error', 'Unknown error')
+ "source_document": part.metadata.get("documentId", "unknown"),
+ "part_id": part.id,
+ "error": str(e)
}
}
- all_sections.append(error_section)
+ all_sections.append(fallback_section)
+ else:
+ # Handle error parts
+ error_section = {
+ "id": f"error_section_{part.id}",
+ "title": "Error Section",
+ "content_type": "paragraph",
+ "elements": [{
+ "text": f"Error in part {part.id}: {part.metadata.get('error', 'Unknown error')}"
+ }],
+ "order": part.metadata.get("chunkIndex", 0),
+ "metadata": {
+ "source_document": part.metadata.get("documentId", "unknown"),
+ "part_id": part.id,
+ "error": part.metadata.get('error', 'Unknown error')
+ }
+ }
+ all_sections.append(error_section)
# Sort sections by order
all_sections.sort(key=lambda x: x.get("order", 0))
- # If we have merged documents from multi-file responses, return them
- if all_documents:
- logger.info(f"Merged {len(all_documents)} documents from {len(chunkResults)} chunks")
- return {
- "metadata": combined_metadata,
- "documents": all_documents
- }
-
- # Otherwise, create merged document with sections (single-file fallback)
+ # Create merged document with sections
merged_document = {
"metadata": {
"title": document_titles[0] if document_titles else "Merged Document",
- "source_documents": list(results_by_document.keys()),
- "extraction_method": "ai_json_extraction",
- "version": "1.0"
+ "extraction_method": "ai_json_extraction_with_merging",
+ "version": "2.0"
},
"sections": all_sections,
- "summary": f"Merged document from {len(results_by_document)} source documents",
- "tags": ["merged", "ai_generated"]
+ "summary": f"Merged document using sophisticated merging system",
+ "tags": ["merged", "ai_generated", "sophisticated_merging"]
}
- logger.info(f"Merged {len(chunkResults)} chunks from {len(results_by_document)} documents (JSON mode)")
+ logger.info(f"Merged {len(chunkResults)} chunks using existing sophisticated merging system (JSON mode)")
return merged_document
- def _getModelCapabilitiesForContent(self, prompt: str, documents: Optional[List[ChatDocument]], options: AiCallOptions) -> Dict[str, int]:
- """
- Get model capabilities for content processing, including appropriate size limits for chunking.
- Uses centralized model selection to determine chunking parameters.
- """
- # Estimate total content size
- prompt_size = len(prompt.encode('utf-8'))
- document_size = 0
- if documents:
- # Rough estimate of document content size
- for doc in documents:
- document_size += doc.fileSize or 0
-
- total_size = prompt_size + document_size
-
- # Use centralized model selection to get the best model for chunking parameters
- try:
- from modules.aicore.aicoreModelRegistry import modelRegistry
- from modules.aicore.aicoreModelSelector import model_selector
-
- # Get available models and select the best one for this operation
- availableModels = modelRegistry.getAvailableModels()
- selectedModel = model_selector.selectModel(prompt, "", options, availableModels)
-
- if selectedModel:
- context_length = selectedModel.contextLength
- model_name = selectedModel.name
- logger.debug(f"Selected model for chunking: {model_name} with context length: {context_length}")
- else:
- # Fallback to conservative default if no model selected
- context_length = 128000 # GPT-4o default
- model_name = "fallback"
- logger.warning(f"No model selected for chunking, using fallback context length: {context_length}")
-
- except Exception as e:
- # Fallback to conservative default if model selection fails
- context_length = 128000 # GPT-4o default
- model_name = "fallback"
- logger.error(f"Model selection failed for chunking: {e}, using fallback context length: {context_length}")
-
- # Calculate appropriate sizes
- # Convert tokens to bytes (rough estimate: 1 token ≈ 4 characters)
- context_length_bytes = int(context_length * 4)
- max_context_bytes = int(context_length_bytes * 0.9) # 90% of context length
- text_chunk_size = int(max_context_bytes * 0.7) # 70% of max context for text chunks
- image_chunk_size = int(max_context_bytes * 0.8) # 80% of max context for image chunks
-
- logger.debug(f"Content size: {total_size} bytes, Max context: {max_context_bytes} bytes")
- logger.debug(f"Text chunk size: {text_chunk_size} bytes, Image chunk size: {image_chunk_size} bytes")
-
- return {
- "maxContextBytes": max_context_bytes,
- "textChunkSize": text_chunk_size,
- "imageChunkSize": image_chunk_size
- }
+# REMOVED: _getModelCapabilitiesForContent method - no longer needed with model-aware chunking
diff --git a/modules/services/serviceExtraction/subPipeline.py b/modules/services/serviceExtraction/subPipeline.py
index d0b79eaf..9d8193da 100644
--- a/modules/services/serviceExtraction/subPipeline.py
+++ b/modules/services/serviceExtraction/subPipeline.py
@@ -73,139 +73,18 @@ def runExtraction(extractorRegistry: ExtractorRegistry, chunkerRegistry: Chunker
parts = extractor.extract(documentBytes, {"fileName": fileName, "mimeType": mimeType, "options": options})
- # Apply chunking and size limiting
- parts = poolAndLimit(parts, chunkerRegistry, options)
+ # REMOVED: poolAndLimit(parts, chunkerRegistry, options)
+ # REMOVED: Chunking logic - now handled in AI call phase
- # Optional merge step - but preserve chunks
+ # Apply merging strategy if provided (preserve existing logic)
mergeStrategy = options.get("mergeStrategy", {})
if mergeStrategy:
-
- # Don't merge chunks - they should stay separate for processing
- non_chunk_parts = [p for p in parts if not p.metadata.get("chunk", False)]
- chunk_parts = [p for p in parts if p.metadata.get("chunk", False)]
-
- logger.debug(f"runExtraction: Preserving {len(chunk_parts)} chunks from merging")
- logger.debug(f"runExtraction - non_chunk_parts: {len(non_chunk_parts)}, chunk_parts: {len(chunk_parts)}")
-
- # Apply intelligent merging for small text parts
- if non_chunk_parts:
- # Count text parts
- text_parts = [p for p in non_chunk_parts if p.typeGroup == "text"]
- if len(text_parts) > 5: # If we have many small text parts, merge them
- logger.info(f"🔧 Merging {len(text_parts)} small text parts for efficiency")
- non_chunk_parts = _mergeParts(non_chunk_parts, mergeStrategy)
-
- # Combine non-chunk parts with chunk parts (chunks stay separate)
- parts = non_chunk_parts + chunk_parts
-
- logger.debug(f"runExtraction: Final parts after merging: {len(parts)} (chunks: {len(chunk_parts)})")
- logger.debug(f"runExtraction - Final parts: {len(parts)} (chunks: {len(chunk_parts)})")
- # Timestamp-only extraction debug dumps removed
-
+ parts = _applyMerging(parts, mergeStrategy)
+
return ContentExtracted(id=makeId(), parts=parts)
-def poolAndLimit(parts: List[ContentPart], chunkerRegistry: ChunkerRegistry, options: Dict[str, Any]) -> List[ContentPart]:
- maxSize = int(options.get("maxSize", 0) or 0)
- chunkAllowed = bool(options.get("chunkAllowed", False))
- mergeStrategy = options.get("mergeStrategy", {})
-
- if maxSize <= 0:
- # Still apply merging if strategy provided
- if mergeStrategy:
- return _applyMerging(parts, mergeStrategy)
- return parts
-
- # First, try to fit within size limit
- current = 0
- kept: List[ContentPart] = []
- remaining: List[ContentPart] = []
-
- logger.debug(f"Starting poolAndLimit with {len(parts)} parts, maxSize={maxSize}")
-
- for i, p in enumerate(parts):
- size = int(p.metadata.get("size", 0) or 0)
- # Show first 50 characters of text content for debugging
- content_preview = p.data[:50].replace('\n', '\\n') if p.data else ""
- logger.debug(f"Part {i}: {p.typeGroup} - {size} bytes - '{content_preview}...' (current: {current})")
- if current + size <= maxSize:
- kept.append(p)
- current += size
- logger.debug(f"Part {i} kept (total: {current})")
- else:
- remaining.append(p)
- logger.debug(f"Part {i} moved to remaining")
-
- logger.debug(f"Kept: {len(kept)}, Remaining: {len(remaining)}")
-
- # If we have remaining parts and chunking is allowed, try chunking
- if remaining and chunkAllowed:
- logger.debug(f"=== CHUNKING ACTIVATED ===")
- logger.debug(f"Remaining parts to chunk: {len(remaining)}")
- logger.debug(f"Max size limit: {maxSize} bytes")
- logger.debug(f"Current size used: {current} bytes")
- logger.debug(f"Chunking {len(remaining)} remaining parts")
-
- for p in remaining:
- if p.typeGroup in ("text", "table", "structure", "image", "container", "binary"):
- logger.debug(f"Chunking {p.typeGroup} part: {len(p.data)} chars")
- logger.debug(f"Chunking {p.typeGroup} part with {len(p.data)} chars")
- chunks = chunkerRegistry.resolve(p.typeGroup).chunk(p, options)
- logger.debug(f"Created {len(chunks)} chunks")
- logger.debug(f"Created {len(chunks)} chunks")
-
- chunks_added = 0
- for ch in chunks:
- chSize = int(ch.get("size", 0) or 0)
- # Add all chunks - don't limit by maxSize since they'll be processed separately
- kept.append(ContentPart(
- id=makeId(),
- parentId=p.id,
- label=f"chunk_{ch.get('order', 0)}",
- typeGroup=p.typeGroup,
- mimeType=p.mimeType,
- data=ch.get("data", ""),
- metadata={
- "size": chSize,
- "chunk": True,
- **ch.get("metadata", {})
- }
- ))
- chunks_added += 1
- logger.debug(f"Added chunk {ch.get('order', 0)}: {chSize} bytes")
-
- logger.debug(f"Added {chunks_added} chunks from {p.typeGroup} part")
-
- # Apply merging strategy if provided, but preserve chunks
- if mergeStrategy:
- # Don't merge chunks - they should stay separate for processing
- non_chunk_parts = [p for p in kept if not p.metadata.get("chunk", False)]
- chunk_parts = [p for p in kept if p.metadata.get("chunk", False)]
-
- logger.debug(f"Preserving {len(chunk_parts)} chunks from merging")
-
- # Apply intelligent merging for small text parts
- if non_chunk_parts:
- # Count text parts
- text_parts = [p for p in non_chunk_parts if p.typeGroup == "text"]
- if len(text_parts) > 5: # If we have many small text parts, merge them
- logger.info(f"🔧 Merging {len(text_parts)} small text parts for efficiency")
- non_chunk_parts = _applyMerging(non_chunk_parts, mergeStrategy)
-
- # Combine non-chunk parts with chunk parts (chunks stay separate)
- kept = non_chunk_parts + chunk_parts
-
- logger.debug(f"Final parts after merging: {len(kept)} (chunks: {len(chunk_parts)})")
- logger.debug(f"Final parts after merging: {len(kept)} (chunks: {len(chunk_parts)})")
-
- # Re-check size after merging
- totalSize = sum(int(p.metadata.get("size", 0) or 0) for p in kept)
- if totalSize > maxSize and mergeStrategy.get("maxSize"):
- # Apply size limit to merged parts
- kept = _applySizeLimit(kept, maxSize)
-
- logger.debug(f"poolAndLimit returning {len(kept)} parts")
- return kept
+# REMOVED: poolAndLimit function - chunking now handled in AI call phase
def _applyMerging(parts: List[ContentPart], strategy: Dict[str, Any]) -> List[ContentPart]:
@@ -264,37 +143,5 @@ def _applyMerging(parts: List[ContentPart], strategy: Dict[str, Any]) -> List[Co
return merged
-def _applySizeLimit(parts: List[ContentPart], maxSize: int) -> List[ContentPart]:
- """Apply size limit by prioritizing parts and truncating if necessary."""
- # Sort by priority: text first, then others
- priority_order = {"text": 0, "table": 1, "structure": 2, "image": 3, "binary": 4, "metadata": 5, "container": 6}
- sorted_parts = sorted(parts, key=lambda p: priority_order.get(p.typeGroup, 99))
-
- kept: List[ContentPart] = []
- current_size = 0
-
- for part in sorted_parts:
- part_size = int(part.metadata.get("size", 0) or 0)
- if current_size + part_size <= maxSize:
- kept.append(part)
- current_size += part_size
- else:
- # Try to truncate text parts
- if part.typeGroup == "text" and part_size > 0:
- remaining_size = maxSize - current_size
- if remaining_size > 1000: # Only truncate if we have meaningful space
- truncated_data = part.data[:remaining_size * 4] # Rough character estimate
- truncated_part = ContentPart(
- id=makeId(),
- parentId=part.parentId,
- label=f"{part.label}_truncated",
- typeGroup=part.typeGroup,
- mimeType=part.mimeType,
- data=truncated_data,
- metadata={**part.metadata, "size": len(truncated_data.encode('utf-8')), "truncated": True}
- )
- kept.append(truncated_part)
- break
-
- return kept
+# REMOVED: _applySizeLimit function - no longer needed after removing poolAndLimit
diff --git a/test_ai_behavior.py b/test_ai_behavior.py
index b5951d06..34a2ad60 100644
--- a/test_ai_behavior.py
+++ b/test_ai_behavior.py
@@ -138,6 +138,7 @@ class AIBehaviorTester:
self.testResults.append(result)
return result
+
def _extractContinuationInstruction(self, response: str) -> str:
"""Extract continuation instruction from response."""
try:
diff --git a/test_ai_model_selection.py b/test_ai_model_selection.py
index c11742d1..2f7b016f 100644
--- a/test_ai_model_selection.py
+++ b/test_ai_model_selection.py
@@ -25,7 +25,7 @@ from modules.datamodels.datamodelAi import (
)
from modules.datamodels.datamodelUam import User
from modules.aicore.aicoreModelRegistry import modelRegistry
-from modules.aicore.aicoreModelSelector import model_selector
+from modules.aicore.aicoreModelSelector import modelSelector
class ModelSelectionTester:
@@ -45,6 +45,51 @@ class ModelSelectionTester:
self.services.ai = await AiService.create(self.services)
+ async def _printFallbackListWithContext(self, title: str, prompt: str, context: str, options: AiCallOptions) -> None:
+ print(f"\n{'='*80}")
+ print(f"{title}")
+ print(f"{'='*80}")
+ print(
+ f"Operation={options.operationType.name}, Priority={options.priority.name}, ProcessingMode={options.processingMode.name}"
+ )
+
+ # Show context and prompt sizes
+ promptSize = len(prompt.encode("utf-8"))
+ contextSize = len(context.encode("utf-8"))
+ totalSize = promptSize + contextSize
+ print(f"Prompt size: {promptSize} bytes, Context size: {contextSize} bytes, Total: {totalSize} bytes")
+
+ availableModels = modelRegistry.getAvailableModels()
+ failoverModelList = modelSelector.getFailoverModelList(
+ prompt=prompt,
+ context=context,
+ options=options,
+ availableModels=availableModels,
+ )
+
+ if not failoverModelList:
+ print("No suitable models found (capability filter returned empty list).")
+ return
+
+ print("Prioritized fallback model sequence (name | quality | speed | $/1k in | ctx | score):")
+ for idx, m in enumerate(failoverModelList, 1):
+ costIn = getattr(m, "costPer1kTokensInput", 0.0)
+ # Calculate detailed score breakdown
+ promptSize = len(prompt.encode("utf-8"))
+ contextSize = len(context.encode("utf-8"))
+ totalSize = promptSize + contextSize
+
+ # Get detailed scoring
+ sizeRating = modelSelector._getSizeRating(m, totalSize)
+ processingModeRating = modelSelector._getProcessingModeRating(m.processingMode, options.processingMode)
+ priorityRating = modelSelector._getPriorityRating(m, options.priority)
+ totalScore = sizeRating + processingModeRating + priorityRating
+
+ print(
+ f" {idx:>2}. {m.name} | Q={getattr(m, 'qualityRating', 0)} | S={getattr(m, 'speedRating', 0)} | ${costIn:.4f} | ctx={getattr(m, 'contextLength', 0)} | score={totalScore:.3f}"
+ )
+ print(f" Size: {sizeRating:.3f}, ProcessingMode: {processingModeRating:.3f}, Priority: {priorityRating:.3f}")
+
async def _printFallbackList(self, title: str, prompt: str, options: AiCallOptions) -> None:
print(f"\n{'='*80}")
print(f"{title}")
@@ -53,24 +98,43 @@ class ModelSelectionTester:
f"Operation={options.operationType.name}, Priority={options.priority.name}, ProcessingMode={options.processingMode.name}"
)
+ # Show context and prompt sizes
+ context = "" # Currently using empty context
+ promptSize = len(prompt.encode("utf-8"))
+ contextSize = len(context.encode("utf-8"))
+ totalSize = promptSize + contextSize
+ print(f"Prompt size: {promptSize} bytes, Context size: {contextSize} bytes, Total: {totalSize} bytes")
+
availableModels = modelRegistry.getAvailableModels()
- fallbackModels = model_selector.getFallbackModels(
+ failoverModelList = modelSelector.getFailoverModelList(
prompt=prompt,
- context="",
+ context=context,
options=options,
availableModels=availableModels,
)
- if not fallbackModels:
+ if not failoverModelList:
print("No suitable models found (capability filter returned empty list).")
return
- print("Prioritized fallback model sequence (name | quality | speed | $/1k in | ctx):")
- for idx, m in enumerate(fallbackModels, 1):
+ print("Prioritized fallback model sequence (name | quality | speed | $/1k in | ctx | score):")
+ for idx, m in enumerate(failoverModelList, 1):
costIn = getattr(m, "costPer1kTokensInput", 0.0)
+ # Calculate detailed score breakdown
+ promptSize = len(prompt.encode("utf-8"))
+ contextSize = len(context.encode("utf-8"))
+ totalSize = promptSize + contextSize
+
+ # Get detailed scoring
+ sizeRating = modelSelector._getSizeRating(m, totalSize)
+ processingModeRating = modelSelector._getProcessingModeRating(m.processingMode, options.processingMode)
+ priorityRating = modelSelector._getPriorityRating(m, options.priority)
+ totalScore = sizeRating + processingModeRating + priorityRating
+
print(
- f" {idx:>2}. {m.name} | Q={getattr(m, 'qualityRating', 0)} | S={getattr(m, 'speedRating', 0)} | ${costIn:.4f} | ctx={getattr(m, 'contextLength', 0)}"
+ f" {idx:>2}. {m.name} | Q={getattr(m, 'qualityRating', 0)} | S={getattr(m, 'speedRating', 0)} | ${costIn:.4f} | ctx={getattr(m, 'contextLength', 0)} | score={totalScore:.3f}"
)
+ print(f" Size: {sizeRating:.3f}, ProcessingMode: {processingModeRating:.3f}, Priority: {priorityRating:.3f}")
async def run(self) -> None:
# Scenarios reflecting workflows/
@@ -146,10 +210,93 @@ class ModelSelectionTester:
)
)
+ # Intent analysis (user input understanding)
+ scenarios.append(
+ (
+ "ANALYSE - Quality, Detailed (Intent Analysis)",
+ "Analyze user intent and extract key requirements from the following request: 'I need to create a comprehensive marketing strategy for our new product launch including budget allocation, timeline, and target audience analysis.'",
+ AiCallOptions(
+ operationType=OperationTypeEnum.ANALYSE,
+ priority=PriorityEnum.QUALITY,
+ compressPrompt=False,
+ compressContext=False,
+ processingMode=ProcessingModeEnum.DETAILED,
+ maxCost=0.08,
+ maxProcessingTime=45,
+ resultFormat="json",
+ temperature=0.2,
+ ),
+ )
+ )
+
+ # Review/Validation (quality assurance)
+ scenarios.append(
+ (
+ "ANALYSE - Quality, Detailed (Review/Validation)",
+ "Review and validate the following business proposal for completeness, accuracy, and compliance with industry standards. Identify any gaps or areas for improvement.",
+ AiCallOptions(
+ operationType=OperationTypeEnum.ANALYSE,
+ priority=PriorityEnum.QUALITY,
+ compressPrompt=False,
+ compressContext=False,
+ processingMode=ProcessingModeEnum.DETAILED,
+ maxCost=0.10,
+ maxProcessingTime=60,
+ resultFormat="json",
+ temperature=0.1,
+ ),
+ )
+ )
+
+ # Large context scenario (to test size-based scoring)
+ scenarios.append(
+ (
+ "GENERAL - Balanced, Advanced (Large Context Test)",
+ "Process this large document and provide a comprehensive summary.",
+ AiCallOptions(
+ operationType=OperationTypeEnum.GENERAL,
+ priority=PriorityEnum.BALANCED,
+ compressPrompt=False,
+ compressContext=False,
+ processingMode=ProcessingModeEnum.ADVANCED,
+ maxCost=0.15,
+ maxProcessingTime=120,
+ ),
+ )
+ )
+
# Iterate and print lists
for title, prompt, options in scenarios:
await self._printFallbackList(title, prompt, options)
+ # Test with actual context to see size-based scoring
+ largeContext = """
+ This is a comprehensive business document containing detailed information about our company's strategic initiatives,
+ financial performance, market analysis, competitive landscape, operational metrics, customer feedback,
+ product development roadmap, technology stack, human resources, legal compliance, risk management,
+ sustainability efforts, and future growth plans. The document spans multiple sections including executive summary,
+ market research, financial statements, operational reports, customer insights, product specifications,
+ technology architecture, HR policies, legal frameworks, risk assessments, environmental impact studies,
+ and strategic recommendations. This extensive content is designed to test the model selection algorithm's
+ ability to handle large context sizes and make intelligent decisions about which models are best suited
+ for processing such substantial amounts of information while maintaining efficiency and cost-effectiveness.
+ """ * 10 # Repeat to make it even larger
+
+ await self._printFallbackListWithContext(
+ "GENERAL - Balanced, Advanced (Large Context Test)",
+ "Analyze this comprehensive business document and provide key insights.",
+ largeContext,
+ AiCallOptions(
+ operationType=OperationTypeEnum.GENERAL,
+ priority=PriorityEnum.BALANCED,
+ compressPrompt=False,
+ compressContext=False,
+ processingMode=ProcessingModeEnum.ADVANCED,
+ maxCost=0.15,
+ maxProcessingTime=120,
+ ),
+ )
+
async def main() -> None:
tester = ModelSelectionTester()