gateway/modules/services/serviceAi/subDocumentProcessing.py

463 lines
20 KiB
Python

import json
import logging
import re
import time
from typing import Dict, Any, List, Optional
from modules.datamodels.datamodelChat import ChatDocument
from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationTypeEnum, PriorityEnum
from modules.datamodels.datamodelExtraction import ContentExtracted, PartResult, ExtractionOptions, MergeStrategy
# Resolve forward refs for ExtractionOptions (OperationTypeEnum) at runtime without using unsupported args
try:
# Import here to avoid circular import at module load time
from modules.datamodels.datamodelAi import OperationTypeEnum
# Provide parent namespace so Pydantic can resolve forward refs
ExtractionOptions.__pydantic_parent_namespace__ = {"OperationTypeEnum": OperationTypeEnum}
ExtractionOptions.model_rebuild()
except Exception as _e:
logging.getLogger(__name__).warning(f"ExtractionOptions forward-ref rebuild skipped: {_e}")
from modules.services.serviceExtraction.mainServiceExtraction import ExtractionService
logger = logging.getLogger(__name__)
class SubDocumentProcessing:
"""Document processing operations including chunking, processing, and merging."""
def __init__(self, services, aiObjects):
"""Initialize document processing service.
Args:
services: Service center instance for accessing other services
aiObjects: Initialized AiObjects instance
"""
self.services = services
self.aiObjects = aiObjects
self._extractionService = None
@property
def extractionService(self):
"""Lazy initialization of extraction service."""
if self._extractionService is None:
logger.info("Lazy initializing ExtractionService...")
self._extractionService = ExtractionService(self.services)
return self._extractionService
async def processDocumentsPerChunk(
self,
documents: List[ChatDocument],
prompt: str,
options: Optional[AiCallOptions] = None,
operationId: Optional[str] = None
) -> str:
"""
Process documents with model-aware chunking and merge results.
NEW: Uses model-aware chunking in AI call phase instead of extraction phase.
Args:
documents: List of ChatDocument objects to process
prompt: AI prompt for processing
options: AI call options
operationId: Optional operation ID for progress tracking
Returns:
Merged AI results as string with preserved document structure
"""
if not documents:
return ""
# Create operationId if not provided
if not operationId:
import time
workflowId = self.services.currentWorkflow.id if self.services.currentWorkflow else f"no-workflow-{int(time.time())}"
operationId = f"ai_text_extract_{workflowId}_{int(time.time())}"
self.services.workflow.progressLogStart(
operationId,
"AI Text Extract",
"Document Processing",
f"Processing {len(documents)} documents"
)
try:
# Build extraction options using Pydantic model
mergeStrategy = MergeStrategy(
useIntelligentMerging=True,
prompt=prompt,
groupBy="typeGroup",
orderBy="id",
mergeType="concatenate"
)
extractionOptions = ExtractionOptions(
prompt=prompt,
operationType=options.operationType if options else OperationTypeEnum.DATA_EXTRACT,
processDocumentsIndividually=True,
mergeStrategy=mergeStrategy
)
logger.debug(f"Per-chunk extraction options: prompt length={len(extractionOptions.prompt)} chars, operationType={extractionOptions.operationType}")
# Extract content WITHOUT chunking
if operationId:
self.services.workflow.progressLogUpdate(operationId, 0.1, f"Extracting content from {len(documents)} documents")
extractionResult = self.extractionService.extractContent(documents, extractionOptions)
if not isinstance(extractionResult, list):
if operationId:
self.services.workflow.progressLogFinish(operationId, False)
return "[Error: No extraction results]"
# Process parts (not chunks) with model-aware AI calls
if operationId:
self.services.workflow.progressLogUpdate(operationId, 0.3, f"Processing {len(extractionResult)} extracted content parts")
partResults = await self._processPartsWithMapping(extractionResult, prompt, options, operationId)
# Merge results using existing merging system
if operationId:
self.services.workflow.progressLogUpdate(operationId, 0.9, f"Merging {len(partResults)} part results")
mergedContent = self._mergePartResults(partResults, options)
# Save merged extraction content to debug
self.services.utils.writeDebugFile(mergedContent or '', "extraction_merged_text")
if operationId:
self.services.workflow.progressLogFinish(operationId, True)
return mergedContent
except Exception as e:
logger.error(f"Error in processDocumentsPerChunk: {str(e)}")
if operationId:
self.services.workflow.progressLogFinish(operationId, False)
raise
async def callAiText(
self,
prompt: str,
documents: Optional[List[ChatDocument]],
options: AiCallOptions,
operationId: Optional[str] = None
) -> str:
"""
Handle text calls with document processing through ExtractionService.
UNIFIED PROCESSING: Always use per-chunk processing for consistency.
"""
return await self.processDocumentsPerChunk(documents, prompt, options, operationId)
async def _processPartsWithMapping(
self,
extractionResult: List[ContentExtracted],
prompt: str,
options: Optional[AiCallOptions] = None,
operationId: Optional[str] = None
) -> List['PartResult']:
"""Process content parts with model-aware chunking and proper mapping."""
from modules.datamodels.datamodelExtraction import PartResult
import asyncio
# Collect all parts that need processing
partsToProcess = []
partIndex = 0
for ec in extractionResult:
for part in ec.parts:
if part.typeGroup in ("text", "table", "structure", "image", "container", "binary"):
# Skip empty container parts
if part.typeGroup == "container" and (not part.data or len(part.data.strip()) == 0):
logger.debug(f"Skipping empty container part: mimeType={part.mimeType}")
continue
partsToProcess.append({
'part': part,
'part_index': partIndex,
'document_id': ec.id
})
partIndex += 1
logger.info(f"Processing {len(partsToProcess)} parts with model-aware chunking")
totalParts = len(partsToProcess)
# Process parts in parallel
processedCount = [0] # Use list to allow modification in nested function
async def processSinglePart(partInfo: Dict) -> PartResult:
part = partInfo['part']
part_index = partInfo['part_index']
documentId = partInfo['document_id']
start_time = time.time()
try:
# Create AI call request with content part
from modules.datamodels.datamodelAi import AiCallRequest
request = AiCallRequest(
prompt=prompt,
context="", # Context is in the content part
options=options,
contentParts=[part] # Pass as list for unified processing
)
# Update progress before AI call
if operationId and totalParts > 0:
processedCount[0] += 1
progress = 0.3 + (processedCount[0] / totalParts * 0.6) # Progress from 0.3 to 0.9
self.services.workflow.progressLogUpdate(operationId, progress, f"Processing part {processedCount[0]}/{totalParts}")
# Call AI with model-aware chunking
response = await self.aiObjects.call(request)
processing_time = time.time() - start_time
return PartResult(
originalPart=part,
aiResult=response.content,
partIndex=part_index,
documentId=documentId,
processingTime=processing_time,
metadata={
"success": True,
"partSize": len(part.data) if part.data else 0,
"resultSize": len(response.content),
"typeGroup": part.typeGroup,
"modelName": response.modelName,
"priceUsd": response.priceUsd
}
)
except Exception as e:
processing_time = time.time() - start_time
logger.warning(f"Error processing part {part_index}: {str(e)}")
return PartResult(
originalPart=part,
aiResult=f"[Error processing part: {str(e)}]",
partIndex=part_index,
documentId=documentId,
processingTime=processing_time,
metadata={
"success": False,
"error": str(e),
"partSize": len(part.data) if part.data else 0,
"typeGroup": part.typeGroup
}
)
# Process parts with concurrency control
maxConcurrent = 5
if options and hasattr(options, 'maxConcurrentParts'):
maxConcurrent = options.maxConcurrentParts
semaphore = asyncio.Semaphore(maxConcurrent)
async def processWithSemaphore(partInfo):
async with semaphore:
return await processSinglePart(partInfo)
tasks = [processWithSemaphore(part_info) for part_info in partsToProcess]
partResults = await asyncio.gather(*tasks, return_exceptions=True)
# Handle exceptions
processedResults = []
for i, result in enumerate(partResults):
if isinstance(result, Exception):
part_info = partsToProcess[i]
processedResults.append(PartResult(
originalPart=part_info['part'],
aiResult=f"[Error in parallel processing: {str(result)}]",
partIndex=part_info['part_index'],
documentId=part_info['document_id'],
processingTime=0.0,
metadata={"success": False, "error": str(result)}
))
elif result is not None:
processedResults.append(result)
logger.info(f"Completed processing {len(processedResults)} parts")
return processedResults
def _mergePartResults(
self,
partResults: List['PartResult'],
options: Optional[AiCallOptions] = None
) -> str:
"""Merge part results using existing sophisticated merging system."""
if not partResults:
return ""
# Convert PartResults back to ContentParts for existing merger system
from modules.datamodels.datamodelExtraction import ContentPart
content_parts = []
for part_result in partResults:
# Create ContentPart from PartResult with proper typeGroup
content_part = ContentPart(
id=part_result.originalPart.id,
parentId=part_result.originalPart.parentId,
label=part_result.originalPart.label,
typeGroup=part_result.originalPart.typeGroup, # Use original typeGroup
mimeType=part_result.originalPart.mimeType,
data=part_result.aiResult, # Use AI result as data
metadata={
**part_result.originalPart.metadata,
"aiResult": True,
"partIndex": part_result.partIndex,
"documentId": part_result.documentId,
"processingTime": part_result.processingTime,
"success": part_result.metadata.get("success", False)
}
)
content_parts.append(content_part)
# Use existing merging strategy from options
merge_strategy = MergeStrategy(
useIntelligentMerging=True,
groupBy="documentId", # Group by document
orderBy="partIndex", # Order by part index
mergeType="concatenate"
)
# Apply existing merging logic using the sophisticated merging system
from modules.services.serviceExtraction.subPipeline import _applyMerging
merged_parts = _applyMerging(content_parts, merge_strategy)
# Convert merged parts back to final string
final_content = "\n\n".join([part.data for part in merged_parts])
logger.info(f"Merged {len(partResults)} parts using existing sophisticated merging system")
return final_content.strip()
def _convertPartResultsToJson(
self,
partResults: List['PartResult'],
options: Optional[AiCallOptions] = None
) -> Dict[str, Any]:
"""Convert part results to JSON format using existing sophisticated merging system."""
if not partResults:
return {"metadata": {"title": "Empty Document"}, "sections": []}
# Convert PartResults back to ContentParts for existing merger system
from modules.datamodels.datamodelExtraction import ContentPart
content_parts = []
for part_result in partResults:
# Create ContentPart from PartResult with proper typeGroup
content_part = ContentPart(
id=part_result.originalPart.id,
parentId=part_result.originalPart.parentId,
label=part_result.originalPart.label,
typeGroup=part_result.originalPart.typeGroup, # Use original typeGroup
mimeType=part_result.originalPart.mimeType,
data=part_result.aiResult, # Use AI result as data
metadata={
**part_result.originalPart.metadata,
"aiResult": True,
"partIndex": part_result.partIndex,
"documentId": part_result.documentId,
"processingTime": part_result.processingTime,
"success": part_result.metadata.get("success", False)
}
)
content_parts.append(content_part)
# Use existing merging strategy for JSON mode
merge_strategy = MergeStrategy(
useIntelligentMerging=True,
groupBy="documentId", # Group by document
orderBy="partIndex", # Order by part index
mergeType="concatenate"
)
# Apply existing merging logic using the sophisticated merging system
from modules.services.serviceExtraction.subPipeline import _applyMerging
merged_parts = _applyMerging(content_parts, merge_strategy)
# Convert merged parts to JSON format
all_sections = []
document_titles = []
for part in merged_parts:
if part.metadata.get("success", False):
try:
# Parse JSON from AI result
part_json = json.loads(part.data)
# Check if this is a multi-file response (has "documents" key)
if isinstance(part_json, dict) and "documents" in part_json:
# This is a multi-file response - merge all documents
logger.debug(f"Processing multi-file response from part {part.id} with {len(part_json['documents'])} documents")
# Return multi-file response directly
return {
"metadata": part_json.get("metadata", {"title": "Merged Document"}),
"documents": part_json["documents"]
}
# Extract sections from single-file response
elif isinstance(part_json, dict) and "sections" in part_json:
for section in part_json["sections"]:
# Add part context to section
section["metadata"] = section.get("metadata", {})
section["metadata"]["source_part"] = part.id
section["metadata"]["source_document"] = part.metadata.get("documentId", "unknown")
section["metadata"]["part_index"] = part.metadata.get("partIndex", 0)
all_sections.append(section)
# Extract document title
if isinstance(part_json, dict) and "metadata" in part_json:
title = part_json["metadata"].get("title", "")
if title and title not in document_titles:
document_titles.append(title)
except json.JSONDecodeError as e:
logger.warning(f"Failed to parse JSON from part {part.id}: {str(e)}")
# Create a fallback section for invalid JSON
fallback_section = {
"id": f"error_section_{part.id}",
"title": "Error Section",
"content_type": "paragraph",
"elements": [{
"text": f"Error parsing part {part.id}: {str(e)}"
}],
"order": part.metadata.get("partIndex", 0),
"metadata": {
"source_document": part.metadata.get("documentId", "unknown"),
"part_id": part.id,
"error": str(e)
}
}
all_sections.append(fallback_section)
else:
# Handle error parts
error_section = {
"id": f"error_section_{part.id}",
"title": "Error Section",
"content_type": "paragraph",
"elements": [{
"text": f"Error in part {part.id}: {part.metadata.get('error', 'Unknown error')}"
}],
"order": part.metadata.get("partIndex", 0),
"metadata": {
"source_document": part.metadata.get("documentId", "unknown"),
"part_id": part.id,
"error": part.metadata.get('error', 'Unknown error')
}
}
all_sections.append(error_section)
# Sort sections by order
all_sections.sort(key=lambda x: x.get("order", 0))
# Create merged document with sections
merged_document = {
"metadata": {
"title": document_titles[0] if document_titles else "Merged Document",
"extraction_method": "model_aware_chunking_with_merging",
"version": "2.0"
},
"sections": all_sections,
"summary": f"Merged document using sophisticated merging system",
"tags": ["merged", "ai_generated", "model_aware", "sophisticated_merging"]
}
logger.info(f"Converted {len(partResults)} parts to JSON format using existing sophisticated merging system")
return merged_document