""" Test script for document processing and DOCX generation. Calls the main AI service directly to process PDF documents and generate DOCX summaries. """ import asyncio import sys import os import logging import base64 from datetime import datetime from pathlib import Path # Add the gateway module to the path sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'modules')) from modules.datamodels.datamodelChat import ChatDocument from modules.datamodels.datamodelAi import EnhancedAiCallOptions from modules.services.serviceAi.mainServiceAi import AiService from modules.services.serviceGeneration.mainServiceGeneration import GenerationService # Set up logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) async def process_documents_and_generate_summary(): """Process documents using the main AI service with intelligent chunk integration.""" logger.info("๐Ÿš€ Starting intelligent chunk integration test...") # Find testdata directory testdata_path = Path("../wiki/poweron/testdata") if not testdata_path.exists(): # Try relative to current directory testdata_path = Path("wiki/poweron/testdata") if not testdata_path.exists(): # Try relative to parent directory testdata_path = Path("../wiki/poweron/testdata") if not testdata_path.exists(): logger.error(f"โŒ Testdata path not found. Tried:") logger.error(f" - ../wiki/poweron/testdata") logger.error(f" - wiki/poweron/testdata") logger.error(f" - ../wiki/poweron/testdata") logger.info("Please ensure the testdata folder exists with PDF documents") return False # Find all supported document files supported_extensions = ["*.pdf", "*.jpg", "*.jpeg", "*.png", "*.gif", "*.docx", "*.xlsx", "*.pptx", "*.ppt", "*.txt", "*.md", "*.html", "*.csv"] document_files = [] for ext in supported_extensions: document_files.extend(list(testdata_path.glob(ext))) logger.info(f"Found {len(document_files)} document files in testdata:") for doc_file in document_files: logger.info(f" - {doc_file.name}") if not document_files: logger.error("โŒ No supported document files found in testdata folder") return False try: # Mock the database interface to provide our file data BEFORE creating AI service class TestDbInterface: def __init__(self, file_data_map): self.file_data_map = file_data_map def getFileData(self, file_id): logger.info(f"TestDbInterface.getFileData called with file_id: {file_id}") data = self.file_data_map.get(file_id) if data: logger.info(f"โœ… Found file data for {file_id}: {len(data)} bytes") else: logger.warning(f"โŒ No file data found for {file_id}") return data # Create file data mapping file_data_map = {} for i, doc_file in enumerate(document_files): with open(doc_file, 'rb') as f: file_data_map[f"test_doc_{i+1}"] = f.read() logger.info(f"๐Ÿ“ Loaded {doc_file.name} as test_doc_{i+1}: {len(file_data_map[f'test_doc_{i+1}'])} bytes") # Mock the database interface BEFORE creating AI service import modules.interfaces.interfaceDbComponentObjects as db_interface_module original_get_interface = db_interface_module.getInterface db_interface_module.getInterface = lambda: TestDbInterface(file_data_map) logger.info("๐Ÿ”ง Database interface mocked successfully") # Initialize the main AI service - let it handle everything logger.info("๐Ÿ”ง Initializing main AI service...") ai_service = await AiService.create() # Create test documents - the AI service will handle file access internally documents = [] logger.info(f"๐Ÿ“ Found {len(document_files)} document files") for i, doc_file in enumerate(document_files): logger.info(f"๐Ÿ“„ Processing file {i+1}/{len(document_files)}: {doc_file.name}") # Determine MIME type based on file extension mime_type = "application/octet-stream" # default if doc_file.suffix.lower() == '.pdf': mime_type = "application/pdf" elif doc_file.suffix.lower() in ['.jpg', '.jpeg']: mime_type = "image/jpeg" elif doc_file.suffix.lower() == '.png': mime_type = "image/png" elif doc_file.suffix.lower() == '.gif': mime_type = "image/gif" elif doc_file.suffix.lower() == '.docx': mime_type = "application/vnd.openxmlformats-officedocument.wordprocessingml.document" elif doc_file.suffix.lower() == '.xlsx': mime_type = "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" elif doc_file.suffix.lower() == '.pptx': mime_type = "application/vnd.openxmlformats-officedocument.presentationml.presentation" elif doc_file.suffix.lower() == '.ppt': mime_type = "application/vnd.ms-powerpoint" elif doc_file.suffix.lower() == '.html': mime_type = "text/html" elif doc_file.suffix.lower() == '.csv': mime_type = "text/csv" elif doc_file.suffix.lower() in ['.txt', '.md']: mime_type = "text/plain" chat_doc = ChatDocument( fileId=f"test_doc_{i+1}", messageId=f"test_message_{i+1}", fileName=doc_file.name, mimeType=mime_type, fileSize=doc_file.stat().st_size, roundNumber=1, taskNumber=1, actionNumber=1, actionId=f"test_action_{i+1}" ) documents.append(chat_doc) logger.info(f"โœ… Created ChatDocument: {chat_doc.fileName} ({chat_doc.mimeType}) - {chat_doc.fileSize} bytes") logger.info(f"๐Ÿ“„ Created {len(documents)} document objects") # Create enhanced AI call options for intelligent chunked processing ai_options = EnhancedAiCallOptions( operationType="general", enableParallelProcessing=True, maxConcurrentChunks=5, # Increased for better testing preserveChunkMetadata=True, chunkSeparator="\n\n---\n\n" ) # Call the main AI service directly - let it handle everything including DOCX generation logger.info("๐Ÿค– Calling main AI service with intelligent merging...") # Run a single end-to-end test to avoid the loop issue logger.info("๐Ÿงช Running single end-to-end test...") # userPrompt = "Analyze these documents and create a comprehensive DOCX summary document including: 1) Document types and purposes, 2) Key information and main points, 3) Important details and numbers, 4) Notable sections, 5) Overall assessment and recommendations." userPrompt = "Extract the table from file and produce 2 lists in excel. one list with all entries, one list only with entries that are yellow highlighted." # userPrompt = "Create a docx file containing a summary and the COMPLETE list from the pdf file, having one additional column with a 'x' marker for all items, which are yellow highlighted." # userPrompt = "Create a docx file containing the combined documents in french language." try: # Single AI call with DOCX generation ai_response = await ai_service.callAi( prompt=userPrompt, documents=documents, options=ai_options, outputFormat="xlsx", title="Document Analysis Summary" ) logger.info(f"โœ… End-to-end test completed successfully") logger.info(f"๐Ÿ“Š Response type: {type(ai_response)}") logger.info(f"๐Ÿ“Š Response length: {len(str(ai_response))} characters") # Single test result test_results = [{ "test_name": "End-to-End DOCX Generation", "success": True, "response_type": type(ai_response).__name__, "response_length": len(str(ai_response)), "response": ai_response }] except Exception as e: logger.error(f"โŒ End-to-end test failed: {str(e)}") test_results = [{ "test_name": "End-to-End DOCX Generation", "success": False, "error": str(e), "response": None }] logger.info(f"๐ŸŽฏ Completed 1 end-to-end test") # Process all test results and save outputs logger.info("๐Ÿ“Š Processing test results...") successful_tests = [r for r in test_results if r['success']] failed_tests = [r for r in test_results if not r['success']] logger.info(f"โœ… Successful tests: {len(successful_tests)}") logger.info(f"โŒ Failed tests: {len(failed_tests)}") # Display test results summary logger.info("=" * 80) logger.info("END-TO-END TEST RESULTS SUMMARY") logger.info("=" * 80) for i, result in enumerate(test_results, 1): status = "โœ… PASS" if result['success'] else "โŒ FAIL" logger.info(f"Test {i}: {result['test_name']} - {status}") if result['success']: logger.info(f" Response Type: {result['response_type']}") logger.info(f" Response Length: {result['response_length']} characters") else: logger.info(f" Error: {result['error']}") logger.info("=" * 80) # Create output directory if it doesn't exist output_dir = Path("test-chat/unittestoutput") output_dir.mkdir(parents=True, exist_ok=True) # Save all test results and generated files timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") logger.info("๐Ÿ’พ Saving test results and generated files...") try: for i, result in enumerate(successful_tests, 1): test_name = result['test_name'].replace(' ', '_').lower() response = result['response'] logger.info(f"๐Ÿ’พ Saving Test {i}: {result['test_name']}") # Handle different response types if isinstance(response, dict): # Document generation response if 'documents' in response and response['documents']: logger.info(f"๐Ÿ“„ Found {len(response['documents'])} documents in response") for j, doc in enumerate(response['documents']): doc_name = doc.get('documentName', f'{test_name}_document_{j+1}') doc_data = doc.get('documentData', '') doc_mime = doc.get('mimeType', 'application/octet-stream') logger.info(f"๐Ÿ“„ Document {j+1}: {doc_name}") logger.info(f"๐Ÿ“„ MIME Type: {doc_mime}") logger.info(f"๐Ÿ“„ Data length: {len(doc_data)} characters") # Determine file extension with better MIME type detection file_ext = '.bin' # Default fallback if doc_mime: if 'docx' in doc_mime.lower() or 'wordprocessingml' in doc_mime.lower(): file_ext = '.docx' elif 'pdf' in doc_mime.lower(): file_ext = '.pdf' elif 'txt' in doc_mime.lower() or 'plain' in doc_mime.lower(): file_ext = '.txt' elif 'html' in doc_mime.lower(): file_ext = '.html' elif 'json' in doc_mime.lower(): file_ext = '.json' elif 'csv' in doc_mime.lower(): file_ext = '.csv' elif 'xlsx' in doc_mime.lower() or 'spreadsheetml' in doc_mime.lower(): file_ext = '.xlsx' elif 'pptx' in doc_mime.lower() or 'presentationml' in doc_mime.lower(): file_ext = '.pptx' else: logger.warning(f"โš ๏ธ Unknown MIME type: {doc_mime}, using .bin") # Also check filename for hints if doc_name and '.' in doc_name: name_ext = '.' + doc_name.split('.')[-1].lower() if name_ext in ['.docx', '.pdf', '.txt', '.html', '.json', '.csv', '.xlsx', '.pptx']: file_ext = name_ext logger.info(f"๐Ÿ“„ Using extension from filename: {file_ext}") logger.info(f"๐Ÿ“„ Final file extension: {file_ext}") # Save document output_path = output_dir / f"{test_name}_{timestamp}{file_ext}" doc_bytes = base64.b64decode(doc_data) with open(output_path, 'wb') as f: f.write(doc_bytes) logger.info(f"โœ… Document saved: {output_path} ({len(doc_bytes)} bytes)") # Also save raw content as text content = response.get('content', '') if content: text_path = output_dir / f"{test_name}_content_{timestamp}.txt" with open(text_path, 'w', encoding='utf-8') as f: # Handle both string and dictionary content if isinstance(content, dict): import json f.write(json.dumps(content, indent=2, ensure_ascii=False)) else: f.write(str(content)) logger.info(f"โœ… Content saved: {text_path}") elif isinstance(response, str): # Text response text_path = output_dir / f"{test_name}_response_{timestamp}.txt" with open(text_path, 'w', encoding='utf-8') as f: f.write(response) logger.info(f"โœ… Text response saved: {text_path}") else: logger.warning(f"โš ๏ธ Unknown response type for {result['test_name']}: {type(response)}") # Save failed test details if failed_tests: error_path = output_dir / f"failed_tests_{timestamp}.txt" with open(error_path, 'w', encoding='utf-8') as f: f.write("# Failed Test Details\n\n") for i, result in enumerate(failed_tests, 1): f.write(f"## Test {i}: {result['test_name']}\n") f.write(f"**Error:** {result['error']}\n\n") logger.info(f"โœ… Failed test details saved: {error_path}") except Exception as e: logger.error(f"โŒ Error saving test results: {str(e)}") return False # Save comprehensive test report report_path = output_dir / f"end_to_end_test_report_{timestamp}.txt" with open(report_path, 'w', encoding='utf-8') as f: f.write(f"# End-to-End AI Service Test Report\n") f.write(f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n") f.write(f"## Test Configuration\n") f.write(f"- Documents processed: {len(documents)}\n") f.write(f"- Processing method: Intelligent Token-Aware Merging\n") f.write(f"- Parallel processing: {ai_options.enableParallelProcessing}\n") f.write(f"- Max concurrent chunks: {ai_options.maxConcurrentChunks}\n") f.write(f"- Chunk metadata preserved: {ai_options.preserveChunkMetadata}\n") f.write(f"- Chunk separator: '{ai_options.chunkSeparator}'\n\n") f.write(f"## Document Inventory\n") for i, doc in enumerate(documents, 1): f.write(f"{i}. **{doc.fileName}**\n") f.write(f" - MIME Type: {doc.mimeType}\n") f.write(f" - File Size: {doc.fileSize:,} bytes\n") f.write(f" - File ID: {doc.fileId}\n\n") f.write(f"## Test Results Summary\n") f.write(f"- Total Tests: {len(test_results)}\n") f.write(f"- Successful: {len(successful_tests)}\n") f.write(f"- Failed: {len(failed_tests)}\n") f.write(f"- Success Rate: {len(successful_tests)/len(test_results)*100:.1f}%\n\n") f.write(f"## Detailed Test Results\n") for i, result in enumerate(test_results, 1): f.write(f"### Test {i}: {result['test_name']}\n") f.write(f"**Status:** {'โœ… PASS' if result['success'] else 'โŒ FAIL'}\n") if result['success']: f.write(f"**Response Type:** {result['response_type']}\n") f.write(f"**Response Length:** {result['response_length']} characters\n") # Show response preview response_preview = str(result['response'])[:500] f.write(f"**Response Preview:**\n```\n{response_preview}...\n```\n\n") else: f.write(f"**Error:** {result['error']}\n\n") f.write(f"## Technical Implementation Details\n") f.write(f"This test validates the complete AI service pipeline:\n\n") f.write(f"### Tested Components:\n") f.write(f"- **Document Extraction**: PDF, DOCX, images, etc.\n") f.write(f"- **Intelligent Chunking**: Token-aware merging\n") f.write(f"- **Model Selection**: Automatic AI model choice\n") f.write(f"- **Parallel Processing**: Concurrent chunk processing\n") f.write(f"- **Document Generation**: DOCX, PDF, text output\n") f.write(f"- **Error Handling**: Graceful failure management\n\n") f.write(f"### Performance Metrics:\n") f.write(f"- **Chunk Optimization**: Intelligent merging reduces AI calls\n") f.write(f"- **Processing Speed**: Parallel execution\n") f.write(f"- **Memory Efficiency**: Token-aware chunking\n") f.write(f"- **Output Quality**: Multiple format support\n\n") f.write(f"## Generated Files\n") for i, result in enumerate(successful_tests, 1): test_name = result['test_name'].replace(' ', '_').lower() f.write(f"- **Test {i}**: {result['test_name']} โ†’ `{test_name}_*_{timestamp}.*`\n") if failed_tests: f.write(f"- **Failed Tests**: `failed_tests_{timestamp}.txt`\n") f.write(f"- **This Report**: `end_to_end_test_report_{timestamp}.txt`\n\n") f.write(f"The end-to-end test successfully validates the complete AI service\n") f.write(f"pipeline from document input to formatted output generation.\n") logger.info(f"โœ… Comprehensive test report saved: {report_path}") # Restore original database interface db_interface_module.getInterface = original_get_interface return True except Exception as e: logger.error(f"โŒ Error during document processing: {str(e)}") import traceback logger.error(f"Traceback: {traceback.format_exc()}") # Restore original database interface in case of error try: db_interface_module.getInterface = original_get_interface except: pass return False async def main(): """Main function to run the intelligent chunk integration test.""" logger.info("๐ŸŽฏ Starting Intelligent Chunk Integration Test") logger.info("=" * 60) success = await process_documents_and_generate_summary() if success: logger.info("๐ŸŽ‰ Intelligent chunk integration test completed successfully!") logger.info("โœ… Main AI service handled all processing internally") logger.info("โœ… Intelligent token-aware merging activated") logger.info("โœ… DOCX document generated directly by AI service") logger.info("โœ… Detailed chunk integration analysis saved") logger.info("โœ… Performance optimization achieved") else: logger.error("โŒ Test failed!") logger.error("Please check the error messages above for details") logger.info("=" * 60) if __name__ == "__main__": asyncio.run(main())