gateway/modules/workflows/methods/methodDocument.py
2025-10-03 11:23:48 +02:00

538 lines
25 KiB
Python

"""
Document processing method module.
Handles document operations using the document service.
"""
import logging
import os
from typing import Dict, Any, List, Optional
from datetime import datetime, UTC
from modules.workflows.methods.methodBase import MethodBase, action
from modules.datamodels.datamodelWorkflow import ActionResult, ActionDocument
from modules.datamodels.datamodelChat import ChatDocument
from modules.datamodels.datamodelAi import AiCallOptions, OperationType, Priority
logger = logging.getLogger(__name__)
class MethodDocument(MethodBase):
"""Document method implementation for document operations"""
def __init__(self, services):
"""Initialize the document method"""
super().__init__(services)
self.name = "document"
self.description = "Handle document operations like extraction and analysis"
def _format_timestamp_for_filename(self) -> str:
"""Format current timestamp as YYYYMMDD-hhmmss for filenames."""
return datetime.now(UTC).strftime("%Y%m%d-%H%M%S")
@action
async def extract(self, parameters: Dict[str, Any]) -> ActionResult:
"""
Extract content from any document using AI prompt.
Parameters:
documentList (list): Document list reference(s) - List of document references to extract content from
prompt (str): AI prompt for extraction - Specific prompt describing what content to extract and how to process it
operationType (str, optional): Type of operation - Use 'extract_content', 'analyze_document', 'summarize_content', etc. (default: 'extract_content')
processDocumentsIndividually (bool, optional): Process each document separately - Set to True for individual processing, False for batch processing (default: True)
chunkAllowed (bool, optional): Allow content chunking - Set to True to allow AI service to chunk large content, False to process as-is (default: True)
mergeStrategy (dict, optional): Strategy for merging results - Specify how to merge chunked content: groupBy, orderBy, mergeType (default: concatenate)
expectedDocumentFormats (list, optional): Expected output formats - List of format specifications with extension, mimeType, description
includeMetadata (bool, optional): Include document metadata - Set to True to include file metadata in results (default: True)
"""
try:
documentList = parameters.get("documentList")
if isinstance(documentList, str):
documentList = [documentList]
prompt = parameters.get("prompt")
operationType = parameters.get("operationType", "extract_content")
processDocumentsIndividually = parameters.get("processDocumentsIndividually", True)
chunkAllowed = parameters.get("chunkAllowed", True)
mergeStrategy = parameters.get("mergeStrategy", {
"groupBy": "typeGroup",
"orderBy": "id",
"mergeType": "concatenate"
})
expectedDocumentFormats = parameters.get("expectedDocumentFormats", [])
includeMetadata = parameters.get("includeMetadata", True)
if not documentList:
return ActionResult.isFailure(
error="Document list reference is required"
)
if not prompt:
return ActionResult.isFailure(
error="Prompt is required"
)
chatDocuments = self.services.workflow.getChatDocumentsFromDocumentList(documentList)
if not chatDocuments:
return ActionResult.isFailure(
error="No documents found for the provided reference"
)
# Use new extraction service with ChatDocument objects
try:
# Build extraction options directly from AI planner parameters
extraction_options = {
"prompt": prompt,
"operationType": operationType,
"processDocumentsIndividually": processDocumentsIndividually,
"chunkAllowed": chunkAllowed,
"mergeStrategy": mergeStrategy
}
# Add format instructions to prompt if expected formats are provided
enhanced_prompt = prompt
if expectedDocumentFormats:
format_instructions = []
for fmt in expectedDocumentFormats:
extension = fmt.get("extension", ".txt")
mime_type = fmt.get("mimeType", "text/plain")
description = fmt.get("description", "")
format_instructions.append(f"- {extension} ({mime_type}): {description}")
if format_instructions:
enhanced_prompt += f"\n\nPlease format the output as: {', '.join([fmt.get('extension', '.txt') for fmt in expectedDocumentFormats])}"
enhanced_prompt += f"\nExpected formats:\n" + "\n".join(format_instructions)
extraction_options["expectedDocumentFormats"] = expectedDocumentFormats
extraction_options["prompt"] = enhanced_prompt
if not includeMetadata:
extraction_options["includeMetadata"] = False
# Use new extraction service API
all_extracted_content = self.services.extraction.extractContent(
documents=chatDocuments,
options=extraction_options
)
logger.info(f"Extraction completed: {len(all_extracted_content)} documents processed")
except Exception as e:
logger.error(f"Extraction failed: {str(e)}")
all_extracted_content = []
if not all_extracted_content:
return ActionResult.isFailure(
error="No content could be extracted from any documents"
)
# Process each document individually with its own format conversion
action_documents = []
for i, chatDocument in enumerate(chatDocuments):
# Extract text content from this document using new ExtractedContent structure
text_content = ""
try:
ec = all_extracted_content[i] if i < len(all_extracted_content) else None
if ec and hasattr(ec, 'parts'):
text_parts = []
for part in ec.parts:
try:
if part.typeGroup in ("text", "table", "structure") and part.data:
text_parts.append(part.data)
except Exception:
continue
text_content = "\n".join(text_parts)
else:
text_content = ""
except Exception:
text_content = ""
# Use the extracted content directly - format conversion is handled by extraction service
final_content = text_content
final_mime_type = "text/plain"
final_extension = ".txt"
# Create meaningful output fileName with workflow context
original_fileName = chatDocument.fileName
base_name = original_fileName.rsplit('.', 1)[0] if '.' in original_fileName else original_fileName
extension = final_extension.lstrip('.') # Remove leading dot for meaningful naming
output_fileName = self._generateMeaningfulFileName(
base_name=f"{base_name}_extracted",
extension=extension,
action_name="extract"
)
logger.info(f"Created output document: {output_fileName} with {len(final_content)} characters")
# Create proper ActionDocument object
action_documents.append(ActionDocument(
documentName=output_fileName,
documentData=final_content,
mimeType=final_mime_type
))
return ActionResult.isSuccess(
documents=action_documents
)
except Exception as e:
logger.error(f"Error extracting content: {str(e)}")
return ActionResult.isFailure(
error=str(e)
)
@action
async def generateReport(self, parameters: Dict[str, Any]) -> ActionResult:
"""
Generate report from multiple documents using AI.
Parameters:
documentList (list): Document list reference(s) - List of document references to include in report
prompt (str): AI prompt for report generation - Specific prompt describing what kind of report to generate
title (str, optional): Report title - Title for the generated report (default: "Summary Report")
outputFormat (str, optional): Output format extension - Specify the desired output format: 'html', 'pdf', 'docx', 'txt', 'md', 'json', 'csv', 'xlsx' (default: 'html')
operationType (str, optional): Type of operation - Use 'generate_report', 'analyze_documents', etc. (default: 'generate_report')
processDocumentsIndividually (bool, optional): Process each document separately - Set to True for individual processing (default: True)
chunkAllowed (bool, optional): Allow content chunking - Set to True to allow AI service to chunk large content (default: True)
mergeStrategy (dict, optional): Strategy for merging results - Specify how to merge content for report generation (default: concatenate)
includeMetadata (bool, optional): Include document metadata - Set to True to include file metadata in results (default: True)
"""
try:
documentList = parameters.get("documentList")
if isinstance(documentList, str):
documentList = [documentList]
prompt = parameters.get("prompt")
title = parameters.get("title", "Summary Report")
outputFormat = parameters.get("outputFormat", "html")
operationType = parameters.get("operationType", "generate_report")
processDocumentsIndividually = parameters.get("processDocumentsIndividually", True)
chunkAllowed = parameters.get("chunkAllowed", True)
mergeStrategy = parameters.get("mergeStrategy", {
"groupBy": "typeGroup",
"orderBy": "id",
"mergeType": "concatenate"
})
includeMetadata = parameters.get("includeMetadata", True)
if not documentList:
return ActionResult.isFailure(
error="Document list reference is required"
)
if not prompt:
return ActionResult.isFailure(
error="Prompt is required to specify what kind of report to generate"
)
chatDocuments = self.services.workflow.getChatDocumentsFromDocumentList(documentList)
logger.info(f"Retrieved {len(chatDocuments)} chat documents for report generation")
if not chatDocuments:
return ActionResult.isFailure(
error="No documents found for the provided reference"
)
# Generate report using the new format handling system
report_content, mime_type = await self._generateReport(
chatDocuments, title, outputFormat, includeMetadata, prompt
)
# Create meaningful output fileName with workflow context
output_fileName = self._generateMeaningfulFileName(
base_name="report",
extension=outputFormat,
action_name="generate"
)
logger.info(f"Generated {outputFormat.upper()} report: {output_fileName} with {len(report_content)} characters")
return ActionResult.isSuccess(
documents=[ActionDocument(
documentName=output_fileName,
documentData=report_content,
mimeType=mime_type
)]
)
except Exception as e:
logger.error(f"Error generating report: {str(e)}")
return ActionResult.isFailure(
error=str(e)
)
async def _generateReport(self, chatDocuments: List[Any], title: str, outputFormat: str, includeMetadata: bool, prompt: str) -> tuple[str, str]:
"""
Generate a report in the specified format using format-specific extraction:
1. Get format-specific extraction prompt from renderer
2. Extract content using AI with format-specific prompt
3. Clean and return the formatted content
"""
try:
# Get format-specific extraction prompt
from modules.services.serviceGeneration.mainServiceGeneration import GenerationService
generation_service = GenerationService(self.services)
extraction_prompt = generation_service.getExtractionPrompt(
output_format=outputFormat,
user_prompt=prompt,
title=title
)
# Extract content using format-specific prompt
extracted_content = await self._extractContentWithPrompt(
chatDocuments, extraction_prompt, includeMetadata
)
# Render the extracted content (mostly just cleaning)
rendered_content, mime_type = await generation_service.renderReport(
extracted_content=extracted_content,
output_format=outputFormat,
title=title
)
return rendered_content, mime_type
except Exception as e:
logger.error(f"Error generating report: {str(e)}")
# Fallback to simple text format
fallback_content = f"# {title}\n\nError generating report: {str(e)}"
return fallback_content, "text/plain"
async def _extractContentWithPrompt(self, chatDocuments: List[Any], extraction_prompt: str, includeMetadata: bool) -> str:
"""
Extract content from documents using a specific extraction prompt.
"""
try:
# Use extraction service directly with format-specific prompt and all documents
logger.info(f"Extracting content with format-specific prompt for {len(chatDocuments)} documents")
# Build extraction options for report generation
extraction_options = {
"prompt": extraction_prompt,
"operationType": "generate_report",
"processDocumentsIndividually": True,
"chunkAllowed": True,
"mergeStrategy": {
"groupBy": "typeGroup",
"orderBy": "id",
"mergeType": "concatenate"
}
}
if not includeMetadata:
extraction_options["includeMetadata"] = False
# Extract content using extraction service with format-specific prompt
extracted_list = self.services.extraction.extractContent(
documents=chatDocuments,
options=extraction_options
)
if not extracted_list:
logger.warning("No content extracted from documents")
return "No readable content found in documents"
# The extraction service should return format-specific content directly
# Combine all extracted content
all_extracted_content = []
for ec in extracted_list:
if ec and hasattr(ec, 'parts'):
for part in ec.parts:
try:
if part.typeGroup in ("text", "table", "structure") and part.data:
all_extracted_content.append(part.data)
except Exception:
continue
if not all_extracted_content:
logger.warning("No readable content found in extracted results")
return "No readable content found in documents"
# Join all extracted content
combined_content = "\n\n".join(all_extracted_content)
if not combined_content or combined_content.strip() == "":
logger.error("No content extracted from documents")
raise Exception("No content extracted from documents")
# Call AI service to process the content with the format-specific prompt
logger.info(f"Calling AI service to process {len(combined_content)} characters with prompt")
aiResponse = await self.services.ai.callAi(
prompt=extraction_prompt,
documents=chatDocuments, # Pass the original ChatDocument objects
options=AiCallOptions(operationType=OperationType.GENERATE_CONTENT)
)
if not aiResponse or aiResponse.strip() == "":
logger.error("AI content generation failed")
raise Exception("AI content generation failed")
# Clean up the AI response
content = aiResponse.strip()
# Remove markdown code blocks if present
if content.startswith("```") and content.endswith("```"):
lines = content.split('\n')
if len(lines) >= 2:
content = '\n'.join(lines[1:-1]).strip()
logger.info(f"Successfully generated format-specific content: {len(content)} characters")
return content
except Exception as e:
logger.error(f"Error extracting content with prompt: {str(e)}")
# Return minimal fallback content
return f"Error extracting content: {str(e)}"
async def _generateHtmlReport(self, chatDocuments: List[Any], title: str, includeMetadata: bool, prompt: str) -> str:
"""
Generate a comprehensive HTML report using AI from all input documents.
"""
try:
# Filter out empty documents and collect content
validDocuments = []
allContent = []
for doc in chatDocuments:
content = ""
logger.info(f"Processing document: type={type(doc)}")
# Use new extraction service for each document
try:
# Build extraction options for report generation from AI planner parameters
extraction_options = {
"prompt": prompt,
"operationType": operationType,
"processDocumentsIndividually": processDocumentsIndividually,
"chunkAllowed": chunkAllowed,
"mergeStrategy": mergeStrategy
}
# Add optional parameters if provided by AI planner
if not includeMetadata:
extraction_options["includeMetadata"] = False
# Extract content using new service
extracted_list = self.services.extraction.extractContent(
documents=[doc],
options=extraction_options
)
ec = extracted_list[0] if extracted_list else None
if ec and hasattr(ec, 'parts'):
for part in ec.parts:
try:
if part.typeGroup in ("text", "table", "structure") and part.data:
content += part.data + " "
except Exception:
continue
if content.strip():
logger.info(f" Retrieved content from file: {len(content)} characters")
else:
logger.info(f" No readable text content found (binary file)")
else:
logger.info(f" No content extracted (binary file)")
except Exception as e:
logger.info(f" Could not extract content (binary file): {str(e)}")
# Skip empty documents
if content and content.strip():
validDocuments.append(doc)
allContent.append(f"Document: {doc.fileName}\n{content}\n")
logger.info(f" Added document to valid documents list")
else:
logger.info(f" Skipping document with no readable text content")
if not validDocuments:
# No readable content; return a minimal valid HTML document
timestamp = int(self.services.utils.getUtcTimestamp())
return f"<!DOCTYPE html><html><head><meta charset=\"UTF-8\"><title>{title}</title></head><body><h1>{title}</h1><p>Keine auswertbaren Inhalte gefunden.</p><p>Generated: {timestamp}</p></body></html>"
# Create AI prompt for comprehensive report generation using user's prompt
combinedContent = "\n\n".join(allContent)
aiPrompt = f"""
{prompt}
Report Title: {title}
OUTPUT POLICY:
- Return ONLY a complete, raw HTML document.
- Start with: <!DOCTYPE html>
- Must include: <html>, <head> (with <meta charset="UTF-8"> and <title>), and <body>.
- The response must be valid, self-contained HTML suitable for saving as .html.
Structure:
- Title and short subtitle
- Executive summary
- Sections with clear headings
- Use tables for structured data when helpful
- Key findings and recommendations
- Generation date and number of documents
Quality and design requirements:
- Use clear, professional, and accessible styling in a <style> block
- Apply clean layout, spacing, and visual hierarchy for headings
- Keep HTML and CSS standards-compliant and lightweight
SOURCE DOCUMENT CONTENT:
---START---
{combinedContent}
---END---
"""
# Call AI to generate the report
logger.info(f"Generating AI report for {len(validDocuments)} documents")
# Build ChatDocument list from chatDocuments
documents = []
try:
for d in validDocuments:
try:
data = self.services.workflow.getFileData(d.fileId) if hasattr(d, 'fileId') else None
if data:
documents.append(ChatDocument(fileData=data, fileName=d.fileName, mimeType=d.mimeType))
except Exception:
continue
except Exception:
documents = None
aiReport = await self.services.ai.callAi(
prompt=aiPrompt,
documents=documents or None,
options=AiCallOptions(
operationType=OperationType.GENERATE_CONTENT, # Using GENERATE_CONTENT for report generation
priority=Priority.QUALITY,
compressPrompt=False,
compressContext=True,
processDocumentsIndividually=True,
resultFormat="html",
processingMode="detailed",
maxCost=0.08,
maxProcessingTime=90
)
)
# If AI call fails, return error - AI is crucial for report generation
if not aiReport or aiReport.strip() == "":
logger.error("AI report generation failed - AI is crucial for this action")
raise Exception("AI report generation failed - AI is required for report generation")
# Clean up the AI response and ensure it's valid HTML
aiReport = aiReport.strip()
# Normalize: strip code fences if present
if aiReport.startswith("```") and aiReport.endswith("```"):
lines = aiReport.split('\n')
if len(lines) >= 2:
aiReport = '\n'.join(lines[1:-1]).strip()
cleaned = aiReport.strip()
# Return exactly what we have (no wrapping)
return cleaned
except Exception as e:
logger.error(f"Error generating AI report: {str(e)}")
# Re-raise the error - AI is crucial for report generation
raise