gateway/modules/methods/methodDocument.py
2025-08-16 23:32:36 +02:00

705 lines
32 KiB
Python

"""
Document processing method module.
Handles document operations using the document service.
"""
import logging
from typing import Dict, Any, List, Optional
from datetime import datetime, UTC
from modules.chat.methodBase import MethodBase, ActionResult, action
logger = logging.getLogger(__name__)
class MethodDocument(MethodBase):
"""Document method implementation for document operations"""
def __init__(self, serviceCenter: Any):
"""Initialize the document method"""
super().__init__(serviceCenter)
self.name = "document"
self.description = "Handle document operations like extraction and analysis"
@action
async def extract(self, parameters: Dict[str, Any]) -> ActionResult:
"""
Extract specific content from document with AI prompt and return it in the specified format.
Parameters:
documentList (str): Reference to the document list to extract content from
aiPrompt (str): AI prompt for content extraction
expectedDocumentFormats (list, optional): Expected document formats with extension, mimeType, description
includeMetadata (bool, optional): Whether to include metadata (default: True)
"""
try:
documentList = parameters.get("documentList")
aiPrompt = parameters.get("aiPrompt")
expectedDocumentFormats = parameters.get("expectedDocumentFormats", [])
includeMetadata = parameters.get("includeMetadata", True)
if not documentList:
return self._createResult(
success=False,
data={},
error="Document list reference is required"
)
if not aiPrompt:
return self._createResult(
success=False,
data={},
error="AI prompt is required"
)
chatDocuments = self.service.getChatDocumentsFromDocumentList(documentList)
if not chatDocuments:
return self._createResult(
success=False,
data={},
error="No documents found for the provided reference"
)
# Extract content from all documents using AI
all_extracted_content = []
file_infos = []
for chatDocument in chatDocuments:
fileId = chatDocument.fileId
file_data = self.service.getFileData(fileId)
file_info = self.service.getFileInfo(fileId)
if not file_data:
logger.warning(f"File not found or empty for fileId: {fileId}")
continue
extracted_content = await self.service.extractContentFromFileData(
prompt=aiPrompt,
fileData=file_data,
filename=file_info.get('name', 'document'),
mimeType=file_info.get('mimeType', 'application/octet-stream'),
base64Encoded=False,
documentId=chatDocument.id
)
all_extracted_content.append(extracted_content)
if includeMetadata:
file_infos.append(file_info)
if not all_extracted_content:
return self._createResult(
success=False,
data={},
error="No content could be extracted from any documents"
)
# Process each document individually with its own format conversion
output_documents = []
for i, (chatDocument, extracted_content) in enumerate(zip(chatDocuments, all_extracted_content)):
# Extract text content from this document
text_content = ""
if hasattr(extracted_content, 'contents') and extracted_content.contents:
# Extract text from ContentItem objects
text_parts = []
for content_item in extracted_content.contents:
if hasattr(content_item, 'data') and content_item.data:
text_parts.append(content_item.data)
text_content = "\n".join(text_parts)
elif isinstance(extracted_content, str):
text_content = extracted_content
else:
text_content = str(extracted_content)
# Get the expected format for this document (or use default)
target_format = None
if expectedDocumentFormats and i < len(expectedDocumentFormats):
target_format = expectedDocumentFormats[i]
elif expectedDocumentFormats and len(expectedDocumentFormats) > 0:
# If fewer formats than documents, use the last format for remaining documents
target_format = expectedDocumentFormats[-1]
# Determine output format and filename
if target_format:
target_extension = target_format.get("extension", ".txt")
target_mime_type = target_format.get("mimeType", "text/plain")
# Check if format conversion is needed
if target_extension not in [".txt", ".text"] or target_mime_type != "text/plain":
logger.info(f"Converting document {i+1} to format: {target_extension} ({target_mime_type})")
# Use AI to convert format
formatted_content = await self._convertContentToFormat(text_content, target_format)
final_content = formatted_content
final_mime_type = target_mime_type
final_extension = target_extension
else:
logger.info(f"Document {i+1}: No format conversion needed, using plain text")
final_content = text_content
final_mime_type = "text/plain"
final_extension = ".txt"
else:
logger.info(f"Document {i+1}: No expected format specified, using plain text")
final_content = text_content
final_mime_type = "text/plain"
final_extension = ".txt"
# Create output filename based on original filename and target format
original_filename = chatDocument.filename
base_name = original_filename.rsplit('.', 1)[0] if '.' in original_filename else original_filename
output_filename = f"{base_name}_extracted_{datetime.now(UTC).strftime('%Y%m%d_%H%M%S')}{final_extension}"
# Create result data for this document
result_data = {
"documentCount": 1,
"content": final_content,
"originalFilename": original_filename,
"fileInfos": [file_infos[i]] if includeMetadata and i < len(file_infos) else None,
"timestamp": datetime.now(UTC).isoformat()
}
logger.info(f"Created output document: {output_filename} with {len(final_content)} characters")
output_documents.append({
"documentName": output_filename,
"documentData": result_data,
"mimeType": final_mime_type
})
return self._createResult(
success=True,
data={
"documents": output_documents
}
)
except Exception as e:
logger.error(f"Error extracting content: {str(e)}")
return self._createResult(
success=False,
data={},
error=str(e)
)
@action
async def generate(self, parameters: Dict[str, Any]) -> ActionResult:
"""
Generate documents in specific formats from document references.
This action automatically extracts content from documents and converts it to the specified format.
Parameters:
documentList (list): List of document references to extract content from
expectedDocumentFormats (list): Expected document formats with extension, mimeType, description
originalDocuments (list, optional): List of original document names
includeMetadata (bool, optional): Whether to include metadata (default: True)
"""
try:
document_list = parameters.get("documentList", [])
expected_document_formats = parameters.get("expectedDocumentFormats", [])
original_documents = parameters.get("originalDocuments", [])
include_metadata = parameters.get("includeMetadata", True)
if not document_list:
return self._createResult(
success=False,
data={},
error="Document list is required for generation"
)
if not expected_document_formats or len(expected_document_formats) == 0:
return self._createResult(
success=False,
data={},
error="Expected document formats specification is required"
)
# Get chat documents for original documents list
chat_documents = self.service.getChatDocumentsFromDocumentList(document_list)
logger.info(f"Found {len(chat_documents)} chat documents")
if not chat_documents:
return self._createResult(
success=False,
data={},
error="No documents found for the provided documentList reference"
)
# Update original documents list if not provided
if not original_documents:
original_documents = [doc.filename if hasattr(doc, 'filename') else str(doc.id) for doc in chat_documents]
# Process each document individually with its own format conversion
output_documents = []
for i, chat_document in enumerate(chat_documents):
# Extract content from this document
# ChatDocument is just a reference, so we need to get file data using fileId
content = ""
if hasattr(chat_document, 'fileId') and chat_document.fileId:
# Need to get file data
file_data = self.service.getFileData(chat_document.fileId)
if file_data:
if isinstance(file_data, bytes):
content = file_data.decode('utf-8', errors='ignore')
else:
content = str(file_data)
else:
logger.warning(f"Could not get file data for document {i+1}, skipping")
continue
else:
logger.warning(f"Document {i+1} has no fileId, skipping")
continue
if not content:
logger.warning(f"Could not extract content from document {i+1}, skipping")
continue
logger.info(f"Extracted content from document {i+1}: {len(content)} characters")
# Get the expected format for this document (or use default)
target_format = None
if i < len(expected_document_formats):
target_format = expected_document_formats[i]
elif len(expected_document_formats) > 0:
# If fewer formats than documents, use the last format for remaining documents
target_format = expected_document_formats[-1]
if not target_format:
logger.warning(f"No expected format for document {i+1}, skipping")
continue
# Use AI to convert format
formatted_content = await self._convertContentToFormat(content, target_format)
if not formatted_content:
logger.warning(f"Failed to format document {i+1}, skipping")
continue
target_extension = target_format.get("extension", ".txt")
target_mime_type = target_format.get("mimeType", "text/plain")
# Create output filename
timestamp = datetime.now(UTC).strftime('%Y%m%d_%H%M%S')
if i < len(original_documents):
base_name = original_documents[i].rsplit('.', 1)[0] if '.' in original_documents[i] else original_documents[i]
else:
base_name = f"document_{i+1}"
output_filename = f"{base_name}_generated_{timestamp}{target_extension}"
# Create result data
result_data = {
"documentCount": 1,
"content": formatted_content,
"outputFormat": target_format,
"originalDocument": original_documents[i] if i < len(original_documents) else f"document_{i+1}",
"timestamp": datetime.now(UTC).isoformat()
}
logger.info(f"Generated document: {output_filename} with {len(formatted_content)} characters")
output_documents.append({
"documentName": output_filename,
"documentData": result_data,
"mimeType": target_mime_type
})
if not output_documents:
return self._createResult(
success=False,
data={},
error="No documents could be generated"
)
return self._createResult(
success=True,
data={
"documents": output_documents
}
)
except Exception as e:
logger.error(f"Error generating document: {str(e)}")
return self._createResult(
success=False,
data={},
error=str(e)
)
async def _convertContentToFormat(self, content: str, target_format: Dict[str, Any]) -> str:
"""
Helper function to convert content to the specified format using AI.
"""
try:
extension = target_format.get("extension", ".txt")
mime_type = target_format.get("mimeType", "text/plain")
logger.info(f"Converting content to format: {extension} ({mime_type})")
# Create AI prompt for format conversion
format_prompts = {
".csv": f"""
Convert the following content into a proper CSV format.
Requirements:
1. Output ONLY the CSV data without any markdown, code blocks, or additional text
2. Use appropriate headers based on the content
3. Ensure proper CSV formatting with commas and quotes where needed
4. Make the data easily readable and importable into spreadsheet applications
Content to convert:
{content}
Generate ONLY the CSV data:
""",
".json": f"""
Convert the following content into a proper JSON format.
Requirements:
1. Output ONLY the JSON data without any markdown, code blocks, or additional text
2. Structure the data logically with appropriate keys and values
3. Ensure valid JSON syntax
4. Make the data easily parseable and readable
Content to convert:
{content}
Generate ONLY the JSON data:
""",
".xml": f"""
Convert the following content into a proper XML format.
Requirements:
1. Output ONLY the XML data without any markdown, code blocks, or additional text
2. Use appropriate XML tags and structure
3. Ensure valid XML syntax
4. Make the data easily parseable and readable
Content to convert:
{content}
Generate ONLY the XML data:
""",
".html": f"""
Convert the following content into a proper HTML format.
Requirements:
1. Output ONLY the HTML data without any markdown, code blocks, or additional text
2. Use appropriate HTML tags and structure
3. Ensure valid HTML syntax
4. Make the data easily readable in web browsers
Content to convert:
{content}
Generate ONLY the HTML data:
""",
".md": f"""
Convert the following content into a proper Markdown format.
Requirements:
1. Output ONLY the Markdown data without any code blocks or additional text
2. Use appropriate Markdown syntax for headers, lists, emphasis, etc.
3. Structure the content logically
4. Make the data easily readable and convertible to other formats
Content to convert:
{content}
Generate ONLY the Markdown data:
"""
}
# Get the appropriate prompt for the target format
if extension in format_prompts:
ai_prompt = format_prompts[extension]
else:
# Generic format conversion
ai_prompt = f"""
Convert the following content into {extension.upper()} format.
Requirements:
1. Output ONLY the {extension.upper()} data without any markdown, code blocks, or additional text
2. Use appropriate formatting for {extension.upper()} files
3. Ensure the output is valid and usable
4. Make the data easily readable and importable
Content to convert:
{content}
Generate ONLY the {extension.upper()} data:
"""
# Call AI to generate the formatted content
logger.info(f"Calling AI for {extension} format conversion")
formatted_content = await self.service.callAiTextBasic(ai_prompt, content)
if not formatted_content or formatted_content.strip() == "":
logger.warning("AI format conversion failed, using fallback")
return self._generateFallbackFormattedContent(content, extension, mime_type)
# Clean up the AI response
formatted_content = formatted_content.strip()
# Remove markdown code blocks if present
if formatted_content.startswith("```") and formatted_content.endswith("```"):
lines = formatted_content.split('\n')
if len(lines) > 2:
formatted_content = '\n'.join(lines[1:-1])
return formatted_content
except Exception as e:
logger.error(f"Error in AI format conversion: {str(e)}")
return self._generateFallbackFormattedContent(content, extension, mime_type)
def _generateFallbackFormattedContent(self, content: str, extension: str, mime_type: str) -> str:
"""
Generate fallback formatted content when AI conversion fails.
"""
try:
if extension == ".csv":
# Simple CSV fallback - split by lines and create basic CSV
lines = content.strip().split('\n')
if lines:
# Create a simple CSV with line numbers and content
csv_lines = ["Line,Content"]
for i, line in enumerate(lines, 1):
# Escape quotes and wrap in quotes if comma present
if ',' in line:
line = f'"{line.replace(chr(34), chr(34) + chr(34))}"'
csv_lines.append(f"{i},{line}")
return '\n'.join(csv_lines)
return "Line,Content\n1,No content available"
elif extension == ".json":
# Simple JSON fallback
content_escaped = content.replace('"', '\\"')
timestamp = datetime.now(UTC).isoformat()
return f'{{"content": "{content_escaped}", "format": "json", "timestamp": "{timestamp}"}}'
elif extension == ".xml":
# Simple XML fallback
timestamp = datetime.now(UTC).isoformat()
return f'<?xml version="1.0" encoding="UTF-8"?>\n<document>\n<content>{content}</content>\n<format>xml</format>\n<timestamp>{timestamp}</timestamp>\n</document>'
elif extension == ".html":
# Simple HTML fallback
timestamp = datetime.now(UTC).strftime('%Y-%m-%d %H:%M:%S UTC')
return f'<!DOCTYPE html>\n<html>\n<head><meta charset="UTF-8"><title>Generated Document</title></head>\n<body>\n<pre>{content}</pre>\n<p><em>Generated on {timestamp}</em></p>\n</body>\n</html>'
elif extension == ".md":
# Simple Markdown fallback
timestamp = datetime.now(UTC).strftime('%Y-%m-%d %H:%M:%S UTC')
return f"# Generated Document\n\n{content}\n\n---\n*Generated on {timestamp}*"
else:
# Generic fallback - return content as-is
return content
except Exception as e:
logger.error(f"Error in fallback format conversion: {str(e)}")
return content
@action
async def generateReport(self, parameters: Dict[str, Any]) -> ActionResult:
"""
Generate a comprehensive, professional HTML report from multiple documents, consolidating and summarizing all findings using AI.
Parameters:
documentList (str): Reference to the document list to create the report from
title (str, optional): Title for the report (default: "Summary Report")
includeMetadata (bool, optional): Whether to include metadata (default: True)
"""
try:
documentList = parameters.get("documentList")
title = parameters.get("title", "Summary Report")
includeMetadata = parameters.get("includeMetadata", True)
if not documentList:
return self._createResult(
success=False,
data={},
error="Document list reference is required"
)
chatDocuments = self.service.getChatDocumentsFromDocumentList(documentList)
logger.info(f"Retrieved {len(chatDocuments)} chat documents for report generation")
if not chatDocuments:
return self._createResult(
success=False,
data={},
error="No documents found for the provided reference"
)
# Generate HTML report
html_content = await self._generateHtmlReport(chatDocuments, title, includeMetadata)
# Create output filename
timestamp = datetime.now(UTC).strftime('%Y%m%d_%H%M%S')
output_filename = f"report_{timestamp}.html"
result_data = {
"documentCount": len(chatDocuments),
"content": html_content,
"title": title,
"timestamp": datetime.now(UTC).isoformat()
}
logger.info(f"Generated HTML report: {output_filename} with {len(html_content)} characters")
return self._createResult(
success=True,
data={
"documents": [{
"documentName": output_filename,
"documentData": result_data,
"mimeType": "text/html"
}]
}
)
except Exception as e:
logger.error(f"Error generating report: {str(e)}")
return self._createResult(
success=False,
data={},
error=str(e)
)
async def _generateHtmlReport(self, chatDocuments: List[Any], title: str, includeMetadata: bool) -> str:
"""
Generate a comprehensive HTML report using AI from all input documents.
"""
try:
# Filter out empty documents and collect content
validDocuments = []
allContent = []
for doc in chatDocuments:
content = ""
logger.info(f"Processing document: type={type(doc)}")
# Get actual file content using the fileId reference
try:
file_data = self.service.getFileData(doc.fileId)
if file_data:
# Convert bytes to string
if isinstance(file_data, bytes):
content = file_data.decode('utf-8')
else:
content = str(file_data)
logger.info(f" Retrieved content from file: {len(content)} characters")
else:
logger.warning(f" No file data found for fileId: {doc.fileId}")
except Exception as e:
logger.error(f" Error retrieving file data: {str(e)}")
# Skip empty documents
if content:
validDocuments.append(doc)
allContent.append(f"Document: {doc.filename}\n{content}\n")
logger.info(f" Added document to valid documents list")
else:
logger.warning(f" Skipping document with no content")
if not validDocuments:
# If no valid documents, create a simple report
html = ["<html><head><meta charset='utf-8'><title>" + title + "</title></head><body>"]
html.append(f"<h1>{title}</h1>")
html.append(f"<p><b>Generated:</b> {datetime.now(UTC).strftime('%Y-%m-%d %H:%M:%S UTC')}</p>")
html.append("<p><em>No content available in the provided documents.</em></p>")
html.append("</body></html>")
return '\n'.join(html)
# Create AI prompt for comprehensive report generation
combinedContent = "\n\n".join(allContent)
aiPrompt = f"""
Create a comprehensive, well-structured HTML report based on the following documents and content.
Report Title: {title}
Requirements:
1. Create a professional, well-formatted HTML report
2. Include an executive summary at the beginning
3. Organize information logically with clear sections
4. Highlight key findings and insights
5. Include relevant data, statistics, and conclusions
6. Use proper HTML formatting with headers, lists, and styling
7. Make it readable and professional
Document Content:
{combinedContent}
Generate a complete HTML report that integrates all the information into a cohesive, professional document.
"""
# Call AI to generate the report
logger.info(f"Generating AI report for {len(validDocuments)} documents")
aiReport = await self.service.callAiTextBasic(aiPrompt, combinedContent)
# If AI call fails, fall back to basic HTML
if not aiReport or aiReport.strip() == "":
logger.warning("AI report generation failed, using fallback HTML")
return self._generateFallbackHtmlReport(validDocuments, title, includeMetadata)
# Clean up the AI response and ensure it's valid HTML
if not aiReport.strip().startswith('<html'):
# Wrap the AI content in proper HTML structure
html = ["<html><head><meta charset='utf-8'><title>" + title + "</title></head><body>"]
html.append(f"<h1>{title}</h1>")
html.append(f"<p><b>Generated:</b> {datetime.now(UTC).strftime('%Y-%m-%d %H:%M:%S UTC')}</p>")
html.append(f"<p><b>Total Documents Analyzed:</b> {len(validDocuments)}</p>")
html.append("<hr>")
html.append(aiReport)
html.append("</body></html>")
return '\n'.join(html)
else:
# AI returned complete HTML, use it directly
return aiReport
except Exception as e:
logger.error(f"Error generating AI report: {str(e)}")
# Fall back to basic HTML report
return self._generateFallbackHtmlReport(chatDocuments, title, includeMetadata)
def _generateFallbackHtmlReport(self, chatDocuments: List[Any], title: str, includeMetadata: bool) -> str:
"""
Generate a basic HTML report as fallback when AI generation fails.
"""
html = ["<html><head><meta charset='utf-8'><title>" + title + "</title></head><body>"]
html.append(f"<h1>{title}</h1>")
html.append(f"<p><b>Generated:</b> {datetime.now(UTC).strftime('%Y-%m-%d %H:%M:%S UTC')}</p>")
html.append(f"<p><b>Total Documents:</b> {len(chatDocuments)}</p>")
for i, doc in enumerate(chatDocuments, 1):
html.append(f"<h2>Document {i}: {doc.filename}</h2>")
if includeMetadata:
html.append("<ul>")
html.append(f"<li><b>ID:</b> {doc.id}</li>")
html.append(f"<li><b>File ID:</b> {doc.fileId}</li>")
html.append(f"<li><b>Filename:</b> {doc.filename}</li>")
if hasattr(doc, 'createdAt'):
html.append(f"<li><b>Created:</b> {doc.createdAt}</li>")
html.append("</ul>")
# Add document content if available
content = ""
if hasattr(doc, 'fileId') and doc.fileId:
# ChatDocument is just a reference, so we need to get file data using fileId
try:
file_data = self.service.getFileData(doc.fileId)
if file_data:
if isinstance(file_data, bytes):
content = file_data.decode('utf-8')
else:
content = str(file_data)
except Exception as e:
logger.warning(f"Could not retrieve content for document {doc.filename}: {str(e)}")
if content:
html.append(f"<div style='white-space:pre-wrap; border:1px solid #ccc; padding:0.5em; margin-bottom:1em; background-color:#f9f9f9;'>{content}</div>")
else:
html.append("<p><em>No content available</em></p>")
html.append("</body></html>")
return '\n'.join(html)