""" Document processing method module. Handles document operations using the document service. """ import logging from typing import Dict, Any, List, Optional from datetime import datetime, UTC from modules.chat.methodBase import MethodBase, action from modules.interfaces.interfaceChatModel import ActionResult logger = logging.getLogger(__name__) class MethodDocument(MethodBase): """Document method implementation for document operations""" def __init__(self, serviceCenter: Any): """Initialize the document method""" super().__init__(serviceCenter) self.name = "document" self.description = "Handle document operations like extraction and analysis" @action async def extract(self, parameters: Dict[str, Any]) -> ActionResult: """ Extract content from any document using AI prompt. Parameters: documentList (str): Document list reference aiPrompt (str): AI prompt for extraction expectedDocumentFormats (list, optional): Output formats includeMetadata (bool, optional): Include metadata (default: True) """ try: documentList = parameters.get("documentList") aiPrompt = parameters.get("aiPrompt") expectedDocumentFormats = parameters.get("expectedDocumentFormats", []) includeMetadata = parameters.get("includeMetadata", True) if not documentList: return ActionResult.failure( error="Document list reference is required" ) if not aiPrompt: return ActionResult.failure( error="AI prompt is required" ) chatDocuments = self.service.getChatDocumentsFromDocumentList(documentList) if not chatDocuments: return ActionResult.failure( error="No documents found for the provided reference" ) # Extract content from all documents using AI all_extracted_content = [] file_infos = [] for chatDocument in chatDocuments: file_info = self.service.getFileInfo(chatDocument.fileId) try: # Use the document content extraction service with the specific AI prompt # This handles all document types (text, binary, image, etc.) intelligently extracted_content = await self.service.extractContentFromDocument( prompt=aiPrompt, document=chatDocument ) if extracted_content and extracted_content.contents: all_extracted_content.append(extracted_content) if includeMetadata: file_infos.append(file_info) logger.info(f"Successfully extracted content from {chatDocument.filename}") else: logger.warning(f"No content extracted from {chatDocument.filename}") except Exception as e: logger.error(f"Error extracting content from {chatDocument.filename}: {str(e)}") continue if not all_extracted_content: return ActionResult.failure( error="No content could be extracted from any documents" ) # Process each document individually with its own format conversion output_documents = [] for i, (chatDocument, extracted_content) in enumerate(zip(chatDocuments, all_extracted_content)): # Extract text content from this document text_content = "" if hasattr(extracted_content, 'contents') and extracted_content.contents: # Extract text from ContentItem objects text_parts = [] for content_item in extracted_content.contents: if hasattr(content_item, 'data') and content_item.data: text_parts.append(content_item.data) text_content = "\n".join(text_parts) elif isinstance(extracted_content, str): text_content = extracted_content else: text_content = str(extracted_content) # Get the expected format for this document (or use default) target_format = None if expectedDocumentFormats and i < len(expectedDocumentFormats): target_format = expectedDocumentFormats[i] elif expectedDocumentFormats and len(expectedDocumentFormats) > 0: # If fewer formats than documents, use the last format for remaining documents target_format = expectedDocumentFormats[-1] # Determine output format and filename if target_format: target_extension = target_format.get("extension", ".txt") target_mime_type = target_format.get("mimeType", "text/plain") # Check if format conversion is needed if target_extension not in [".txt", ".text"] or target_mime_type != "text/plain": logger.info(f"Converting document {i+1} to format: {target_extension} ({target_mime_type})") # Use AI to convert format formatted_content = await self._convertContentToFormat(text_content, target_format) final_content = formatted_content final_mime_type = target_mime_type final_extension = target_extension else: logger.info(f"Document {i+1}: No format conversion needed, using plain text") final_content = text_content final_mime_type = "text/plain" final_extension = ".txt" else: logger.info(f"Document {i+1}: No expected format specified, using plain text") final_content = text_content final_mime_type = "text/plain" final_extension = ".txt" # Create output filename based on original filename and target format original_filename = chatDocument.filename base_name = original_filename.rsplit('.', 1)[0] if '.' in original_filename else original_filename output_filename = f"{base_name}_extracted_{datetime.now(UTC).strftime('%Y%m%d_%H%M%S')}{final_extension}" # Create result data for this document result_data = { "documentCount": 1, "content": final_content, "originalFilename": original_filename, "fileInfos": [file_infos[i]] if includeMetadata and i < len(file_infos) else None, "timestamp": datetime.now(UTC).isoformat() } logger.info(f"Created output document: {output_filename} with {len(final_content)} characters") output_documents.append({ "documentName": output_filename, "documentData": result_data, "mimeType": final_mime_type }) return ActionResult.success( documents=output_documents ) except Exception as e: logger.error(f"Error extracting content: {str(e)}") return ActionResult.failure( error=str(e) ) @action async def generate(self, parameters: Dict[str, Any]) -> ActionResult: """ Convert TEXT-ONLY documents to target formats (NO AI usage). Parameters: documentList (list): TEXT-ONLY documents only expectedDocumentFormats (list): Target formats originalDocuments (list, optional): Original names includeMetadata (bool, optional): Include metadata (default: True) """ try: document_list = parameters.get("documentList", []) expected_document_formats = parameters.get("expectedDocumentFormats", []) original_documents = parameters.get("originalDocuments", []) include_metadata = parameters.get("includeMetadata", True) if not document_list: return ActionResult.failure( error="Document list is required for generation" ) if not expected_document_formats or len(expected_document_formats) == 0: return ActionResult.failure( error="Expected document formats specification is required" ) # Get chat documents for original documents list chat_documents = self.service.getChatDocumentsFromDocumentList(document_list) logger.info(f"Found {len(chat_documents)} chat documents") if not chat_documents: return ActionResult.failure( error="No documents found for the provided documentList reference" ) # Update original documents list if not provided if not original_documents: original_documents = [doc.filename if hasattr(doc, 'filename') else str(doc.id) for doc in chat_documents] # Process each document individually with its own format conversion output_documents = [] for i, chat_document in enumerate(chat_documents): # Extract content from this document directly - NO AI, just read the data as-is # This ensures we get the original text content for format conversion content = "" if hasattr(chat_document, 'fileId') and chat_document.fileId: try: # Get file data directly without AI processing file_data = self.service.getFileData(chat_document.fileId) if file_data: # Check if it's text data and convert to string if isinstance(file_data, bytes): try: # Try to decode as UTF-8 to check if it's text content = file_data.decode('utf-8') logger.info(f"Document {i+1} ({chat_document.filename}): Successfully decoded as UTF-8 text") except UnicodeDecodeError: logger.info(f"Document {i+1} ({chat_document.filename}): Binary data, not text - skipping") continue else: # Already a string content = str(file_data) logger.info(f"Document {i+1} ({chat_document.filename}): Already text data") else: logger.warning(f"Document {i+1} ({chat_document.filename}): No file data found") continue if not content.strip(): logger.info(f"Document {i+1} ({chat_document.filename}): Empty text content, skipping") continue except Exception as e: logger.warning(f"Error reading document {i+1} ({chat_document.filename}): {str(e)}") continue else: logger.warning(f"Document {i+1} has no fileId, skipping") continue logger.info(f"Extracted content from document {i+1}: {len(content)} characters") # Get the expected format for this document (or use default) target_format = None if i < len(expected_document_formats): target_format = expected_document_formats[i] elif len(expected_document_formats) > 0: # If fewer formats than documents, use the last format for remaining documents target_format = expected_document_formats[-1] if not target_format: logger.warning(f"No expected format for document {i+1}, skipping") continue # Use AI to convert format formatted_content = await self._convertContentToFormat(content, target_format) if not formatted_content: logger.warning(f"Failed to format document {i+1}, skipping") continue target_extension = target_format.get("extension", ".txt") target_mime_type = target_format.get("mimeType", "text/plain") # Create output filename timestamp = datetime.now(UTC).strftime('%Y%m%d_%H%M%S') if i < len(original_documents): base_name = original_documents[i].rsplit('.', 1)[0] if '.' in original_documents[i] else original_documents[i] else: base_name = f"document_{i+1}" output_filename = f"{base_name}_generated_{timestamp}{target_extension}" # Create result data result_data = { "documentCount": 1, "content": formatted_content, "outputFormat": target_format, "originalDocument": original_documents[i] if i < len(original_documents) else f"document_{i+1}", "timestamp": datetime.now(UTC).isoformat() } logger.info(f"Generated document: {output_filename} with {len(formatted_content)} characters") output_documents.append({ "documentName": output_filename, "documentData": result_data, "mimeType": target_mime_type }) if not output_documents: return ActionResult.failure( error="No documents could be generated" ) return ActionResult.success( documents=output_documents ) except Exception as e: logger.error(f"Error generating document: {str(e)}") return ActionResult.failure( error=str(e) ) async def _convertContentToFormat(self, content: str, target_format: Dict[str, Any]) -> str: """ Helper function to convert content to the specified format using AI. """ try: extension = target_format.get("extension", ".txt") mime_type = target_format.get("mimeType", "text/plain") logger.info(f"Converting content to format: {extension} ({mime_type})") # Create AI prompt for format conversion format_prompts = { ".csv": f""" Convert the following content into a proper CSV format. Requirements: 1. Output ONLY the CSV data without any markdown, code blocks, or additional text 2. Use appropriate headers based on the content 3. Ensure proper CSV formatting with commas and quotes where needed 4. Make the data easily readable and importable into spreadsheet applications Content to convert: {content} Generate ONLY the CSV data: """, ".json": f""" Convert the following content into a proper JSON format. Requirements: 1. Output ONLY the JSON data without any markdown, code blocks, or additional text 2. Structure the data logically with appropriate keys and values 3. Ensure valid JSON syntax 4. Make the data easily parseable and readable Content to convert: {content} Generate ONLY the JSON data: """, ".xml": f""" Convert the following content into a proper XML format. Requirements: 1. Output ONLY the XML data without any markdown, code blocks, or additional text 2. Use appropriate XML tags and structure 3. Ensure valid XML syntax 4. Make the data easily parseable and readable Content to convert: {content} Generate ONLY the XML data: """, ".html": f""" Convert the following content into a proper HTML format. Requirements: 1. Output ONLY the HTML data without any markdown, code blocks, or additional text 2. Use appropriate HTML tags and structure 3. Ensure valid HTML syntax 4. Make the data easily readable in web browsers Content to convert: {content} Generate ONLY the HTML data: """, ".md": f""" Convert the following content into a proper Markdown format. Requirements: 1. Output ONLY the Markdown data without any code blocks or additional text 2. Use appropriate Markdown syntax for headers, lists, emphasis, etc. 3. Structure the content logically 4. Make the data easily readable and convertible to other formats Content to convert: {content} Generate ONLY the Markdown data: """ } # Get the appropriate prompt for the target format if extension in format_prompts: ai_prompt = format_prompts[extension] else: # Generic format conversion ai_prompt = f""" Convert the following content into {extension.upper()} format. Requirements: 1. Output ONLY the {extension.upper()} data without any markdown, code blocks, or additional text 2. Use appropriate formatting for {extension.upper()} files 3. Ensure the output is valid and usable 4. Make the data easily readable and importable Content to convert: {content} Generate ONLY the {extension.upper()} data: """ # Call AI to generate the formatted content logger.info(f"Calling AI for {extension} format conversion") formatted_content = await self.service.callAiTextBasic(ai_prompt, content) if not formatted_content or formatted_content.strip() == "": logger.warning("AI format conversion failed, using fallback") return self._generateFallbackFormattedContent(content, extension, mime_type) # Clean up the AI response formatted_content = formatted_content.strip() # Remove markdown code blocks if present if formatted_content.startswith("```") and formatted_content.endswith("```"): lines = formatted_content.split('\n') if len(lines) > 2: formatted_content = '\n'.join(lines[1:-1]) return formatted_content except Exception as e: logger.error(f"Error in AI format conversion: {str(e)}") return self._generateFallbackFormattedContent(content, extension, mime_type) def _generateFallbackFormattedContent(self, content: str, extension: str, mime_type: str) -> str: """ Generate fallback formatted content when AI conversion fails. """ try: if extension == ".csv": # Simple CSV fallback - split by lines and create basic CSV lines = content.strip().split('\n') if lines: # Create a simple CSV with line numbers and content csv_lines = ["Line,Content"] for i, line in enumerate(lines, 1): # Escape quotes and wrap in quotes if comma present if ',' in line: line = f'"{line.replace(chr(34), chr(34) + chr(34))}"' csv_lines.append(f"{i},{line}") return '\n'.join(csv_lines) return "Line,Content\n1,No content available" elif extension == ".json": # Simple JSON fallback content_escaped = content.replace('"', '\\"') timestamp = datetime.now(UTC).isoformat() return f'{{"content": "{content_escaped}", "format": "json", "timestamp": "{timestamp}"}}' elif extension == ".xml": # Simple XML fallback timestamp = datetime.now(UTC).isoformat() return f'\n\n{content}\nxml\n{timestamp}\n' elif extension == ".html": # Simple HTML fallback timestamp = datetime.now(UTC).strftime('%Y-%m-%d %H:%M:%S UTC') return f'\n\nGenerated Document\n\n
{content}
\n

Generated on {timestamp}

\n\n' elif extension == ".md": # Simple Markdown fallback timestamp = datetime.now(UTC).strftime('%Y-%m-%d %H:%M:%S UTC') return f"# Generated Document\n\n{content}\n\n---\n*Generated on {timestamp}*" else: # Generic fallback - return content as-is return content except Exception as e: logger.error(f"Error in fallback format conversion: {str(e)}") return content @action async def generateReport(self, parameters: Dict[str, Any]) -> ActionResult: """ Generate HTML report from multiple documents using AI. Parameters: documentList (str): Document list reference prompt (str): AI prompt for report generation title (str, optional): Report title (default: "Summary Report") includeMetadata (bool, optional): Include metadata (default: True) """ try: documentList = parameters.get("documentList") prompt = parameters.get("prompt") title = parameters.get("title", "Summary Report") includeMetadata = parameters.get("includeMetadata", True) if not documentList: return ActionResult.failure( error="Document list reference is required" ) if not prompt: return ActionResult.failure( error="Prompt is required to specify what kind of report to generate" ) chatDocuments = self.service.getChatDocumentsFromDocumentList(documentList) logger.info(f"Retrieved {len(chatDocuments)} chat documents for report generation") if not chatDocuments: return ActionResult.failure( error="No documents found for the provided reference" ) # Generate HTML report html_content = await self._generateHtmlReport(chatDocuments, title, includeMetadata, prompt) # Create output filename timestamp = datetime.now(UTC).strftime('%Y%m%d_%H%M%S') output_filename = f"report_{timestamp}.html" result_data = { "documentCount": len(chatDocuments), "content": html_content, "title": title, "timestamp": datetime.now(UTC).isoformat() } logger.info(f"Generated HTML report: {output_filename} with {len(html_content)} characters") return ActionResult.success( documents=[{ "documentName": output_filename, "documentData": result_data, "mimeType": "text/html" }] ) except Exception as e: logger.error(f"Error generating report: {str(e)}") return ActionResult.failure( error=str(e) ) async def _generateHtmlReport(self, chatDocuments: List[Any], title: str, includeMetadata: bool, prompt: str) -> str: """ Generate a comprehensive HTML report using AI from all input documents. """ try: # Filter out empty documents and collect content validDocuments = [] allContent = [] for doc in chatDocuments: content = "" logger.info(f"Processing document: type={type(doc)}") # Get actual file content using the document content extraction service try: extracted_content = await self.service.extractContentFromDocument( prompt="Extract readable text content for HTML report generation", document=doc ) if extracted_content and extracted_content.contents: # Get the first content item's data for content_item in extracted_content.contents: if hasattr(content_item, 'data') and content_item.data: content += content_item.data + " " if content.strip(): logger.info(f" Retrieved content from file: {len(content)} characters") else: logger.info(f" No readable text content found (binary file)") else: logger.info(f" No content extracted (binary file)") except Exception as e: logger.info(f" Could not extract content (binary file): {str(e)}") # Skip empty documents if content and content.strip(): validDocuments.append(doc) allContent.append(f"Document: {doc.filename}\n{content}\n") logger.info(f" Added document to valid documents list") else: logger.info(f" Skipping document with no readable text content") if not validDocuments: # If no valid documents, create a simple report html = ["" + title + ""] html.append(f"

{title}

") html.append(f"

Generated: {datetime.now(UTC).strftime('%Y-%m-%d %H:%M:%S UTC')}

") html.append("

No content available in the provided documents.

") html.append("") return '\n'.join(html) # Create AI prompt for comprehensive report generation using user's prompt combinedContent = "\n\n".join(allContent) aiPrompt = f""" {prompt} Report Title: {title} Additional Requirements: 1. Create a professional, well-formatted HTML report 2. Include an executive summary at the beginning 3. Organize information logically with clear sections 4. Highlight key findings and insights 5. Include relevant data, statistics, and conclusions 6. Use proper HTML formatting with headers, lists, and styling 7. Make it readable and professional Document Content: {combinedContent} Generate a complete HTML report that addresses the user's specific requirements and integrates all the information into a cohesive, professional document. """ # Call AI to generate the report logger.info(f"Generating AI report for {len(validDocuments)} documents") aiReport = await self.service.callAiTextBasic(aiPrompt, combinedContent) # If AI call fails, return error - AI is crucial for report generation if not aiReport or aiReport.strip() == "": logger.error("AI report generation failed - AI is crucial for this action") raise Exception("AI report generation failed - AI is required for report generation") # Clean up the AI response and ensure it's valid HTML if not aiReport.strip().startswith('" + title + ""] # Only add the title if the AI response doesn't already have one if not has_title: html.append(f"

{title}

") html.append(f"

Generated: {datetime.now(UTC).strftime('%Y-%m-%d %H:%M:%S UTC')}

") html.append(f"

Total Documents Analyzed: {len(validDocuments)}

") html.append("
") html.append(aiReport) html.append("") return '\n'.join(html) else: # AI returned complete HTML, use it directly return aiReport except Exception as e: logger.error(f"Error generating AI report: {str(e)}") # Re-raise the error - AI is crucial for report generation raise