gateway/modules/services/serviceExtraction/subPipeline.py

370 lines
16 KiB
Python

from typing import Any, Dict, List
import logging
import os
from modules.datamodels.datamodelExtraction import ContentExtracted, ContentPart
from modules.shared.configuration import APP_CONFIG
from .subUtils import makeId
from .subRegistry import ExtractorRegistry, ChunkerRegistry
from .merging.mergerText import TextMerger
from .merging.mergerTable import TableMerger
from .merging.mergerDefault import DefaultMerger
from .subMerger import IntelligentTokenAwareMerger
logger = logging.getLogger(__name__)
def _mergeParts(parts: List[ContentPart], mergeStrategy: Dict[str, Any]) -> List[ContentPart]:
"""Merge parts based on the provided strategy."""
if not parts or not mergeStrategy:
return parts
groupBy = mergeStrategy.get("groupBy", "typeGroup")
orderBy = mergeStrategy.get("orderBy", "id")
# Group parts by the specified field
groups = {}
for part in parts:
key = getattr(part, groupBy, "unknown")
if key not in groups:
groups[key] = []
groups[key].append(part)
# Merge each group
merged_parts = []
for group_key, group_parts in groups.items():
if len(group_parts) == 1:
merged_parts.extend(group_parts)
else:
# Sort by orderBy field if specified
if orderBy:
group_parts.sort(key=lambda p: getattr(p, orderBy, ""))
# Use appropriate merger based on type
type_group = group_parts[0].typeGroup if group_parts else "unknown"
if type_group == "text":
merger = TextMerger()
elif type_group == "table":
merger = TableMerger()
else:
merger = DefaultMerger()
# Merge the group
merged = merger.merge(group_parts, mergeStrategy)
merged_parts.extend(merged)
return merged_parts
def runExtraction(extractorRegistry: ExtractorRegistry, chunkerRegistry: ChunkerRegistry, documentBytes: bytes, fileName: str, mimeType: str, options: Dict[str, Any]) -> ContentExtracted:
extractor = extractorRegistry.resolve(mimeType, fileName)
if extractor is None:
# fallback: single binary part
part = ContentPart(
id=makeId(),
parentId=None,
label="file",
typeGroup="binary",
mimeType=mimeType or "application/octet-stream",
data="",
metadata={"warning": "No extractor registered"}
)
return ContentExtracted(id=makeId(), parts=[part])
parts = extractor.extract(documentBytes, {"fileName": fileName, "mimeType": mimeType, "options": options})
# Apply chunking and size limiting
parts = poolAndLimit(parts, chunkerRegistry, options)
# Optional merge step - but preserve chunks
mergeStrategy = options.get("mergeStrategy", {})
if mergeStrategy:
# Don't merge chunks - they should stay separate for processing
non_chunk_parts = [p for p in parts if not p.metadata.get("chunk", False)]
chunk_parts = [p for p in parts if p.metadata.get("chunk", False)]
logger.debug(f"runExtraction: Preserving {len(chunk_parts)} chunks from merging")
logger.debug(f"runExtraction - non_chunk_parts: {len(non_chunk_parts)}, chunk_parts: {len(chunk_parts)}")
# Apply intelligent merging for small text parts
if non_chunk_parts:
# Count text parts
text_parts = [p for p in non_chunk_parts if p.typeGroup == "text"]
if len(text_parts) > 5: # If we have many small text parts, merge them
logger.info(f"🔧 Merging {len(text_parts)} small text parts for efficiency")
non_chunk_parts = _mergeParts(non_chunk_parts, mergeStrategy)
# Combine non-chunk parts with chunk parts (chunks stay separate)
parts = non_chunk_parts + chunk_parts
logger.debug(f"runExtraction: Final parts after merging: {len(parts)} (chunks: {len(chunk_parts)})")
logger.debug(f"runExtraction - Final parts: {len(parts)} (chunks: {len(chunk_parts)})")
# DEBUG: dump parts and chunks to files - only if debug enabled
try:
debug_enabled = APP_CONFIG.get("APP_DEBUG_CHAT_WORKFLOW_ENABLED", False)
if debug_enabled:
base_dir = "./test-chat/ai"
os.makedirs(base_dir, exist_ok=True)
# Generate timestamp for consistent naming
from datetime import datetime, UTC
ts = datetime.now(UTC).strftime('%Y%m%d-%H%M%S-%f')[:-3]
# Write a summary file
summary_lines: List[str] = [f"fileName: {fileName}", f"mimeType: {mimeType}", f"totalParts: {len(parts)}"]
text_index = 0
for idx, part in enumerate(parts):
is_texty = part.typeGroup in ("text", "table", "structure")
size = int(part.metadata.get("size", 0) or 0)
is_chunk = bool(part.metadata.get("chunk", False))
summary_lines.append(
f"part[{idx}]: typeGroup={part.typeGroup}, label={part.label}, size={size}, chunk={is_chunk}"
)
if is_texty and getattr(part, "data", None):
text_index += 1
fname = f"{ts}_extract_{fileName}_part_{idx:03d}_{'chunk' if is_chunk else 'full'}_{text_index:03d}.txt"
fpath = os.path.join(base_dir, fname)
with open(fpath, "w", encoding="utf-8") as f:
f.write(f"# typeGroup: {part.typeGroup}\n# label: {part.label}\n# chunk: {is_chunk}\n# size: {size}\n\n")
f.write(str(part.data))
# Write summary file
summary_fname = f"{ts}_extract_{fileName}_summary.txt"
summary_fpath = os.path.join(base_dir, summary_fname)
with open(summary_fpath, "w", encoding="utf-8") as f:
f.write("\n".join(summary_lines))
except Exception as _e:
logger.debug(f"Debug dump skipped: {_e}")
return ContentExtracted(id=makeId(), parts=parts)
def poolAndLimit(parts: List[ContentPart], chunkerRegistry: ChunkerRegistry, options: Dict[str, Any]) -> List[ContentPart]:
maxSize = int(options.get("maxSize", 0) or 0)
chunkAllowed = bool(options.get("chunkAllowed", False))
mergeStrategy = options.get("mergeStrategy", {})
if maxSize <= 0:
# Still apply merging if strategy provided
if mergeStrategy:
return _applyMerging(parts, mergeStrategy)
return parts
# First, try to fit within size limit
current = 0
kept: List[ContentPart] = []
remaining: List[ContentPart] = []
logger.debug(f"Starting poolAndLimit with {len(parts)} parts, maxSize={maxSize}")
for i, p in enumerate(parts):
size = int(p.metadata.get("size", 0) or 0)
# Show first 50 characters of text content for debugging
content_preview = p.data[:50].replace('\n', '\\n') if p.data else ""
logger.debug(f"Part {i}: {p.typeGroup} - {size} bytes - '{content_preview}...' (current: {current})")
if current + size <= maxSize:
kept.append(p)
current += size
logger.debug(f"Part {i} kept (total: {current})")
else:
remaining.append(p)
logger.debug(f"Part {i} moved to remaining")
logger.debug(f"Kept: {len(kept)}, Remaining: {len(remaining)}")
# If we have remaining parts and chunking is allowed, try chunking
if remaining and chunkAllowed:
logger.debug(f"=== CHUNKING ACTIVATED ===")
logger.debug(f"Remaining parts to chunk: {len(remaining)}")
logger.debug(f"Max size limit: {maxSize} bytes")
logger.debug(f"Current size used: {current} bytes")
logger.debug(f"Chunking {len(remaining)} remaining parts")
for p in remaining:
if p.typeGroup in ("text", "table", "structure", "image", "container", "binary"):
logger.debug(f"Chunking {p.typeGroup} part: {len(p.data)} chars")
logger.debug(f"Chunking {p.typeGroup} part with {len(p.data)} chars")
chunks = chunkerRegistry.resolve(p.typeGroup).chunk(p, options)
logger.debug(f"Created {len(chunks)} chunks")
logger.debug(f"Created {len(chunks)} chunks")
chunks_added = 0
for ch in chunks:
chSize = int(ch.get("size", 0) or 0)
# Add all chunks - don't limit by maxSize since they'll be processed separately
kept.append(ContentPart(
id=makeId(),
parentId=p.id,
label=f"chunk_{ch.get('order', 0)}",
typeGroup=p.typeGroup,
mimeType=p.mimeType,
data=ch.get("data", ""),
metadata={
"size": chSize,
"chunk": True,
**ch.get("metadata", {})
}
))
chunks_added += 1
logger.debug(f"Added chunk {ch.get('order', 0)}: {chSize} bytes")
logger.debug(f"Added {chunks_added} chunks from {p.typeGroup} part")
# Apply merging strategy if provided, but preserve chunks
if mergeStrategy:
# Don't merge chunks - they should stay separate for processing
non_chunk_parts = [p for p in kept if not p.metadata.get("chunk", False)]
chunk_parts = [p for p in kept if p.metadata.get("chunk", False)]
logger.debug(f"Preserving {len(chunk_parts)} chunks from merging")
# Apply intelligent merging for small text parts
if non_chunk_parts:
# Count text parts
text_parts = [p for p in non_chunk_parts if p.typeGroup == "text"]
if len(text_parts) > 5: # If we have many small text parts, merge them
logger.info(f"🔧 Merging {len(text_parts)} small text parts for efficiency")
non_chunk_parts = _applyMerging(non_chunk_parts, mergeStrategy)
# Combine non-chunk parts with chunk parts (chunks stay separate)
kept = non_chunk_parts + chunk_parts
logger.debug(f"Final parts after merging: {len(kept)} (chunks: {len(chunk_parts)})")
logger.debug(f"Final parts after merging: {len(kept)} (chunks: {len(chunk_parts)})")
# Re-check size after merging
totalSize = sum(int(p.metadata.get("size", 0) or 0) for p in kept)
if totalSize > maxSize and mergeStrategy.get("maxSize"):
# Apply size limit to merged parts
kept = _applySizeLimit(kept, maxSize)
logger.debug(f"poolAndLimit returning {len(kept)} parts")
return kept
def _applyMerging(parts: List[ContentPart], strategy: Dict[str, Any]) -> List[ContentPart]:
"""Apply merging strategy to parts with intelligent token-aware merging."""
logger.debug(f"_applyMerging called with {len(parts)} parts")
# Check if intelligent merging is enabled
if strategy.get("useIntelligentMerging", False):
model_capabilities = strategy.get("modelCapabilities", {})
subMerger = IntelligentTokenAwareMerger(model_capabilities)
# Use intelligent merging for all parts
merged = subMerger.merge_chunks_intelligently(parts, strategy.get("prompt", ""))
# Calculate and log optimization stats
stats = subMerger.calculate_optimization_stats(parts, merged)
logger.info(f"🧠 Intelligent merging stats: {stats}")
logger.debug(f"Intelligent merging: {stats['original_ai_calls']}{stats['optimized_ai_calls']} calls ({stats['reduction_percent']}% reduction)")
return merged
# Fallback to traditional merging
textMerger = TextMerger()
tableMerger = TableMerger()
defaultMerger = DefaultMerger()
# Group by typeGroup
textParts = [p for p in parts if p.typeGroup == "text"]
tableParts = [p for p in parts if p.typeGroup == "table"]
structureParts = [p for p in parts if p.typeGroup == "structure"]
otherParts = [p for p in parts if p.typeGroup not in ("text", "table", "structure")]
logger.debug(f"Grouped - text: {len(textParts)}, table: {len(tableParts)}, structure: {len(structureParts)}, other: {len(otherParts)}")
merged: List[ContentPart] = []
if textParts:
textMerged = textMerger.merge(textParts, strategy)
logger.debug(f"TextMerger merged {len(textParts)} parts into {len(textMerged)} parts")
merged.extend(textMerged)
if tableParts:
tableMerged = tableMerger.merge(tableParts, strategy)
logger.debug(f"TableMerger merged {len(tableParts)} parts into {len(tableMerged)} parts")
merged.extend(tableMerged)
if structureParts:
# For now, treat structure like text
structureMerged = textMerger.merge(structureParts, strategy)
logger.debug(f"StructureMerger merged {len(structureParts)} parts into {len(structureMerged)} parts")
merged.extend(structureMerged)
if otherParts:
otherMerged = defaultMerger.merge(otherParts, strategy)
logger.debug(f"DefaultMerger merged {len(otherParts)} parts into {len(otherMerged)} parts")
merged.extend(otherMerged)
logger.debug(f"_applyMerging returning {len(merged)} parts")
return merged
def _applySizeLimit(parts: List[ContentPart], maxSize: int) -> List[ContentPart]:
"""Apply size limit by prioritizing parts and truncating if necessary."""
# Sort by priority: text first, then others
priority_order = {"text": 0, "table": 1, "structure": 2, "image": 3, "binary": 4, "metadata": 5, "container": 6}
sorted_parts = sorted(parts, key=lambda p: priority_order.get(p.typeGroup, 99))
kept: List[ContentPart] = []
current_size = 0
for part in sorted_parts:
part_size = int(part.metadata.get("size", 0) or 0)
if current_size + part_size <= maxSize:
kept.append(part)
current_size += part_size
else:
# Try to truncate text parts
if part.typeGroup == "text" and part_size > 0:
remaining_size = maxSize - current_size
if remaining_size > 1000: # Only truncate if we have meaningful space
truncated_data = part.data[:remaining_size * 4] # Rough character estimate
truncated_part = ContentPart(
id=makeId(),
parentId=part.parentId,
label=f"{part.label}_truncated",
typeGroup=part.typeGroup,
mimeType=part.mimeType,
data=truncated_data,
metadata={**part.metadata, "size": len(truncated_data.encode('utf-8')), "truncated": True}
)
kept.append(truncated_part)
break
return kept
def applyAiIfRequested(extracted: ContentExtracted, options: Dict[str, Any]) -> ContentExtracted:
"""
Apply AI processing if requested in options.
This is a placeholder for actual AI integration.
"""
prompt = options.get("prompt")
operationType = options.get("operationType", "general")
if not prompt:
return extracted
# Placeholder AI processing based on operationType
if operationType == "analyse_content":
# Add analysis metadata to parts
for part in extracted.parts:
if part.typeGroup in ("text", "table", "structure"):
part.metadata["ai_processed"] = True
part.metadata["operation_type"] = operationType
elif operationType == "generate_plan":
# Add plan generation metadata
for part in extracted.parts:
if part.typeGroup == "text":
part.metadata["ai_processed"] = True
part.metadata["operation_type"] = operationType
elif operationType == "generate_content":
# Add content generation metadata
for part in extracted.parts:
part.metadata["ai_processed"] = True
part.metadata["operation_type"] = operationType
return extracted