wiki/poweron/implementation/implementation_content_handling_with_dynamic_ai_v2.md
ValueOn AG 02fdbeb726 docs
2025-10-22 16:48:52 +02:00

22 KiB

Implementation: Content Handling with Dynamic AI v2

Overview

This document outlines the implementation of model-aware content chunking within the AI call pipeline. The key innovation is moving chunking logic from the extraction phase into the AI call phase, ensuring that content is chunked based on the actual selected model's capabilities rather than a pre-selected model.

Problem Statement

Current Issue

  • Chunking Phase: Uses Model A's context length to determine chunk sizes
  • AI Call Phase: Uses Models B, C, D... in fallback sequence until one succeeds
  • Result: No guarantee that chunking model matches successful AI call model
  • Impact: Suboptimal chunk sizes, potential failures, inconsistent performance

Root Cause

Chunking happens before model selection, using a model that may not be the one actually used for AI calls.

Solution Architecture

Core Concept

Move chunking logic from extraction phase into AI call phase, making it model-aware and dynamic.

Key Principles

  1. Extract First: Pure content extraction without chunking
  2. Chunk on Demand: Chunk content based on actual selected model
  3. Per-Model Chunking: Re-chunk for each model attempt if needed
  4. Fallback with Re-chunking: Each model gets fresh chunking based on its capabilities

Implementation Plan

Phase 1: Modify Extraction Pipeline

1.1 Remove Chunking from Extraction

File: gateway/modules/services/serviceExtraction/subPipeline.py

def runExtraction(extractorRegistry: ExtractorRegistry, chunkerRegistry: ChunkerRegistry, documentBytes: bytes, fileName: str, mimeType: str, options: Dict[str, Any]) -> ContentExtracted:
    extractor = extractorRegistry.resolve(mimeType, fileName)
    if extractor is None:
        # fallback: single binary part
        part = ContentPart(
            id=makeId(),
            parentId=None,
            label="file",
            typeGroup="binary",
            mimeType=mimeType or "application/octet-stream",
            data="",
            metadata={"warning": "No extractor registered"}
        )
        return ContentExtracted(id=makeId(), parts=[part])

    parts = extractor.extract(documentBytes, {"fileName": fileName, "mimeType": mimeType, "options": options})
    
    # REMOVED: poolAndLimit(parts, chunkerRegistry, options)
    # REMOVED: Chunking logic - now handled in AI call phase
    
    # Apply merging strategy if provided (preserve existing logic)
    mergeStrategy = options.get("mergeStrategy", {})
    if mergeStrategy:
        parts = _applyMerging(parts, mergeStrategy)
    
    return ContentExtracted(id=makeId(), parts=parts)

1.2 Update Document Processing

File: gateway/modules/services/serviceAi/subDocumentProcessing.py

async def processDocumentsPerChunk(self, documents: List[ChatDocument], prompt: str, options: Optional[AiCallOptions] = None) -> str:
    if not documents:
        return ""
    
    # REMOVED: _getModelCapabilitiesForContent() - no longer needed for chunking
    # REMOVED: model_capabilities calculation
    
    # Build extraction options WITHOUT chunking parameters
    extractionOptions: Dict[str, Any] = {
        "prompt": prompt,
        "operationType": options.operationType if options else "general",
        "processDocumentsIndividually": True,
        # REMOVED: maxSize, textChunkSize, imageChunkSize
        "mergeStrategy": {
            "useIntelligentMerging": True,
            "prompt": prompt,
            "groupBy": "typeGroup",
            "orderBy": "id",
            "mergeType": "concatenate"
        },
    }
    
    # Extract content WITHOUT chunking
    extractionResult = self.extractionService.extractContent(documents, extractionOptions)
    
    if not isinstance(extractionResult, list):
        return "[Error: No extraction results]"
    
    # Process parts (not chunks) with AI calls
    partResults = await self._processPartsWithMapping(extractionResult, prompt, options)
    
    # Merge results
    mergedContent = self._mergePartResults(partResults, options)
    
    return mergedContent

Phase 2: Implement Model-Aware AI Calls

2.1 Modify AI Call Interface

File: gateway/modules/interfaces/interfaceAiObjects.py

async def call(self, request: AiCallRequest) -> AiCallResponse:
    """Call AI model for text generation with model-aware chunking."""
    prompt = request.prompt
    context = request.context
    options = request.options
    
    # Handle content parts (unified path)
    if hasattr(request, 'contentParts') and request.contentParts:
        return await self._callWithContentParts(request)
    
    # Handle traditional text/context calls
    return await self._callWithTextContext(request)

async def _callWithContentParts(self, request: AiCallRequest) -> AiCallResponse:
    """Process content parts with model-aware chunking (unified for single and multiple parts)."""
    prompt = request.prompt
    options = request.options
    contentParts = request.contentParts
    
    # Get fallback models
    availableModels = modelRegistry.getAvailableModels()
    fallbackModels = model_selector.getFallbackModels(prompt, "", options, availableModels)
    
    if not fallbackModels:
        return self._createErrorResponse("No suitable models found", 0, 0)
    
    # Process each content part
    allResults = []
    for contentPart in contentParts:
        partResult = await self._processContentPartWithFallback(contentPart, prompt, options, fallbackModels)
        allResults.append(partResult)
    
    # Merge all results
    mergedContent = self._mergePartResults(allResults)
    
    return AiCallResponse(
        content=mergedContent,
        modelName="multiple",
        priceUsd=sum(r.priceUsd for r in allResults),
        processingTime=sum(r.processingTime for r in allResults),
        bytesSent=sum(r.bytesSent for r in allResults),
        bytesReceived=sum(r.bytesReceived for r in allResults),
        errorCount=sum(r.errorCount for r in allResults)
    )

async def _processContentPartWithFallback(self, contentPart: ContentPart, prompt: str, options: AiCallOptions, fallbackModels: List[AiModel]) -> AiCallResponse:
    """Process a single content part with model-aware chunking and fallback."""
    lastError = None
    
    for attempt, model in enumerate(fallbackModels):
        try:
            logger.info(f"Processing content part with model: {model.name} (attempt {attempt + 1}/{len(fallbackModels)})")
            
            # Check if part fits in model context
            partSize = len(contentPart.data.encode('utf-8')) if contentPart.data else 0
            modelContextBytes = model.contextLength * 4  # Convert tokens to bytes
            
            if partSize <= modelContextBytes:
                # Part fits - call AI directly
                response = await self._callWithModel(model, prompt, contentPart.data, 0.2, None, partSize)
                logger.info(f"✅ Content part processed successfully with model: {model.name}")
                return response
            else:
                # Part too large - chunk it
                chunks = await self._chunkContentPart(contentPart, model, options)
                if not chunks:
                    raise ValueError(f"Failed to chunk content part for model {model.name}")
                
                # Process each chunk
                chunkResults = []
                for chunk in chunks:
                    chunkResponse = await self._callWithModel(model, prompt, chunk['data'], 0.2, None, chunk['size'])
                    chunkResults.append(chunkResponse)
                
                # Merge chunk results
                mergedContent = self._mergeChunkResults(chunkResults)
                totalPrice = sum(r.priceUsd for r in chunkResults)
                totalTime = sum(r.processingTime for r in chunkResults)
                totalBytesSent = sum(r.bytesSent for r in chunkResults)
                totalBytesReceived = sum(r.bytesReceived for r in chunkResults)
                totalErrors = sum(r.errorCount for r in chunkResults)
                
                logger.info(f"✅ Content part chunked and processed with model: {model.name} ({len(chunks)} chunks)")
                return AiCallResponse(
                    content=mergedContent,
                    modelName=model.name,
                    priceUsd=totalPrice,
                    processingTime=totalTime,
                    bytesSent=totalBytesSent,
                    bytesReceived=totalBytesReceived,
                    errorCount=totalErrors
                )
                
        except Exception as e:
            lastError = e
            logger.warning(f"❌ Model {model.name} failed for content part: {str(e)}")
            
            if attempt < len(fallbackModels) - 1:
                logger.info(f"🔄 Trying next fallback model...")
                continue
            else:
                logger.error(f"💥 All {len(fallbackModels)} models failed for content part")
                break
    
    # All models failed
    return self._createErrorResponse(f"All models failed: {str(lastError)}", 0, 0)

async def _chunkContentPart(self, contentPart: ContentPart, model: AiModel, options: AiCallOptions) -> List[Dict[str, Any]]:
    """Chunk a content part based on model capabilities."""
    # Calculate model-specific chunk sizes
    modelContextBytes = model.contextLength * 4  # Convert tokens to bytes
    maxContextBytes = int(modelContextBytes * 0.9)  # 90% of context length
    textChunkSize = int(maxContextBytes * 0.7)  # 70% of max context for text chunks
    imageChunkSize = int(maxContextBytes * 0.8)  # 80% of max context for image chunks
    
    # Build chunking options
    chunkingOptions = {
        "textChunkSize": textChunkSize,
        "imageChunkSize": imageChunkSize,
        "maxSize": maxContextBytes,
        "chunkAllowed": True
    }
    
    # Get appropriate chunker
    from modules.services.serviceExtraction.subRegistry import ChunkerRegistry
    chunkerRegistry = ChunkerRegistry()
    chunker = chunkerRegistry.resolve(contentPart.typeGroup)
    
    if not chunker:
        logger.warning(f"No chunker found for typeGroup: {contentPart.typeGroup}")
        return []
    
    # Chunk the content part
    try:
        chunks = chunker.chunk(contentPart, chunkingOptions)
        logger.debug(f"Created {len(chunks)} chunks for {contentPart.typeGroup} part")
        return chunks
    except Exception as e:
        logger.error(f"Chunking failed for {contentPart.typeGroup}: {str(e)}")
        return []

2.2 Update Document Processing Interface

File: gateway/modules/services/serviceAi/subDocumentProcessing.py

async def _processPartsWithMapping(self, extractionResult: List[ContentExtracted], prompt: str, options: Optional[AiCallOptions] = None) -> List[PartResult]:
    """Process content parts with proper mapping to preserve relationships."""
    from modules.datamodels.datamodelExtraction import PartResult
    import asyncio
    
    # Collect all parts that need processing
    parts_to_process = []
    part_index = 0
        
    for ec in extractionResult:
        for part in ec.parts:
            if part.typeGroup in ("text", "table", "structure", "image", "container", "binary"):
                # Skip empty container parts
                if part.typeGroup == "container" and (not part.data or len(part.data.strip()) == 0):
                    logger.debug(f"Skipping empty container part: mimeType={part.mimeType}")
                    continue
                
                parts_to_process.append({
                    'part': part,
                    'part_index': part_index,
                    'document_id': ec.id
                })
                part_index += 1
    
    logger.info(f"Processing {len(parts_to_process)} parts with model-aware chunking")
    
    # Process parts in parallel
    async def process_single_part(part_info: Dict) -> PartResult:
        part = part_info['part']
        part_index = part_info['part_index']
        document_id = part_info['document_id']
        
        start_time = time.time()
        
        try:
            # Create AI call request with content part
            from modules.datamodels.datamodelAi import AiCallRequest
            request = AiCallRequest(
                prompt=prompt,
                context="",  # Context is in the content part
                options=options,
                contentParts=[part]  # Pass as list for unified processing
            )
            
            # Call AI with model-aware chunking
            response = await self.aiObjects.call(request)
            
            processing_time = time.time() - start_time
            
            return PartResult(
                originalPart=part,
                aiResult=response.content,
                partIndex=part_index,
                documentId=document_id,
                processingTime=processing_time,
                metadata={
                    "success": True,
                    "partSize": len(part.data) if part.data else 0,
                    "resultSize": len(response.content),
                    "typeGroup": part.typeGroup,
                    "modelName": response.modelName,
                    "priceUsd": response.priceUsd
                }
            )
            
        except Exception as e:
            processing_time = time.time() - start_time
            logger.warning(f"Error processing part {part_index}: {str(e)}")
            
            return PartResult(
                originalPart=part,
                aiResult=f"[Error processing part: {str(e)}]",
                partIndex=part_index,
                documentId=document_id,
                processingTime=processing_time,
                metadata={
                    "success": False,
                    "error": str(e),
                    "partSize": len(part.data) if part.data else 0,
                    "typeGroup": part.typeGroup
                }
            )
    
    # Process parts with concurrency control
    max_concurrent = 5
    if options and hasattr(options, 'maxConcurrentParts'):
        max_concurrent = options.maxConcurrentParts
    
    semaphore = asyncio.Semaphore(max_concurrent)
    
    async def process_with_semaphore(part_info):
        async with semaphore:
            return await process_single_part(part_info)
    
    tasks = [process_with_semaphore(part_info) for part_info in parts_to_process]
    part_results = await asyncio.gather(*tasks, return_exceptions=True)
    
    # Handle exceptions
    processed_results = []
    for i, result in enumerate(part_results):
        if isinstance(result, Exception):
            part_info = parts_to_process[i]
            processed_results.append(PartResult(
                originalPart=part_info['part'],
                aiResult=f"[Error in parallel processing: {str(result)}]",
                partIndex=part_info['part_index'],
                documentId=part_info['document_id'],
                processingTime=0.0,
                metadata={"success": False, "error": str(result)}
            ))
        elif result is not None:
            processed_results.append(result)
    
    logger.info(f"Completed processing {len(processed_results)} parts")
    return processed_results

Phase 3: Update Data Models

3.1 Extend AiCallRequest

File: gateway/modules/datamodels/datamodelAi.py

class AiCallRequest(BaseModel):
    prompt: str
    context: str = ""
    options: AiCallOptions
    contentParts: Optional[List[ContentPart]] = None  # NEW: Content parts (unified for single and multiple)

3.2 Add PartResult Model

File: gateway/modules/datamodels/datamodelExtraction.py

class PartResult(BaseModel):
    originalPart: ContentPart
    aiResult: str
    partIndex: int
    documentId: str
    processingTime: float
    metadata: Dict[str, Any]

Phase 4: Leverage Existing Merging System

4.1 Use Existing Format-Aligned Mergers

Key Insight: The codebase already has a sophisticated, format-aligned merging system that should be leveraged instead of creating new merging logic.

Existing Merging Infrastructure:

  • TextMerger: Handles text content merging with proper grouping and ordering
  • TableMerger: Handles table content merging with structure preservation
  • IntelligentTokenAwareMerger: Advanced AI-aware merging that optimizes token usage
  • DefaultMerger: Fallback for other content types

File: gateway/modules/services/serviceAi/subDocumentProcessing.py

def _mergePartResults(self, partResults: List[PartResult], options: Optional[AiCallOptions] = None) -> str:
    """Merge part results using existing format-aligned merging system."""
    if not partResults:
        return ""
    
    # Convert PartResults back to ContentParts for existing merger system
    content_parts = []
    for part_result in partResults:
        # Create ContentPart from PartResult
        content_part = ContentPart(
            id=part_result.originalPart.id,
            parentId=part_result.originalPart.parentId,
            label=part_result.originalPart.label,
            typeGroup=part_result.originalPart.typeGroup,
            mimeType=part_result.originalPart.mimeType,
            data=part_result.aiResult,  # Use AI result as data
            metadata={
                **part_result.originalPart.metadata,
                "aiResult": True,
                "partIndex": part_result.partIndex,
                "documentId": part_result.documentId,
                "processingTime": part_result.processingTime,
                "success": part_result.metadata.get("success", False)
            }
        )
        content_parts.append(content_part)
    
    # Use existing merging strategy from options
    merge_strategy = {
        "useIntelligentMerging": True,
        "groupBy": "documentId",  # Group by document
        "orderBy": "partIndex",   # Order by part index
        "mergeType": "concatenate"
    }
    
    if options and hasattr(options, 'mergeStrategy'):
        merge_strategy.update(options.mergeStrategy)
    
    # Apply existing merging logic
    from modules.services.serviceExtraction.subPipeline import _applyMerging
    merged_parts = _applyMerging(content_parts, merge_strategy)
    
    # Convert merged parts back to final string
    final_content = "\n\n".join([part.data for part in merged_parts])
    
    logger.info(f"Merged {len(partResults)} parts using existing format-aligned merging system")
    return final_content.strip()

4.2 Benefits of Using Existing Merging System

  • Format Preservation: Maintains document structure and semantic boundaries
  • Content Type Awareness: Handles text, table, and other content types appropriately
  • Token Optimization: Uses intelligent merging to minimize AI calls
  • Proven Reliability: Leverages battle-tested merging logic
  • Consistency: Uses same merging approach across the entire system

Benefits

1. Model-Aware Chunking

  • Content is chunked based on actual selected model's capabilities
  • Optimal chunk sizes for each model attempt
  • No wasted context or oversized chunks

2. Improved Reliability

  • Each model gets fresh chunking based on its capabilities
  • Better fallback behavior with model-specific optimization
  • Reduced failure rates due to chunk size mismatches

3. Performance Optimization

  • Smaller chunks for models with limited context
  • Larger chunks for models with extensive context
  • Reduced API calls through optimal chunking

4. Leverages Existing Infrastructure

  • Reuses existing sophisticated merging system (TextMerger, TableMerger, IntelligentTokenAwareMerger)
  • Maintains format alignment and document structure preservation
  • Consistent with existing codebase patterns and proven reliability

5. Maintainability

  • Reuses existing chunking and merging logic
  • Clear separation of concerns
  • Consistent error handling

Migration Strategy

Phase 1: Preparation

  1. Add new data models (PartResult, extended AiCallRequest)
  2. Create new merging functions
  3. Test with existing code (backward compatibility)

Phase 2: Implementation

  1. Modify extraction pipeline (remove chunking)
  2. Update AI call interface (add model-aware chunking)
  3. Update document processing (use new part-based approach)

Phase 3: Testing

  1. Unit tests for new chunking logic
  2. Integration tests for end-to-end flow
  3. Performance testing with various model combinations

Phase 4: Deployment

  1. Gradual rollout with feature flags
  2. Monitor performance and error rates
  3. Full migration once stable

Risk Mitigation

1. Backward Compatibility

  • Keep existing interfaces during transition
  • Feature flags for new vs old behavior
  • Gradual migration path

2. Error Handling

  • Comprehensive error handling for chunking failures
  • Fallback to original content if chunking fails
  • Detailed logging for debugging

3. Performance Monitoring

  • Track chunking performance
  • Monitor API call patterns
  • Alert on unusual error rates

Reflection and Correction

Key Learning: Leverage Existing Merging Infrastructure

During implementation review, it was discovered that the codebase already contains a sophisticated, format-aligned merging system that should be leveraged rather than creating new merging logic. The existing system includes:

  • TextMerger: Handles text content with proper grouping and ordering
  • TableMerger: Preserves table structure during merging
  • IntelligentTokenAwareMerger: Optimizes AI calls through token-aware merging
  • DefaultMerger: Fallback for other content types

Updated Approach

The implementation has been updated to:

  1. Reuse existing merging logic instead of creating new _mergePartResults functions
  2. Convert PartResults back to ContentParts to work with existing merger system
  3. Leverage proven merging strategies that maintain document structure and format alignment
  4. Maintain consistency with existing codebase patterns

This approach ensures better reliability, consistency, and maintainability while avoiding duplication of existing, well-tested functionality.

Conclusion

This implementation provides a robust, model-aware content handling system that optimizes chunking based on actual model capabilities. The approach maintains backward compatibility while significantly improving reliability and performance through intelligent, dynamic chunking.

The key innovation is moving chunking from a pre-processing step to a model-aware, on-demand process that adapts to each model's specific capabilities, while leveraging the existing sophisticated merging infrastructure to ensure optimal performance and reliability across all AI model types.