# Implementation: Content Handling with Dynamic AI v2 ## Overview This document outlines the implementation of model-aware content chunking within the AI call pipeline. The key innovation is moving chunking logic from the extraction phase into the AI call phase, ensuring that content is chunked based on the actual selected model's capabilities rather than a pre-selected model. ## Problem Statement ### Current Issue - **Chunking Phase**: Uses Model A's context length to determine chunk sizes - **AI Call Phase**: Uses Models B, C, D... in fallback sequence until one succeeds - **Result**: No guarantee that chunking model matches successful AI call model - **Impact**: Suboptimal chunk sizes, potential failures, inconsistent performance ### Root Cause Chunking happens before model selection, using a model that may not be the one actually used for AI calls. ## Solution Architecture ### Core Concept Move chunking logic from extraction phase into AI call phase, making it model-aware and dynamic. ### Key Principles 1. **Extract First**: Pure content extraction without chunking 2. **Chunk on Demand**: Chunk content based on actual selected model 3. **Per-Model Chunking**: Re-chunk for each model attempt if needed 4. **Fallback with Re-chunking**: Each model gets fresh chunking based on its capabilities ## Implementation Plan ### Phase 1: Modify Extraction Pipeline #### 1.1 Remove Chunking from Extraction **File**: `gateway/modules/services/serviceExtraction/subPipeline.py` ```python def runExtraction(extractorRegistry: ExtractorRegistry, chunkerRegistry: ChunkerRegistry, documentBytes: bytes, fileName: str, mimeType: str, options: Dict[str, Any]) -> ContentExtracted: extractor = extractorRegistry.resolve(mimeType, fileName) if extractor is None: # fallback: single binary part part = ContentPart( id=makeId(), parentId=None, label="file", typeGroup="binary", mimeType=mimeType or "application/octet-stream", data="", metadata={"warning": "No extractor registered"} ) return ContentExtracted(id=makeId(), parts=[part]) parts = extractor.extract(documentBytes, {"fileName": fileName, "mimeType": mimeType, "options": options}) # REMOVED: poolAndLimit(parts, chunkerRegistry, options) # REMOVED: Chunking logic - now handled in AI call phase # Apply merging strategy if provided (preserve existing logic) mergeStrategy = options.get("mergeStrategy", {}) if mergeStrategy: parts = _applyMerging(parts, mergeStrategy) return ContentExtracted(id=makeId(), parts=parts) ``` #### 1.2 Update Document Processing **File**: `gateway/modules/services/serviceAi/subDocumentProcessing.py` ```python async def processDocumentsPerChunk(self, documents: List[ChatDocument], prompt: str, options: Optional[AiCallOptions] = None) -> str: if not documents: return "" # REMOVED: _getModelCapabilitiesForContent() - no longer needed for chunking # REMOVED: model_capabilities calculation # Build extraction options WITHOUT chunking parameters extractionOptions: Dict[str, Any] = { "prompt": prompt, "operationType": options.operationType if options else "general", "processDocumentsIndividually": True, # REMOVED: maxSize, textChunkSize, imageChunkSize "mergeStrategy": { "useIntelligentMerging": True, "prompt": prompt, "groupBy": "typeGroup", "orderBy": "id", "mergeType": "concatenate" }, } # Extract content WITHOUT chunking extractionResult = self.extractionService.extractContent(documents, extractionOptions) if not isinstance(extractionResult, list): return "[Error: No extraction results]" # Process parts (not chunks) with AI calls partResults = await self._processPartsWithMapping(extractionResult, prompt, options) # Merge results mergedContent = self._mergePartResults(partResults, options) return mergedContent ``` ### Phase 2: Implement Model-Aware AI Calls #### 2.1 Modify AI Call Interface **File**: `gateway/modules/interfaces/interfaceAiObjects.py` ```python async def call(self, request: AiCallRequest) -> AiCallResponse: """Call AI model for text generation with model-aware chunking.""" prompt = request.prompt context = request.context options = request.options # Handle content parts (unified path) if hasattr(request, 'contentParts') and request.contentParts: return await self._callWithContentParts(request) # Handle traditional text/context calls return await self._callWithTextContext(request) async def _callWithContentParts(self, request: AiCallRequest) -> AiCallResponse: """Process content parts with model-aware chunking (unified for single and multiple parts).""" prompt = request.prompt options = request.options contentParts = request.contentParts # Get fallback models availableModels = modelRegistry.getAvailableModels() fallbackModels = model_selector.getFallbackModels(prompt, "", options, availableModels) if not fallbackModels: return self._createErrorResponse("No suitable models found", 0, 0) # Process each content part allResults = [] for contentPart in contentParts: partResult = await self._processContentPartWithFallback(contentPart, prompt, options, fallbackModels) allResults.append(partResult) # Merge all results mergedContent = self._mergePartResults(allResults) return AiCallResponse( content=mergedContent, modelName="multiple", priceCHF=sum(r.priceCHF for r in allResults), processingTime=sum(r.processingTime for r in allResults), bytesSent=sum(r.bytesSent for r in allResults), bytesReceived=sum(r.bytesReceived for r in allResults), errorCount=sum(r.errorCount for r in allResults) ) async def _processContentPartWithFallback(self, contentPart: ContentPart, prompt: str, options: AiCallOptions, fallbackModels: List[AiModel]) -> AiCallResponse: """Process a single content part with model-aware chunking and fallback.""" lastError = None for attempt, model in enumerate(fallbackModels): try: logger.info(f"Processing content part with model: {model.name} (attempt {attempt + 1}/{len(fallbackModels)})") # Check if part fits in model context partSize = len(contentPart.data.encode('utf-8')) if contentPart.data else 0 modelContextBytes = model.contextLength * 4 # Convert tokens to bytes if partSize <= modelContextBytes: # Part fits - call AI directly response = await self._callWithModel(model, prompt, contentPart.data, 0.2, None, partSize) logger.info(f"✅ Content part processed successfully with model: {model.name}") return response else: # Part too large - chunk it chunks = await self._chunkContentPart(contentPart, model, options) if not chunks: raise ValueError(f"Failed to chunk content part for model {model.name}") # Process each chunk chunkResults = [] for chunk in chunks: chunkResponse = await self._callWithModel(model, prompt, chunk['data'], 0.2, None, chunk['size']) chunkResults.append(chunkResponse) # Merge chunk results mergedContent = self._mergeChunkResults(chunkResults) totalPrice = sum(r.priceCHF for r in chunkResults) totalTime = sum(r.processingTime for r in chunkResults) totalBytesSent = sum(r.bytesSent for r in chunkResults) totalBytesReceived = sum(r.bytesReceived for r in chunkResults) totalErrors = sum(r.errorCount for r in chunkResults) logger.info(f"✅ Content part chunked and processed with model: {model.name} ({len(chunks)} chunks)") return AiCallResponse( content=mergedContent, modelName=model.name, priceCHF=totalPrice, processingTime=totalTime, bytesSent=totalBytesSent, bytesReceived=totalBytesReceived, errorCount=totalErrors ) except Exception as e: lastError = e logger.warning(f"❌ Model {model.name} failed for content part: {str(e)}") if attempt < len(fallbackModels) - 1: logger.info(f"🔄 Trying next fallback model...") continue else: logger.error(f"💥 All {len(fallbackModels)} models failed for content part") break # All models failed return self._createErrorResponse(f"All models failed: {str(lastError)}", 0, 0) async def _chunkContentPart(self, contentPart: ContentPart, model: AiModel, options: AiCallOptions) -> List[Dict[str, Any]]: """Chunk a content part based on model capabilities.""" # Calculate model-specific chunk sizes modelContextBytes = model.contextLength * 4 # Convert tokens to bytes maxContextBytes = int(modelContextBytes * 0.9) # 90% of context length textChunkSize = int(maxContextBytes * 0.7) # 70% of max context for text chunks imageChunkSize = int(maxContextBytes * 0.8) # 80% of max context for image chunks # Build chunking options chunkingOptions = { "textChunkSize": textChunkSize, "imageChunkSize": imageChunkSize, "maxSize": maxContextBytes, "chunkAllowed": True } # Get appropriate chunker from modules.services.serviceExtraction.subRegistry import ChunkerRegistry chunkerRegistry = ChunkerRegistry() chunker = chunkerRegistry.resolve(contentPart.typeGroup) if not chunker: logger.warning(f"No chunker found for typeGroup: {contentPart.typeGroup}") return [] # Chunk the content part try: chunks = chunker.chunk(contentPart, chunkingOptions) logger.debug(f"Created {len(chunks)} chunks for {contentPart.typeGroup} part") return chunks except Exception as e: logger.error(f"Chunking failed for {contentPart.typeGroup}: {str(e)}") return [] ``` #### 2.2 Update Document Processing Interface **File**: `gateway/modules/services/serviceAi/subDocumentProcessing.py` ```python async def _processPartsWithMapping(self, extractionResult: List[ContentExtracted], prompt: str, options: Optional[AiCallOptions] = None) -> List[PartResult]: """Process content parts with proper mapping to preserve relationships.""" from modules.datamodels.datamodelExtraction import PartResult import asyncio # Collect all parts that need processing parts_to_process = [] part_index = 0 for ec in extractionResult: for part in ec.parts: if part.typeGroup in ("text", "table", "structure", "image", "container", "binary"): # Skip empty container parts if part.typeGroup == "container" and (not part.data or len(part.data.strip()) == 0): logger.debug(f"Skipping empty container part: mimeType={part.mimeType}") continue parts_to_process.append({ 'part': part, 'part_index': part_index, 'document_id': ec.id }) part_index += 1 logger.info(f"Processing {len(parts_to_process)} parts with model-aware chunking") # Process parts in parallel async def process_single_part(part_info: Dict) -> PartResult: part = part_info['part'] part_index = part_info['part_index'] document_id = part_info['document_id'] start_time = time.time() try: # Create AI call request with content part from modules.datamodels.datamodelAi import AiCallRequest request = AiCallRequest( prompt=prompt, context="", # Context is in the content part options=options, contentParts=[part] # Pass as list for unified processing ) # Call AI with model-aware chunking response = await self.aiObjects.call(request) processing_time = time.time() - start_time return PartResult( originalPart=part, aiResult=response.content, partIndex=part_index, documentId=document_id, processingTime=processing_time, metadata={ "success": True, "partSize": len(part.data) if part.data else 0, "resultSize": len(response.content), "typeGroup": part.typeGroup, "modelName": response.modelName, "priceCHF": response.priceCHF } ) except Exception as e: processing_time = time.time() - start_time logger.warning(f"Error processing part {part_index}: {str(e)}") return PartResult( originalPart=part, aiResult=f"[Error processing part: {str(e)}]", partIndex=part_index, documentId=document_id, processingTime=processing_time, metadata={ "success": False, "error": str(e), "partSize": len(part.data) if part.data else 0, "typeGroup": part.typeGroup } ) # Process parts with concurrency control max_concurrent = 5 if options and hasattr(options, 'maxConcurrentParts'): max_concurrent = options.maxConcurrentParts semaphore = asyncio.Semaphore(max_concurrent) async def process_with_semaphore(part_info): async with semaphore: return await process_single_part(part_info) tasks = [process_with_semaphore(part_info) for part_info in parts_to_process] part_results = await asyncio.gather(*tasks, return_exceptions=True) # Handle exceptions processed_results = [] for i, result in enumerate(part_results): if isinstance(result, Exception): part_info = parts_to_process[i] processed_results.append(PartResult( originalPart=part_info['part'], aiResult=f"[Error in parallel processing: {str(result)}]", partIndex=part_info['part_index'], documentId=part_info['document_id'], processingTime=0.0, metadata={"success": False, "error": str(result)} )) elif result is not None: processed_results.append(result) logger.info(f"Completed processing {len(processed_results)} parts") return processed_results ``` ### Phase 3: Update Data Models #### 3.1 Extend AiCallRequest **File**: `gateway/modules/datamodels/datamodelAi.py` ```python class AiCallRequest(BaseModel): prompt: str context: str = "" options: AiCallOptions contentParts: Optional[List[ContentPart]] = None # NEW: Content parts (unified for single and multiple) ``` #### 3.2 Add PartResult Model **File**: `gateway/modules/datamodels/datamodelExtraction.py` ```python class PartResult(BaseModel): originalPart: ContentPart aiResult: str partIndex: int documentId: str processingTime: float metadata: Dict[str, Any] ``` ### Phase 4: Leverage Existing Merging System #### 4.1 Use Existing Format-Aligned Mergers **Key Insight**: The codebase already has a sophisticated, format-aligned merging system that should be leveraged instead of creating new merging logic. **Existing Merging Infrastructure**: - **TextMerger**: Handles text content merging with proper grouping and ordering - **TableMerger**: Handles table content merging with structure preservation - **IntelligentTokenAwareMerger**: Advanced AI-aware merging that optimizes token usage - **DefaultMerger**: Fallback for other content types **File**: `gateway/modules/services/serviceAi/subDocumentProcessing.py` ```python def _mergePartResults(self, partResults: List[PartResult], options: Optional[AiCallOptions] = None) -> str: """Merge part results using existing format-aligned merging system.""" if not partResults: return "" # Convert PartResults back to ContentParts for existing merger system content_parts = [] for part_result in partResults: # Create ContentPart from PartResult content_part = ContentPart( id=part_result.originalPart.id, parentId=part_result.originalPart.parentId, label=part_result.originalPart.label, typeGroup=part_result.originalPart.typeGroup, mimeType=part_result.originalPart.mimeType, data=part_result.aiResult, # Use AI result as data metadata={ **part_result.originalPart.metadata, "aiResult": True, "partIndex": part_result.partIndex, "documentId": part_result.documentId, "processingTime": part_result.processingTime, "success": part_result.metadata.get("success", False) } ) content_parts.append(content_part) # Use existing merging strategy from options merge_strategy = { "useIntelligentMerging": True, "groupBy": "documentId", # Group by document "orderBy": "partIndex", # Order by part index "mergeType": "concatenate" } if options and hasattr(options, 'mergeStrategy'): merge_strategy.update(options.mergeStrategy) # Apply existing merging logic from modules.services.serviceExtraction.subPipeline import _applyMerging merged_parts = _applyMerging(content_parts, merge_strategy) # Convert merged parts back to final string final_content = "\n\n".join([part.data for part in merged_parts]) logger.info(f"Merged {len(partResults)} parts using existing format-aligned merging system") return final_content.strip() ``` #### 4.2 Benefits of Using Existing Merging System - **Format Preservation**: Maintains document structure and semantic boundaries - **Content Type Awareness**: Handles text, table, and other content types appropriately - **Token Optimization**: Uses intelligent merging to minimize AI calls - **Proven Reliability**: Leverages battle-tested merging logic - **Consistency**: Uses same merging approach across the entire system ## Benefits ### 1. Model-Aware Chunking - Content is chunked based on actual selected model's capabilities - Optimal chunk sizes for each model attempt - No wasted context or oversized chunks ### 2. Improved Reliability - Each model gets fresh chunking based on its capabilities - Better fallback behavior with model-specific optimization - Reduced failure rates due to chunk size mismatches ### 3. Performance Optimization - Smaller chunks for models with limited context - Larger chunks for models with extensive context - Reduced API calls through optimal chunking ### 4. Leverages Existing Infrastructure - Reuses existing sophisticated merging system (TextMerger, TableMerger, IntelligentTokenAwareMerger) - Maintains format alignment and document structure preservation - Consistent with existing codebase patterns and proven reliability ### 5. Maintainability - Reuses existing chunking and merging logic - Clear separation of concerns - Consistent error handling ## Migration Strategy ### Phase 1: Preparation 1. Add new data models (`PartResult`, extended `AiCallRequest`) 2. Create new merging functions 3. Test with existing code (backward compatibility) ### Phase 2: Implementation 1. Modify extraction pipeline (remove chunking) 2. Update AI call interface (add model-aware chunking) 3. Update document processing (use new part-based approach) ### Phase 3: Testing 1. Unit tests for new chunking logic 2. Integration tests for end-to-end flow 3. Performance testing with various model combinations ### Phase 4: Deployment 1. Gradual rollout with feature flags 2. Monitor performance and error rates 3. Full migration once stable ## Risk Mitigation ### 1. Backward Compatibility - Keep existing interfaces during transition - Feature flags for new vs old behavior - Gradual migration path ### 2. Error Handling - Comprehensive error handling for chunking failures - Fallback to original content if chunking fails - Detailed logging for debugging ### 3. Performance Monitoring - Track chunking performance - Monitor API call patterns - Alert on unusual error rates ## Reflection and Correction ### Key Learning: Leverage Existing Merging Infrastructure During implementation review, it was discovered that the codebase already contains a sophisticated, format-aligned merging system that should be leveraged rather than creating new merging logic. The existing system includes: - **TextMerger**: Handles text content with proper grouping and ordering - **TableMerger**: Preserves table structure during merging - **IntelligentTokenAwareMerger**: Optimizes AI calls through token-aware merging - **DefaultMerger**: Fallback for other content types ### Updated Approach The implementation has been updated to: 1. **Reuse existing merging logic** instead of creating new `_mergePartResults` functions 2. **Convert PartResults back to ContentParts** to work with existing merger system 3. **Leverage proven merging strategies** that maintain document structure and format alignment 4. **Maintain consistency** with existing codebase patterns This approach ensures better reliability, consistency, and maintainability while avoiding duplication of existing, well-tested functionality. ## Conclusion This implementation provides a robust, model-aware content handling system that optimizes chunking based on actual model capabilities. The approach maintains backward compatibility while significantly improving reliability and performance through intelligent, dynamic chunking. The key innovation is moving chunking from a pre-processing step to a model-aware, on-demand process that adapts to each model's specific capabilities, while leveraging the existing sophisticated merging infrastructure to ensure optimal performance and reliability across all AI model types.