gateway/modules/workflows/methods/methodDocument.py
2025-10-04 18:44:42 +02:00

403 lines
20 KiB
Python

"""
Document processing method module.
Handles document operations using the document service.
"""
import logging
import os
from typing import Dict, Any, List, Optional
from datetime import datetime, UTC
from modules.workflows.methods.methodBase import MethodBase, action
from modules.datamodels.datamodelWorkflow import ActionResult, ActionDocument
from modules.datamodels.datamodelChat import ChatDocument
from modules.datamodels.datamodelAi import AiCallOptions, OperationType, Priority
logger = logging.getLogger(__name__)
class MethodDocument(MethodBase):
"""Document method implementation for document operations"""
def __init__(self, services):
"""Initialize the document method"""
super().__init__(services)
self.name = "document"
self.description = "Handle document operations like extraction and analysis"
def _format_timestamp_for_filename(self) -> str:
"""Format current timestamp as YYYYMMDD-hhmmss for filenames."""
return datetime.now(UTC).strftime("%Y%m%d-%H%M%S")
@action
async def extract(self, parameters: Dict[str, Any]) -> ActionResult:
"""
Extract and analyze content from existing documents using AI
USE FOR: Analyzing documents, extracting specific information, summarizing content, finding patterns, data extraction
DO NOT USE FOR: Creating new documents, generating reports, web research, email operations
INPUT REQUIREMENTS: Requires documentList (existing documents) and prompt (what to extract)
OUTPUT FORMAT: Plain text extracted content (.txt files)
DEPENDENCIES: Requires existing documents in documentList parameter
WORKFLOW POSITION: Use after documents are available, before generating reports
Parameters:
documentList (list): Document list reference(s) - List of document references to extract content from
prompt (str): AI prompt for extraction - Specific prompt describing what content to extract and how to process it
operationType (str, optional): Type of operation - Use 'extract_content', 'analyze_document', 'summarize_content', etc. (default: 'extract_content')
processDocumentsIndividually (bool, optional): Process each document separately - Set to True for individual processing, False for batch processing (default: True)
chunkAllowed (bool, optional): Allow content chunking - Set to True to allow AI service to chunk large content, False to process as-is (default: True)
mergeStrategy (dict, optional): Strategy for merging results - Specify how to merge chunked content: groupBy, orderBy, mergeType (default: concatenate)
expectedDocumentFormats (list, optional): Expected output formats - List of format specifications with extension, mimeType, description
includeMetadata (bool, optional): Include document metadata - Set to True to include file metadata in results (default: True)
"""
try:
documentList = parameters.get("documentList")
if isinstance(documentList, str):
documentList = [documentList]
prompt = parameters.get("prompt")
operationType = parameters.get("operationType", "extract_content")
processDocumentsIndividually = parameters.get("processDocumentsIndividually", True)
chunkAllowed = parameters.get("chunkAllowed", True)
mergeStrategy = parameters.get("mergeStrategy", {
"groupBy": "typeGroup",
"orderBy": "id",
"mergeType": "concatenate"
})
expectedDocumentFormats = parameters.get("expectedDocumentFormats", [])
includeMetadata = parameters.get("includeMetadata", True)
if not documentList:
return ActionResult.isFailure(
error="Document list reference is required"
)
if not prompt:
return ActionResult.isFailure(
error="Prompt is required"
)
chatDocuments = self.services.workflow.getChatDocumentsFromDocumentList(documentList)
if not chatDocuments:
return ActionResult.isFailure(
error="No documents found for the provided reference"
)
# Use new extraction service with ChatDocument objects
try:
# Build extraction options directly from AI planner parameters
extraction_options = {
"prompt": prompt,
"operationType": operationType,
"processDocumentsIndividually": processDocumentsIndividually,
"chunkAllowed": chunkAllowed,
"mergeStrategy": mergeStrategy
}
# Add format instructions to prompt if expected formats are provided
enhanced_prompt = prompt
if expectedDocumentFormats:
format_instructions = []
for fmt in expectedDocumentFormats:
extension = fmt.get("extension", ".txt")
mime_type = fmt.get("mimeType", "text/plain")
description = fmt.get("description", "")
format_instructions.append(f"- {extension} ({mime_type}): {description}")
if format_instructions:
enhanced_prompt += f"\n\nPlease format the output as: {', '.join([fmt.get('extension', '.txt') for fmt in expectedDocumentFormats])}"
enhanced_prompt += f"\nExpected formats:\n" + "\n".join(format_instructions)
extraction_options["expectedDocumentFormats"] = expectedDocumentFormats
extraction_options["prompt"] = enhanced_prompt
if not includeMetadata:
extraction_options["includeMetadata"] = False
# Use new extraction service API
all_extracted_content = self.services.extraction.extractContent(
documents=chatDocuments,
options=extraction_options
)
logger.info(f"Extraction completed: {len(all_extracted_content)} documents processed")
except Exception as e:
logger.error(f"Extraction failed: {str(e)}")
all_extracted_content = []
if not all_extracted_content:
return ActionResult.isFailure(
error="No content could be extracted from any documents"
)
# Process each document individually with its own format conversion
action_documents = []
for i, chatDocument in enumerate(chatDocuments):
# Extract text content from this document using new ExtractedContent structure
text_content = ""
try:
ec = all_extracted_content[i] if i < len(all_extracted_content) else None
if ec and hasattr(ec, 'parts'):
text_parts = []
for part in ec.parts:
try:
if part.typeGroup in ("text", "table", "structure") and part.data:
text_parts.append(part.data)
except Exception:
continue
text_content = "\n".join(text_parts)
else:
text_content = ""
except Exception:
text_content = ""
# Use the extracted content directly - format conversion is handled by extraction service
final_content = text_content
final_mime_type = "text/plain"
final_extension = ".txt"
# Create meaningful output fileName with workflow context
original_fileName = chatDocument.fileName
base_name = original_fileName.rsplit('.', 1)[0] if '.' in original_fileName else original_fileName
extension = final_extension.lstrip('.') # Remove leading dot for meaningful naming
output_fileName = self._generateMeaningfulFileName(
base_name=f"{base_name}_extracted",
extension=extension,
action_name="extract"
)
logger.info(f"Created output document: {output_fileName} with {len(final_content)} characters")
# Create proper ActionDocument object
action_documents.append(ActionDocument(
documentName=output_fileName,
documentData=final_content,
mimeType=final_mime_type
))
return ActionResult.isSuccess(
documents=action_documents
)
except Exception as e:
logger.error(f"Error extracting content: {str(e)}")
return ActionResult.isFailure(
error=str(e)
)
@action
async def generate(self, parameters: Dict[str, Any]) -> ActionResult:
"""
Generate formatted documents and reports from source documents - creates actual files (Word, PDF, Excel, etc.)
USE FOR: Creating formatted documents, reports, presentations, spreadsheets, structured outputs, professional documents
DO NOT USE FOR: Simple text analysis, Q&A, web research, email operations
INPUT REQUIREMENTS: Requires documentList (source documents) and prompt (what kind of report to generate)
OUTPUT FORMAT: Formatted documents (.html, .pdf, .docx, .txt, .md, .json, .csv, .xlsx)
DEPENDENCIES: Requires existing documents in documentList parameter
WORKFLOW POSITION: Use after document analysis, as final output generation step
Parameters:
documentList (list): Document list reference(s) - List of document references to include in report
prompt (str): AI prompt for report generation - Specific prompt describing what kind of report to generate
title (str): Report title - Title for the generated report (default: "Summary Report")
outputFormat (str): Output format extension - Specify the desired output format: 'html', 'pdf', 'docx', 'txt', 'md', 'json', 'csv', 'xlsx' (default: 'html')
operationType (str, optional): Type of operation - Use 'generate_report', 'analyze_documents', etc. (default: 'generate_report')
processDocumentsIndividually (bool, optional): Process each document separately - Set to True for individual processing (default: True)
chunkAllowed (bool, optional): Allow content chunking - Set to True to allow AI service to chunk large content (default: True)
mergeStrategy (dict, optional): Strategy for merging results - Specify how to merge content for report generation (default: concatenate)
includeMetadata (bool, optional): Include document metadata - Set to True to include file metadata in results (default: True)
"""
try:
documentList = parameters.get("documentList")
if isinstance(documentList, str):
documentList = [documentList]
prompt = parameters.get("prompt")
title = parameters.get("title", "Summary Report")
outputFormat = parameters.get("outputFormat", "html")
operationType = parameters.get("operationType", "generate_report")
processDocumentsIndividually = parameters.get("processDocumentsIndividually", True)
chunkAllowed = parameters.get("chunkAllowed", True)
mergeStrategy = parameters.get("mergeStrategy", {
"groupBy": "typeGroup",
"orderBy": "id",
"mergeType": "concatenate"
})
includeMetadata = parameters.get("includeMetadata", True)
if not documentList:
return ActionResult.isFailure(
error="Document list reference is required"
)
if not prompt:
return ActionResult.isFailure(
error="Prompt is required to specify what kind of report to generate"
)
chatDocuments = self.services.workflow.getChatDocumentsFromDocumentList(documentList)
logger.info(f"Retrieved {len(chatDocuments)} chat documents for report generation")
if not chatDocuments:
return ActionResult.isFailure(
error="No documents found for the provided reference"
)
# Generate report using the new format handling system
report_content, mime_type = await self._generateReport(
chatDocuments, title, outputFormat, includeMetadata, prompt
)
# Create meaningful output fileName with workflow context
output_fileName = self._generateMeaningfulFileName(
base_name="report",
extension=outputFormat,
action_name="generate"
)
logger.info(f"Generated {outputFormat.upper()} report: {output_fileName} with {len(report_content)} characters")
return ActionResult.isSuccess(
documents=[ActionDocument(
documentName=output_fileName,
documentData=report_content,
mimeType=mime_type
)]
)
except Exception as e:
logger.error(f"Error generating report: {str(e)}")
return ActionResult.isFailure(
error=str(e)
)
async def _generateReport(self, chatDocuments: List[Any], title: str, outputFormat: str, includeMetadata: bool, prompt: str) -> tuple[str, str]:
"""
Generate a report in the specified format using format-specific extraction:
1. Get format-specific extraction prompt from renderer
2. Extract content using AI with format-specific prompt
3. Clean and return the formatted content
"""
try:
# Get format-specific extraction prompt
from modules.services.serviceGeneration.mainServiceGeneration import GenerationService
generation_service = GenerationService(self.services)
extraction_prompt = generation_service.getExtractionPrompt(
output_format=outputFormat,
user_prompt=prompt,
title=title
)
# Extract content using format-specific prompt
extracted_content = await self._extractContentWithPrompt(
chatDocuments, extraction_prompt, includeMetadata
)
# Render the extracted content (mostly just cleaning)
rendered_content, mime_type = await generation_service.renderReport(
extracted_content=extracted_content,
output_format=outputFormat,
title=title
)
return rendered_content, mime_type
except Exception as e:
logger.error(f"Error generating report: {str(e)}")
# Fallback to simple text format
fallback_content = f"# {title}\n\nError generating report: {str(e)}"
return fallback_content, "text/plain"
async def _extractContentWithPrompt(self, chatDocuments: List[Any], extraction_prompt: str, includeMetadata: bool) -> str:
"""
Extract content from documents using a specific extraction prompt.
"""
try:
# Use extraction service directly with format-specific prompt and all documents
logger.info(f"Extracting content with format-specific prompt for {len(chatDocuments)} documents")
# Build extraction options for report generation
extraction_options = {
"prompt": extraction_prompt,
"operationType": "generate_report",
"processDocumentsIndividually": True,
"chunkAllowed": True,
"mergeStrategy": {
"groupBy": "typeGroup",
"orderBy": "id",
"mergeType": "concatenate"
}
}
if not includeMetadata:
extraction_options["includeMetadata"] = False
# Extract content using extraction service with format-specific prompt
extracted_list = self.services.extraction.extractContent(
documents=chatDocuments,
options=extraction_options
)
if not extracted_list:
logger.warning("No content extracted from documents")
return "No readable content found in documents"
# The extraction service should return format-specific content directly
# Combine all extracted content
all_extracted_content = []
for ec in extracted_list:
if ec and hasattr(ec, 'parts'):
for part in ec.parts:
try:
if part.typeGroup in ("text", "table", "structure") and part.data:
all_extracted_content.append(part.data)
except Exception:
continue
if not all_extracted_content:
logger.warning("No readable content found in extracted results")
return "No readable content found in documents"
# Join all extracted content
combined_content = "\n\n".join(all_extracted_content)
if not combined_content or combined_content.strip() == "":
logger.error("No content extracted from documents")
raise Exception("No content extracted from documents")
# Call AI service to process the content with the format-specific prompt
logger.info(f"Calling AI service to process {len(combined_content)} characters with prompt")
aiResponse = await self.services.ai.callAi(
prompt=extraction_prompt,
documents=chatDocuments, # Pass the original ChatDocument objects
options=AiCallOptions(operationType=OperationType.GENERATE_CONTENT)
)
if not aiResponse or aiResponse.strip() == "":
logger.error("AI content generation failed")
raise Exception("AI content generation failed")
# Clean up the AI response
content = aiResponse.strip()
# Remove markdown code blocks if present
if content.startswith("```") and content.endswith("```"):
lines = content.split('\n')
if len(lines) >= 2:
content = '\n'.join(lines[1:-1]).strip()
logger.info(f"Successfully generated format-specific content: {len(content)} characters")
return content
except Exception as e:
logger.error(f"Error extracting content with prompt: {str(e)}")
# Return minimal fallback content
return f"Error extracting content: {str(e)}"