22 KiB
Implementation: Content Handling with Dynamic AI v2
Overview
This document outlines the implementation of model-aware content chunking within the AI call pipeline. The key innovation is moving chunking logic from the extraction phase into the AI call phase, ensuring that content is chunked based on the actual selected model's capabilities rather than a pre-selected model.
Problem Statement
Current Issue
- Chunking Phase: Uses Model A's context length to determine chunk sizes
- AI Call Phase: Uses Models B, C, D... in fallback sequence until one succeeds
- Result: No guarantee that chunking model matches successful AI call model
- Impact: Suboptimal chunk sizes, potential failures, inconsistent performance
Root Cause
Chunking happens before model selection, using a model that may not be the one actually used for AI calls.
Solution Architecture
Core Concept
Move chunking logic from extraction phase into AI call phase, making it model-aware and dynamic.
Key Principles
- Extract First: Pure content extraction without chunking
- Chunk on Demand: Chunk content based on actual selected model
- Per-Model Chunking: Re-chunk for each model attempt if needed
- Fallback with Re-chunking: Each model gets fresh chunking based on its capabilities
Implementation Plan
Phase 1: Modify Extraction Pipeline
1.1 Remove Chunking from Extraction
File: gateway/modules/services/serviceExtraction/subPipeline.py
def runExtraction(extractorRegistry: ExtractorRegistry, chunkerRegistry: ChunkerRegistry, documentBytes: bytes, fileName: str, mimeType: str, options: Dict[str, Any]) -> ContentExtracted:
extractor = extractorRegistry.resolve(mimeType, fileName)
if extractor is None:
# fallback: single binary part
part = ContentPart(
id=makeId(),
parentId=None,
label="file",
typeGroup="binary",
mimeType=mimeType or "application/octet-stream",
data="",
metadata={"warning": "No extractor registered"}
)
return ContentExtracted(id=makeId(), parts=[part])
parts = extractor.extract(documentBytes, {"fileName": fileName, "mimeType": mimeType, "options": options})
# REMOVED: poolAndLimit(parts, chunkerRegistry, options)
# REMOVED: Chunking logic - now handled in AI call phase
# Apply merging strategy if provided (preserve existing logic)
mergeStrategy = options.get("mergeStrategy", {})
if mergeStrategy:
parts = _applyMerging(parts, mergeStrategy)
return ContentExtracted(id=makeId(), parts=parts)
1.2 Update Document Processing
File: gateway/modules/services/serviceAi/subDocumentProcessing.py
async def processDocumentsPerChunk(self, documents: List[ChatDocument], prompt: str, options: Optional[AiCallOptions] = None) -> str:
if not documents:
return ""
# REMOVED: _getModelCapabilitiesForContent() - no longer needed for chunking
# REMOVED: model_capabilities calculation
# Build extraction options WITHOUT chunking parameters
extractionOptions: Dict[str, Any] = {
"prompt": prompt,
"operationType": options.operationType if options else "general",
"processDocumentsIndividually": True,
# REMOVED: maxSize, textChunkSize, imageChunkSize
"mergeStrategy": {
"useIntelligentMerging": True,
"prompt": prompt,
"groupBy": "typeGroup",
"orderBy": "id",
"mergeType": "concatenate"
},
}
# Extract content WITHOUT chunking
extractionResult = self.extractionService.extractContent(documents, extractionOptions)
if not isinstance(extractionResult, list):
return "[Error: No extraction results]"
# Process parts (not chunks) with AI calls
partResults = await self._processPartsWithMapping(extractionResult, prompt, options)
# Merge results
mergedContent = self._mergePartResults(partResults, options)
return mergedContent
Phase 2: Implement Model-Aware AI Calls
2.1 Modify AI Call Interface
File: gateway/modules/interfaces/interfaceAiObjects.py
async def call(self, request: AiCallRequest) -> AiCallResponse:
"""Call AI model for text generation with model-aware chunking."""
prompt = request.prompt
context = request.context
options = request.options
# Handle content parts (unified path)
if hasattr(request, 'contentParts') and request.contentParts:
return await self._callWithContentParts(request)
# Handle traditional text/context calls
return await self._callWithTextContext(request)
async def _callWithContentParts(self, request: AiCallRequest) -> AiCallResponse:
"""Process content parts with model-aware chunking (unified for single and multiple parts)."""
prompt = request.prompt
options = request.options
contentParts = request.contentParts
# Get fallback models
availableModels = modelRegistry.getAvailableModels()
fallbackModels = model_selector.getFallbackModels(prompt, "", options, availableModels)
if not fallbackModels:
return self._createErrorResponse("No suitable models found", 0, 0)
# Process each content part
allResults = []
for contentPart in contentParts:
partResult = await self._processContentPartWithFallback(contentPart, prompt, options, fallbackModels)
allResults.append(partResult)
# Merge all results
mergedContent = self._mergePartResults(allResults)
return AiCallResponse(
content=mergedContent,
modelName="multiple",
priceUsd=sum(r.priceUsd for r in allResults),
processingTime=sum(r.processingTime for r in allResults),
bytesSent=sum(r.bytesSent for r in allResults),
bytesReceived=sum(r.bytesReceived for r in allResults),
errorCount=sum(r.errorCount for r in allResults)
)
async def _processContentPartWithFallback(self, contentPart: ContentPart, prompt: str, options: AiCallOptions, fallbackModels: List[AiModel]) -> AiCallResponse:
"""Process a single content part with model-aware chunking and fallback."""
lastError = None
for attempt, model in enumerate(fallbackModels):
try:
logger.info(f"Processing content part with model: {model.name} (attempt {attempt + 1}/{len(fallbackModels)})")
# Check if part fits in model context
partSize = len(contentPart.data.encode('utf-8')) if contentPart.data else 0
modelContextBytes = model.contextLength * 4 # Convert tokens to bytes
if partSize <= modelContextBytes:
# Part fits - call AI directly
response = await self._callWithModel(model, prompt, contentPart.data, 0.2, None, partSize)
logger.info(f"✅ Content part processed successfully with model: {model.name}")
return response
else:
# Part too large - chunk it
chunks = await self._chunkContentPart(contentPart, model, options)
if not chunks:
raise ValueError(f"Failed to chunk content part for model {model.name}")
# Process each chunk
chunkResults = []
for chunk in chunks:
chunkResponse = await self._callWithModel(model, prompt, chunk['data'], 0.2, None, chunk['size'])
chunkResults.append(chunkResponse)
# Merge chunk results
mergedContent = self._mergeChunkResults(chunkResults)
totalPrice = sum(r.priceUsd for r in chunkResults)
totalTime = sum(r.processingTime for r in chunkResults)
totalBytesSent = sum(r.bytesSent for r in chunkResults)
totalBytesReceived = sum(r.bytesReceived for r in chunkResults)
totalErrors = sum(r.errorCount for r in chunkResults)
logger.info(f"✅ Content part chunked and processed with model: {model.name} ({len(chunks)} chunks)")
return AiCallResponse(
content=mergedContent,
modelName=model.name,
priceUsd=totalPrice,
processingTime=totalTime,
bytesSent=totalBytesSent,
bytesReceived=totalBytesReceived,
errorCount=totalErrors
)
except Exception as e:
lastError = e
logger.warning(f"❌ Model {model.name} failed for content part: {str(e)}")
if attempt < len(fallbackModels) - 1:
logger.info(f"🔄 Trying next fallback model...")
continue
else:
logger.error(f"💥 All {len(fallbackModels)} models failed for content part")
break
# All models failed
return self._createErrorResponse(f"All models failed: {str(lastError)}", 0, 0)
async def _chunkContentPart(self, contentPart: ContentPart, model: AiModel, options: AiCallOptions) -> List[Dict[str, Any]]:
"""Chunk a content part based on model capabilities."""
# Calculate model-specific chunk sizes
modelContextBytes = model.contextLength * 4 # Convert tokens to bytes
maxContextBytes = int(modelContextBytes * 0.9) # 90% of context length
textChunkSize = int(maxContextBytes * 0.7) # 70% of max context for text chunks
imageChunkSize = int(maxContextBytes * 0.8) # 80% of max context for image chunks
# Build chunking options
chunkingOptions = {
"textChunkSize": textChunkSize,
"imageChunkSize": imageChunkSize,
"maxSize": maxContextBytes,
"chunkAllowed": True
}
# Get appropriate chunker
from modules.services.serviceExtraction.subRegistry import ChunkerRegistry
chunkerRegistry = ChunkerRegistry()
chunker = chunkerRegistry.resolve(contentPart.typeGroup)
if not chunker:
logger.warning(f"No chunker found for typeGroup: {contentPart.typeGroup}")
return []
# Chunk the content part
try:
chunks = chunker.chunk(contentPart, chunkingOptions)
logger.debug(f"Created {len(chunks)} chunks for {contentPart.typeGroup} part")
return chunks
except Exception as e:
logger.error(f"Chunking failed for {contentPart.typeGroup}: {str(e)}")
return []
2.2 Update Document Processing Interface
File: gateway/modules/services/serviceAi/subDocumentProcessing.py
async def _processPartsWithMapping(self, extractionResult: List[ContentExtracted], prompt: str, options: Optional[AiCallOptions] = None) -> List[PartResult]:
"""Process content parts with proper mapping to preserve relationships."""
from modules.datamodels.datamodelExtraction import PartResult
import asyncio
# Collect all parts that need processing
parts_to_process = []
part_index = 0
for ec in extractionResult:
for part in ec.parts:
if part.typeGroup in ("text", "table", "structure", "image", "container", "binary"):
# Skip empty container parts
if part.typeGroup == "container" and (not part.data or len(part.data.strip()) == 0):
logger.debug(f"Skipping empty container part: mimeType={part.mimeType}")
continue
parts_to_process.append({
'part': part,
'part_index': part_index,
'document_id': ec.id
})
part_index += 1
logger.info(f"Processing {len(parts_to_process)} parts with model-aware chunking")
# Process parts in parallel
async def process_single_part(part_info: Dict) -> PartResult:
part = part_info['part']
part_index = part_info['part_index']
document_id = part_info['document_id']
start_time = time.time()
try:
# Create AI call request with content part
from modules.datamodels.datamodelAi import AiCallRequest
request = AiCallRequest(
prompt=prompt,
context="", # Context is in the content part
options=options,
contentParts=[part] # Pass as list for unified processing
)
# Call AI with model-aware chunking
response = await self.aiObjects.call(request)
processing_time = time.time() - start_time
return PartResult(
originalPart=part,
aiResult=response.content,
partIndex=part_index,
documentId=document_id,
processingTime=processing_time,
metadata={
"success": True,
"partSize": len(part.data) if part.data else 0,
"resultSize": len(response.content),
"typeGroup": part.typeGroup,
"modelName": response.modelName,
"priceUsd": response.priceUsd
}
)
except Exception as e:
processing_time = time.time() - start_time
logger.warning(f"Error processing part {part_index}: {str(e)}")
return PartResult(
originalPart=part,
aiResult=f"[Error processing part: {str(e)}]",
partIndex=part_index,
documentId=document_id,
processingTime=processing_time,
metadata={
"success": False,
"error": str(e),
"partSize": len(part.data) if part.data else 0,
"typeGroup": part.typeGroup
}
)
# Process parts with concurrency control
max_concurrent = 5
if options and hasattr(options, 'maxConcurrentParts'):
max_concurrent = options.maxConcurrentParts
semaphore = asyncio.Semaphore(max_concurrent)
async def process_with_semaphore(part_info):
async with semaphore:
return await process_single_part(part_info)
tasks = [process_with_semaphore(part_info) for part_info in parts_to_process]
part_results = await asyncio.gather(*tasks, return_exceptions=True)
# Handle exceptions
processed_results = []
for i, result in enumerate(part_results):
if isinstance(result, Exception):
part_info = parts_to_process[i]
processed_results.append(PartResult(
originalPart=part_info['part'],
aiResult=f"[Error in parallel processing: {str(result)}]",
partIndex=part_info['part_index'],
documentId=part_info['document_id'],
processingTime=0.0,
metadata={"success": False, "error": str(result)}
))
elif result is not None:
processed_results.append(result)
logger.info(f"Completed processing {len(processed_results)} parts")
return processed_results
Phase 3: Update Data Models
3.1 Extend AiCallRequest
File: gateway/modules/datamodels/datamodelAi.py
class AiCallRequest(BaseModel):
prompt: str
context: str = ""
options: AiCallOptions
contentParts: Optional[List[ContentPart]] = None # NEW: Content parts (unified for single and multiple)
3.2 Add PartResult Model
File: gateway/modules/datamodels/datamodelExtraction.py
class PartResult(BaseModel):
originalPart: ContentPart
aiResult: str
partIndex: int
documentId: str
processingTime: float
metadata: Dict[str, Any]
Phase 4: Leverage Existing Merging System
4.1 Use Existing Format-Aligned Mergers
Key Insight: The codebase already has a sophisticated, format-aligned merging system that should be leveraged instead of creating new merging logic.
Existing Merging Infrastructure:
- TextMerger: Handles text content merging with proper grouping and ordering
- TableMerger: Handles table content merging with structure preservation
- IntelligentTokenAwareMerger: Advanced AI-aware merging that optimizes token usage
- DefaultMerger: Fallback for other content types
File: gateway/modules/services/serviceAi/subDocumentProcessing.py
def _mergePartResults(self, partResults: List[PartResult], options: Optional[AiCallOptions] = None) -> str:
"""Merge part results using existing format-aligned merging system."""
if not partResults:
return ""
# Convert PartResults back to ContentParts for existing merger system
content_parts = []
for part_result in partResults:
# Create ContentPart from PartResult
content_part = ContentPart(
id=part_result.originalPart.id,
parentId=part_result.originalPart.parentId,
label=part_result.originalPart.label,
typeGroup=part_result.originalPart.typeGroup,
mimeType=part_result.originalPart.mimeType,
data=part_result.aiResult, # Use AI result as data
metadata={
**part_result.originalPart.metadata,
"aiResult": True,
"partIndex": part_result.partIndex,
"documentId": part_result.documentId,
"processingTime": part_result.processingTime,
"success": part_result.metadata.get("success", False)
}
)
content_parts.append(content_part)
# Use existing merging strategy from options
merge_strategy = {
"useIntelligentMerging": True,
"groupBy": "documentId", # Group by document
"orderBy": "partIndex", # Order by part index
"mergeType": "concatenate"
}
if options and hasattr(options, 'mergeStrategy'):
merge_strategy.update(options.mergeStrategy)
# Apply existing merging logic
from modules.services.serviceExtraction.subPipeline import _applyMerging
merged_parts = _applyMerging(content_parts, merge_strategy)
# Convert merged parts back to final string
final_content = "\n\n".join([part.data for part in merged_parts])
logger.info(f"Merged {len(partResults)} parts using existing format-aligned merging system")
return final_content.strip()
4.2 Benefits of Using Existing Merging System
- Format Preservation: Maintains document structure and semantic boundaries
- Content Type Awareness: Handles text, table, and other content types appropriately
- Token Optimization: Uses intelligent merging to minimize AI calls
- Proven Reliability: Leverages battle-tested merging logic
- Consistency: Uses same merging approach across the entire system
Benefits
1. Model-Aware Chunking
- Content is chunked based on actual selected model's capabilities
- Optimal chunk sizes for each model attempt
- No wasted context or oversized chunks
2. Improved Reliability
- Each model gets fresh chunking based on its capabilities
- Better fallback behavior with model-specific optimization
- Reduced failure rates due to chunk size mismatches
3. Performance Optimization
- Smaller chunks for models with limited context
- Larger chunks for models with extensive context
- Reduced API calls through optimal chunking
4. Leverages Existing Infrastructure
- Reuses existing sophisticated merging system (TextMerger, TableMerger, IntelligentTokenAwareMerger)
- Maintains format alignment and document structure preservation
- Consistent with existing codebase patterns and proven reliability
5. Maintainability
- Reuses existing chunking and merging logic
- Clear separation of concerns
- Consistent error handling
Migration Strategy
Phase 1: Preparation
- Add new data models (
PartResult, extendedAiCallRequest) - Create new merging functions
- Test with existing code (backward compatibility)
Phase 2: Implementation
- Modify extraction pipeline (remove chunking)
- Update AI call interface (add model-aware chunking)
- Update document processing (use new part-based approach)
Phase 3: Testing
- Unit tests for new chunking logic
- Integration tests for end-to-end flow
- Performance testing with various model combinations
Phase 4: Deployment
- Gradual rollout with feature flags
- Monitor performance and error rates
- Full migration once stable
Risk Mitigation
1. Backward Compatibility
- Keep existing interfaces during transition
- Feature flags for new vs old behavior
- Gradual migration path
2. Error Handling
- Comprehensive error handling for chunking failures
- Fallback to original content if chunking fails
- Detailed logging for debugging
3. Performance Monitoring
- Track chunking performance
- Monitor API call patterns
- Alert on unusual error rates
Reflection and Correction
Key Learning: Leverage Existing Merging Infrastructure
During implementation review, it was discovered that the codebase already contains a sophisticated, format-aligned merging system that should be leveraged rather than creating new merging logic. The existing system includes:
- TextMerger: Handles text content with proper grouping and ordering
- TableMerger: Preserves table structure during merging
- IntelligentTokenAwareMerger: Optimizes AI calls through token-aware merging
- DefaultMerger: Fallback for other content types
Updated Approach
The implementation has been updated to:
- Reuse existing merging logic instead of creating new
_mergePartResultsfunctions - Convert PartResults back to ContentParts to work with existing merger system
- Leverage proven merging strategies that maintain document structure and format alignment
- Maintain consistency with existing codebase patterns
This approach ensures better reliability, consistency, and maintainability while avoiding duplication of existing, well-tested functionality.
Conclusion
This implementation provides a robust, model-aware content handling system that optimizes chunking based on actual model capabilities. The approach maintains backward compatibility while significantly improving reliability and performance through intelligent, dynamic chunking.
The key innovation is moving chunking from a pre-processing step to a model-aware, on-demand process that adapts to each model's specific capabilities, while leveraging the existing sophisticated merging infrastructure to ensure optimal performance and reliability across all AI model types.