from typing import Any, Dict, List, Optional, Union import uuid import logging import time from .subRegistry import ExtractorRegistry, ChunkerRegistry from .subPipeline import runExtraction, poolAndLimit, applyAiIfRequested from modules.datamodels.datamodelExtraction import ContentExtracted, ContentPart, MergeStrategy from modules.datamodels.datamodelChat import ChatDocument from modules.datamodels.datamodelAi import AiCallResponse from modules.interfaces.interfaceAiObjects import aiModels logger = logging.getLogger(__name__) class ExtractionService: def __init__(self, services: Optional[Any] = None): self.services = services self._extractorRegistry = ExtractorRegistry() self._chunkerRegistry = ChunkerRegistry() def extractContent(self, documents: List[ChatDocument], options: Dict[str, Any]) -> List[ContentExtracted]: """ Extract content from a list of ChatDocument objects. Args: documents: List of ChatDocument objects to extract content from options: Extraction options including maxSize, chunkAllowed, mergeStrategy, etc. Returns: List of ContentExtracted objects, one per input document """ results: List[ContentExtracted] = [] # Lazy import to avoid circular deps and heavy init at module import from modules.interfaces.interfaceDbComponentObjects import getInterface dbInterface = getInterface() for i, doc in enumerate(documents): logger.info(f"=== DOCUMENT {i}: {doc.fileName} ===") logger.info(f"Initial MIME type: {doc.mimeType}") # Start timing for this document startTime = time.time() # Resolve raw bytes for this document using interface documentBytes = dbInterface.getFileData(doc.fileId) if not documentBytes: raise ValueError(f"No file data found for fileId={doc.fileId}") # Convert ChatDocument to the format expected by runExtraction documentData = { "id": doc.id, "bytes": documentBytes, "fileName": doc.fileName, "mimeType": doc.mimeType } ec = runExtraction( extractorRegistry=self._extractorRegistry, chunkerRegistry=self._chunkerRegistry, documentBytes=documentData["bytes"], fileName=documentData["fileName"], mimeType=documentData["mimeType"], options=options ) # Log content parts metadata logger.debug(f"Content parts: {len(ec.parts)}") for j, part in enumerate(ec.parts): logger.debug(f" Part {j}: {part.typeGroup} ({part.mimeType}) - {len(part.data) if part.data else 0} chars") if part.metadata: logger.debug(f" Metadata: {part.metadata}") # Attach document id and MIME type to parts if missing for p in ec.parts: if "documentId" not in p.metadata: p.metadata["documentId"] = documentData["id"] or str(uuid.uuid4()) if "documentMimeType" not in p.metadata: p.metadata["documentMimeType"] = documentData["mimeType"] # Log chunking information chunked_parts = [p for p in ec.parts if p.metadata.get("chunk", False)] if chunked_parts: logger.debug(f"=== CHUNKING RESULTS ===") logger.debug(f"Total parts: {len(ec.parts)}") logger.debug(f"Chunked parts: {len(chunked_parts)}") for chunk in chunked_parts: logger.debug(f" Chunk: {chunk.label} - {len(chunk.data)} chars (parent: {chunk.parentId})") else: logger.debug(f"No chunking needed - {len(ec.parts)} parts fit within size limits") ec = applyAiIfRequested(ec, options) # Calculate timing and emit stats endTime = time.time() processingTime = endTime - startTime bytesSent = len(documentBytes) bytesReceived = sum(len(part.data) if part.data else 0 for part in ec.parts) # Emit stats for extraction operation # Use internal extraction model for pricing modelName = "internal_extraction" priceUsd = aiModels[modelName]["calculatePriceUsd"](processingTime, bytesSent, bytesReceived) # Create AiCallResponse with real calculation aiResponse = AiCallResponse( content="", # No content for extraction stats needed modelName=modelName, priceUsd=priceUsd, processingTime=processingTime, bytesSent=bytesSent, bytesReceived=bytesReceived, errorCount=0 ) self.services.workflow.storeWorkflowStat( self.services.currentWorkflow, aiResponse, f"extraction.process.{doc.mimeType}" ) results.append(ec) return results def mergeAiResults( self, extractedContent: List[ContentExtracted], aiResults: List[str], strategy: MergeStrategy ) -> ContentExtracted: """ Merge AI results from chunked content back into a single ContentExtracted. Args: extractedContent: List of ContentExtracted objects that were processed aiResults: List of AI response strings, one per chunk strategy: Merge strategy configuration (dict or MergeStrategy object) Returns: Single ContentExtracted with merged AI results """ logger.debug(f"=== MERGING AI RESULTS ===") logger.debug(f"Extracted content: {len(extractedContent)} documents") logger.debug(f"AI results: {len(aiResults)} responses") logger.debug(f"Merge strategy: {strategy.mergeType}") mergeStrategy = strategy # Collect all parts from all extracted content allParts: List[ContentPart] = [] for ec in extractedContent: allParts.extend(ec.parts) logger.debug(f"Total original parts: {len(allParts)}") # Create AI result parts aiResultParts: List[ContentPart] = [] for i, aiResult in enumerate(aiResults): aiPart = ContentPart( id=f"ai_result_{i}", parentId=None, # Will be set based on strategy label="ai_result", typeGroup="text", mimeType="text/plain", data=aiResult, metadata={ "aiResult": True, "order": i, "size": len(aiResult.encode('utf-8')) } ) aiResultParts.append(aiPart) logger.debug(f"Created {len(aiResultParts)} AI result parts") # Apply merging strategy if mergeStrategy.mergeType == "concatenate": mergedParts = self._mergeConcatenate(allParts, aiResultParts, mergeStrategy) elif mergeStrategy.mergeType == "hierarchical": mergedParts = self._mergeHierarchical(allParts, aiResultParts, mergeStrategy) elif mergeStrategy.mergeType == "intelligent": mergedParts = self._mergeIntelligent(allParts, aiResultParts, mergeStrategy) else: # Default to concatenate mergedParts = self._mergeConcatenate(allParts, aiResultParts, mergeStrategy) # Create final ContentExtracted mergedContent = ContentExtracted( id=f"merged_{uuid.uuid4()}", parts=mergedParts ) logger.debug(f"=== MERGE COMPLETED ===") logger.debug(f"Final merged parts: {len(mergedParts)}") logger.debug(f"Merged content ID: {mergedContent.id}") return mergedContent def _mergeConcatenate( self, originalParts: List[ContentPart], aiResultParts: List[ContentPart], strategy: MergeStrategy ) -> List[ContentPart]: """Merge parts by simple concatenation.""" mergedParts = [] # Add original parts (filtered if needed) for part in originalParts: if strategy.preserveChunks or not part.metadata.get("chunk", False): mergedParts.append(part) # Add AI results if aiResultParts: # Group AI results by parentId if available aiResultsByParent = {} for aiPart in aiResultParts: parentId = aiPart.parentId or "root" if parentId not in aiResultsByParent: aiResultsByParent[parentId] = [] aiResultsByParent[parentId].append(aiPart) # Merge AI results for each parent for parentId, aiParts in aiResultsByParent.items(): if len(aiParts) == 1: mergedParts.append(aiParts[0]) else: # Concatenate multiple AI results for same parent combinedData = strategy.chunkSeparator.join([p.data for p in aiParts]) combinedPart = ContentPart( id=f"merged_ai_{parentId}", parentId=parentId if parentId != "root" else None, label="merged_ai_result", typeGroup="text", mimeType="text/plain", data=combinedData, metadata={ "aiResult": True, "merged": True, "sourceCount": len(aiParts), "size": len(combinedData.encode('utf-8')) } ) mergedParts.append(combinedPart) return mergedParts def _mergeHierarchical( self, originalParts: List[ContentPart], aiResultParts: List[ContentPart], strategy: MergeStrategy ) -> List[ContentPart]: """Merge parts hierarchically based on parentId relationships.""" # Group parts by parentId partsByParent = {} for part in originalParts: parentId = part.parentId or "root" if parentId not in partsByParent: partsByParent[parentId] = [] partsByParent[parentId].append(part) # Group AI results by parentId aiResultsByParent = {} for aiPart in aiResultParts: parentId = aiPart.parentId or "root" if parentId not in aiResultsByParent: aiResultsByParent[parentId] = [] aiResultsByParent[parentId].append(aiPart) mergedParts = [] # Process each parent group for parentId in set(list(partsByParent.keys()) + list(aiResultsByParent.keys())): originalGroup = partsByParent.get(parentId, []) aiGroup = aiResultsByParent.get(parentId, []) # Add original parts mergedParts.extend(originalGroup) # Add AI results for this parent if aiGroup: if len(aiGroup) == 1: mergedParts.append(aiGroup[0]) else: # Merge multiple AI results combinedData = strategy.chunkSeparator.join([p.data for p in aiGroup]) combinedPart = ContentPart( id=f"hierarchical_ai_{parentId}", parentId=parentId if parentId != "root" else None, label="hierarchical_ai_result", typeGroup="text", mimeType="text/plain", data=combinedData, metadata={ "aiResult": True, "hierarchical": True, "sourceCount": len(aiGroup), "size": len(combinedData.encode('utf-8')) } ) mergedParts.append(combinedPart) return mergedParts def _mergeIntelligent( self, originalParts: List[ContentPart], aiResultParts: List[ContentPart], strategy: MergeStrategy ) -> List[ContentPart]: """Merge parts using intelligent strategies based on content type.""" mergedParts = [] # Group by typeGroup for intelligent merging partsByType = {} for part in originalParts: typeGroup = part.typeGroup if typeGroup not in partsByType: partsByType[typeGroup] = [] partsByType[typeGroup].append(part) # Process each type group for typeGroup, parts in partsByType.items(): if typeGroup == "text": mergedParts.extend(self._mergeTextIntelligent(parts, aiResultParts, strategy)) elif typeGroup == "table": mergedParts.extend(self._mergeTableIntelligent(parts, aiResultParts, strategy)) elif typeGroup == "structure": mergedParts.extend(self._mergeStructureIntelligent(parts, aiResultParts, strategy)) else: # Default handling for other types mergedParts.extend(parts) # Add any remaining AI results that weren't merged for aiPart in aiResultParts: if not any(p.id == aiPart.id for p in mergedParts): mergedParts.append(aiPart) return mergedParts def _mergeTextIntelligent( self, textParts: List[ContentPart], aiResultParts: List[ContentPart], strategy: MergeStrategy ) -> List[ContentPart]: """Intelligent merging for text content.""" # For now, use concatenate strategy # This could be enhanced with semantic analysis, summarization, etc. return self._mergeConcatenate(textParts, aiResultParts, strategy) def _mergeTableIntelligent( self, tableParts: List[ContentPart], aiResultParts: List[ContentPart], strategy: MergeStrategy ) -> List[ContentPart]: """Intelligent merging for table content.""" # For now, use concatenate strategy # This could be enhanced with table merging logic return self._mergeConcatenate(tableParts, aiResultParts, strategy) def _mergeStructureIntelligent( self, structureParts: List[ContentPart], aiResultParts: List[ContentPart], strategy: MergeStrategy ) -> List[ContentPart]: """Intelligent merging for structured content.""" # For now, use concatenate strategy # This could be enhanced with structure-aware merging return self._mergeConcatenate(structureParts, aiResultParts, strategy)