266 lines
12 KiB
Python
266 lines
12 KiB
Python
"""
|
|
Document processing method module.
|
|
Handles document operations using the document service.
|
|
"""
|
|
|
|
import logging
|
|
import os
|
|
from typing import Dict, Any, List, Optional
|
|
from datetime import datetime, UTC
|
|
|
|
from modules.workflows.methods.methodBase import MethodBase, action
|
|
from modules.datamodels.datamodelChat import ActionResult, ActionDocument
|
|
from modules.datamodels.datamodelChat import ChatDocument
|
|
from modules.datamodels.datamodelAi import AiCallOptions, OperationType, Priority
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class MethodDocument(MethodBase):
|
|
"""Document method implementation for document operations"""
|
|
|
|
def __init__(self, services):
|
|
"""Initialize the document method"""
|
|
super().__init__(services)
|
|
self.name = "document"
|
|
self.description = "Handle document operations like extraction and analysis"
|
|
|
|
def _format_timestamp_for_filename(self) -> str:
|
|
"""Format current timestamp as YYYYMMDD-hhmmss for filenames."""
|
|
return datetime.now(UTC).strftime("%Y%m%d-%H%M%S")
|
|
|
|
@action
|
|
async def extract(self, parameters: Dict[str, Any]) -> ActionResult:
|
|
"""
|
|
GENERAL:
|
|
- Purpose: Extract and analyze content from existing documents using AI.
|
|
- Input requirements: documentList (required); prompt (required).
|
|
- Output format: Plain text per source document (.txt by default).
|
|
|
|
Parameters:
|
|
- documentList (list, required): Document reference(s) to extract from.
|
|
- prompt (str, required): Instruction describing what to extract.
|
|
- operationType (str, optional): extract_content | analyze_document | summarize_content. Default: extract_content.
|
|
- processDocumentsIndividually (bool, optional): Process each document separately. Default: True.
|
|
- chunkAllowed (bool, optional): Allow chunking for large inputs. Default: True.
|
|
- mergeStrategy (dict, optional): Merge strategy for chunked content.
|
|
- expectedDocumentFormats (list, optional): Desired output format specs.
|
|
- includeMetadata (bool, optional): Include file metadata. Default: True.
|
|
"""
|
|
try:
|
|
documentList = parameters.get("documentList")
|
|
if isinstance(documentList, str):
|
|
documentList = [documentList]
|
|
prompt = parameters.get("prompt")
|
|
operationType = parameters.get("operationType", "extract_content")
|
|
processDocumentsIndividually = parameters.get("processDocumentsIndividually", True)
|
|
chunkAllowed = parameters.get("chunkAllowed", True)
|
|
mergeStrategy = parameters.get("mergeStrategy", {
|
|
"groupBy": "typeGroup",
|
|
"orderBy": "id",
|
|
"mergeType": "concatenate"
|
|
})
|
|
expectedDocumentFormats = parameters.get("expectedDocumentFormats", [])
|
|
includeMetadata = parameters.get("includeMetadata", True)
|
|
|
|
if not documentList:
|
|
return ActionResult.isFailure(
|
|
error="Document list reference is required"
|
|
)
|
|
|
|
if not prompt:
|
|
return ActionResult.isFailure(
|
|
error="Prompt is required"
|
|
)
|
|
|
|
chatDocuments = self.services.workflow.getChatDocumentsFromDocumentList(documentList)
|
|
if not chatDocuments:
|
|
return ActionResult.isFailure(
|
|
error="No documents found for the provided reference"
|
|
)
|
|
|
|
# Use enhanced AI service with integrated extraction
|
|
try:
|
|
# Build AI call options
|
|
ai_options = AiCallOptions(
|
|
operationType=operationType,
|
|
processDocumentsIndividually=processDocumentsIndividually,
|
|
compressContext=not chunkAllowed
|
|
)
|
|
|
|
# Add format instructions to prompt if expected formats are provided
|
|
enhanced_prompt = prompt
|
|
if expectedDocumentFormats:
|
|
format_instructions = []
|
|
for fmt in expectedDocumentFormats:
|
|
extension = fmt.get("extension", ".txt")
|
|
mime_type = fmt.get("mimeType", "text/plain")
|
|
description = fmt.get("description", "")
|
|
format_instructions.append(f"- {extension} ({mime_type}): {description}")
|
|
|
|
if format_instructions:
|
|
enhanced_prompt += f"\n\nPlease format the output as: {', '.join([fmt.get('extension', '.txt') for fmt in expectedDocumentFormats])}"
|
|
enhanced_prompt += f"\nExpected formats:\n" + "\n".join(format_instructions)
|
|
|
|
# Use enhanced AI service for extraction
|
|
ai_response = await self.services.ai.callAi(
|
|
prompt=enhanced_prompt,
|
|
documents=chatDocuments,
|
|
options=ai_options
|
|
)
|
|
|
|
logger.info(f"AI extraction completed: {len(ai_response)} characters")
|
|
|
|
except Exception as e:
|
|
logger.error(f"AI extraction failed: {str(e)}")
|
|
ai_response = ""
|
|
|
|
if not ai_response or ai_response.strip() == "":
|
|
return ActionResult.isFailure(
|
|
error="No content could be extracted from any documents"
|
|
)
|
|
|
|
# Process each document individually with extracted content
|
|
action_documents = []
|
|
|
|
for i, chatDocument in enumerate(chatDocuments):
|
|
# Use the AI response directly - it already contains processed content
|
|
final_content = ai_response
|
|
final_mime_type = "text/plain"
|
|
final_extension = ".txt"
|
|
|
|
# Create meaningful output fileName with workflow context
|
|
original_fileName = chatDocument.fileName
|
|
base_name = original_fileName.rsplit('.', 1)[0] if '.' in original_fileName else original_fileName
|
|
extension = final_extension.lstrip('.') # Remove leading dot for meaningful naming
|
|
output_fileName = self._generateMeaningfulFileName(
|
|
base_name=f"{base_name}_extracted",
|
|
extension=extension,
|
|
action_name="extract"
|
|
)
|
|
|
|
logger.info(f"Created output document: {output_fileName} with {len(final_content)} characters")
|
|
|
|
# Create proper ActionDocument object
|
|
action_documents.append(ActionDocument(
|
|
documentName=output_fileName,
|
|
documentData=final_content,
|
|
mimeType=final_mime_type
|
|
))
|
|
|
|
return ActionResult.isSuccess(
|
|
documents=action_documents
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Error extracting content: {str(e)}")
|
|
return ActionResult.isFailure(
|
|
error=str(e)
|
|
)
|
|
|
|
|
|
|
|
|
|
@action
|
|
async def generate(self, parameters: Dict[str, Any]) -> ActionResult:
|
|
"""
|
|
GENERAL:
|
|
- Purpose: Generate formatted documents and reports from source documents.
|
|
- Input requirements: documentList (required); prompt (required); optional title and outputFormat.
|
|
- Any output format, e.g.: html | pdf | docx | txt | md | json | csv | xlsx
|
|
|
|
Parameters:
|
|
- documentList (list, required): Document reference(s) to include as context.
|
|
- prompt (str, required): Instruction describing the desired document/report.
|
|
- title (str, optional): Title for the generated document. Default: "Summary Report".
|
|
- outputFormat (str, optional): html | pdf | docx | txt | md | json | csv | xlsx. Default: html.
|
|
- operationType (str, optional): generate_report | analyze_documents. Default: generate_report.
|
|
- processDocumentsIndividually (bool, optional): Process per document. Default: True.
|
|
- chunkAllowed (bool, optional): Allow chunking for large inputs. Default: True.
|
|
- mergeStrategy (dict, optional): Merging rules for multi-part generation.
|
|
- includeMetadata (bool, optional): Include file metadata. Default: True.
|
|
"""
|
|
try:
|
|
documentList = parameters.get("documentList")
|
|
if isinstance(documentList, str):
|
|
documentList = [documentList]
|
|
prompt = parameters.get("prompt")
|
|
title = parameters.get("title", "Summary Report")
|
|
outputFormat = parameters.get("outputFormat", "html")
|
|
operationType = parameters.get("operationType", "generate_report")
|
|
processDocumentsIndividually = parameters.get("processDocumentsIndividually", True)
|
|
chunkAllowed = parameters.get("chunkAllowed", True)
|
|
mergeStrategy = parameters.get("mergeStrategy", {
|
|
"groupBy": "typeGroup",
|
|
"orderBy": "id",
|
|
"mergeType": "concatenate"
|
|
})
|
|
includeMetadata = parameters.get("includeMetadata", True)
|
|
|
|
if not documentList:
|
|
return ActionResult.isFailure(
|
|
error="Document list reference is required"
|
|
)
|
|
|
|
if not prompt:
|
|
return ActionResult.isFailure(
|
|
error="Prompt is required to specify what kind of report to generate"
|
|
)
|
|
|
|
chatDocuments = self.services.workflow.getChatDocumentsFromDocumentList(documentList)
|
|
logger.info(f"Retrieved {len(chatDocuments)} chat documents for report generation")
|
|
|
|
if not chatDocuments:
|
|
return ActionResult.isFailure(
|
|
error="No documents found for the provided reference"
|
|
)
|
|
|
|
# Use enhanced AI service with document generation
|
|
try:
|
|
# Build AI call options
|
|
ai_options = AiCallOptions(
|
|
operationType=operationType,
|
|
processDocumentsIndividually=processDocumentsIndividually,
|
|
compressContext=not chunkAllowed
|
|
)
|
|
|
|
# Use enhanced AI service with document generation
|
|
result = await self.services.ai.callAi(
|
|
prompt=prompt,
|
|
documents=chatDocuments,
|
|
options=ai_options,
|
|
outputFormat=outputFormat,
|
|
title=title
|
|
)
|
|
|
|
if isinstance(result, dict) and result.get("success"):
|
|
# Extract document information from result
|
|
documents = result.get("documents", [])
|
|
if documents:
|
|
# Convert to ActionDocument format
|
|
action_documents = []
|
|
for doc in documents:
|
|
action_documents.append(ActionDocument(
|
|
documentName=doc["documentName"],
|
|
documentData=doc["documentData"],
|
|
mimeType=doc["mimeType"]
|
|
))
|
|
|
|
logger.info(f"Generated {outputFormat.upper()} report: {len(action_documents)} documents")
|
|
return ActionResult.isSuccess(documents=action_documents)
|
|
else:
|
|
return ActionResult.isFailure(error="No documents generated")
|
|
else:
|
|
error_msg = result.get("error", "Unknown error") if isinstance(result, dict) else "AI generation failed"
|
|
return ActionResult.isFailure(error=error_msg)
|
|
|
|
except Exception as e:
|
|
logger.error(f"AI generation failed: {str(e)}")
|
|
return ActionResult.isFailure(error=str(e))
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error generating report: {str(e)}")
|
|
return ActionResult.isFailure(
|
|
error=str(e)
|
|
)
|
|
|
|
|
|
|