# Copyright (c) 2025 Patrick Motsch # All rights reserved. """ Extract Content action for Context operations. Extracts content from documents (separate from AI calls). """ import logging import time from typing import Dict, Any from modules.workflows.methods.methodBase import action from modules.datamodels.datamodelChat import ActionResult, ActionDocument from modules.datamodels.datamodelDocref import DocumentReferenceList from modules.datamodels.datamodelExtraction import ExtractionOptions, MergeStrategy, ContentExtracted, ContentPart logger = logging.getLogger(__name__) @action async def extractContent(self, parameters: Dict[str, Any]) -> ActionResult: """ Extract raw content parts from documents without AI processing. This action performs pure content extraction WITHOUT AI/OCR processing. It returns ContentParts with different typeGroups: - "text": Extracted text from text-based formats (PDF text layers, Word docs, etc.) - "image": Images as base64-encoded data (NOT converted to text, no OCR) - "table": Tables as structured data - "structure": Structured content (JSON, etc.) - "container": Container elements (PDF pages, etc.) IMPORTANT: - Images are returned as base64 data, NOT as extracted text - No OCR is performed - images are preserved as visual elements - Text extraction only works for text-based formats (not images) - The extracted ContentParts can then be used by subsequent AI processing actions Parameters: - documentList (list, required): Document reference(s) to extract content from. - extractionOptions (dict, optional): Extraction options (if not provided, defaults are used). Returns: - ActionResult with ActionDocument containing ContentExtracted objects - ContentExtracted.parts contains List[ContentPart] with various typeGroups - Each ContentPart has a typeGroup indicating its type (text, image, table, etc.) """ try: # Init progress logger workflowId = self.services.workflow.id if self.services.workflow else f"no-workflow-{int(time.time())}" operationId = f"context_extract_{workflowId}_{int(time.time())}" # Extract documentList from parameters dict documentListParam = parameters.get("documentList") if not documentListParam: return ActionResult.isFailure(error="documentList is required") # Convert to DocumentReferenceList if needed if isinstance(documentListParam, DocumentReferenceList): documentList = documentListParam elif isinstance(documentListParam, str): documentList = DocumentReferenceList.from_string_list([documentListParam]) elif isinstance(documentListParam, list): documentList = DocumentReferenceList.from_string_list(documentListParam) else: return ActionResult.isFailure(error=f"Invalid documentList type: {type(documentListParam)}") # Start progress tracking parentOperationId = parameters.get('parentOperationId') self.services.chat.progressLogStart( operationId, "Extracting content from documents", "Content Extraction", f"Documents: {len(documentList.references)}", parentOperationId=parentOperationId ) # Get ChatDocuments from documentList self.services.chat.progressLogUpdate(operationId, 0.2, "Loading documents") chatDocuments = self.services.chat.getChatDocumentsFromDocumentList(documentList) if not chatDocuments: self.services.chat.progressLogFinish(operationId, False) return ActionResult.isFailure(error="No documents found in documentList") logger.info(f"Extracting content from {len(chatDocuments)} documents") # Prepare extraction options self.services.chat.progressLogUpdate(operationId, 0.3, "Preparing extraction options") extractionOptionsParam = parameters.get("extractionOptions") # Convert dict to ExtractionOptions object if needed, or create defaults if extractionOptionsParam: if isinstance(extractionOptionsParam, dict): # Ensure required fields are present if "prompt" not in extractionOptionsParam: extractionOptionsParam["prompt"] = "Extract all content from the document" if "mergeStrategy" not in extractionOptionsParam: extractionOptionsParam["mergeStrategy"] = MergeStrategy( mergeType="concatenate", groupBy="typeGroup", orderBy="id" ) # Convert dict to ExtractionOptions object try: extractionOptions = ExtractionOptions(**extractionOptionsParam) except Exception as e: logger.warning(f"Failed to create ExtractionOptions from dict: {str(e)}, using defaults") extractionOptions = None elif isinstance(extractionOptionsParam, ExtractionOptions): extractionOptions = extractionOptionsParam else: # Invalid type, use defaults logger.warning(f"Invalid extractionOptions type: {type(extractionOptionsParam)}, using defaults") extractionOptions = None else: extractionOptions = None # If extractionOptions not provided, create defaults if not extractionOptions: # Default extraction options for pure content extraction (no AI processing) extractionOptions = ExtractionOptions( prompt="Extract all content from the document", mergeStrategy=MergeStrategy( mergeType="concatenate", groupBy="typeGroup", orderBy="id" ), processDocumentsIndividually=True ) # Call extraction service with hierarchical progress logging self.services.chat.progressLogUpdate(operationId, 0.4, "Initiating") self.services.chat.progressLogUpdate(operationId, 0.5, f"Extracting content from {len(chatDocuments)} documents") # Pass operationId for hierarchical per-document progress logging extractedResults = self.services.extraction.extractContent(chatDocuments, extractionOptions, operationId=operationId) # Check if neutralization is enabled and should be applied automatically neutralizationEnabled = False try: config = self.services.neutralization.getConfig() neutralizationEnabled = config and config.enabled except Exception as e: logger.debug(f"Could not check neutralization config: {str(e)}") # Neutralize extracted data if enabled (for dynamic mode: after extraction, before AI processing) if neutralizationEnabled: self.services.chat.progressLogUpdate(operationId, 0.7, "Neutralizing extracted data") logger.info("Neutralization enabled - neutralizing extracted content data") # Neutralize each ContentExtracted result for extracted in extractedResults: if extracted.parts: neutralizedParts = [] for part in extracted.parts: if not isinstance(part, ContentPart): # Try to parse as ContentPart if it's a dict if isinstance(part, dict): try: part = ContentPart(**part) except Exception as e: logger.warning(f"Could not parse ContentPart: {str(e)}") neutralizedParts.append(part) continue else: neutralizedParts.append(part) continue # Neutralize the data field if it contains text if part.data: try: # Call neutralization service neutralizationResult = self.services.neutralization.processText(part.data) if neutralizationResult and 'neutralized_text' in neutralizationResult: # Replace data with neutralized text neutralizedData = neutralizationResult['neutralized_text'] # Create new ContentPart with neutralized data neutralizedPart = ContentPart( id=part.id, parentId=part.parentId, label=part.label, typeGroup=part.typeGroup, mimeType=part.mimeType, data=neutralizedData, metadata=part.metadata.copy() if part.metadata else {} ) neutralizedParts.append(neutralizedPart) else: # Neutralization failed, use original part logger.warning(f"Neutralization did not return neutralized_text for part {part.id}") neutralizedParts.append(part) except Exception as e: logger.error(f"Error neutralizing part {part.id}: {str(e)}") # On error, use original part neutralizedParts.append(part) else: # No data to neutralize, keep original part neutralizedParts.append(part) # Update extracted result with neutralized parts extracted.parts = neutralizedParts logger.info(f"Neutralized {len(neutralizedParts)} content parts") # Build ActionDocuments from ContentExtracted results self.services.chat.progressLogUpdate(operationId, 0.8, "Building result documents") actionDocuments = [] # Map extracted results back to original documents by index (results are in same order) for i, extracted in enumerate(extractedResults): # Get original document name if available originalDoc = chatDocuments[i] if i < len(chatDocuments) else None if originalDoc and hasattr(originalDoc, 'fileName') and originalDoc.fileName: # Use original filename with "extracted_" prefix baseName = originalDoc.fileName.rsplit('.', 1)[0] if '.' in originalDoc.fileName else originalDoc.fileName documentName = f"{baseName}_extracted_{extracted.id}.json" else: # Fallback to generic name with index documentName = f"document_{i+1:03d}_extracted_{extracted.id}.json" # Store ContentExtracted object in ActionDocument.documentData validationMetadata = { "actionType": "context.extractContent", "documentIndex": i, "extractedId": extracted.id, "partCount": len(extracted.parts) if extracted.parts else 0, "neutralized": neutralizationEnabled, "originalFileName": originalDoc.fileName if originalDoc and hasattr(originalDoc, 'fileName') else None } actionDoc = ActionDocument( documentName=documentName, documentData=extracted, # ContentExtracted object mimeType="application/json", validationMetadata=validationMetadata ) actionDocuments.append(actionDoc) self.services.chat.progressLogFinish(operationId, True) return ActionResult.isSuccess(documents=actionDocuments) except Exception as e: logger.error(f"Error in content extraction: {str(e)}") # Complete progress tracking with failure try: self.services.chat.progressLogFinish(operationId, False) except: pass # Don't fail on progress logging errors return ActionResult.isFailure(error=str(e))