from typing import Any, Dict, List import logging import os from modules.datamodels.datamodelExtraction import ContentExtracted, ContentPart from .subUtils import makeId from .subRegistry import ExtractorRegistry, ChunkerRegistry from .merging.text_merger import TextMerger from .merging.table_merger import TableMerger from .merging.default_merger import DefaultMerger logger = logging.getLogger(__name__) def _mergeParts(parts: List[ContentPart], mergeStrategy: Dict[str, Any]) -> List[ContentPart]: """Merge parts based on the provided strategy.""" if not parts or not mergeStrategy: return parts groupBy = mergeStrategy.get("groupBy", "typeGroup") orderBy = mergeStrategy.get("orderBy", "id") # Group parts by the specified field groups = {} for part in parts: key = getattr(part, groupBy, "unknown") if key not in groups: groups[key] = [] groups[key].append(part) # Merge each group merged_parts = [] for group_key, group_parts in groups.items(): if len(group_parts) == 1: merged_parts.extend(group_parts) else: # Sort by orderBy field if specified if orderBy: group_parts.sort(key=lambda p: getattr(p, orderBy, "")) # Use appropriate merger based on type type_group = group_parts[0].typeGroup if group_parts else "unknown" if type_group == "text": merger = TextMerger() elif type_group == "table": merger = TableMerger() else: merger = DefaultMerger() # Merge the group merged = merger.merge(group_parts, mergeStrategy) merged_parts.extend(merged) return merged_parts def runExtraction(extractorRegistry: ExtractorRegistry, chunkerRegistry: ChunkerRegistry, documentBytes: bytes, fileName: str, mimeType: str, options: Dict[str, Any]) -> ContentExtracted: extractor = extractorRegistry.resolve(mimeType, fileName) if extractor is None: # fallback: single binary part part = ContentPart( id=makeId(), parentId=None, label="file", typeGroup="binary", mimeType=mimeType or "application/octet-stream", data="", metadata={"warning": "No extractor registered"} ) return ContentExtracted(id=makeId(), parts=[part]) parts = extractor.extract(documentBytes, {"fileName": fileName, "mimeType": mimeType, "options": options}) # Apply chunking and size limiting parts = poolAndLimit(parts, chunkerRegistry, options) # Optional merge step - but preserve chunks mergeStrategy = options.get("mergeStrategy", {}) if mergeStrategy: # Don't merge chunks - they should stay separate for processing non_chunk_parts = [p for p in parts if not p.metadata.get("chunk", False)] chunk_parts = [p for p in parts if p.metadata.get("chunk", False)] logger.debug(f"runExtraction: Preserving {len(chunk_parts)} chunks from merging") if non_chunk_parts: non_chunk_parts = _mergeParts(non_chunk_parts, mergeStrategy) # Combine non-chunk parts with chunk parts (chunks stay separate) parts = non_chunk_parts + chunk_parts logger.debug(f"runExtraction: Final parts after merging: {len(parts)} (chunks: {len(chunk_parts)})") # DEBUG: dump parts and chunks to files TODO TO REMOVE try: base_dir = "./test-chat/ai" os.makedirs(base_dir, exist_ok=True) # Generate timestamp for consistent naming from datetime import datetime, UTC ts = datetime.now(UTC).strftime('%Y%m%d-%H%M%S-%f')[:-3] # Write a summary file summary_lines: List[str] = [f"fileName: {fileName}", f"mimeType: {mimeType}", f"totalParts: {len(parts)}"] text_index = 0 for idx, part in enumerate(parts): is_texty = part.typeGroup in ("text", "table", "structure") size = int(part.metadata.get("size", 0) or 0) is_chunk = bool(part.metadata.get("chunk", False)) summary_lines.append( f"part[{idx}]: typeGroup={part.typeGroup}, label={part.label}, size={size}, chunk={is_chunk}" ) if is_texty and getattr(part, "data", None): text_index += 1 fname = f"{ts}_extract_{fileName}_part_{idx:03d}_{'chunk' if is_chunk else 'full'}_{text_index:03d}.txt" fpath = os.path.join(base_dir, fname) with open(fpath, "w", encoding="utf-8") as f: f.write(f"# typeGroup: {part.typeGroup}\n# label: {part.label}\n# chunk: {is_chunk}\n# size: {size}\n\n") f.write(str(part.data)) # Write summary file summary_fname = f"{ts}_extract_{fileName}_summary.txt" summary_fpath = os.path.join(base_dir, summary_fname) with open(summary_fpath, "w", encoding="utf-8") as f: f.write("\n".join(summary_lines)) except Exception as _e: logger.debug(f"Debug dump skipped: {_e}") return ContentExtracted(id=makeId(), parts=parts) def poolAndLimit(parts: List[ContentPart], chunkerRegistry: ChunkerRegistry, options: Dict[str, Any]) -> List[ContentPart]: maxSize = int(options.get("maxSize", 0) or 0) chunkAllowed = bool(options.get("chunkAllowed", False)) mergeStrategy = options.get("mergeStrategy", {}) if maxSize <= 0: # Still apply merging if strategy provided if mergeStrategy: return _applyMerging(parts, mergeStrategy) return parts # First, try to fit within size limit current = 0 kept: List[ContentPart] = [] remaining: List[ContentPart] = [] for p in parts: size = int(p.metadata.get("size", 0) or 0) if current + size <= maxSize: kept.append(p) current += size else: remaining.append(p) # If we have remaining parts and chunking is allowed, try chunking if remaining and chunkAllowed: logger.debug(f"=== CHUNKING ACTIVATED ===") logger.debug(f"Remaining parts to chunk: {len(remaining)}") logger.debug(f"Max size limit: {maxSize} bytes") logger.debug(f"Current size used: {current} bytes") for p in remaining: if p.typeGroup in ("text", "table", "structure", "image"): logger.debug(f"Chunking {p.typeGroup} part: {len(p.data)} chars") chunks = chunkerRegistry.resolve(p.typeGroup).chunk(p, options) logger.debug(f"Created {len(chunks)} chunks") chunks_added = 0 for ch in chunks: chSize = int(ch.get("size", 0) or 0) # Add all chunks - don't limit by maxSize since they'll be processed separately kept.append(ContentPart( id=makeId(), parentId=p.id, label=f"chunk_{ch.get('order', 0)}", typeGroup=p.typeGroup, mimeType=p.mimeType, data=ch.get("data", ""), metadata={ "size": chSize, "chunk": True, **ch.get("metadata", {}) } )) chunks_added += 1 logger.debug(f"Added chunk {ch.get('order', 0)}: {chSize} bytes") logger.debug(f"Added {chunks_added} chunks from {p.typeGroup} part") # Apply merging strategy if provided, but preserve chunks if mergeStrategy: # Don't merge chunks - they should stay separate for processing non_chunk_parts = [p for p in kept if not p.metadata.get("chunk", False)] chunk_parts = [p for p in kept if p.metadata.get("chunk", False)] logger.debug(f"Preserving {len(chunk_parts)} chunks from merging") if non_chunk_parts: non_chunk_parts = _applyMerging(non_chunk_parts, mergeStrategy) # Combine non-chunk parts with chunk parts (chunks stay separate) kept = non_chunk_parts + chunk_parts logger.debug(f"Final parts after merging: {len(kept)} (chunks: {len(chunk_parts)})") # Re-check size after merging totalSize = sum(int(p.metadata.get("size", 0) or 0) for p in kept) if totalSize > maxSize and mergeStrategy.get("maxSize"): # Apply size limit to merged parts kept = _applySizeLimit(kept, maxSize) return kept def _applyMerging(parts: List[ContentPart], strategy: Dict[str, Any]) -> List[ContentPart]: """Apply merging strategy to parts.""" textMerger = TextMerger() tableMerger = TableMerger() defaultMerger = DefaultMerger() # Group by typeGroup textParts = [p for p in parts if p.typeGroup == "text"] tableParts = [p for p in parts if p.typeGroup == "table"] structureParts = [p for p in parts if p.typeGroup == "structure"] otherParts = [p for p in parts if p.typeGroup not in ("text", "table", "structure")] merged: List[ContentPart] = [] if textParts: merged.extend(textMerger.merge(textParts, strategy)) if tableParts: merged.extend(tableMerger.merge(tableParts, strategy)) if structureParts: # For now, treat structure like text merged.extend(textMerger.merge(structureParts, strategy)) if otherParts: merged.extend(defaultMerger.merge(otherParts, strategy)) return merged def _applySizeLimit(parts: List[ContentPart], maxSize: int) -> List[ContentPart]: """Apply size limit by prioritizing parts and truncating if necessary.""" # Sort by priority: text first, then others priority_order = {"text": 0, "table": 1, "structure": 2, "image": 3, "binary": 4, "metadata": 5, "container": 6} sorted_parts = sorted(parts, key=lambda p: priority_order.get(p.typeGroup, 99)) kept: List[ContentPart] = [] current_size = 0 for part in sorted_parts: part_size = int(part.metadata.get("size", 0) or 0) if current_size + part_size <= maxSize: kept.append(part) current_size += part_size else: # Try to truncate text parts if part.typeGroup == "text" and part_size > 0: remaining_size = maxSize - current_size if remaining_size > 1000: # Only truncate if we have meaningful space truncated_data = part.data[:remaining_size * 4] # Rough character estimate truncated_part = ContentPart( id=makeId(), parentId=part.parentId, label=f"{part.label}_truncated", typeGroup=part.typeGroup, mimeType=part.mimeType, data=truncated_data, metadata={**part.metadata, "size": len(truncated_data.encode('utf-8')), "truncated": True} ) kept.append(truncated_part) break return kept def applyAiIfRequested(extracted: ContentExtracted, options: Dict[str, Any]) -> ContentExtracted: """ Apply AI processing if requested in options. This is a placeholder for actual AI integration. """ prompt = options.get("prompt") operationType = options.get("operationType", "general") if not prompt: return extracted # Placeholder AI processing based on operationType if operationType == "analyse_content": # Add analysis metadata to parts for part in extracted.parts: if part.typeGroup in ("text", "table", "structure"): part.metadata["ai_processed"] = True part.metadata["operation_type"] = operationType elif operationType == "generate_plan": # Add plan generation metadata for part in extracted.parts: if part.typeGroup == "text": part.metadata["ai_processed"] = True part.metadata["operation_type"] = operationType elif operationType == "generate_content": # Add content generation metadata for part in extracted.parts: part.metadata["ai_processed"] = True part.metadata["operation_type"] = operationType return extracted