443 lines
No EOL
22 KiB
Python
443 lines
No EOL
22 KiB
Python
"""
|
|
Test script for document processing and DOCX generation.
|
|
Calls the main AI service directly to process PDF documents and generate DOCX summaries.
|
|
"""
|
|
|
|
import asyncio
|
|
import sys
|
|
import os
|
|
import logging
|
|
import base64
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
# Add the gateway module to the path
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'modules'))
|
|
|
|
from modules.datamodels.datamodelChat import ChatDocument
|
|
from modules.datamodels.datamodelAi import EnhancedAiCallOptions
|
|
from modules.services.serviceAi.mainServiceAi import AiService
|
|
from modules.services.serviceGeneration.mainServiceGeneration import GenerationService
|
|
|
|
# Set up logging
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
async def process_documents_and_generate_summary():
|
|
"""Process documents using the main AI service with intelligent chunk integration."""
|
|
logger.info("🚀 Starting intelligent chunk integration test...")
|
|
|
|
# Find testdata directory
|
|
testdata_path = Path("../wiki/poweron/testdata")
|
|
if not testdata_path.exists():
|
|
# Try relative to current directory
|
|
testdata_path = Path("wiki/poweron/testdata")
|
|
if not testdata_path.exists():
|
|
# Try relative to parent directory
|
|
testdata_path = Path("../wiki/poweron/testdata")
|
|
if not testdata_path.exists():
|
|
logger.error(f"❌ Testdata path not found. Tried:")
|
|
logger.error(f" - ../wiki/poweron/testdata")
|
|
logger.error(f" - wiki/poweron/testdata")
|
|
logger.error(f" - ../wiki/poweron/testdata")
|
|
logger.info("Please ensure the testdata folder exists with PDF documents")
|
|
return False
|
|
|
|
# Find all supported document files
|
|
supported_extensions = ["*.pdf", "*.jpg", "*.jpeg", "*.png", "*.gif", "*.docx", "*.xlsx", "*.pptx", "*.ppt", "*.txt", "*.md", "*.html", "*.csv"]
|
|
document_files = []
|
|
for ext in supported_extensions:
|
|
document_files.extend(list(testdata_path.glob(ext)))
|
|
|
|
logger.info(f"Found {len(document_files)} document files in testdata:")
|
|
for doc_file in document_files:
|
|
logger.info(f" - {doc_file.name}")
|
|
|
|
if not document_files:
|
|
logger.error("❌ No supported document files found in testdata folder")
|
|
return False
|
|
|
|
try:
|
|
# Mock the database interface to provide our file data BEFORE creating AI service
|
|
class TestDbInterface:
|
|
def __init__(self, file_data_map):
|
|
self.file_data_map = file_data_map
|
|
|
|
def getFileData(self, file_id):
|
|
logger.info(f"TestDbInterface.getFileData called with file_id: {file_id}")
|
|
data = self.file_data_map.get(file_id)
|
|
if data:
|
|
logger.info(f"✅ Found file data for {file_id}: {len(data)} bytes")
|
|
else:
|
|
logger.warning(f"❌ No file data found for {file_id}")
|
|
return data
|
|
|
|
# Create file data mapping
|
|
file_data_map = {}
|
|
for i, doc_file in enumerate(document_files):
|
|
with open(doc_file, 'rb') as f:
|
|
file_data_map[f"test_doc_{i+1}"] = f.read()
|
|
logger.info(f"📁 Loaded {doc_file.name} as test_doc_{i+1}: {len(file_data_map[f'test_doc_{i+1}'])} bytes")
|
|
|
|
# Mock the database interface BEFORE creating AI service
|
|
import modules.interfaces.interfaceDbComponentObjects as db_interface_module
|
|
original_get_interface = db_interface_module.getInterface
|
|
db_interface_module.getInterface = lambda: TestDbInterface(file_data_map)
|
|
logger.info("🔧 Database interface mocked successfully")
|
|
|
|
# Initialize the main AI service - let it handle everything
|
|
logger.info("🔧 Initializing main AI service...")
|
|
ai_service = await AiService.create()
|
|
|
|
# Create test documents - the AI service will handle file access internally
|
|
documents = []
|
|
logger.info(f"📁 Found {len(document_files)} document files")
|
|
for i, doc_file in enumerate(document_files):
|
|
logger.info(f"📄 Processing file {i+1}/{len(document_files)}: {doc_file.name}")
|
|
# Determine MIME type based on file extension
|
|
mime_type = "application/octet-stream" # default
|
|
if doc_file.suffix.lower() == '.pdf':
|
|
mime_type = "application/pdf"
|
|
elif doc_file.suffix.lower() in ['.jpg', '.jpeg']:
|
|
mime_type = "image/jpeg"
|
|
elif doc_file.suffix.lower() == '.png':
|
|
mime_type = "image/png"
|
|
elif doc_file.suffix.lower() == '.gif':
|
|
mime_type = "image/gif"
|
|
elif doc_file.suffix.lower() == '.docx':
|
|
mime_type = "application/vnd.openxmlformats-officedocument.wordprocessingml.document"
|
|
elif doc_file.suffix.lower() == '.xlsx':
|
|
mime_type = "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
|
|
elif doc_file.suffix.lower() == '.pptx':
|
|
mime_type = "application/vnd.openxmlformats-officedocument.presentationml.presentation"
|
|
elif doc_file.suffix.lower() == '.ppt':
|
|
mime_type = "application/vnd.ms-powerpoint"
|
|
elif doc_file.suffix.lower() == '.html':
|
|
mime_type = "text/html"
|
|
elif doc_file.suffix.lower() == '.csv':
|
|
mime_type = "text/csv"
|
|
elif doc_file.suffix.lower() in ['.txt', '.md']:
|
|
mime_type = "text/plain"
|
|
|
|
chat_doc = ChatDocument(
|
|
fileId=f"test_doc_{i+1}",
|
|
messageId=f"test_message_{i+1}",
|
|
fileName=doc_file.name,
|
|
mimeType=mime_type,
|
|
fileSize=doc_file.stat().st_size,
|
|
roundNumber=1,
|
|
taskNumber=1,
|
|
actionNumber=1,
|
|
actionId=f"test_action_{i+1}"
|
|
)
|
|
documents.append(chat_doc)
|
|
logger.info(f"✅ Created ChatDocument: {chat_doc.fileName} ({chat_doc.mimeType}) - {chat_doc.fileSize} bytes")
|
|
|
|
logger.info(f"📄 Created {len(documents)} document objects")
|
|
|
|
# Create enhanced AI call options for intelligent chunked processing
|
|
ai_options = EnhancedAiCallOptions(
|
|
operationType="general",
|
|
enableParallelProcessing=True,
|
|
maxConcurrentChunks=5, # Increased for better testing
|
|
preserveChunkMetadata=True,
|
|
chunkSeparator="\n\n---\n\n"
|
|
)
|
|
|
|
# Call the main AI service directly - let it handle everything including DOCX generation
|
|
logger.info("🤖 Calling main AI service with intelligent merging...")
|
|
|
|
|
|
# Run a single end-to-end test to avoid the loop issue
|
|
logger.info("🧪 Running single end-to-end test...")
|
|
|
|
# userPrompt = "Analyze these documents and create a comprehensive DOCX summary document including: 1) Document types and purposes, 2) Key information and main points, 3) Important details and numbers, 4) Notable sections, 5) Overall assessment and recommendations."
|
|
|
|
userPrompt = "Analyze these documents and create a comprehensive form for a user to fill out"
|
|
|
|
# userPrompt = "Extract the table from file and produce 2 lists in excel. one list with all entries, one list only with entries that are yellow highlighted."
|
|
|
|
# userPrompt = "Create a docx file containing a summary and the COMPLETE list from the pdf file, having one additional column with a 'x' marker for all items, which are yellow highlighted."
|
|
|
|
# userPrompt = "Create a docx file containing the combined documents in french language."
|
|
|
|
try:
|
|
# Single AI call with DOCX generation
|
|
ai_response = await ai_service.callAi(
|
|
prompt=userPrompt,
|
|
documents=documents,
|
|
options=ai_options,
|
|
outputFormat="pdf",
|
|
title="Formulaire"
|
|
)
|
|
|
|
logger.info(f"✅ End-to-end test completed successfully")
|
|
logger.info(f"📊 Response type: {type(ai_response)}")
|
|
logger.info(f"📊 Response length: {len(str(ai_response))} characters")
|
|
|
|
# Single test result
|
|
test_results = [{
|
|
"test_name": "End-to-End DOCX Generation",
|
|
"success": True,
|
|
"response_type": type(ai_response).__name__,
|
|
"response_length": len(str(ai_response)),
|
|
"response": ai_response
|
|
}]
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ End-to-end test failed: {str(e)}")
|
|
test_results = [{
|
|
"test_name": "End-to-End DOCX Generation",
|
|
"success": False,
|
|
"error": str(e),
|
|
"response": None
|
|
}]
|
|
|
|
logger.info(f"🎯 Completed 1 end-to-end test")
|
|
|
|
# Process all test results and save outputs
|
|
logger.info("📊 Processing test results...")
|
|
|
|
successful_tests = [r for r in test_results if r['success']]
|
|
failed_tests = [r for r in test_results if not r['success']]
|
|
|
|
logger.info(f"✅ Successful tests: {len(successful_tests)}")
|
|
logger.info(f"❌ Failed tests: {len(failed_tests)}")
|
|
|
|
# Display test results summary
|
|
logger.info("=" * 80)
|
|
logger.info("END-TO-END TEST RESULTS SUMMARY")
|
|
logger.info("=" * 80)
|
|
for i, result in enumerate(test_results, 1):
|
|
status = "✅ PASS" if result['success'] else "❌ FAIL"
|
|
logger.info(f"Test {i}: {result['test_name']} - {status}")
|
|
if result['success']:
|
|
logger.info(f" Response Type: {result['response_type']}")
|
|
logger.info(f" Response Length: {result['response_length']} characters")
|
|
else:
|
|
logger.info(f" Error: {result['error']}")
|
|
logger.info("=" * 80)
|
|
|
|
# Create output directory if it doesn't exist
|
|
output_dir = Path("test-chat/unittestoutput")
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Save all test results and generated files
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
|
|
logger.info("💾 Saving test results and generated files...")
|
|
|
|
try:
|
|
for i, result in enumerate(successful_tests, 1):
|
|
test_name = result['test_name'].replace(' ', '_').lower()
|
|
response = result['response']
|
|
|
|
logger.info(f"💾 Saving Test {i}: {result['test_name']}")
|
|
|
|
# Handle different response types
|
|
if isinstance(response, dict):
|
|
# Document generation response
|
|
if 'documents' in response and response['documents']:
|
|
logger.info(f"📄 Found {len(response['documents'])} documents in response")
|
|
|
|
for j, doc in enumerate(response['documents']):
|
|
doc_name = doc.get('documentName', f'{test_name}_document_{j+1}')
|
|
doc_data = doc.get('documentData', '')
|
|
doc_mime = doc.get('mimeType', 'application/octet-stream')
|
|
|
|
logger.info(f"📄 Document {j+1}: {doc_name}")
|
|
logger.info(f"📄 MIME Type: {doc_mime}")
|
|
logger.info(f"📄 Data length: {len(doc_data)} characters")
|
|
|
|
# Determine file extension with better MIME type detection
|
|
file_ext = '.bin' # Default fallback
|
|
|
|
if doc_mime:
|
|
if 'docx' in doc_mime.lower() or 'wordprocessingml' in doc_mime.lower():
|
|
file_ext = '.docx'
|
|
elif 'pdf' in doc_mime.lower():
|
|
file_ext = '.pdf'
|
|
elif 'txt' in doc_mime.lower() or 'plain' in doc_mime.lower():
|
|
file_ext = '.txt'
|
|
elif 'html' in doc_mime.lower():
|
|
file_ext = '.html'
|
|
elif 'json' in doc_mime.lower():
|
|
file_ext = '.json'
|
|
elif 'csv' in doc_mime.lower():
|
|
file_ext = '.csv'
|
|
elif 'xlsx' in doc_mime.lower() or 'spreadsheetml' in doc_mime.lower():
|
|
file_ext = '.xlsx'
|
|
elif 'pptx' in doc_mime.lower() or 'presentationml' in doc_mime.lower():
|
|
file_ext = '.pptx'
|
|
else:
|
|
logger.warning(f"⚠️ Unknown MIME type: {doc_mime}, using .bin")
|
|
|
|
# Also check filename for hints
|
|
if doc_name and '.' in doc_name:
|
|
name_ext = '.' + doc_name.split('.')[-1].lower()
|
|
if name_ext in ['.docx', '.pdf', '.txt', '.html', '.json', '.csv', '.xlsx', '.pptx']:
|
|
file_ext = name_ext
|
|
logger.info(f"📄 Using extension from filename: {file_ext}")
|
|
|
|
logger.info(f"📄 Final file extension: {file_ext}")
|
|
|
|
# Save document
|
|
output_path = output_dir / f"{test_name}_{timestamp}{file_ext}"
|
|
doc_bytes = base64.b64decode(doc_data)
|
|
|
|
with open(output_path, 'wb') as f:
|
|
f.write(doc_bytes)
|
|
|
|
logger.info(f"✅ Document saved: {output_path} ({len(doc_bytes)} bytes)")
|
|
|
|
# Also save raw content as text
|
|
content = response.get('content', '')
|
|
if content:
|
|
text_path = output_dir / f"{test_name}_content_{timestamp}.txt"
|
|
with open(text_path, 'w', encoding='utf-8') as f:
|
|
# Handle both string and dictionary content
|
|
if isinstance(content, dict):
|
|
import json
|
|
f.write(json.dumps(content, indent=2, ensure_ascii=False))
|
|
else:
|
|
f.write(str(content))
|
|
logger.info(f"✅ Content saved: {text_path}")
|
|
|
|
elif isinstance(response, str):
|
|
# Text response
|
|
text_path = output_dir / f"{test_name}_response_{timestamp}.txt"
|
|
with open(text_path, 'w', encoding='utf-8') as f:
|
|
f.write(response)
|
|
logger.info(f"✅ Text response saved: {text_path}")
|
|
|
|
else:
|
|
logger.warning(f"⚠️ Unknown response type for {result['test_name']}: {type(response)}")
|
|
|
|
# Save failed test details
|
|
if failed_tests:
|
|
error_path = output_dir / f"failed_tests_{timestamp}.txt"
|
|
with open(error_path, 'w', encoding='utf-8') as f:
|
|
f.write("# Failed Test Details\n\n")
|
|
for i, result in enumerate(failed_tests, 1):
|
|
f.write(f"## Test {i}: {result['test_name']}\n")
|
|
f.write(f"**Error:** {result['error']}\n\n")
|
|
logger.info(f"✅ Failed test details saved: {error_path}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error saving test results: {str(e)}")
|
|
return False
|
|
|
|
# Save comprehensive test report
|
|
report_path = output_dir / f"end_to_end_test_report_{timestamp}.txt"
|
|
with open(report_path, 'w', encoding='utf-8') as f:
|
|
f.write(f"# End-to-End AI Service Test Report\n")
|
|
f.write(f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n")
|
|
|
|
f.write(f"## Test Configuration\n")
|
|
f.write(f"- Documents processed: {len(documents)}\n")
|
|
f.write(f"- Processing method: Intelligent Token-Aware Merging\n")
|
|
f.write(f"- Parallel processing: {ai_options.enableParallelProcessing}\n")
|
|
f.write(f"- Max concurrent chunks: {ai_options.maxConcurrentChunks}\n")
|
|
f.write(f"- Chunk metadata preserved: {ai_options.preserveChunkMetadata}\n")
|
|
f.write(f"- Chunk separator: '{ai_options.chunkSeparator}'\n\n")
|
|
|
|
f.write(f"## Document Inventory\n")
|
|
for i, doc in enumerate(documents, 1):
|
|
f.write(f"{i}. **{doc.fileName}**\n")
|
|
f.write(f" - MIME Type: {doc.mimeType}\n")
|
|
f.write(f" - File Size: {doc.fileSize:,} bytes\n")
|
|
f.write(f" - File ID: {doc.fileId}\n\n")
|
|
|
|
f.write(f"## Test Results Summary\n")
|
|
f.write(f"- Total Tests: {len(test_results)}\n")
|
|
f.write(f"- Successful: {len(successful_tests)}\n")
|
|
f.write(f"- Failed: {len(failed_tests)}\n")
|
|
f.write(f"- Success Rate: {len(successful_tests)/len(test_results)*100:.1f}%\n\n")
|
|
|
|
f.write(f"## Detailed Test Results\n")
|
|
for i, result in enumerate(test_results, 1):
|
|
f.write(f"### Test {i}: {result['test_name']}\n")
|
|
f.write(f"**Status:** {'✅ PASS' if result['success'] else '❌ FAIL'}\n")
|
|
|
|
if result['success']:
|
|
f.write(f"**Response Type:** {result['response_type']}\n")
|
|
f.write(f"**Response Length:** {result['response_length']} characters\n")
|
|
|
|
# Show response preview
|
|
response_preview = str(result['response'])[:500]
|
|
f.write(f"**Response Preview:**\n```\n{response_preview}...\n```\n\n")
|
|
else:
|
|
f.write(f"**Error:** {result['error']}\n\n")
|
|
|
|
f.write(f"## Technical Implementation Details\n")
|
|
f.write(f"This test validates the complete AI service pipeline:\n\n")
|
|
f.write(f"### Tested Components:\n")
|
|
f.write(f"- **Document Extraction**: PDF, DOCX, images, etc.\n")
|
|
f.write(f"- **Intelligent Chunking**: Token-aware merging\n")
|
|
f.write(f"- **Model Selection**: Automatic AI model choice\n")
|
|
f.write(f"- **Parallel Processing**: Concurrent chunk processing\n")
|
|
f.write(f"- **Document Generation**: DOCX, PDF, text output\n")
|
|
f.write(f"- **Error Handling**: Graceful failure management\n\n")
|
|
|
|
f.write(f"### Performance Metrics:\n")
|
|
f.write(f"- **Chunk Optimization**: Intelligent merging reduces AI calls\n")
|
|
f.write(f"- **Processing Speed**: Parallel execution\n")
|
|
f.write(f"- **Memory Efficiency**: Token-aware chunking\n")
|
|
f.write(f"- **Output Quality**: Multiple format support\n\n")
|
|
|
|
f.write(f"## Generated Files\n")
|
|
for i, result in enumerate(successful_tests, 1):
|
|
test_name = result['test_name'].replace(' ', '_').lower()
|
|
f.write(f"- **Test {i}**: {result['test_name']} → `{test_name}_*_{timestamp}.*`\n")
|
|
|
|
if failed_tests:
|
|
f.write(f"- **Failed Tests**: `failed_tests_{timestamp}.txt`\n")
|
|
|
|
f.write(f"- **This Report**: `end_to_end_test_report_{timestamp}.txt`\n\n")
|
|
|
|
f.write(f"The end-to-end test successfully validates the complete AI service\n")
|
|
f.write(f"pipeline from document input to formatted output generation.\n")
|
|
|
|
logger.info(f"✅ Comprehensive test report saved: {report_path}")
|
|
|
|
# Restore original database interface
|
|
db_interface_module.getInterface = original_get_interface
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error during document processing: {str(e)}")
|
|
import traceback
|
|
logger.error(f"Traceback: {traceback.format_exc()}")
|
|
|
|
# Restore original database interface in case of error
|
|
try:
|
|
db_interface_module.getInterface = original_get_interface
|
|
except:
|
|
pass
|
|
|
|
return False
|
|
|
|
async def main():
|
|
"""Main function to run the intelligent chunk integration test."""
|
|
logger.info("🎯 Starting Intelligent Chunk Integration Test")
|
|
logger.info("=" * 60)
|
|
|
|
success = await process_documents_and_generate_summary()
|
|
|
|
if success:
|
|
logger.info("🎉 Intelligent chunk integration test completed successfully!")
|
|
logger.info("✅ Main AI service handled all processing internally")
|
|
logger.info("✅ Intelligent token-aware merging activated")
|
|
logger.info("✅ DOCX document generated directly by AI service")
|
|
logger.info("✅ Detailed chunk integration analysis saved")
|
|
logger.info("✅ Performance optimization achieved")
|
|
else:
|
|
logger.error("❌ Test failed!")
|
|
logger.error("Please check the error messages above for details")
|
|
|
|
logger.info("=" * 60)
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main()) |