gateway/modules/services/serviceExtraction/subPipeline.py
2025-12-15 21:55:26 +01:00

48 lines
1.8 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
from typing import List
import logging
from modules.datamodels.datamodelExtraction import ContentExtracted, ContentPart, ExtractionOptions, MergeStrategy
from .subUtils import makeId
from .subRegistry import ExtractorRegistry, ChunkerRegistry
logger = logging.getLogger(__name__)
# REMOVED: _mergeParts function - unused, functionality replaced by applyMerging in interfaceAiObjects.py
def runExtraction(extractorRegistry: ExtractorRegistry, chunkerRegistry: ChunkerRegistry, documentBytes: bytes, fileName: str, mimeType: str, options: ExtractionOptions) -> ContentExtracted:
extractor = extractorRegistry.resolve(mimeType, fileName)
if extractor is None:
# fallback: single binary part
part = ContentPart(
id=makeId(),
parentId=None,
label="file",
typeGroup="binary",
mimeType=mimeType or "application/octet-stream",
data="",
metadata={"warning": "No extractor registered"}
)
return ContentExtracted(id=makeId(), parts=[part])
parts = extractor.extract(documentBytes, {"fileName": fileName, "mimeType": mimeType})
# REMOVED: poolAndLimit(parts, chunkerRegistry, options)
# REMOVED: Chunking logic - now handled in AI call phase
# Apply merging strategy if provided (preserve existing logic)
if options.mergeStrategy:
# Use module-level applyMerging function
from .mainServiceExtraction import applyMerging
parts = applyMerging(parts, options.mergeStrategy)
return ContentExtracted(id=makeId(), parts=parts)
# REMOVED: poolAndLimit function - chunking now handled in AI call phase
# REMOVED: applyMerging function - moved to interfaceAiObjects.py for proper interface-level access