# Copyright (c) 2025 Patrick Motsch # All rights reserved. from typing import Any, Dict, List, Optional, TYPE_CHECKING import logging from modules.datamodels.datamodelExtraction import ContentPart if TYPE_CHECKING: from modules.datamodels.datamodelUdm import UdmDocument logger = logging.getLogger(__name__) _extractorRegistrySingleton: Optional["ExtractorRegistry"] = None def getExtractorRegistry() -> "ExtractorRegistry": """Shared ExtractorRegistry instance (avoid repeated auto-discovery e.g. per file in ZIP).""" global _extractorRegistrySingleton if _extractorRegistrySingleton is None: _extractorRegistrySingleton = ExtractorRegistry() return _extractorRegistrySingleton class Extractor: """ Base class for all document extractors. Each extractor should implement: - detect(): Check if this extractor can handle the given file - extract(): Extract content from the file - getSupportedExtensions(): Return supported file extensions - getSupportedMimeTypes(): Return supported MIME types """ def detect(self, fileName: str, mimeType: str, headBytes: bytes) -> bool: """Check if this extractor can handle the given file.""" return False def extract(self, fileBytes: bytes, context: Dict[str, Any]) -> list[ContentPart]: """Extract content from the file bytes.""" raise NotImplementedError def extractToUdm( self, fileBytes: bytes, context: Dict[str, Any], precomputedParts: Optional[List[ContentPart]] = None, ) -> "UdmDocument": """Build UDM from extracted parts (default: heuristic grouping). Override for format-specific trees.""" from modules.datamodels.datamodelUdm import _contentPartsToUdm, _mimeToUdmSourceType from modules.datamodels.datamodelExtraction import ContentExtracted from .subUtils import makeId parts = precomputedParts if precomputedParts is not None else self.extract(fileBytes, context) eid = context.get("extractionId") or makeId() extracted = ContentExtracted(id=eid, parts=parts) src = _mimeToUdmSourceType(context.get("mimeType", ""), context.get("fileName", "")) return _contentPartsToUdm(extracted, src, context.get("fileName", "")) def getSupportedExtensions(self) -> list[str]: """Return list of supported file extensions (including dots).""" return [] def getSupportedMimeTypes(self) -> list[str]: """Return list of supported MIME types.""" return [] class Chunker: def chunk(self, part: ContentPart, options: Dict[str, Any]) -> list[Dict[str, Any]]: return [] class ExtractorRegistry: def __init__(self): self._map: Dict[str, Extractor] = {} self._fallback: Optional[Extractor] = None self._auto_discover_extractors() def _auto_discover_extractors(self): """Auto-discover and register all extractors from the extractors directory.""" try: import os import importlib from pathlib import Path # Get the extractors directory current_dir = Path(__file__).parent extractors_dir = current_dir / "extractors" if not extractors_dir.exists(): logger.error(f"Extractors directory not found: {extractors_dir}") return # Import all extractor modules extractor_modules = [] for file_path in extractors_dir.glob("extractor*.py"): if file_path.name == "__init__.py": continue module_name = file_path.stem try: # Import the module module = importlib.import_module(f".{module_name}", package="modules.serviceCenter.services.serviceExtraction.extractors") # Find all extractor classes in the module for attr_name in dir(module): attr = getattr(module, attr_name) if (isinstance(attr, type) and issubclass(attr, Extractor) and attr != Extractor and not attr_name.startswith('_')): # Create instance and auto-register extractor_instance = attr() self._auto_register_extractor(extractor_instance) extractor_modules.append(attr_name) except Exception as e: logger.warning(f"Failed to import {module_name}: {str(e)}") continue # Set fallback extractor try: from .extractors.extractorBinary import BinaryExtractor self.setFallback(BinaryExtractor()) except Exception as e: logger.warning(f"Failed to set fallback extractor: {str(e)}") logger.info(f"ExtractorRegistry: Auto-discovered and registered {len(extractor_modules)} extractor classes: {', '.join(extractor_modules)}") logger.info(f"ExtractorRegistry: Total registered formats: {len(self._map)}") except Exception as e: logger.error(f"ExtractorRegistry: Failed to auto-discover extractors: {str(e)}") import traceback traceback.print_exc() def _auto_register_extractor(self, extractor: Extractor): """Auto-register an extractor based on its declared supported formats.""" try: # Register MIME types mime_types = extractor.getSupportedMimeTypes() for mime_type in mime_types: self.register(mime_type, extractor) # Register file extensions extensions = extractor.getSupportedExtensions() for ext in extensions: # Remove leading dot for registry key ext_key = ext.lstrip('.') self.register(ext_key, extractor) except Exception as e: logger.error(f"Failed to auto-register {extractor.__class__.__name__}: {str(e)}") def register(self, key: str, extractor: Extractor): self._map[key] = extractor def setFallback(self, extractor: Extractor): self._fallback = extractor def resolve(self, mimeType: str, fileName: str) -> Optional[Extractor]: if mimeType in self._map: return self._map[mimeType] # simple extension fallback if "." in fileName: ext = fileName.lower().rsplit(".", 1)[-1] if ext in self._map: return self._map[ext] return self._fallback def getExtensionToMimeMap(self) -> Dict[str, str]: """Build a map from file extension (without dot) to primary MIME type. Iterates all registered extractors and pairs each declared extension with the first MIME type declared by the same extractor. Specialized extractors (Pdf, Docx, …) are processed first so their mapping wins over broad extractors like TextExtractor. """ extMap: Dict[str, str] = {} seen: set = set() # Collect unique extractor instances (same instance registered under many keys) extractors: list[Extractor] = [] for ext in self._map.values(): eid = id(ext) if eid not in seen: seen.add(eid) extractors.append(ext) # Specialized (few extensions) first so they win over broad ones extractors.sort(key=lambda e: len(e.getSupportedExtensions())) for ext in extractors: extensions = ext.getSupportedExtensions() mimeTypes = ext.getSupportedMimeTypes() if not extensions or not mimeTypes: continue primaryMime = mimeTypes[0] for rawExt in extensions: key = rawExt.lstrip('.').lower() if key not in extMap: extMap[key] = primaryMime return extMap def getAllSupportedFormats(self) -> Dict[str, Dict[str, list[str]]]: """ Get all supported formats from all registered extractors. Returns: Dictionary with format information: { "extensions": { "extractor_name": [".ext1", ".ext2", ...] }, "mime_types": { "extractor_name": ["mime/type1", "mime/type2", ...] } } """ formats = {"extensions": {}, "mime_types": {}} # Get formats from registered extractors for key, extractor in self._map.items(): if hasattr(extractor, 'getSupportedExtensions'): extensions = extractor.getSupportedExtensions() if extensions: formats["extensions"][key] = extensions if hasattr(extractor, 'getSupportedMimeTypes'): mime_types = extractor.getSupportedMimeTypes() if mime_types: formats["mime_types"][key] = mime_types # Add fallback extractor info if self._fallback and hasattr(self._fallback, 'getSupportedExtensions'): formats["extensions"]["fallback"] = self._fallback.getSupportedExtensions() if self._fallback and hasattr(self._fallback, 'getSupportedMimeTypes'): formats["mime_types"]["fallback"] = self._fallback.getSupportedMimeTypes() return formats class ChunkerRegistry: def __init__(self): self._map: Dict[str, Chunker] = {} self._noop = Chunker() # Register default chunkers try: from .chunking.chunkerText import TextChunker from .chunking.chunkerTable import TableChunker from .chunking.chunkerStructure import StructureChunker from .chunking.chunkerImage import ImageChunker self.register("text", TextChunker()) self.register("table", TableChunker()) self.register("structure", StructureChunker()) self.register("image", ImageChunker()) # Use text chunker for container, binary, and media stream content self.register("container", TextChunker()) self.register("binary", TextChunker()) self.register("audiostream", TextChunker()) self.register("videostream", TextChunker()) except Exception as e: logger.error(f"ChunkerRegistry: Failed to register chunkers: {str(e)}") import traceback traceback.print_exc() def register(self, typeGroup: str, chunker: Chunker): self._map[typeGroup] = chunker def resolve(self, typeGroup: str) -> Chunker: return self._map.get(typeGroup, self._noop)