gateway/modules/methods/methodDocument.py
2025-07-11 23:13:42 +02:00

187 lines
9 KiB
Python

"""
Document processing method module.
Handles document operations using the document service.
"""
import logging
from typing import Dict, Any, List, Optional
import uuid
from datetime import datetime, UTC
from modules.chat.methodBase import MethodBase, ActionResult, action
logger = logging.getLogger(__name__)
class MethodDocument(MethodBase):
"""Document method implementation for document operations"""
def __init__(self, serviceCenter: Any):
"""Initialize the document method"""
super().__init__(serviceCenter)
self.name = "document"
self.description = "Handle document operations like extraction and analysis"
@action
async def extract(self, parameters: Dict[str, Any]) -> ActionResult:
"""
Extract specific content from document with ai prompt and return it in the specified format
Parameters:
documentList (str): Reference to the document list to extract content from
aiPrompt (str): AI prompt for content extraction
includeMetadata (bool, optional): Whether to include metadata (default: True)
expectedDocumentFormats (list, optional): Expected document formats with extension, mimeType, description
"""
try:
documentList = parameters.get("documentList")
aiPrompt = parameters.get("aiPrompt")
includeMetadata = parameters.get("includeMetadata", True)
expectedDocumentFormats = parameters.get("expectedDocumentFormats", [])
if not documentList:
return self._createResult(
success=False,
data={},
error="Document list reference is required"
)
if not aiPrompt:
return self._createResult(
success=False,
data={},
error="AI prompt is required"
)
chatDocuments = self.service.getChatDocumentsFromDocumentList(documentList)
if not chatDocuments:
return self._createResult(
success=False,
data={},
error="No documents found for the provided reference"
)
# Determine output format based on expected formats
output_extension = ".txt" # Default
output_mime_type = "text/plain" # Default
if expectedDocumentFormats and len(expectedDocumentFormats) > 0:
# Use the first expected format
expected_format = expectedDocumentFormats[0]
output_extension = expected_format.get("extension", ".txt")
output_mime_type = expected_format.get("mimeType", "text/plain")
logger.info(f"Using expected format: {output_extension} ({output_mime_type})")
logger.info(f"Expected document formats: {expectedDocumentFormats}")
else:
logger.info("No expected format specified, using default .txt format")
# Enhance AI prompt to specify output format
enhanced_prompt = aiPrompt
if output_extension == ".csv":
enhanced_prompt += f"\n\nCRITICAL: Deliver the result as pure CSV data without any markdown formatting, code blocks, or additional text. Output only the CSV content with proper headers and data rows. Do not include ```csv or ``` markers."
elif output_extension == ".json":
enhanced_prompt += f"\n\nCRITICAL: Deliver the result as pure JSON data without any markdown formatting, code blocks, or additional text. Output only the JSON content. Do not include ```json or ``` markers."
elif output_extension == ".xml":
enhanced_prompt += f"\n\nCRITICAL: Deliver the result as pure XML data without any markdown formatting, code blocks, or additional text. Output only the XML content. Do not include ```xml or ``` markers."
elif output_extension != ".txt":
enhanced_prompt += f"\n\nCRITICAL: Deliver the result as pure {output_extension.upper()} data without any markdown formatting, code blocks, or additional text. Output only the {output_extension.upper()} content. Do not include any markdown markers."
# Extract content from all documents
all_extracted_content = []
file_infos = []
for chatDocument in chatDocuments:
fileId = chatDocument.fileId
file_data = self.service.getFileData(fileId)
file_info = self.service.getFileInfo(fileId)
if not file_data:
logger.warning(f"File not found or empty for fileId: {fileId}")
continue
extracted_content = await self.service.extractContentFromFileData(
prompt=enhanced_prompt, # Use enhanced prompt instead of original
fileData=file_data,
filename=file_info.get('name', 'document'),
mimeType=file_info.get('mimeType', 'application/octet-stream'),
base64Encoded=False,
documentId=chatDocument.id
)
all_extracted_content.append(extracted_content)
if includeMetadata:
file_infos.append(file_info)
if not all_extracted_content:
return self._createResult(
success=False,
data={},
error="No content could be extracted from any documents"
)
# Extract text content from ExtractedContent objects
text_contents = []
for content_obj in all_extracted_content:
if hasattr(content_obj, 'contents') and content_obj.contents:
# Extract text from ContentItem objects
for content_item in content_obj.contents:
if hasattr(content_item, 'data') and content_item.data:
text_contents.append(content_item.data)
elif isinstance(content_obj, str):
text_contents.append(content_obj)
else:
# Fallback: convert to string representation
text_contents.append(str(content_obj))
# Process each document individually and create separate output files
output_documents = []
for i, (chatDocument, extracted_content) in enumerate(zip(chatDocuments, all_extracted_content)):
# Extract text content from this document
text_content = ""
if hasattr(extracted_content, 'contents') and extracted_content.contents:
# Extract text from ContentItem objects
for content_item in extracted_content.contents:
if hasattr(content_item, 'data') and content_item.data:
text_content += content_item.data + "\n"
elif isinstance(extracted_content, str):
text_content = extracted_content
else:
# Fallback: convert to string representation
text_content = str(extracted_content)
# Create output filename based on original filename
original_filename = chatDocument.filename
base_name = original_filename.rsplit('.', 1)[0] if '.' in original_filename else original_filename
output_filename = f"{base_name}_extracted_{datetime.now(UTC).strftime('%Y%m%d_%H%M%S')}{output_extension}"
# Create result data for this document
result_data = {
"documentCount": 1,
"content": text_content,
"originalFilename": original_filename,
"fileInfos": [file_infos[i]] if includeMetadata and i < len(file_infos) else None,
"timestamp": datetime.now(UTC).isoformat()
}
logger.info(f"Created output document: {output_filename} with {len(text_content)} characters")
logger.info(f"Content preview: {text_content[:200]}...")
output_documents.append({
"documentName": output_filename,
"documentData": result_data,
"mimeType": output_mime_type
})
return self._createResult(
success=True,
data={
"documents": output_documents
}
)
except Exception as e:
logger.error(f"Error extracting content: {str(e)}")
return self._createResult(
success=False,
data={},
error=str(e)
)