wiki/implementation/implementation_core_ai_engine.md

21 KiB

Enhanced Core AI Engine Implementation - Critical Fix for Chunked Documents

Overview

This document describes critical fixes to the existing AI services to properly handle large documents (300+ MB, 200+ documents). The current system has a fundamental flaw in chunked document processing that causes loss of document structure and poor merging quality. This solution fixes the core issue while adding performance improvements.

Critical Problem Analysis

🚨 The Core Problem: Lost Chunk-to-AI-Result Mapping

The current system has a fundamental architectural flaw that breaks chunked document processing:

  1. Chunks are processed sequentially but AI results lose their relationship to original chunks
  2. No mapping between processed chunks and their AI results
  3. Merging loses document structure because it can't maintain chunk order and context
  4. Simple concatenation without awareness of document flow or chunk relationships

Current System Issues (Critical Flaws)

  • Lost Chunk Relationships: AI results are stored as simple strings without reference to original chunks
  • Poor Document Structure: Merged results lack coherence and document flow
  • Lost Metadata: Chunk metadata (page numbers, sections, etc.) is discarded
  • No Context Preservation: Each chunk processed in complete isolation
  • Inconsistent Merging: Simple concatenation without understanding document structure
  • Sequential Processing: Performance bottleneck with large documents

What Works (Existing Strengths)

  • Modular Service Architecture: Clean separation with ExtractionService, AiService, GenerationService, and WorkflowService
  • Robust Chunking System: Intelligent chunking with ChunkerRegistry supporting text, table, and structure chunkers
  • Content Processing Pipeline: Sophisticated extraction → chunking → AI processing → generation flow
  • Format Support: Comprehensive support for multiple input/output formats (PDF, DOCX, XLSX, etc.)
  • Token Limit Elimination: Per-chunk processing eliminates LLM token constraints

Solution Architecture

Core Design Principles

  1. Fix the Core Problem First: Address the lost chunk-to-AI-result mapping before adding enhancements
  2. Preserve Document Structure: Maintain chunk relationships and document flow throughout processing
  3. Enhance Existing System: Build on proven infrastructure rather than replacing it
  4. Parallel Processing: Process multiple chunks simultaneously for better performance
  5. Context Preservation: Maintain context across chunks for better consistency
  6. Backward Compatibility: Maintain existing functionality while fixing critical issues

Fixed Processing Architecture

┌─────────────────────────────────────────────────────────────────────────┐
│                    Fixed AI Service (Core Problem Solved)              │
│  ┌─────────────────┐  ┌─────────────────┐  ┌─────────────────────────┐  │
│  │   Extraction    │  │   Chunk         │  │   Enhanced              │  │
│  │   Service       │  │   Mapping       │  │   Merging               │  │
│  │   (Existing)    │  │   System        │  │   System                │  │
│  │  - Robust       │  │  - ChunkResult  │  │  - Document Structure   │  │
│  │    Chunking     │  │  - AI Mapping   │  │  - Context Preservation │  │
│  │  - Format       │  │  - Metadata     │  │  - Quality Merging      │  │
│  │    Support      │  │  - Order        │  │  - Parallel Processing  │  │
└─────────────────┘  └─────────────────┘  └─────────────────────────┘  │
└─────────────────────────────────────────────────────────────────────────┘
                                │
        ┌───────────────────────┼───────────────────────┐
        │                       │                       │
┌───────▼────────┐    ┌────────▼────────┐    ┌────────▼────────┐
│  Existing      │    │   Chunk         │    │   Document      │
│  Chunkers      │    │   Processing    │    │   Structure     │
│  - Text        │    │  - Parallel     │    │  - Order        │
│  - Table       │    │  - Mapping      │    │  - Context      │
│  - Structure   │    │  - Context      │    │  - Merging      │
│  - Image       │    │  - Metadata     │    │  - Quality      │
└────────────────┘    └─────────────────┘    └─────────────────┘
        │                       │                       │
        └───────────────────────┼───────────────────────┘
                                │
                    ┌───────────▼───────────┐
                    │   Existing Extractors │
                    │   & Infrastructure    │
                    │   - PDF, DOCX, XLSX   │
                    │   - Text, Table, etc. │
                    │   - Chunking Registry │
                    └───────────────────────┘

Key Insight: Fix the Core Mapping Problem

Critical Reality: The existing system eliminates token limits but loses chunk relationships, causing:

  1. Lost Document Structure: Chunks lose their position and context in the document
  2. Poor Merging Quality: Simple concatenation without understanding chunk relationships
  3. Lost Metadata: Chunk metadata (page numbers, sections, etc.) is discarded
  4. No Context Preservation: Each chunk processed in complete isolation
  5. Sequential Processing: Performance bottleneck with large documents

Solution: Fix the fundamental mapping problem with:

  • ChunkResult data model to preserve chunk-to-AI-result relationships
  • Enhanced merging that maintains document structure and chunk order
  • Parallel processing that preserves chunk relationships
  • Context preservation through proper chunk mapping
  • Quality merging that understands document flow

Implementation Details

1. Fix Chunk-to-AI-Result Mapping (Critical Fix)

The core problem is that the current system loses the relationship between chunks and their AI results. We need to create a proper mapping system:

# New data model to preserve chunk relationships
class ChunkResult(BaseModel):
    """Preserves the relationship between a chunk and its AI result."""
    originalChunk: ContentPart
    aiResult: str
    chunkIndex: int
    documentId: str
    processingTime: float = 0.0
    metadata: Dict[str, Any] = Field(default_factory=dict)

# Fixed version of _processDocumentsPerChunk method
async def _processDocumentsPerChunk(
    self,
    documents: List[ChatDocument],
    prompt: str,
    options: Optional[AiCallOptions] = None
) -> str:
    """
    Fixed per-chunk processing that preserves chunk relationships.
    """
    if not documents:
        return ""
    
    # Get model capabilities for size calculation (existing logic)
    model_capabilities = self._getModelCapabilitiesForContent(prompt, documents, options)
    
    # Build extraction options for chunking (existing logic)
    extractionOptions: Dict[str, Any] = {
        "prompt": prompt,
        "operationType": options.operationType if options else "general",
        "processDocumentsIndividually": True,
        "maxSize": model_capabilities["maxContextBytes"],
        "chunkAllowed": True,
        "textChunkSize": model_capabilities["textChunkSize"],
        "imageChunkSize": model_capabilities["imageChunkSize"],
        "imageMaxPixels": 1024 * 1024,
        "imageQuality": 85,
        "mergeStrategy": {
            "groupBy": "typeGroup",
            "orderBy": "id",
            "mergeType": "concatenate"
        },
    }
    
    # Extract content with chunking (existing logic)
    extractionResult = self.extractionService.extractContent(documents, extractionOptions)
    
    # FIXED: Process chunks with proper mapping
    chunkResults = await self._processChunksWithMapping(extractionResult, prompt, options)
    
    # FIXED: Merge with preserved chunk relationships
    mergedContent = self._mergeChunkResults(chunkResults, options)
    
    return mergedContent

async def _processChunksWithMapping(
    self,
    extractionResult: List[ContentExtracted],
    prompt: str,
    options: Optional[AiCallOptions] = None
) -> List[ChunkResult]:
    """Process chunks with proper mapping to preserve relationships."""
    import asyncio
    import time
    
    # Collect all chunks that need processing with proper indexing
    chunks_to_process = []
    chunk_index = 0
    
    for ec in extractionResult:
        for part in ec.parts:
            if part.typeGroup in ("text", "table", "structure", "image"):
                chunks_to_process.append({
                    'part': part,
                    'chunk_index': chunk_index,
                    'document_id': ec.id
                })
                chunk_index += 1
    
    # Process chunks in parallel with proper mapping
    async def process_single_chunk(chunk_info: Dict) -> ChunkResult:
        part = chunk_info['part']
        chunk_index = chunk_info['chunk_index']
        document_id = chunk_info['document_id']
        
        start_time = time.time()
        
        try:
            if part.typeGroup == "image":
                ai_result = await self.readImage(
                    prompt=prompt,
                    imageData=part.data,
                    mimeType=part.mimeType,
                    options=options
                )
            else:
                request = AiCallRequest(
                    prompt=prompt,
                    context=part.data,
                    options=options
                )
                response = await self.aiObjects.call(request)
                ai_result = response.content
            
            processing_time = time.time() - start_time
            
            return ChunkResult(
                originalChunk=part,
                aiResult=ai_result,
                chunkIndex=chunk_index,
                documentId=document_id,
                processingTime=processing_time,
                metadata={
                    "success": True,
                    "chunkSize": len(part.data) if part.data else 0,
                    "resultSize": len(ai_result)
                }
            )
            
        except Exception as e:
            processing_time = time.time() - start_time
            logger.warning(f"Error processing chunk {chunk_index}: {str(e)}")
            
            return ChunkResult(
                originalChunk=part,
                aiResult=f"[Error processing chunk: {str(e)}]",
                chunkIndex=chunk_index,
                documentId=document_id,
                processingTime=processing_time,
                metadata={
                    "success": False,
                    "error": str(e),
                    "chunkSize": len(part.data) if part.data else 0
                }
            )
    
    # Process all chunks in parallel
    tasks = [process_single_chunk(chunk_info) for chunk_info in chunks_to_process]
    chunk_results = await asyncio.gather(*tasks, return_exceptions=True)
    
    # Handle any exceptions in the gather itself
    processed_results = []
    for i, result in enumerate(chunk_results):
        if isinstance(result, Exception):
            # Create error ChunkResult
            chunk_info = chunks_to_process[i]
            processed_results.append(ChunkResult(
                originalChunk=chunk_info['part'],
                aiResult=f"[Error in parallel processing: {str(result)}]",
                chunkIndex=chunk_info['chunk_index'],
                documentId=chunk_info['document_id'],
                processingTime=0.0,
                metadata={"success": False, "error": str(result)}
            ))
        else:
            processed_results.append(result)
    
    return processed_results

def _mergeChunkResults(
    self,
    chunkResults: List[ChunkResult],
    options: Optional[AiCallOptions] = None
) -> str:
    """Merge chunk results while preserving document structure and chunk order."""
    
    if not chunkResults:
        return ""
    
    # Group chunk results by document
    results_by_document = {}
    for chunk_result in chunkResults:
        doc_id = chunk_result.documentId
        if doc_id not in results_by_document:
            results_by_document[doc_id] = []
        results_by_document[doc_id].append(chunk_result)
    
    # Sort chunks within each document by chunk index
    for doc_id in results_by_document:
        results_by_document[doc_id].sort(key=lambda x: x.chunkIndex)
    
    # Merge results for each document
    merged_documents = []
    
    for doc_id, doc_chunks in results_by_document.items():
        # Build document header
        doc_header = f"\n\n=== DOCUMENT: {doc_id} ===\n\n"
        
        # Merge chunks for this document
        doc_content = ""
        for i, chunk_result in enumerate(doc_chunks):
            # Add chunk separator (except for first chunk)
            if i > 0:
                doc_content += "\n\n---\n\n"
            
            # Add chunk content with metadata
            chunk_metadata = chunk_result.metadata
            if chunk_metadata.get("success", False):
                doc_content += chunk_result.aiResult
            else:
                # Handle error chunks
                doc_content += f"[ERROR in chunk {chunk_result.chunkIndex}: {chunk_metadata.get('error', 'Unknown error')}]"
        
        merged_documents.append(doc_header + doc_content)
    
    # Join all documents
    final_result = "\n\n".join(merged_documents)
    
    return final_result.strip()

### 2. Enhanced Data Models (Minimal Extensions)

Add the new `ChunkResult` model to preserve chunk relationships:

```python
# Add to datamodelExtraction.py
class ChunkResult(BaseModel):
    """Preserves the relationship between a chunk and its AI result."""
    originalChunk: ContentPart
    aiResult: str
    chunkIndex: int
    documentId: str
    processingTime: float = 0.0
    metadata: Dict[str, Any] = Field(default_factory=dict)

# Enhanced AiCallOptions with minimal additions
class EnhancedAiCallOptions(AiCallOptions):
    """Enhanced options for improved document processing."""
    
    # Parallel processing
    enableParallelProcessing: bool = Field(
        default=True, 
        description="Enable parallel processing of chunks"
    )
    maxConcurrentChunks: int = Field(
        default=5, 
        ge=1, 
        le=20, 
        description="Maximum number of chunks to process concurrently"
    )
    
    # Chunk mapping
    preserveChunkMetadata: bool = Field(
        default=True, 
        description="Preserve chunk metadata during processing"
    )
    chunkSeparator: str = Field(
        default="\n\n---\n\n", 
        description="Separator between chunks in merged output"
    )

### 3. Usage Examples

#### Basic Usage with Fixed Chunk Mapping

```python
# Use the fixed AI service with proper chunk mapping
aiService = AiService(services)

# Process large documents with proper chunk relationships
documents = [
    ChatDocument(fileId="doc1", filename="large_report.pdf", mimeType="application/pdf", fileSize=500000000),
    ChatDocument(fileId="doc2", filename="massive_data.xlsx", mimeType="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", fileSize=300000000)
]

# Enhanced options for improved processing
options = EnhancedAiCallOptions(
    operationType="analyse_content",
    enableParallelProcessing=True,
    maxConcurrentChunks=10,
    preserveChunkMetadata=True,
    chunkSeparator="\n\n---\n\n"
)

result = await aiService.callAi(
    prompt="Create a comprehensive analysis report combining insights from both documents",
    documents=documents,
    options=options
)

# Result maintains document structure and chunk order
print(f"Generated content: {len(result)} characters")

Advanced Usage with Custom Options

# Advanced processing with custom options for very large document sets
options = EnhancedAiCallOptions(
    operationType="generate_content",
    enableParallelProcessing=True,
    maxConcurrentChunks=15,
    preserveChunkMetadata=True,
    chunkSeparator="\n\n=== CHUNK ===\n\n",
    processingMode="detailed"
)

result = await aiService.callAi(
    prompt="Generate detailed technical documentation with code examples and diagrams",
    documents=largeDocumentSet,  # 200+ documents, 300+ MB each
    options=options
)

Implementation Plan

Phase 1: Fix Chunk Mapping (Critical - 1 week)

Goal: Fix the fundamental chunk-to-AI-result mapping problem

Tasks:

  1. Create ChunkResult data model

    • Add to datamodelExtraction.py
    • Preserve chunk relationships and metadata
    • Include processing statistics
  2. Modify _processDocumentsPerChunk() method

    • Replace simple aiResults list with ChunkResult objects
    • Implement _processChunksWithMapping() method
    • Add proper chunk indexing and document mapping
  3. Implement _mergeChunkResults() method

    • Replace broken merging with proper chunk-aware merging
    • Preserve document structure and chunk order
    • Add proper separators and metadata

Files to modify:

  • gateway/modules/datamodels/datamodelExtraction.py
  • gateway/modules/services/serviceAi/mainServiceAi.py

Phase 2: Parallel Processing (1 week)

Goal: Add parallel processing while preserving chunk relationships

Tasks:

  1. Implement parallel chunk processing

    • Use asyncio.gather() for concurrent processing
    • Maintain chunk mapping in parallel processing
    • Add configurable concurrency limits
  2. Add parallel processing options

    • enableParallelProcessing: bool = True
    • maxConcurrentChunks: int = 5

Files to modify:

  • gateway/modules/services/serviceAi/mainServiceAi.py
  • gateway/modules/datamodels/datamodelAi.py

Phase 3: Enhanced Merging (1 week)

Goal: Improve merging quality and document structure preservation

Tasks:

  1. Enhance merging strategies

    • Add document-aware merging
    • Improve chunk separators and formatting
    • Preserve metadata and document flow
  2. Add merging options

    • chunkSeparator: str = "\n\n---\n\n"
    • preserveChunkMetadata: bool = True

Files to modify:

  • gateway/modules/services/serviceAi/mainServiceAi.py
  • gateway/modules/services/serviceExtraction/mainServiceExtraction.py

Benefits of the Fixed Approach

  1. Fixes Core Problem: Addresses the fundamental chunk mapping issue
  2. Preserves Document Structure: Maintains chunk order and document flow
  3. Improves Performance: Parallel processing with 3-5x speed improvement
  4. Maintains Compatibility: Existing functionality remains unchanged
  5. Simple Implementation: Focused on critical fixes, not over-engineering
  6. Testable: Each phase can be tested independently
  7. Scalable: Can be extended further as needed

Expected Results

Before Fix (Current System)

  • Lost chunk relationships
  • Poor document structure in merged results
  • Lost metadata and context
  • Sequential processing (slow)
  • Simple concatenation merging

After Fix (Enhanced System)

  • Preserved chunk relationships
  • Maintained document structure and flow
  • Preserved metadata and context
  • Parallel processing (3-5x faster)
  • Quality merging with proper separators

Conclusion

The current system has a critical architectural flaw that breaks chunked document processing. This solution:

  1. Fixes the Core Problem - Proper chunk-to-AI-result mapping
  2. Preserves Document Structure - Maintains chunk order and relationships
  3. Adds Performance - Parallel processing while preserving relationships
  4. Maintains Compatibility - Existing functionality unchanged
  5. Simple and Focused - Addresses real problems without over-engineering

This approach provides a practical and maintainable solution that fixes the fundamental issues while adding performance improvements. The focus is on solving the real problem (lost chunk relationships) rather than adding unnecessary complexity.