gateway/modules/services/serviceExtraction/mainServiceExtraction.py

1659 lines
87 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
from typing import Any, Dict, List, Optional, Union
import uuid
import logging
import time
import asyncio
import base64
import json
from .subRegistry import ExtractorRegistry, ChunkerRegistry
from .subPipeline import runExtraction
from modules.datamodels.datamodelExtraction import ContentExtracted, ContentPart, MergeStrategy, ExtractionOptions, PartResult, DocumentIntent
from modules.datamodels.datamodelChat import ChatDocument
from modules.datamodels.datamodelAi import AiCallResponse, AiCallRequest, AiCallOptions, OperationTypeEnum, AiModelCall
from modules.aicore.aicoreModelRegistry import modelRegistry
from modules.aicore.aicoreModelSelector import modelSelector
from modules.shared.jsonUtils import stripCodeFences
logger = logging.getLogger(__name__)
class ExtractionService:
_sharedExtractorRegistry: Optional[ExtractorRegistry] = None
_sharedChunkerRegistry: Optional[ChunkerRegistry] = None
def __init__(self, services: Optional[Any] = None):
self.services = services
if ExtractionService._sharedExtractorRegistry is None:
ExtractionService._sharedExtractorRegistry = ExtractorRegistry()
if ExtractionService._sharedChunkerRegistry is None:
ExtractionService._sharedChunkerRegistry = ChunkerRegistry()
self._extractorRegistry = ExtractionService._sharedExtractorRegistry
self._chunkerRegistry = ExtractionService._sharedChunkerRegistry
modelRegistry.ensureConnectorsRegistered()
# Verify required internal model is available (used for pricing in extractContent)
modelDisplayName = "Internal Document Extractor"
model = modelRegistry.getModel(modelDisplayName)
if model is None or model.calculatepriceCHF is None:
raise RuntimeError(f"FATAL: Required internal model '{modelDisplayName}' is not available. Check connector registration.")
def extractContent(
self,
documents: List[ChatDocument],
options: ExtractionOptions,
operationId: Optional[str] = None,
parentOperationId: Optional[str] = None
) -> List[ContentExtracted]:
"""
Extract content from a list of ChatDocument objects.
Args:
documents: List of ChatDocument objects to extract content from
options: Extraction options including maxSize, chunkAllowed, mergeStrategy, etc.
operationId: Optional operation ID for progress logging (parent operation)
parentOperationId: Optional parent operation ID for hierarchical logging
Returns:
List of ContentExtracted objects, one per input document
"""
results: List[ContentExtracted] = []
# Lazy import to avoid circular deps and heavy init at module import
from modules.interfaces.interfaceDbManagement import getInterface
dbInterface = getInterface()
totalDocs = len(documents)
for i, doc in enumerate(documents):
logger.info(f"=== DOCUMENT {i + 1}/{totalDocs}: {doc.fileName} ===")
logger.info(f"Initial MIME type: {doc.mimeType}")
# Create child operation for this document if parent operationId is provided
docOperationId = None
if operationId:
workflowId = self.services.workflow.id if self.services.workflow else f"no-workflow-{int(time.time())}"
docOperationId = f"{operationId}_doc_{i}"
# Use parentOperationId if provided, otherwise use operationId as parent
parentId = parentOperationId if parentOperationId else operationId
self.services.chat.progressLogStart(
docOperationId,
"Extracting Document",
f"Document {i + 1}/{totalDocs}",
doc.fileName[:50] + "..." if len(doc.fileName) > 50 else doc.fileName,
parentOperationId=parentId # Correct parent reference for ChatLog hierarchy
)
# Start timing for this document
startTime = time.time()
try:
if docOperationId:
self.services.chat.progressLogUpdate(docOperationId, 0.1, "Loading document data")
# Resolve raw bytes for this document using interface
documentBytes = dbInterface.getFileData(doc.fileId)
if not documentBytes:
if docOperationId:
self.services.chat.progressLogFinish(docOperationId, False)
raise ValueError(f"No file data found for fileId={doc.fileId}")
if docOperationId:
self.services.chat.progressLogUpdate(docOperationId, 0.2, "Running extraction pipeline")
# Convert ChatDocument to the format expected by runExtraction
documentData = {
"id": doc.id,
"bytes": documentBytes,
"fileName": doc.fileName,
"mimeType": doc.mimeType
}
ec = runExtraction(
extractorRegistry=self._extractorRegistry,
chunkerRegistry=self._chunkerRegistry,
documentBytes=documentData["bytes"],
fileName=documentData["fileName"],
mimeType=documentData["mimeType"],
options=options
)
if docOperationId:
self.services.chat.progressLogUpdate(docOperationId, 0.7, f"Extracted {len(ec.parts)} parts")
# Log content parts metadata
logger.debug(f"Content parts: {len(ec.parts)}")
for j, part in enumerate(ec.parts):
logger.debug(f" Part {j + 1}/{len(ec.parts)}: {part.typeGroup} ({part.mimeType}) - {len(part.data) if part.data else 0} chars")
if part.metadata:
logger.debug(f" Metadata: {part.metadata}")
# Attach complete metadata to parts according to ContentPart Metadaten-Schema
for p in ec.parts:
# Ensure metadata dict exists
if not p.metadata:
p.metadata = {}
# Required metadata fields (from concept)
if "documentId" not in p.metadata:
p.metadata["documentId"] = documentData["id"] or str(uuid.uuid4())
if "documentMimeType" not in p.metadata:
p.metadata["documentMimeType"] = documentData["mimeType"]
if "originalFileName" not in p.metadata:
p.metadata["originalFileName"] = documentData["fileName"]
# ContentFormat: Set based on typeGroup and mimeType
# Default to "extracted" for text content, but can be overridden by caller
if "contentFormat" not in p.metadata:
# Default: extracted text content
p.metadata["contentFormat"] = "extracted"
# Intent: Default to "extract" for extracted content
if "intent" not in p.metadata:
p.metadata["intent"] = "extract"
# ExtractionPrompt: Use from options if available
if "extractionPrompt" not in p.metadata and options and options.prompt:
p.metadata["extractionPrompt"] = options.prompt
# UsageHint: Provide default hint
if "usageHint" not in p.metadata:
p.metadata["usageHint"] = f"Use extracted content from {documentData['fileName']}"
# SourceAction: Mark as from extraction service
if "sourceAction" not in p.metadata:
p.metadata["sourceAction"] = "extraction.extractContent"
# Write debug file for each text part extracted (without AI)
for j, part in enumerate(ec.parts):
if part.typeGroup == "text" and part.data and self.services and hasattr(self.services, 'utils') and hasattr(self.services.utils, 'writeDebugFile'):
try:
debug_content = {
"partIndex": j + 1,
"partId": part.id,
"typeGroup": part.typeGroup,
"mimeType": part.mimeType or "text/plain",
"label": part.label,
"dataLength": len(part.data),
"metadata": part.metadata.copy() if part.metadata else {},
"data": part.data # Full extracted text
}
debug_json = json.dumps(debug_content, indent=2, ensure_ascii=False)
# Use document name and part index for filename
doc_name_safe = documentData["fileName"].replace(" ", "_").replace("/", "_").replace("\\", "_")[:50]
debug_filename = f"extraction_text_part_{j+1}_{doc_name_safe}.txt"
self.services.utils.writeDebugFile(debug_json, debug_filename)
logger.info(f"Wrote debug file for extracted text part {j+1}/{len(ec.parts)}: {debug_filename}")
except Exception as e:
logger.warning(f"Failed to write debug file for text part {j+1}: {str(e)}")
# Log chunking information
chunkedParts = [p for p in ec.parts if p.metadata.get("chunk", False)]
if chunkedParts:
logger.debug(f"=== CHUNKING RESULTS ===")
logger.debug(f"Total parts: {len(ec.parts)}")
logger.debug(f"Chunked parts: {len(chunkedParts)}")
for chunk in chunkedParts:
logger.debug(f" Chunk: {chunk.label} - {len(chunk.data)} chars (parent: {chunk.parentId})")
else:
logger.debug(f"No chunking needed - {len(ec.parts)} parts fit within size limits")
if docOperationId:
self.services.chat.progressLogUpdate(docOperationId, 0.9, f"Processing complete: {len(ec.parts)} parts extracted")
# Calculate timing and emit stats
endTime = time.time()
processingTime = endTime - startTime
bytesSent = len(documentBytes)
bytesReceived = sum(len(part.data) if part.data else 0 for part in ec.parts)
# Emit stats for extraction operation
# Use internal extraction model for pricing
modelDisplayName = "Internal Document Extractor"
model = modelRegistry.getModel(modelDisplayName)
# Hard fail if model is missing; caller must ensure connectors are registered
if model is None or model.calculatepriceCHF is None:
if docOperationId:
self.services.chat.progressLogFinish(docOperationId, False)
raise RuntimeError(f"Pricing model not available: {modelDisplayName}")
priceCHF = model.calculatepriceCHF(processingTime, bytesSent, bytesReceived)
# Create AiCallResponse with real calculation
# Use model.name for the response (API identifier), not displayName
aiResponse = AiCallResponse(
content="", # No content for extraction stats needed
modelName=model.name,
provider=model.connectorType,
priceCHF=priceCHF,
processingTime=processingTime,
bytesSent=bytesSent,
bytesReceived=bytesReceived,
errorCount=0
)
self.services.chat.storeWorkflowStat(
self.services.workflow,
aiResponse,
f"extraction.process.{doc.mimeType}"
)
# Write extraction results to debug file
try:
from modules.shared.debugLogger import writeDebugFile
# json is already imported at module level
# Create summary of extraction results for debug
extractionSummary = {
"documentName": doc.fileName,
"documentMimeType": doc.mimeType,
"partsCount": len(ec.parts),
"parts": []
}
for part in ec.parts:
partSummary = {
"typeGroup": part.typeGroup,
"mimeType": part.mimeType,
"label": part.label,
"dataLength": len(part.data) if part.data else 0,
"metadata": part.metadata
}
# Include data preview for small parts (first 500 chars)
if part.data and len(part.data) <= 500:
partSummary["dataPreview"] = part.data[:500]
elif part.data:
partSummary["dataPreview"] = f"[Large data: {len(part.data)} chars - truncated]"
extractionSummary["parts"].append(partSummary)
writeDebugFile(json.dumps(extractionSummary, indent=2, ensure_ascii=False), f"extraction_result_{doc.fileName}.txt")
except Exception as e:
logger.debug(f"Failed to write extraction debug file: {str(e)}")
results.append(ec)
# Finish document operation successfully
if docOperationId:
self.services.chat.progressLogFinish(docOperationId, True)
except Exception as e:
logger.error(f"Error extracting content from document {i + 1}/{totalDocs} ({doc.fileName}): {str(e)}")
if docOperationId:
try:
self.services.chat.progressLogFinish(docOperationId, False)
except:
pass # Don't fail on progress logging errors
# Continue with next document instead of failing completely
# This allows parallel processing to continue even if one document fails
continue
return results
async def processDocumentsPerChunk(
self,
documents: List[ChatDocument],
prompt: str,
aiObjects: Any,
options: Optional[AiCallOptions] = None,
operationId: Optional[str] = None,
parentOperationId: Optional[str] = None
) -> str:
"""
Process documents with model-aware chunking and merge results.
NEW: Uses model-aware chunking in AI call phase instead of extraction phase.
Args:
documents: List of ChatDocument objects to process
prompt: AI prompt for processing
aiObjects: AiObjects instance for making AI calls
options: AI call options
operationId: Optional operation ID for progress tracking
parentOperationId: Optional parent operation ID for hierarchical logging
Returns:
Merged AI results as string with preserved document structure
"""
if not documents:
return ""
# Create operationId if not provided
if not operationId:
workflowId = self.services.workflow.id if self.services.workflow else f"no-workflow-{int(time.time())}"
operationId = f"ai_text_extract_{workflowId}_{int(time.time())}"
self.services.chat.progressLogStart(
operationId,
"AI Text Extract",
"Document Processing",
f"Processing {len(documents)} documents",
parentOperationId=parentOperationId # Use parentOperationId if provided
)
try:
# Build extraction options using Pydantic model
mergeStrategy = MergeStrategy(
useIntelligentMerging=True,
prompt=prompt,
groupBy="typeGroup",
orderBy="id",
mergeType="concatenate"
)
extractionOptions = ExtractionOptions(
prompt=prompt,
processDocumentsIndividually=True,
mergeStrategy=mergeStrategy
)
logger.debug(f"Per-chunk extraction options: prompt length={len(extractionOptions.prompt)} chars")
# Extract content WITHOUT chunking
if operationId:
self.services.chat.progressLogUpdate(operationId, 0.1, f"Extracting content from {len(documents)} documents")
# Pass operationId as parentOperationId for hierarchical logging
# Correct hierarchy: parentOperationId -> operationId -> docOperationId
extractionResult = self.extractContent(documents, extractionOptions, operationId=operationId, parentOperationId=operationId)
if not isinstance(extractionResult, list):
if operationId:
self.services.chat.progressLogFinish(operationId, False)
return "[Error: No extraction results]"
# Process parts (not chunks) with model-aware AI calls
if operationId:
self.services.chat.progressLogUpdate(operationId, 0.3, f"Processing {len(extractionResult)} extracted content parts")
# Use operationId as parentOperationId for child operations
# Correct hierarchy: parentOperationId -> operationId -> partOperationId
processParentOperationId = operationId
partResults = await self._processPartsWithMapping(extractionResult, prompt, aiObjects, options, operationId, processParentOperationId)
# Merge results using existing merging system
if operationId:
self.services.chat.progressLogUpdate(operationId, 0.9, f"Merging {len(partResults)} part results")
mergedContent = self.mergePartResults(partResults, options)
# Save merged extraction content to debug
self.services.utils.writeDebugFile(mergedContent or '', "extraction_merged_text")
if operationId:
self.services.chat.progressLogFinish(operationId, True)
return mergedContent
except Exception as e:
logger.error(f"Error in processDocumentsPerChunk: {str(e)}")
if operationId:
self.services.chat.progressLogFinish(operationId, False)
raise
async def _processPartsWithMapping(
self,
extractionResult: List[ContentExtracted],
prompt: str,
aiObjects: Any,
options: Optional[AiCallOptions] = None,
operationId: Optional[str] = None,
parentOperationId: Optional[str] = None
) -> List[PartResult]:
"""Process content parts with model-aware chunking and proper mapping."""
# Collect all parts that need processing
partsToProcess = []
partIndex = 0
for ec in extractionResult:
for part in ec.parts:
if part.typeGroup in ("text", "table", "structure", "image", "container", "binary"):
# Skip empty container parts
if part.typeGroup == "container" and (not part.data or len(part.data.strip()) == 0):
logger.debug(f"Skipping empty container part: mimeType={part.mimeType}")
continue
partsToProcess.append({
'part': part,
'part_index': partIndex,
'document_id': ec.id
})
partIndex += 1
logger.info(f"Processing {len(partsToProcess)} parts with model-aware chunking")
totalParts = len(partsToProcess)
# Process parts in parallel
processedCount = [0] # Use list to allow modification in nested function
async def processSinglePart(partInfo: Dict) -> PartResult:
part = partInfo['part']
part_index = partInfo['part_index']
documentId = partInfo['document_id']
start_time = time.time()
# Create separate operation for each part with parent reference
partOperationId = None
if operationId:
workflowId = self.services.workflow.id if self.services.workflow else f"no-workflow-{int(time.time())}"
partOperationId = f"{operationId}_part_{part_index}"
self.services.chat.progressLogStart(
partOperationId,
"Content Processing",
f"Part {part_index + 1}",
f"Type: {part.typeGroup}",
parentOperationId=parentOperationId
)
try:
# Create AI call request with content part
request = AiCallRequest(
prompt=prompt,
context="", # Context is in the content part
options=options,
contentParts=[part] # Pass as list for unified processing
)
# Update progress - initiating
if partOperationId:
self.services.chat.progressLogUpdate(partOperationId, 0.3, "Initiating")
# Call AI with model-aware chunking (no progress callback - handled by parent operation)
response = await aiObjects.call(request)
# Update progress - completed
if partOperationId:
self.services.chat.progressLogUpdate(partOperationId, 0.9, "Completed")
self.services.chat.progressLogFinish(partOperationId, True)
processing_time = time.time() - start_time
return PartResult(
originalPart=part,
aiResult=response.content,
partIndex=part_index,
documentId=documentId,
processingTime=processing_time,
metadata={
"success": True,
"partSize": len(part.data) if part.data else 0,
"resultSize": len(response.content),
"typeGroup": part.typeGroup,
"modelName": response.modelName,
"priceCHF": response.priceCHF
}
)
except Exception as e:
processing_time = time.time() - start_time
logger.warning(f"Error processing part {part_index}: {str(e)}")
return PartResult(
originalPart=part,
aiResult=f"[Error processing part: {str(e)}]",
partIndex=part_index,
documentId=documentId,
processingTime=processing_time,
metadata={
"success": False,
"error": str(e),
"partSize": len(part.data) if part.data else 0,
"typeGroup": part.typeGroup
}
)
# Process parts with concurrency control
maxConcurrent = 5
if options and hasattr(options, 'maxConcurrentParts'):
maxConcurrent = options.maxConcurrentParts
semaphore = asyncio.Semaphore(maxConcurrent)
async def processWithSemaphore(partInfo):
async with semaphore:
return await processSinglePart(partInfo)
tasks = [processWithSemaphore(part_info) for part_info in partsToProcess]
partResults = await asyncio.gather(*tasks, return_exceptions=True)
# Handle exceptions
processedResults = []
for i, result in enumerate(partResults):
if isinstance(result, Exception):
part_info = partsToProcess[i]
processedResults.append(PartResult(
originalPart=part_info['part'],
aiResult=f"[Error in parallel processing: {str(result)}]",
partIndex=part_info['part_index'],
documentId=part_info['document_id'],
processingTime=0.0,
metadata={"success": False, "error": str(result)}
))
elif result is not None:
processedResults.append(result)
logger.info(f"Completed processing {len(processedResults)} parts")
return processedResults
def _convertToContentParts(
self, partResults: Union[List[PartResult], List[AiCallResponse]], originalContentParts: Optional[List[ContentPart]] = None
) -> List[ContentPart]:
"""Convert part results to ContentParts (internal helper for consolidation).
Handles both PartResult (from extraction workflow) and AiCallResponse (from content parts processing).
Args:
partResults: List of PartResult or AiCallResponse objects
originalContentParts: Optional list of original ContentPart objects to preserve typeGroup and metadata
"""
content_parts = []
if not partResults:
return content_parts
# Detect input type and convert accordingly
if isinstance(partResults[0], PartResult):
# Existing logic for PartResult (from processDocumentsPerChunk)
# Phase 7: Add originalIndex for explicit ordering
for i, part_result in enumerate(partResults):
content_part = ContentPart(
id=part_result.originalPart.id,
parentId=part_result.originalPart.parentId,
label=part_result.originalPart.label,
typeGroup=part_result.originalPart.typeGroup, # Use original typeGroup
mimeType=part_result.originalPart.mimeType,
data=part_result.aiResult, # Use AI result as data
metadata={
**part_result.originalPart.metadata,
"aiResult": True,
"originalIndex": i, # Phase 7: Explicit order index
"partIndex": part_result.partIndex,
"processingOrder": i, # Phase 7: Processing order
"documentId": part_result.documentId,
"processingTime": part_result.processingTime,
"success": part_result.metadata.get("success", False)
}
)
content_parts.append(content_part)
elif isinstance(partResults[0], AiCallResponse):
# Logic from interfaceAiObjects (from content parts processing)
# Phase 7: Add originalIndex for explicit ordering
# REQUIRED: originalContentParts must be provided for AiCallResponse path to preserve typeGroup
if not originalContentParts:
raise ValueError("originalContentParts is required when merging AiCallResponse objects. All callers must provide the original ContentPart objects to preserve typeGroup.")
for i, result in enumerate(partResults):
if result.content:
# Handle one-to-many relationships (e.g., chunking: 1 contentPart -> N chunkResults)
# If we have fewer originalContentParts than partResults, use the first one for all
if i < len(originalContentParts):
originalPart = originalContentParts[i]
else:
# One-to-many: use first originalContentPart for remaining results
originalPart = originalContentParts[0]
originalTypeGroup = originalPart.typeGroup or "text"
originalMimeType = originalPart.mimeType or "text/plain"
originalLabel = originalPart.label or f"ai_result_{i}"
content_part = ContentPart(
id=str(uuid.uuid4()),
parentId=None,
label=originalLabel,
typeGroup=originalTypeGroup, # Preserve original typeGroup from originalContentParts
mimeType=originalMimeType,
data=result.content,
metadata={
"aiResult": True,
"originalIndex": i, # Phase 7: Explicit order index
"processingOrder": i, # Phase 7: Processing order
"modelName": result.modelName,
"priceCHF": result.priceCHF,
"processingTime": result.processingTime,
"bytesSent": result.bytesSent,
"bytesReceived": result.bytesReceived
}
)
content_parts.append(content_part)
return content_parts
def mergePartResults(
self,
partResults: Union[List[PartResult], List[AiCallResponse]],
options: Optional[AiCallOptions] = None,
originalContentParts: Optional[List[ContentPart]] = None
) -> str:
"""Unified merge for both PartResult and AiCallResponse.
Consolidated from both interfaceAiObjects.py and existing serviceExtraction method.
Args:
partResults: List of PartResult or AiCallResponse objects to merge
options: Optional AiCallOptions for merge strategy
originalContentParts: Optional list of original ContentPart objects to preserve typeGroup
"""
if not partResults:
return ""
# Convert to ContentParts using unified helper, preserving original typeGroup
content_parts = self._convertToContentParts(partResults, originalContentParts)
# Determine merge strategy based on input type
if isinstance(partResults[0], PartResult):
# Phase 7: Use originalIndex for explicit ordering
# Use strategy for extraction workflow (group by document, order by originalIndex)
merge_strategy = MergeStrategy(
useIntelligentMerging=True,
groupBy="documentId", # Group by document
orderBy="originalIndex", # Phase 7: Order by originalIndex instead of partIndex
mergeType="concatenate"
)
else:
# Default strategy for content parts workflow
merge_strategy = MergeStrategy(
useIntelligentMerging=True,
groupBy="typeGroup",
orderBy="id",
mergeType="concatenate"
)
# Check if this is an elements response format (elements array structure)
# This is used for section content generation where multiple ContentParts are processed
isElementsResponse = self._isElementsResponse(content_parts)
if isElementsResponse:
# Merge JSON elements responses intelligently (merge tables, combine elements)
logger.info(f"Detected 'elements' JSON response format - merging {len(content_parts)} JSON responses")
merged_json = self._mergeElementsResponses(content_parts)
merged_json_str = json.dumps(merged_json, indent=2, ensure_ascii=False)
logger.info(f"Successfully merged 'elements' JSON responses into single unified JSON ({len(merged_json_str)} chars)")
return merged_json_str
# Check if this is a JSON extraction response format (extracted_content structure)
# If so, merge JSON structures intelligently before applying regular merging
isJsonExtractionResponse = self._isJsonExtractionResponse(content_parts)
if isJsonExtractionResponse:
# Merge JSON extraction responses intelligently
logger.info(f"Detected JSON extraction response format - merging {len(content_parts)} JSON responses")
merged_json = self._mergeJsonExtractionResponses(content_parts, originalContentParts)
merged_json_str = json.dumps(merged_json, indent=2, ensure_ascii=False)
logger.info(f"Successfully merged JSON extraction responses into single unified JSON ({len(merged_json_str)} chars)")
return merged_json_str
# Apply regular merging for non-JSON extraction responses
merged_parts = applyMerging(content_parts, merge_strategy)
# Phase 6: Enhanced format with metadata preservation
# CRITICAL: Don't add SOURCE markers for internal use - metadata is already preserved in ContentPart objects
# SOURCE markers should ONLY be added when content is returned directly to user for display/debugging
# For extraction content used in generation pipelines, metadata is in ContentPart.metadata, not in text markers
# Check if this is a generation response by looking at operationType or content structure
isGenerationResponse = False
if options and hasattr(options, 'operationType'):
# Generation responses use DATA_GENERATE operation type
isGenerationResponse = options.operationType == OperationTypeEnum.DATA_GENERATE
# Also check if content looks like JSON (starts with { or [)
if not isGenerationResponse and merged_parts:
firstPartData = merged_parts[0].data if merged_parts[0].data else ""
if isinstance(firstPartData, str) and firstPartData.strip().startswith(('{', '[')):
# Check if it's a complete JSON structure (not extracted content)
# Generation responses are complete JSON, extraction responses are text content
try:
# json is already imported at module level
json.loads(firstPartData.strip())
# If it parses as JSON and has "documents" key, it's likely a generation response
parsed = json.loads(firstPartData.strip())
if isinstance(parsed, dict) and "documents" in parsed:
isGenerationResponse = True
except:
pass
# ROOT CAUSE FIX: Never add SOURCE markers - metadata is preserved in ContentPart.metadata
# SOURCE markers pollute content and cause issues when content is used in generation pipelines
# If traceability is needed, use ContentPart.metadata fields (documentId, documentMimeType, label, etc.)
content_sections = []
for part in merged_parts:
# Always return clean content without SOURCE markers
# Metadata is available in ContentPart.metadata for traceability
content_sections.append(part.data if part.data else "")
final_content = "\n\n".join(content_sections)
logger.info(f"Merged {len(partResults)} parts using unified merging system with metadata preservation (generationResponse={isGenerationResponse})")
return final_content.strip()
def _isJsonExtractionResponse(self, content_parts: List[ContentPart]) -> bool:
"""Check if contentParts contain JSON extraction responses (extracted_content format)."""
if not content_parts:
return False
# Check first part to see if it's JSON extraction response format
firstPartData = content_parts[0].data if content_parts[0].data else ""
if not isinstance(firstPartData, str):
return False
# Strip markdown code fences (```json ... ```) before checking
strippedData = stripCodeFences(firstPartData.strip())
# Check if it starts with JSON object/array
if not strippedData.startswith(('{', '[')):
return False
try:
parsed = json.loads(strippedData)
# Check if it has the extraction response structure: {"extracted_content": {...}}
if isinstance(parsed, dict) and "extracted_content" in parsed:
return True
except:
pass
return False
def _isElementsResponse(self, content_parts: List[ContentPart]) -> bool:
"""Check if contentParts contain JSON responses with an 'elements' array (e.g., section content)."""
if not content_parts:
return False
firstPartData = content_parts[0].data if content_parts[0].data else ""
if not isinstance(firstPartData, str):
return False
strippedData = stripCodeFences(firstPartData.strip())
if not strippedData.startswith(('{', '[')):
return False
try:
parsed = json.loads(strippedData)
if isinstance(parsed, dict) and "elements" in parsed and isinstance(parsed["elements"], list):
return True
except:
pass
return False
def _mergeElementsResponses(self, content_parts: List[ContentPart]) -> Dict[str, Any]:
"""Merge multiple JSON responses with an 'elements' array into one unified response.
Specifically designed to merge tables within the 'elements' array.
Empty tables (no rows) are ignored if a table with the same headers already has data.
"""
merged_elements = []
table_headers_map: Dict[str, List[Dict[str, Any]]] = {} # headers_tuple -> [table_contents]
for part in content_parts:
if not part.data:
continue
# Handle multiple JSON blocks in a single response (separated by ---)
partDataBlocks = part.data.split('---')
for blockData in partDataBlocks:
if not blockData.strip():
continue
try:
strippedData = stripCodeFences(blockData.strip())
if not strippedData:
continue
parsed = json.loads(strippedData)
if isinstance(parsed, dict) and "elements" in parsed and isinstance(parsed["elements"], list):
for element in parsed["elements"]:
if isinstance(element, dict) and element.get("type") == "table" and "content" in element:
table_content = element["content"]
headers = table_content.get("headers", [])
rows = table_content.get("rows", [])
if headers:
headers_key = tuple(headers)
# If table has no rows, only add it if no table with these headers exists yet
if not rows:
if headers_key not in table_headers_map:
# No table with these headers exists - keep empty table for now
table_headers_map[headers_key] = []
# If a table with these headers already exists (with or without data), skip empty table
continue
# Table has rows - add to merge map
if headers_key not in table_headers_map:
table_headers_map[headers_key] = []
table_headers_map[headers_key].append(table_content)
else:
# Keep non-table elements as is, but avoid duplicates if possible
if element not in merged_elements:
merged_elements.append(element)
except Exception as e:
logger.warning(f"Failed to parse JSON elements response from part {part.id}: {str(e)}")
continue
# Merge tables by headers - combine rows from tables with same headers
for headers_key, tables in table_headers_map.items():
if not tables:
# Only empty tables with these headers - skip them
continue
all_rows = []
for table_content in tables:
rows = table_content.get("rows", [])
all_rows.extend(rows)
# Only add table if it has rows
if all_rows:
merged_elements.append({
"type": "table",
"content": {
"headers": list(headers_key),
"rows": all_rows
}
})
return {"elements": merged_elements}
def _mergeJsonExtractionResponses(self, content_parts: List[ContentPart], originalContentParts: Optional[List[ContentPart]] = None) -> Dict[str, Any]:
"""Merge multiple JSON extraction responses into one unified response.
Merges:
- Tables: Combines all table rows, preserves headers (duplicates preserved)
- Text: Combines all text blocks
- Headings: Combines all headings arrays
- Lists: Combines all list items
- Images: Combines all image descriptions
"""
merged = {
"extracted_content": {
"text": "",
"tables": [],
"headings": [],
"lists": [],
"images": []
}
}
# Track table headers to merge tables with same structure
table_headers_map: Dict[str, List[Dict[str, Any]]] = {} # headers_tuple -> [tables]
all_text_parts = []
all_headings = []
all_lists = []
all_images = []
# Collect per-part extracted data for debug file
per_part_extracted_data = []
# Track original parts and their extracted data
original_parts_extracted_data = []
for part_idx, part in enumerate(content_parts, 1):
logger.info(f"=== Processing ContentPart {part_idx}/{len(content_parts)}: id={part.id}, label={part.label}, typeGroup={part.typeGroup} ===")
if not part.data:
logger.warning(f"ContentPart {part.id} has no data, skipping")
continue
# Handle multiple JSON blocks in a single response (separated by ---)
# Split by --- to handle multiple JSON blocks per ContentPart
partDataBlocks = part.data.split('---')
logger.debug(f"ContentPart {part.id}: Found {len(partDataBlocks)} JSON block(s) (split by ---)")
for block_idx, blockData in enumerate(partDataBlocks, 1):
if not blockData.strip():
continue
try:
# Strip markdown code fences before parsing
strippedData = stripCodeFences(blockData.strip())
if not strippedData:
logger.debug(f"ContentPart {part.id}, Block {block_idx}: Empty after stripping code fences")
continue
parsed = json.loads(strippedData)
if not isinstance(parsed, dict) or "extracted_content" not in parsed:
logger.debug(f"ContentPart {part.id}, Block {block_idx}: Not a valid extraction response format")
continue
extracted = parsed["extracted_content"]
# Find corresponding original part (if available)
original_part = None
if originalContentParts and part_idx <= len(originalContentParts):
original_part = originalContentParts[part_idx - 1]
elif originalContentParts and len(originalContentParts) > 0:
# Handle one-to-many (chunking) - use first original part
original_part = originalContentParts[0]
# Store extracted data for this part/block for debug file
part_extracted = {
"contentPartId": part.id,
"contentPartLabel": part.label,
"contentPartTypeGroup": part.typeGroup,
"blockIndex": block_idx,
"extracted_content": extracted.copy() # Store full extracted content
}
per_part_extracted_data.append(part_extracted)
# Store original part extracted data
if original_part:
# Extract text from extracted_content for display
extracted_text = extracted.get("text", "") if isinstance(extracted.get("text"), str) else ""
if not extracted_text and extracted.get("tables"):
# If no text but has tables, create a text representation
table_texts = []
for table in extracted.get("tables", []):
if isinstance(table, dict):
headers = table.get("headers", [])
rows = table.get("rows", [])
if headers and rows:
table_texts.append(f"Table: {', '.join(headers)}\nRows: {len(rows)}")
extracted_text = "\n".join(table_texts) if table_texts else ""
original_part_data = {
"id": original_part.id,
"typeGroup": original_part.typeGroup,
"mimeType": original_part.mimeType or "text/plain",
"label": original_part.label,
"dataLength": len(extracted_text),
"metadata": {
"documentId": original_part.metadata.get("documentId") if original_part.metadata else None,
"documentMimeType": original_part.metadata.get("documentMimeType") if original_part.metadata else None,
"originalFileName": original_part.metadata.get("originalFileName") if original_part.metadata else None,
},
"data": extracted_text, # Full extracted text
"extracted_content": extracted.copy() # Full extracted content structure
}
original_parts_extracted_data.append(original_part_data)
# Log extracted content summary
extracted_summary = {
"text": len(extracted.get("text", "")) if extracted.get("text") else 0,
"tables": len(extracted.get("tables", [])) if isinstance(extracted.get("tables"), list) else 0,
"headings": len(extracted.get("headings", [])) if isinstance(extracted.get("headings"), list) else 0,
"lists": len(extracted.get("lists", [])) if isinstance(extracted.get("lists"), list) else 0,
"images": len(extracted.get("images", [])) if isinstance(extracted.get("images"), list) else 0,
}
logger.info(f"ContentPart {part.id}, Block {block_idx} extracted: text={extracted_summary['text']} chars, tables={extracted_summary['tables']}, headings={extracted_summary['headings']}, lists={extracted_summary['lists']}, images={extracted_summary['images']}")
# Log table details
if extracted_summary['tables'] > 0:
for table_idx, table in enumerate(extracted.get("tables", []), 1):
if isinstance(table, dict):
headers = table.get("headers", [])
rows = table.get("rows", [])
logger.info(f" Table {table_idx}: headers={headers}, rows={len(rows) if isinstance(rows, list) else 0}")
# Log list details
if extracted_summary['lists'] > 0:
for list_idx, list_item in enumerate(extracted.get("lists", []), 1):
if isinstance(list_item, dict):
list_type = list_item.get("type", "unknown")
items = list_item.get("items", [])
logger.info(f" List {list_idx}: type={list_type}, items={len(items) if isinstance(items, list) else 0}")
# Merge text
if "text" in extracted and extracted["text"]:
text_content = extracted["text"].strip()
if text_content:
all_text_parts.append(text_content)
# Merge tables - group by headers to merge compatible tables
if "tables" in extracted and isinstance(extracted["tables"], list):
for table in extracted["tables"]:
if not isinstance(table, dict) or "headers" not in table or "rows" not in table:
continue
headers = table["headers"]
rows = table["rows"]
if not headers or not rows:
continue
# Use headers as key for grouping
headers_key = tuple(headers)
if headers_key not in table_headers_map:
table_headers_map[headers_key] = []
table_headers_map[headers_key].append(table)
# Merge headings
if "headings" in extracted and isinstance(extracted["headings"], list):
for heading in extracted["headings"]:
if isinstance(heading, dict) and "text" in heading:
all_headings.append(heading)
# Merge lists
if "lists" in extracted and isinstance(extracted["lists"], list):
for list_item in extracted["lists"]:
if isinstance(list_item, dict) and "items" in list_item:
all_lists.append(list_item)
# Merge images
if "images" in extracted and isinstance(extracted["images"], list):
for image in extracted["images"]:
if isinstance(image, dict) and "description" in image:
all_images.append(image)
except Exception as e:
logger.warning(f"Failed to parse JSON extraction response block from part {part.id}: {str(e)}")
continue
# Combine text parts
if all_text_parts:
merged["extracted_content"]["text"] = "\n\n".join(all_text_parts)
# Merge tables by headers - combine rows from tables with same headers
for headers_key, tables in table_headers_map.items():
# Collect all rows from tables with same headers
all_rows = []
for table in tables:
rows = table.get("rows", [])
all_rows.extend(rows)
# Create merged table
if all_rows:
merged["extracted_content"]["tables"].append({
"headers": list(headers_key),
"rows": all_rows
})
# Add headings
if all_headings:
merged["extracted_content"]["headings"] = all_headings
# Add lists - keep them separate (like headings) to preserve document structure
if all_lists:
merged["extracted_content"]["lists"] = all_lists
# Add images
if all_images:
merged["extracted_content"]["images"] = all_images
logger.info(f"=== Merging Summary ===")
logger.info(f"Total ContentParts processed: {len(content_parts)}")
logger.info(f"Text parts collected: {len(all_text_parts)}")
logger.info(f"Table groups (by headers): {len(table_headers_map)}")
logger.info(f"Headings collected: {len(all_headings)}")
logger.info(f"Lists collected: {len(all_lists)}")
logger.info(f"Images collected: {len(all_images)}")
# Log table merging details
for headers_key, tables in table_headers_map.items():
total_rows = sum(len(table.get("rows", [])) for table in tables)
logger.info(f" Table group with headers {list(headers_key)}: {len(tables)} table(s), {total_rows} total rows")
logger.info(f"Merged JSON extraction responses: {len(table_headers_map)} table groups, {len(all_text_parts)} text parts, {len(all_headings)} headings, {len(all_lists)} lists, {len(all_images)} images")
# Write per-part extracted data to debug file
if per_part_extracted_data and self.services and hasattr(self.services, 'utils') and hasattr(self.services.utils, 'writeDebugFile'):
try:
debug_content = {
"summary": {
"totalContentParts": len(content_parts),
"totalExtractedBlocks": len(per_part_extracted_data),
"mergedResult": {
"textParts": len(all_text_parts),
"tableGroups": len(table_headers_map),
"headings": len(all_headings),
"lists": len(all_lists),
"images": len(all_images)
}
},
"perPartExtractedData": per_part_extracted_data
}
debug_json = json.dumps(debug_content, indent=2, ensure_ascii=False)
self.services.utils.writeDebugFile(debug_json, "content_extraction_per_part")
logger.info(f"Wrote per-part extracted data to debug file: {len(per_part_extracted_data)} blocks from {len(content_parts)} content parts")
except Exception as e:
logger.warning(f"Failed to write per-part extracted data to debug file: {str(e)}")
# Write original parts extracted data in extraction_result format
if original_parts_extracted_data and self.services and hasattr(self.services, 'utils') and hasattr(self.services.utils, 'writeDebugFile'):
try:
# Get document info from first original part if available
document_name = None
document_mime_type = None
if originalContentParts and len(originalContentParts) > 0:
first_part = originalContentParts[0]
if first_part.metadata:
document_name = first_part.metadata.get("originalFileName")
document_mime_type = first_part.metadata.get("documentMimeType")
# Format similar to extraction_result file
extraction_result_format = {
"documentName": document_name or "Unknown",
"documentMimeType": document_mime_type or "application/octet-stream",
"partsCount": len(original_parts_extracted_data),
"parts": []
}
for part_data in original_parts_extracted_data:
# Format each part similar to extraction_result format
formatted_part = {
"typeGroup": part_data["typeGroup"],
"mimeType": part_data["mimeType"],
"label": part_data["label"],
"dataLength": part_data["dataLength"],
"metadata": part_data["metadata"],
"data": part_data["data"], # Full extracted text
"extracted_content": part_data["extracted_content"] # Full structure
}
extraction_result_format["parts"].append(formatted_part)
result_json = json.dumps(extraction_result_format, indent=2, ensure_ascii=False)
self.services.utils.writeDebugFile(result_json, "content_extraction_original_parts")
logger.info(f"Wrote original parts extracted data to debug file: {len(original_parts_extracted_data)} original parts")
except Exception as e:
logger.warning(f"Failed to write original parts extracted data to debug file: {str(e)}")
return merged
async def chunkContentPartForAi(self, contentPart, model, options, prompt: str = "") -> List[Dict[str, Any]]:
"""Chunk a content part based on model capabilities, accounting for prompt, system message overhead, and maxTokens output.
Moved from interfaceAiObjects.py - model-aware chunking for AI processing.
Complementary to existing size-based chunking in extraction pipeline.
"""
# Calculate model-specific chunk sizes
modelContextTokens = model.contextLength # Total context in tokens
modelMaxOutputTokens = model.maxTokens # Maximum output tokens
# CRITICAL: Use same conservative token factor as in processContentPartWithFallback
# Real-world observation: Our calculation says 94k tokens, but API says 217k tokens (2.3x difference!)
TOKEN_SAFETY_FACTOR = 2.2 # Conservative: accounts for JSON tokenization and API overhead
# Reserve tokens for:
# 1. Prompt (user message) - use conservative factor
promptSize = len(prompt.encode('utf-8')) if prompt else 0
promptTokens = promptSize / TOKEN_SAFETY_FACTOR
# 2. System message wrapper ("Context from documents:\n")
systemMessageTokens = 10 # ~40 bytes = 10 tokens
# 3. Max output tokens (model will reserve space for completion)
outputTokens = modelMaxOutputTokens
# 4. JSON structure and message overhead (~100 tokens)
messageOverheadTokens = 100
# Total reserved tokens = input overhead + output reservation
totalReservedTokens = promptTokens + systemMessageTokens + messageOverheadTokens + outputTokens
# Available tokens for content = context length - reserved tokens
# Use 60% of available (same conservative margin as in processContentPartWithFallback)
availableContentTokens = int((modelContextTokens - totalReservedTokens) * 0.60)
# Ensure we have at least some space
if availableContentTokens < 100:
logger.warning(f"Very limited space for content: {availableContentTokens} tokens available. Model: {model.name}, contextLength: {modelContextTokens}, maxTokens: {modelMaxOutputTokens}, prompt: {promptTokens:.0f} tokens")
availableContentTokens = max(100, int(modelContextTokens * 0.1)) # Fallback to 10% of context
# Convert tokens to bytes using conservative factor (reverse: bytes = tokens * factor)
availableContentBytes = int(availableContentTokens * TOKEN_SAFETY_FACTOR)
logger.info(f"Chunking calculation for {model.name}: contextLength={modelContextTokens} tokens, maxTokens={modelMaxOutputTokens} tokens, prompt={promptTokens:.0f} tokens est., reserved={totalReservedTokens:.0f} tokens est., available={availableContentTokens} tokens est. ({availableContentBytes} bytes), factor={TOKEN_SAFETY_FACTOR}")
# Use 50% of available content bytes for text chunks (very conservative to ensure chunks fit)
# This ensures that even with token counting inaccuracies, chunks will fit
textChunkSize = int(availableContentBytes * 0.5)
structureChunkSize = int(availableContentBytes * 0.5) # CRITICAL: Also set for StructureChunker (JSON content)
tableChunkSize = int(availableContentBytes * 0.5) # Also set for TableChunker
imageChunkSize = int(availableContentBytes * 0.6) # 60% for image chunks
# Build chunking options - include ALL chunk size options for different chunkers
chunkingOptions = {
"textChunkSize": textChunkSize,
"structureChunkSize": structureChunkSize, # CRITICAL: Required for StructureChunker (JSON)
"tableChunkSize": tableChunkSize, # Required for TableChunker
"imageChunkSize": imageChunkSize,
"maxSize": availableContentBytes,
"chunkAllowed": True
}
logger.info(f"Chunking options: textChunkSize={textChunkSize} bytes, structureChunkSize={structureChunkSize} bytes, tableChunkSize={tableChunkSize} bytes, imageChunkSize={imageChunkSize} bytes, contentPartSize={len(contentPart.data.encode('utf-8')) if contentPart.data else 0} bytes")
# Get appropriate chunker (uses existing ChunkerRegistry ✅)
chunker = self._chunkerRegistry.resolve(contentPart.typeGroup)
if not chunker:
logger.warning(f"No chunker found for typeGroup: {contentPart.typeGroup}")
return []
# Chunk the content part
try:
contentSize = len(contentPart.data.encode('utf-8')) if contentPart.data else 0
logger.info(f"Chunking {contentPart.typeGroup} part: contentSize={contentSize} bytes, textChunkSize={textChunkSize} bytes, structureChunkSize={structureChunkSize} bytes")
chunks = chunker.chunk(contentPart, chunkingOptions)
logger.info(f"Created {len(chunks)} chunks for {contentPart.typeGroup} part (contentSize={contentSize} bytes)")
if chunks:
for i, chunk in enumerate(chunks):
chunkSize = len(chunk.get('data', '').encode('utf-8')) if chunk.get('data') else 0
logger.info(f" Chunk {i+1}/{len(chunks)}: {chunkSize} bytes")
return chunks
except Exception as e:
logger.error(f"Chunking failed for {contentPart.typeGroup}: {str(e)}")
return []
async def processContentPartWithFallback(self, contentPart, prompt: str, options, failoverModelList, aiObjects, progressCallback=None) -> AiCallResponse:
"""Process a single content part with model-aware chunking and fallback.
Moved from interfaceAiObjects.py - orchestrates chunking and merging.
Calls aiObjects._callWithModel() for actual AI calls.
"""
lastError = None
# Check if this is an image - Vision models need special handling
isImage = (contentPart.typeGroup == "image") or (contentPart.mimeType and contentPart.mimeType.startswith("image/"))
# Determine the correct operation type based on content type
actualOperationType = options.operationType
if isImage:
actualOperationType = OperationTypeEnum.IMAGE_ANALYSE
# Get vision-capable models for images
availableModels = modelRegistry.getAvailableModels()
visionFailoverList = modelSelector.getFailoverModelList(prompt, "", AiCallOptions(operationType=actualOperationType), availableModels)
if visionFailoverList:
logger.debug(f"Using {len(visionFailoverList)} vision-capable models for image processing")
failoverModelList = visionFailoverList
for attempt, model in enumerate(failoverModelList):
try:
logger.info(f"Processing content part with model: {model.name} (attempt {attempt + 1}/{len(failoverModelList)})")
# Special handling for images with Vision models
if isImage and hasattr(model, 'functionCall'):
try:
if not contentPart.data:
raise ValueError("Image content part has no data")
mimeType = contentPart.mimeType or "image/jpeg"
if not mimeType.startswith("image/"):
raise ValueError(f"Invalid mimeType for image: {mimeType}")
# Prepare base64 data
if isinstance(contentPart.data, str):
try:
base64.b64decode(contentPart.data, validate=True)
base64Data = contentPart.data
except Exception as e:
raise ValueError(f"Invalid base64 data in contentPart: {str(e)}")
elif isinstance(contentPart.data, bytes):
base64Data = base64.b64encode(contentPart.data).decode('utf-8')
else:
raise ValueError(f"Unsupported data type for image: {type(contentPart.data)}")
imageDataUrl = f"data:{mimeType};base64,{base64Data}"
modelCall = AiModelCall(
messages=[
{
"role": "user",
"content": [
{"type": "text", "text": prompt or ""},
{
"type": "image_url",
"image_url": {"url": imageDataUrl}
}
]
}
],
model=model,
options=AiCallOptions(operationType=actualOperationType)
)
modelResponse = await model.functionCall(modelCall)
if not modelResponse.success:
raise ValueError(f"Model call failed: {modelResponse.error}")
logger.info(f"✅ Image content part processed successfully with model: {model.name}")
processingTime = getattr(modelResponse, 'processingTime', None) or 0.0
return AiCallResponse(
content=modelResponse.content,
modelName=model.name,
provider=model.connectorType,
priceCHF=0.0,
processingTime=processingTime,
bytesSent=0,
bytesReceived=0,
errorCount=0
)
except Exception as e:
lastError = e
logger.warning(f"❌ Image processing failed with model {model.name}: {str(e)}")
if attempt < len(failoverModelList) - 1:
logger.info(f"🔄 Trying next fallback model for image processing...")
continue
else:
logger.error(f"💥 All {len(failoverModelList)} models failed for image processing")
raise
# For non-image parts, check if part fits in model context
partSize = len(contentPart.data.encode('utf-8')) if contentPart.data else 0
modelContextTokens = model.contextLength
modelMaxOutputTokens = model.maxTokens
promptTokens = len(prompt.encode('utf-8')) / 4 if prompt else 0
systemMessageTokens = 10
outputTokens = modelMaxOutputTokens
messageOverheadTokens = 100
totalReservedTokens = promptTokens + systemMessageTokens + messageOverheadTokens + outputTokens
availableContentTokens = int((modelContextTokens - totalReservedTokens) * 0.8)
if availableContentTokens < 100:
availableContentTokens = max(100, int(modelContextTokens * 0.1))
availableContentBytes = availableContentTokens * 4
# Also check prompt size - prompt + content together must fit
promptSize = len(prompt.encode('utf-8')) if prompt else 0
# CRITICAL: Token counting approximation is VERY inaccurate for JSON/content
# Real-world observation: Our calculation says 94k tokens, but API says 217k tokens (2.3x difference!)
# This happens because:
# 1. JSON/structured content tokenizes differently (more tokens per byte)
# 2. API has message structure overhead (system prompts, message wrappers)
# 3. Tokenizer differences between our approximation and actual API tokenizer
# Use conservative factor: 1 token ≈ 2.2 bytes (instead of 4) to account for these differences
TOKEN_SAFETY_FACTOR = 2.2 # Conservative: accounts for JSON tokenization and API overhead
promptTokens = promptSize / TOKEN_SAFETY_FACTOR
contentTokens = partSize / TOKEN_SAFETY_FACTOR
totalTokens = promptTokens + contentTokens
# CRITICAL: Use very conservative margin (60%) because:
# 1. Token counting approximation is inaccurate - real tokens can be 2-3x more
# 2. API has additional overhead (message structure, system prompts, etc.)
# 3. Anthropic API is strict about the 200k limit
# 4. We've seen cases where our calculation says "fits" but API says "too long"
maxTotalTokens = int(modelContextTokens * 0.60)
logger.info(f"Size check for {model.name}: partSize={partSize} bytes ({contentTokens:.0f} tokens est.), promptSize={promptSize} bytes ({promptTokens:.0f} tokens est.), total={totalTokens:.0f} tokens est., modelContext={modelContextTokens} tokens, maxTotal={maxTotalTokens} tokens (60% margin, conservative factor={TOKEN_SAFETY_FACTOR})")
# CRITICAL: Always check totalTokens first - if prompt + content exceeds limit, MUST chunk
# Token counting approximation may differ significantly from API, so use very conservative margin
if totalTokens > maxTotalTokens:
logger.warning(f"⚠️ Total tokens ({totalTokens:.0f} est.) exceed model limit ({maxTotalTokens}), chunking required. Prompt: {promptTokens:.0f} tokens est., Content: {contentTokens:.0f} tokens est.")
elif partSize > availableContentBytes:
logger.warning(f"⚠️ Content part ({contentTokens:.0f} tokens est.) exceeds available space ({availableContentBytes/TOKEN_SAFETY_FACTOR:.0f} tokens est.), chunking required")
# If either condition fails, chunk the content
# CRITICAL: IMAGE_GENERATE operations should NOT use chunking - they generate images from prompts, not process content chunks
if (totalTokens > maxTotalTokens or partSize > availableContentBytes) and options.operationType != OperationTypeEnum.IMAGE_GENERATE:
# Part too large or total exceeds limit - chunk it (but not for image generation)
chunks = await self.chunkContentPartForAi(contentPart, model, options, prompt)
if not chunks:
raise ValueError(f"Failed to chunk content part for model {model.name}")
logger.info(f"Starting to process {len(chunks)} chunks with model {model.name}")
if progressCallback:
progressCallback(0.0, f"Starting to process {len(chunks)} chunks")
chunkResults = []
for idx, chunk in enumerate(chunks):
chunkNum = idx + 1
chunkData = chunk.get('data', '')
logger.info(f"Processing chunk {chunkNum}/{len(chunks)} with model {model.name}")
if progressCallback:
progressCallback(chunkNum / len(chunks), f"Processing chunk {chunkNum}/{len(chunks)}")
try:
chunkResponse = await aiObjects._callWithModel(model, prompt, chunkData, options)
chunkResults.append(chunkResponse)
except Exception as chunkError:
logger.error(f"Error processing chunk {chunkNum}/{len(chunks)}: {str(chunkError)}")
# Continue with other chunks even if one fails
continue
# Merge chunk results
if not chunkResults:
raise ValueError(f"All chunks failed for content part")
# Pass original contentPart to preserve typeGroup for all chunks (one-to-many: 1 part -> N chunks)
mergedContent = self.mergePartResults(chunkResults, options, [contentPart])
return AiCallResponse(
content=mergedContent,
modelName=model.name,
provider=model.connectorType,
priceCHF=sum(r.priceCHF for r in chunkResults),
processingTime=sum(r.processingTime for r in chunkResults),
bytesSent=sum(r.bytesSent for r in chunkResults),
bytesReceived=sum(r.bytesReceived for r in chunkResults),
errorCount=sum(r.errorCount for r in chunkResults)
)
else:
# Part fits - call AI directly via aiObjects interface
logger.info(f"✅ Content part fits within model limits, processing directly")
response = await aiObjects._callWithModel(model, prompt, contentPart.data, options)
logger.info(f"✅ Content part processed successfully with model: {model.name}")
return response
except Exception as e:
lastError = e
error_msg = str(e) if str(e) else f"{type(e).__name__}"
logger.warning(f"❌ Model {model.name} failed for content part: {error_msg}", exc_info=True)
if attempt < len(failoverModelList) - 1:
logger.info(f"🔄 Trying next failover model...")
continue
else:
logger.error(f"💥 All {len(failoverModelList)} models failed for content part")
break
# All models failed
return self._createErrorResponse(f"All models failed: {str(lastError)}", 0, 0)
def _createErrorResponse(self, errorMsg: str, inputBytes: int, outputBytes: int) -> AiCallResponse:
"""Create an error response."""
return AiCallResponse(
content=errorMsg,
modelName="error",
priceCHF=0.0,
processingTime=0.0,
bytesSent=inputBytes,
bytesReceived=outputBytes,
errorCount=1
)
async def processContentPartsWithAi(
self,
request: AiCallRequest,
aiObjects, # Pass interface for AI calls
progressCallback=None
) -> AiCallResponse:
"""Process content parts with model-aware chunking and AI calls in parallel.
Moved from interfaceAiObjects.callWithContentParts() - entry point for content parts processing.
Uses parallel processing similar to section generation for better performance.
SPECIAL CASE: For DATA_EXTRACT operations, processes all contentParts together in ONE call
to enable proper merging (e.g., merging tables from multiple PDFs into one table).
"""
prompt = request.prompt
options = request.options
contentParts = request.contentParts
# Get failover models
availableModels = modelRegistry.getAvailableModels()
failoverModelList = modelSelector.getFailoverModelList(prompt, "", options, availableModels)
if not failoverModelList:
return self._createErrorResponse("No suitable models found", 0, 0)
totalParts = len(contentParts)
if totalParts == 0:
return self._createErrorResponse("No content parts to process", 0, 0)
# NOTE: For DATA_EXTRACT operations, the extraction prompt explicitly asks the AI to merge
# all contentParts into ONE unified JSON response. Even though we process parts separately,
# each response should contain merged content. The mergePartResults will concatenate responses,
# but the new prompt format (flat extracted_content structure) is designed for easier merging.
# DEFAULT: Process parts in parallel
# Thread-safe counter for progress tracking
completedCount = [0] # Use list to allow modification in nested function
# Process parts in parallel with concurrency control
maxConcurrent = 5
if options and hasattr(options, 'maxConcurrentParts'):
maxConcurrent = options.maxConcurrentParts
semaphore = asyncio.Semaphore(maxConcurrent)
async def processSinglePart(contentPart, partIndex: int) -> AiCallResponse:
"""Process a single content part with progress logging."""
async with semaphore:
partLabel = contentPart.label or f"Part {partIndex+1}"
partType = contentPart.typeGroup or "unknown"
# Log start of processing
if progressCallback:
progressCallback(0.1 + (partIndex / totalParts) * 0.8, f"Processing {partLabel} ({partType}) - {partIndex+1}/{totalParts}")
try:
# Process the part
partResult = await self.processContentPartWithFallback(
contentPart, prompt, options, failoverModelList, aiObjects, None # Don't pass progressCallback to avoid double logging
)
# Write debug files for generation phase (section content generation)
# Check for DATA_GENERATE or DATA_ANALYSE (used for section generation)
isGenerationPhase = False
if options and hasattr(options, 'operationType'):
isGenerationPhase = (options.operationType == OperationTypeEnum.DATA_GENERATE or
options.operationType == OperationTypeEnum.DATA_ANALYSE)
if isGenerationPhase:
if self.services and hasattr(self.services, 'utils') and hasattr(self.services.utils, 'writeDebugFile'):
try:
# Create debug filename with contentPart ID or label
partId = contentPart.id[:8] if contentPart.id else f"part_{partIndex+1}"
partLabelSafe = (contentPart.label or f"part_{partIndex+1}").replace(" ", "_").replace("/", "_").replace("\\", "_")[:30]
debugPrefix = f"generation_contentPart_{partId}_{partLabelSafe}"
# Write prompt
self.services.utils.writeDebugFile(prompt, f"{debugPrefix}_prompt")
# Write response
responseContent = partResult.content if partResult.content else ""
self.services.utils.writeDebugFile(responseContent, f"{debugPrefix}_response")
logger.debug(f"Wrote debug files for contentPart {partId} (generation): {debugPrefix}_prompt, {debugPrefix}_response")
except Exception as debugError:
logger.warning(f"Failed to write debug file for contentPart {contentPart.id}: {str(debugError)}")
# Update completed count and log progress
completedCount[0] += 1
if progressCallback:
progressCallback(0.1 + (completedCount[0] / totalParts) * 0.8, f"Completed {partLabel} ({partType}) - {completedCount[0]}/{totalParts}")
return partResult
except Exception as e:
# Update completed count even on error
completedCount[0] += 1
logger.error(f"Error processing part {partIndex+1} ({partLabel}): {str(e)}")
if progressCallback:
progressCallback(0.1 + (completedCount[0] / totalParts) * 0.8, f"Error processing {partLabel} ({partType}) - {completedCount[0]}/{totalParts}")
# Return error response
return self._createErrorResponse(f"Error processing part: {str(e)}", 0, 0)
# Create tasks for all parts
tasks = [processSinglePart(contentPart, i) for i, contentPart in enumerate(contentParts)]
# Execute all tasks in parallel with error handling
results = await asyncio.gather(*tasks, return_exceptions=True)
# Process results and handle exceptions
allResults = []
for i, result in enumerate(results):
if isinstance(result, Exception):
logger.error(f"Exception processing part {i+1}: {str(result)}")
allResults.append(self._createErrorResponse(f"Exception: {str(result)}", 0, 0))
elif result is not None:
allResults.append(result)
# Merge all results using unified mergePartResults, preserving original typeGroup
mergedContent = self.mergePartResults(allResults, options, contentParts)
return AiCallResponse(
content=mergedContent,
modelName="multiple",
priceCHF=sum(r.priceCHF for r in allResults),
processingTime=sum(r.processingTime for r in allResults),
bytesSent=sum(r.bytesSent for r in allResults),
bytesReceived=sum(r.bytesReceived for r in allResults),
errorCount=sum(r.errorCount for r in allResults)
)
# Module-level function for use by subPipeline and ExtractionService
def applyMerging(parts: List[ContentPart], strategy: MergeStrategy) -> List[ContentPart]:
"""Apply merging strategy to parts with intelligent token-aware merging.
Moved from interfaceAiObjects.py to resolve dependency violations.
Can be used as module-level function or called from ExtractionService methods.
"""
logger.debug(f"applyMerging called with {len(parts)} parts")
# Import merging dependencies (now local imports ✅)
from .merging.mergerText import TextMerger
from .merging.mergerTable import TableMerger
from .merging.mergerDefault import DefaultMerger
from .subMerger import IntelligentTokenAwareMerger
# Check if intelligent merging is enabled
if strategy.useIntelligentMerging:
modelCapabilities = strategy.capabilities or {}
subMerger = IntelligentTokenAwareMerger(modelCapabilities)
# Use intelligent merging for all parts
merged = subMerger.mergeChunksIntelligently(parts, strategy.prompt or "")
# Calculate and log optimization stats
stats = subMerger.calculateOptimizationStats(parts, merged)
logger.info(f"🧠 Intelligent merging stats: {stats}")
logger.debug(f"Intelligent merging: {stats['original_ai_calls']}{stats['optimized_ai_calls']} calls ({stats['reduction_percent']}% reduction)")
return merged
# Fallback to traditional merging
textMerger = TextMerger()
tableMerger = TableMerger()
defaultMerger = DefaultMerger()
# Group by typeGroup
textParts = [p for p in parts if p.typeGroup == "text"]
tableParts = [p for p in parts if p.typeGroup == "table"]
structureParts = [p for p in parts if p.typeGroup == "structure"]
otherParts = [p for p in parts if p.typeGroup not in ("text", "table", "structure")]
logger.debug(f"Grouped - text: {len(textParts)}, table: {len(tableParts)}, structure: {len(structureParts)}, other: {len(otherParts)}")
merged: List[ContentPart] = []
if textParts:
textMerged = textMerger.merge(textParts, strategy)
logger.debug(f"TextMerger merged {len(textParts)} parts into {len(textMerged)} parts")
merged.extend(textMerged)
if tableParts:
tableMerged = tableMerger.merge(tableParts, strategy)
logger.debug(f"TableMerger merged {len(tableParts)} parts into {len(tableMerged)} parts")
merged.extend(tableMerged)
if structureParts:
# For now, treat structure like text
structureMerged = textMerger.merge(structureParts, strategy)
logger.debug(f"StructureMerger merged {len(structureParts)} parts into {len(structureMerged)} parts")
merged.extend(structureMerged)
if otherParts:
otherMerged = defaultMerger.merge(otherParts, strategy)
logger.debug(f"DefaultMerger merged {len(otherParts)} parts into {len(otherMerged)} parts")
merged.extend(otherMerged)
logger.debug(f"applyMerging returning {len(merged)} parts")
return merged