# Copyright (c) 2025 Patrick Motsch # All rights reserved. """ Extract Content action for Context operations. Extracts content from documents (separate from AI calls). """ import logging import time from typing import Dict, Any from modules.workflows.methods.methodBase import action from modules.datamodels.datamodelChat import ActionResult, ActionDocument from modules.datamodels.datamodelDocref import DocumentReferenceList from modules.datamodels.datamodelExtraction import ExtractionOptions, MergeStrategy logger = logging.getLogger(__name__) @action async def extractContent(self, parameters: Dict[str, Any]) -> ActionResult: """ Extract content from documents (separate from AI calls). This action performs pure content extraction without AI processing. The extracted ContentParts can then be used by subsequent AI processing actions. Parameters: - documentList (list, required): Document reference(s) to extract content from. - extractionOptions (dict, optional): Extraction options (if not provided, defaults are used). Returns: - ActionResult with ActionDocument containing ContentExtracted objects - ContentExtracted.parts contains List[ContentPart] (already chunked if needed) """ try: # Init progress logger workflowId = self.services.workflow.id if self.services.workflow else f"no-workflow-{int(time.time())}" operationId = f"context_extract_{workflowId}_{int(time.time())}" # Extract documentList from parameters dict documentListParam = parameters.get("documentList") if not documentListParam: return ActionResult.isFailure(error="documentList is required") # Convert to DocumentReferenceList if needed if isinstance(documentListParam, DocumentReferenceList): documentList = documentListParam elif isinstance(documentListParam, str): documentList = DocumentReferenceList.from_string_list([documentListParam]) elif isinstance(documentListParam, list): documentList = DocumentReferenceList.from_string_list(documentListParam) else: return ActionResult.isFailure(error=f"Invalid documentList type: {type(documentListParam)}") # Start progress tracking parentOperationId = parameters.get('parentOperationId') self.services.chat.progressLogStart( operationId, "Extracting content from documents", "Content Extraction", f"Documents: {len(documentList.references)}", parentOperationId=parentOperationId ) # Get ChatDocuments from documentList self.services.chat.progressLogUpdate(operationId, 0.2, "Loading documents") chatDocuments = self.services.chat.getChatDocumentsFromDocumentList(documentList) if not chatDocuments: self.services.chat.progressLogFinish(operationId, False) return ActionResult.isFailure(error="No documents found in documentList") logger.info(f"Extracting content from {len(chatDocuments)} documents") # Prepare extraction options self.services.chat.progressLogUpdate(operationId, 0.3, "Preparing extraction options") extractionOptionsParam = parameters.get("extractionOptions") # Convert dict to ExtractionOptions object if needed, or create defaults if extractionOptionsParam: if isinstance(extractionOptionsParam, dict): # Convert dict to ExtractionOptions object extractionOptions = ExtractionOptions(**extractionOptionsParam) elif isinstance(extractionOptionsParam, ExtractionOptions): extractionOptions = extractionOptionsParam else: # Invalid type, use defaults extractionOptions = None else: extractionOptions = None # If extractionOptions not provided, create defaults if not extractionOptions: # Default extraction options for pure content extraction (no AI processing) extractionOptions = ExtractionOptions( prompt="Extract all content from the document", mergeStrategy=MergeStrategy( mergeType="concatenate", groupBy="typeGroup", orderBy="id" ), processDocumentsIndividually=True ) # Call extraction service with hierarchical progress logging self.services.chat.progressLogUpdate(operationId, 0.4, "Initiating") self.services.chat.progressLogUpdate(operationId, 0.5, f"Extracting content from {len(chatDocuments)} documents") # Pass operationId for hierarchical per-document progress logging extractedResults = self.services.extraction.extractContent(chatDocuments, extractionOptions, operationId=operationId) # Build ActionDocuments from ContentExtracted results self.services.chat.progressLogUpdate(operationId, 0.8, "Building result documents") actionDocuments = [] # Map extracted results back to original documents by index (results are in same order) for i, extracted in enumerate(extractedResults): # Get original document name if available originalDoc = chatDocuments[i] if i < len(chatDocuments) else None if originalDoc and hasattr(originalDoc, 'fileName') and originalDoc.fileName: # Use original filename with "extracted_" prefix baseName = originalDoc.fileName.rsplit('.', 1)[0] if '.' in originalDoc.fileName else originalDoc.fileName documentName = f"{baseName}_extracted_{extracted.id}.json" else: # Fallback to generic name with index documentName = f"document_{i+1:03d}_extracted_{extracted.id}.json" # Store ContentExtracted object in ActionDocument.documentData validationMetadata = { "actionType": "context.extractContent", "documentIndex": i, "extractedId": extracted.id, "partCount": len(extracted.parts) if extracted.parts else 0, "originalFileName": originalDoc.fileName if originalDoc and hasattr(originalDoc, 'fileName') else None } actionDoc = ActionDocument( documentName=documentName, documentData=extracted, # ContentExtracted object mimeType="application/json", validationMetadata=validationMetadata ) actionDocuments.append(actionDoc) self.services.chat.progressLogFinish(operationId, True) return ActionResult.isSuccess(documents=actionDocuments) except Exception as e: logger.error(f"Error in content extraction: {str(e)}") # Complete progress tracking with failure try: self.services.chat.progressLogFinish(operationId, False) except: pass # Don't fail on progress logging errors return ActionResult.isFailure(error=str(e))