gateway/modules/workflows/methods/_EXCLUDED_methodDocument.py
2025-10-14 14:47:50 +02:00

252 lines
12 KiB
Python

"""
Document processing method module.
Handles document operations using the document service.
"""
import logging
import os
from typing import Dict, Any, List, Optional
from datetime import datetime, UTC
from modules.workflows.methods.methodBase import MethodBase, action
from modules.datamodels.datamodelChat import ActionResult, ActionDocument
from modules.datamodels.datamodelChat import ChatDocument
from modules.datamodels.datamodelAi import AiCallOptions, OperationType, Priority
logger = logging.getLogger(__name__)
class MethodDocument(MethodBase):
"""Document method implementation for document operations"""
def __init__(self, services):
"""Initialize the document method"""
super().__init__(services)
self.name = "document"
self.description = "Handle document operations like extraction and analysis"
def _format_timestamp_for_filename(self) -> str:
"""Format current timestamp as YYYYMMDD-hhmmss for filenames."""
return datetime.now(UTC).strftime("%Y%m%d-%H%M%S")
@action
async def extract(self, parameters: Dict[str, Any]) -> ActionResult:
"""
GENERAL:
- Purpose: Extract and analyze content from existing documents using AI.
- Input requirements: documentList (required); prompt (required).
- Output format: Plain text per source document (.txt by default).
Parameters:
- documentList (list, required): Document reference(s) to extract from.
- prompt (str, required): Instruction describing what to extract.
- operationType (str, optional): extract_content | analyze_document | summarize_content. Default: extract_content.
- processDocumentsIndividually (bool, optional): Process each document separately. Default: True.
- chunkAllowed (bool, optional): Allow chunking for large inputs. Default: True.
- outputMimeType (str, optional): MIME type for output file. Options: "text/plain" (default), "application/json", "text/csv", "text/html". Default: "text/plain".
"""
try:
documentList = parameters.get("documentList")
if isinstance(documentList, str):
documentList = [documentList]
prompt = parameters.get("prompt")
operationType = parameters.get("operationType", "extract_content")
processDocumentsIndividually = parameters.get("processDocumentsIndividually", True)
chunkAllowed = parameters.get("chunkAllowed", True)
outputMimeType = parameters.get("outputMimeType", "text/plain")
if not documentList:
return ActionResult.isFailure(
error="Document list reference is required"
)
if not prompt:
return ActionResult.isFailure(
error="Prompt is required"
)
chatDocuments = self.services.workflow.getChatDocumentsFromDocumentList(documentList)
if not chatDocuments:
return ActionResult.isFailure(
error="No documents found for the provided reference"
)
# Use enhanced AI service with integrated extraction
try:
# Build AI call options
ai_options = AiCallOptions(
operationType=operationType,
processDocumentsIndividually=processDocumentsIndividually,
compressContext=not chunkAllowed
)
# Add format instructions to prompt based on MIME type
enhanced_prompt = prompt
mime_type_mapping = {
"text/plain": (".txt", "Plain text format"),
"application/json": (".json", "Structured JSON format"),
"text/csv": (".csv", "Table format"),
"text/html": (".html", "HTML format")
}
extension, description = mime_type_mapping.get(outputMimeType, (".txt", "Plain text format"))
enhanced_prompt += f"\n\nPlease format the output as {extension} ({outputMimeType}): {description}"
# Use enhanced AI service for extraction
ai_response = await self.services.ai.callAi(
prompt=enhanced_prompt,
documents=chatDocuments,
options=ai_options
)
logger.info(f"AI extraction completed: {len(ai_response)} characters")
except Exception as e:
logger.error(f"AI extraction failed: {str(e)}")
ai_response = ""
if not ai_response or ai_response.strip() == "":
return ActionResult.isFailure(
error="No content could be extracted from any documents"
)
# Process each document individually with extracted content
action_documents = []
for i, chatDocument in enumerate(chatDocuments):
# Use the AI response directly - it already contains processed content
final_content = ai_response
# Determine output format based on MIME type
mime_type_mapping = {
"text/plain": ".txt",
"application/json": ".json",
"text/csv": ".csv",
"text/html": ".html"
}
final_extension = mime_type_mapping.get(outputMimeType, ".txt")
final_mime_type = outputMimeType
# Create meaningful output fileName with workflow context
original_fileName = chatDocument.fileName
base_name = original_fileName.rsplit('.', 1)[0] if '.' in original_fileName else original_fileName
extension = final_extension.lstrip('.') # Remove leading dot for meaningful naming
output_fileName = self._generateMeaningfulFileName(
base_name=f"{base_name}_extracted",
extension=extension,
action_name="extract"
)
logger.info(f"Created output document: {output_fileName} with {len(final_content)} characters")
# Create proper ActionDocument object
action_documents.append(ActionDocument(
documentName=output_fileName,
documentData=final_content,
mimeType=final_mime_type
))
return ActionResult.isSuccess(
documents=action_documents
)
except Exception as e:
logger.error(f"Error extracting content: {str(e)}")
return ActionResult.isFailure(
error=str(e)
)
@action
async def generate(self, parameters: Dict[str, Any]) -> ActionResult:
"""
GENERAL:
- Purpose: Generate formatted documents and reports from source documents.
- Input requirements: documentList (required); prompt (required); optional title and outputFormat.
- Any output format, e.g.: html | pdf | docx | txt | md | json | csv | xlsx
Parameters:
- documentList (list, required): Document reference(s) to include as context.
- prompt (str, required): Instruction describing the desired document/report.
- title (str, optional): Title for the generated document. Default: "Summary Report".
- outputFormat (str, optional): html | pdf | docx | txt | md | json | csv | xlsx. Default: html.
- operationType (str, optional): generate_report | analyze_documents. Default: generate_report.
- processDocumentsIndividually (bool, optional): Process per document. Default: True.
- chunkAllowed (bool, optional): Allow chunking for large inputs. Default: True.
"""
try:
documentList = parameters.get("documentList")
if isinstance(documentList, str):
documentList = [documentList]
prompt = parameters.get("prompt")
title = parameters.get("title", "Summary Report")
outputFormat = parameters.get("outputFormat", "html")
operationType = parameters.get("operationType", "generate_report")
processDocumentsIndividually = parameters.get("processDocumentsIndividually", True)
chunkAllowed = parameters.get("chunkAllowed", True)
if not documentList:
return ActionResult.isFailure(
error="Document list reference is required"
)
if not prompt:
return ActionResult.isFailure(
error="Prompt is required to specify what kind of report to generate"
)
chatDocuments = self.services.workflow.getChatDocumentsFromDocumentList(documentList)
logger.info(f"Retrieved {len(chatDocuments)} chat documents for report generation")
if not chatDocuments:
return ActionResult.isFailure(
error="No documents found for the provided reference"
)
# Use enhanced AI service with document generation
try:
# Build AI call options
ai_options = AiCallOptions(
operationType=operationType,
processDocumentsIndividually=processDocumentsIndividually,
compressContext=not chunkAllowed
)
# Use enhanced AI service with document generation
result = await self.services.ai.callAi(
prompt=prompt,
documents=chatDocuments,
options=ai_options,
outputFormat=outputFormat,
title=title
)
if isinstance(result, dict) and result.get("success"):
# Extract document information from result
documents = result.get("documents", [])
if documents:
# Convert to ActionDocument format
action_documents = []
for doc in documents:
action_documents.append(ActionDocument(
documentName=doc["documentName"],
documentData=doc["documentData"],
mimeType=doc["mimeType"]
))
logger.info(f"Generated {outputFormat.upper()} report: {len(action_documents)} documents")
return ActionResult.isSuccess(documents=action_documents)
else:
return ActionResult.isFailure(error="No documents generated")
else:
error_msg = result.get("error", "Unknown error") if isinstance(result, dict) else "AI generation failed"
return ActionResult.isFailure(error=error_msg)
except Exception as e:
logger.error(f"AI generation failed: {str(e)}")
return ActionResult.isFailure(error=str(e))
except Exception as e:
logger.error(f"Error generating report: {str(e)}")
return ActionResult.isFailure(
error=str(e)
)