gateway/modules/services/serviceExtraction/subPipeline.py

186 lines
7.2 KiB
Python

from typing import Any, Dict, List
from modules.datamodels.datamodelExtraction import ExtractedContent, ContentPart
from .utils import makeId
from .subRegistry import ExtractorRegistry, ChunkerRegistry
from .merging.text_merger import TextMerger
from .merging.table_merger import TableMerger
from .merging.default_merger import DefaultMerger
def runExtraction(extractorRegistry: ExtractorRegistry, chunkerRegistry: ChunkerRegistry, documentBytes: bytes, fileName: str, mimeType: str, options: Dict[str, Any]) -> ExtractedContent:
extractor = extractorRegistry.resolve(mimeType, fileName)
if extractor is None:
# fallback: single binary part
part = ContentPart(
id=makeId(),
parentId=None,
label="file",
typeGroup="binary",
mimeType=mimeType or "application/octet-stream",
data="",
metadata={"warning": "No extractor registered"}
)
return ExtractedContent(id=makeId(), parts=[part])
parts = extractor.extract(documentBytes, {"fileName": fileName, "mimeType": mimeType, "options": options})
# Optional merge step
mergeStrategy = options.get("mergeStrategy", {})
if mergeStrategy:
parts = _mergeParts(parts, mergeStrategy)
return ExtractedContent(id=makeId(), parts=parts)
def poolAndLimit(parts: List[ContentPart], chunkerRegistry: ChunkerRegistry, options: Dict[str, Any]) -> List[ContentPart]:
maxSize = int(options.get("maxSize", 0) or 0)
chunkAllowed = bool(options.get("chunkAllowed", False))
mergeStrategy = options.get("mergeStrategy", {})
if maxSize <= 0:
# Still apply merging if strategy provided
if mergeStrategy:
return _applyMerging(parts, mergeStrategy)
return parts
# First, try to fit within size limit
current = 0
kept: List[ContentPart] = []
remaining: List[ContentPart] = []
for p in parts:
size = int(p.metadata.get("size", 0) or 0)
if current + size <= maxSize:
kept.append(p)
current += size
else:
remaining.append(p)
# If we have remaining parts and chunking is allowed, try chunking
if remaining and chunkAllowed:
for p in remaining:
if p.typeGroup in ("text", "table", "structure"):
chunks = chunkerRegistry.resolve(p.typeGroup).chunk(p, options)
for ch in chunks:
chSize = int(ch.get("size", 0) or 0)
if current + chSize <= maxSize:
kept.append(ContentPart(
id=makeId(),
parentId=p.id,
label=f"chunk_{ch.get('order', 0)}",
typeGroup=p.typeGroup,
mimeType=p.mimeType,
data=ch.get("data", ""),
metadata={"size": chSize, "chunk": True}
))
current += chSize
else:
break
# Apply merging strategy if provided
if mergeStrategy:
kept = _applyMerging(kept, mergeStrategy)
# Re-check size after merging
totalSize = sum(int(p.metadata.get("size", 0) or 0) for p in kept)
if totalSize > maxSize and mergeStrategy.get("maxSize"):
# Apply size limit to merged parts
kept = _applySizeLimit(kept, maxSize)
return kept
def _applyMerging(parts: List[ContentPart], strategy: Dict[str, Any]) -> List[ContentPart]:
"""Apply merging strategy to parts."""
textMerger = TextMerger()
tableMerger = TableMerger()
defaultMerger = DefaultMerger()
# Group by typeGroup
textParts = [p for p in parts if p.typeGroup == "text"]
tableParts = [p for p in parts if p.typeGroup == "table"]
structureParts = [p for p in parts if p.typeGroup == "structure"]
otherParts = [p for p in parts if p.typeGroup not in ("text", "table", "structure")]
merged: List[ContentPart] = []
if textParts:
merged.extend(textMerger.merge(textParts, strategy))
if tableParts:
merged.extend(tableMerger.merge(tableParts, strategy))
if structureParts:
# For now, treat structure like text
merged.extend(textMerger.merge(structureParts, strategy))
if otherParts:
merged.extend(defaultMerger.merge(otherParts, strategy))
return merged
def _applySizeLimit(parts: List[ContentPart], maxSize: int) -> List[ContentPart]:
"""Apply size limit by prioritizing parts and truncating if necessary."""
# Sort by priority: text first, then others
priority_order = {"text": 0, "table": 1, "structure": 2, "image": 3, "binary": 4, "metadata": 5, "container": 6}
sorted_parts = sorted(parts, key=lambda p: priority_order.get(p.typeGroup, 99))
kept: List[ContentPart] = []
current_size = 0
for part in sorted_parts:
part_size = int(part.metadata.get("size", 0) or 0)
if current_size + part_size <= maxSize:
kept.append(part)
current_size += part_size
else:
# Try to truncate text parts
if part.typeGroup == "text" and part_size > 0:
remaining_size = maxSize - current_size
if remaining_size > 1000: # Only truncate if we have meaningful space
truncated_data = part.data[:remaining_size * 4] # Rough character estimate
truncated_part = ContentPart(
id=makeId(),
parentId=part.parentId,
label=f"{part.label}_truncated",
typeGroup=part.typeGroup,
mimeType=part.mimeType,
data=truncated_data,
metadata={**part.metadata, "size": len(truncated_data.encode('utf-8')), "truncated": True}
)
kept.append(truncated_part)
break
return kept
def applyAiIfRequested(extracted: ExtractedContent, options: Dict[str, Any]) -> ExtractedContent:
"""
Apply AI processing if requested in options.
This is a placeholder for actual AI integration.
"""
prompt = options.get("prompt")
operationType = options.get("operationType", "general")
if not prompt:
return extracted
# Placeholder AI processing based on operationType
if operationType == "analyse_content":
# Add analysis metadata to parts
for part in extracted.parts:
if part.typeGroup in ("text", "table", "structure"):
part.metadata["ai_processed"] = True
part.metadata["operation_type"] = operationType
elif operationType == "generate_plan":
# Add plan generation metadata
for part in extracted.parts:
if part.typeGroup == "text":
part.metadata["ai_processed"] = True
part.metadata["operation_type"] = operationType
elif operationType == "generate_content":
# Add content generation metadata
for part in extracted.parts:
part.metadata["ai_processed"] = True
part.metadata["operation_type"] = operationType
return extracted