from typing import Any, Dict, List import logging import os from modules.datamodels.datamodelExtraction import ContentExtracted, ContentPart from modules.shared.configuration import APP_CONFIG from .subUtils import makeId from .subRegistry import ExtractorRegistry, ChunkerRegistry from .merging.mergerText import TextMerger from .merging.mergerTable import TableMerger from .merging.mergerDefault import DefaultMerger from .subMerger import IntelligentTokenAwareMerger logger = logging.getLogger(__name__) def _mergeParts(parts: List[ContentPart], mergeStrategy: Dict[str, Any]) -> List[ContentPart]: """Merge parts based on the provided strategy.""" if not parts or not mergeStrategy: return parts groupBy = mergeStrategy.get("groupBy", "typeGroup") orderBy = mergeStrategy.get("orderBy", "id") # Group parts by the specified field groups = {} for part in parts: key = getattr(part, groupBy, "unknown") if key not in groups: groups[key] = [] groups[key].append(part) # Merge each group merged_parts = [] for group_key, group_parts in groups.items(): if len(group_parts) == 1: merged_parts.extend(group_parts) else: # Sort by orderBy field if specified if orderBy: group_parts.sort(key=lambda p: getattr(p, orderBy, "")) # Use appropriate merger based on type type_group = group_parts[0].typeGroup if group_parts else "unknown" if type_group == "text": merger = TextMerger() elif type_group == "table": merger = TableMerger() else: merger = DefaultMerger() # Merge the group merged = merger.merge(group_parts, mergeStrategy) merged_parts.extend(merged) return merged_parts def runExtraction(extractorRegistry: ExtractorRegistry, chunkerRegistry: ChunkerRegistry, documentBytes: bytes, fileName: str, mimeType: str, options: Dict[str, Any]) -> ContentExtracted: extractor = extractorRegistry.resolve(mimeType, fileName) if extractor is None: # fallback: single binary part part = ContentPart( id=makeId(), parentId=None, label="file", typeGroup="binary", mimeType=mimeType or "application/octet-stream", data="", metadata={"warning": "No extractor registered"} ) return ContentExtracted(id=makeId(), parts=[part]) parts = extractor.extract(documentBytes, {"fileName": fileName, "mimeType": mimeType, "options": options}) # Apply chunking and size limiting parts = poolAndLimit(parts, chunkerRegistry, options) # Optional merge step - but preserve chunks mergeStrategy = options.get("mergeStrategy", {}) if mergeStrategy: # Don't merge chunks - they should stay separate for processing non_chunk_parts = [p for p in parts if not p.metadata.get("chunk", False)] chunk_parts = [p for p in parts if p.metadata.get("chunk", False)] logger.debug(f"runExtraction: Preserving {len(chunk_parts)} chunks from merging") logger.debug(f"runExtraction - non_chunk_parts: {len(non_chunk_parts)}, chunk_parts: {len(chunk_parts)}") # Apply intelligent merging for small text parts if non_chunk_parts: # Count text parts text_parts = [p for p in non_chunk_parts if p.typeGroup == "text"] if len(text_parts) > 5: # If we have many small text parts, merge them logger.info(f"🔧 Merging {len(text_parts)} small text parts for efficiency") non_chunk_parts = _mergeParts(non_chunk_parts, mergeStrategy) # Combine non-chunk parts with chunk parts (chunks stay separate) parts = non_chunk_parts + chunk_parts logger.debug(f"runExtraction: Final parts after merging: {len(parts)} (chunks: {len(chunk_parts)})") logger.debug(f"runExtraction - Final parts: {len(parts)} (chunks: {len(chunk_parts)})") # Timestamp-only extraction debug dumps removed return ContentExtracted(id=makeId(), parts=parts) def poolAndLimit(parts: List[ContentPart], chunkerRegistry: ChunkerRegistry, options: Dict[str, Any]) -> List[ContentPart]: maxSize = int(options.get("maxSize", 0) or 0) chunkAllowed = bool(options.get("chunkAllowed", False)) mergeStrategy = options.get("mergeStrategy", {}) if maxSize <= 0: # Still apply merging if strategy provided if mergeStrategy: return _applyMerging(parts, mergeStrategy) return parts # First, try to fit within size limit current = 0 kept: List[ContentPart] = [] remaining: List[ContentPart] = [] logger.debug(f"Starting poolAndLimit with {len(parts)} parts, maxSize={maxSize}") for i, p in enumerate(parts): size = int(p.metadata.get("size", 0) or 0) # Show first 50 characters of text content for debugging content_preview = p.data[:50].replace('\n', '\\n') if p.data else "" logger.debug(f"Part {i}: {p.typeGroup} - {size} bytes - '{content_preview}...' (current: {current})") if current + size <= maxSize: kept.append(p) current += size logger.debug(f"Part {i} kept (total: {current})") else: remaining.append(p) logger.debug(f"Part {i} moved to remaining") logger.debug(f"Kept: {len(kept)}, Remaining: {len(remaining)}") # If we have remaining parts and chunking is allowed, try chunking if remaining and chunkAllowed: logger.debug(f"=== CHUNKING ACTIVATED ===") logger.debug(f"Remaining parts to chunk: {len(remaining)}") logger.debug(f"Max size limit: {maxSize} bytes") logger.debug(f"Current size used: {current} bytes") logger.debug(f"Chunking {len(remaining)} remaining parts") for p in remaining: if p.typeGroup in ("text", "table", "structure", "image", "container", "binary"): logger.debug(f"Chunking {p.typeGroup} part: {len(p.data)} chars") logger.debug(f"Chunking {p.typeGroup} part with {len(p.data)} chars") chunks = chunkerRegistry.resolve(p.typeGroup).chunk(p, options) logger.debug(f"Created {len(chunks)} chunks") logger.debug(f"Created {len(chunks)} chunks") chunks_added = 0 for ch in chunks: chSize = int(ch.get("size", 0) or 0) # Add all chunks - don't limit by maxSize since they'll be processed separately kept.append(ContentPart( id=makeId(), parentId=p.id, label=f"chunk_{ch.get('order', 0)}", typeGroup=p.typeGroup, mimeType=p.mimeType, data=ch.get("data", ""), metadata={ "size": chSize, "chunk": True, **ch.get("metadata", {}) } )) chunks_added += 1 logger.debug(f"Added chunk {ch.get('order', 0)}: {chSize} bytes") logger.debug(f"Added {chunks_added} chunks from {p.typeGroup} part") # Apply merging strategy if provided, but preserve chunks if mergeStrategy: # Don't merge chunks - they should stay separate for processing non_chunk_parts = [p for p in kept if not p.metadata.get("chunk", False)] chunk_parts = [p for p in kept if p.metadata.get("chunk", False)] logger.debug(f"Preserving {len(chunk_parts)} chunks from merging") # Apply intelligent merging for small text parts if non_chunk_parts: # Count text parts text_parts = [p for p in non_chunk_parts if p.typeGroup == "text"] if len(text_parts) > 5: # If we have many small text parts, merge them logger.info(f"🔧 Merging {len(text_parts)} small text parts for efficiency") non_chunk_parts = _applyMerging(non_chunk_parts, mergeStrategy) # Combine non-chunk parts with chunk parts (chunks stay separate) kept = non_chunk_parts + chunk_parts logger.debug(f"Final parts after merging: {len(kept)} (chunks: {len(chunk_parts)})") logger.debug(f"Final parts after merging: {len(kept)} (chunks: {len(chunk_parts)})") # Re-check size after merging totalSize = sum(int(p.metadata.get("size", 0) or 0) for p in kept) if totalSize > maxSize and mergeStrategy.get("maxSize"): # Apply size limit to merged parts kept = _applySizeLimit(kept, maxSize) logger.debug(f"poolAndLimit returning {len(kept)} parts") return kept def _applyMerging(parts: List[ContentPart], strategy: Dict[str, Any]) -> List[ContentPart]: """Apply merging strategy to parts with intelligent token-aware merging.""" logger.debug(f"_applyMerging called with {len(parts)} parts") # Check if intelligent merging is enabled if strategy.get("useIntelligentMerging", False): model_capabilities = strategy.get("modelCapabilities", {}) subMerger = IntelligentTokenAwareMerger(model_capabilities) # Use intelligent merging for all parts merged = subMerger.merge_chunks_intelligently(parts, strategy.get("prompt", "")) # Calculate and log optimization stats stats = subMerger.calculate_optimization_stats(parts, merged) logger.info(f"🧠 Intelligent merging stats: {stats}") logger.debug(f"Intelligent merging: {stats['original_ai_calls']} → {stats['optimized_ai_calls']} calls ({stats['reduction_percent']}% reduction)") return merged # Fallback to traditional merging textMerger = TextMerger() tableMerger = TableMerger() defaultMerger = DefaultMerger() # Group by typeGroup textParts = [p for p in parts if p.typeGroup == "text"] tableParts = [p for p in parts if p.typeGroup == "table"] structureParts = [p for p in parts if p.typeGroup == "structure"] otherParts = [p for p in parts if p.typeGroup not in ("text", "table", "structure")] logger.debug(f"Grouped - text: {len(textParts)}, table: {len(tableParts)}, structure: {len(structureParts)}, other: {len(otherParts)}") merged: List[ContentPart] = [] if textParts: textMerged = textMerger.merge(textParts, strategy) logger.debug(f"TextMerger merged {len(textParts)} parts into {len(textMerged)} parts") merged.extend(textMerged) if tableParts: tableMerged = tableMerger.merge(tableParts, strategy) logger.debug(f"TableMerger merged {len(tableParts)} parts into {len(tableMerged)} parts") merged.extend(tableMerged) if structureParts: # For now, treat structure like text structureMerged = textMerger.merge(structureParts, strategy) logger.debug(f"StructureMerger merged {len(structureParts)} parts into {len(structureMerged)} parts") merged.extend(structureMerged) if otherParts: otherMerged = defaultMerger.merge(otherParts, strategy) logger.debug(f"DefaultMerger merged {len(otherParts)} parts into {len(otherMerged)} parts") merged.extend(otherMerged) logger.debug(f"_applyMerging returning {len(merged)} parts") return merged def _applySizeLimit(parts: List[ContentPart], maxSize: int) -> List[ContentPart]: """Apply size limit by prioritizing parts and truncating if necessary.""" # Sort by priority: text first, then others priority_order = {"text": 0, "table": 1, "structure": 2, "image": 3, "binary": 4, "metadata": 5, "container": 6} sorted_parts = sorted(parts, key=lambda p: priority_order.get(p.typeGroup, 99)) kept: List[ContentPart] = [] current_size = 0 for part in sorted_parts: part_size = int(part.metadata.get("size", 0) or 0) if current_size + part_size <= maxSize: kept.append(part) current_size += part_size else: # Try to truncate text parts if part.typeGroup == "text" and part_size > 0: remaining_size = maxSize - current_size if remaining_size > 1000: # Only truncate if we have meaningful space truncated_data = part.data[:remaining_size * 4] # Rough character estimate truncated_part = ContentPart( id=makeId(), parentId=part.parentId, label=f"{part.label}_truncated", typeGroup=part.typeGroup, mimeType=part.mimeType, data=truncated_data, metadata={**part.metadata, "size": len(truncated_data.encode('utf-8')), "truncated": True} ) kept.append(truncated_part) break return kept def applyAiIfRequested(extracted: ContentExtracted, options: Dict[str, Any]) -> ContentExtracted: """ Apply AI processing if requested in options. This is a placeholder for actual AI integration. """ prompt = options.get("prompt") operationType = options.get("operationType", "general") if not prompt: return extracted # Placeholder AI processing based on operationType if operationType == "analyse_content": # Add analysis metadata to parts for part in extracted.parts: if part.typeGroup in ("text", "table", "structure"): part.metadata["ai_processed"] = True part.metadata["operation_type"] = operationType elif operationType == "generate_plan": # Add plan generation metadata for part in extracted.parts: if part.typeGroup == "text": part.metadata["ai_processed"] = True part.metadata["operation_type"] = operationType elif operationType == "generate_content": # Add content generation metadata for part in extracted.parts: part.metadata["ai_processed"] = True part.metadata["operation_type"] = operationType return extracted