#!/usr/bin/env python3 """ Test procedure for DocumentManager document extraction functionality. """ import asyncio import sys import os import json import argparse from datetime import datetime, UTC from pathlib import Path import logging print("Starting test_document_extraction.py...") # Configure logging FIRST, before any other imports import logging # Clear any existing handlers to avoid duplicate logs for handler in logging.root.handlers[:]: logging.root.removeHandler(handler) logging.basicConfig( level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(name)s - %(message)s', handlers=[ logging.StreamHandler(sys.stdout), logging.FileHandler('test_document_extraction.log', mode='w', encoding='utf-8') # 'w' mode clears the file ], force=True # Force reconfiguration even if already configured ) # Filter out httpcore messages logging.getLogger('httpcore').setLevel(logging.WARNING) logging.getLogger('httpx').setLevel(logging.WARNING) logger = logging.getLogger(__name__) # Set up test configuration os.environ['POWERON_CONFIG_FILE'] = 'test_config.ini' print("Set POWERON_CONFIG_FILE environment variable") try: # Import required modules from modules.interfaces.interfaceAppObjects import User, UserConnection from modules.interfaces.interfaceChatModel import ChatWorkflow from modules.workflow.managerDocument import DocumentManager from modules.workflow.serviceContainer import ServiceContainer print("All imports successful") except Exception as e: print(f"Import error: {e}") import traceback traceback.print_exc() sys.exit(1) def log_extraction_debug(message: str, data: dict = None): """Log extraction debug data with JSON dumps""" timestamp = datetime.now(UTC).isoformat() if data: logger.debug(f"[{timestamp}] {message}\n{json.dumps(data, indent=2, ensure_ascii=False)}") else: logger.debug(f"[{timestamp}] {message}") def create_test_user() -> User: """Create a test user for the document extraction""" return User( id="test-user-doc-001", mandateId="test-mandate-doc-001", username="testuser_doc", email="test_doc@example.com", fullName="Test Document User", enabled=True, language="en", privilege="user", authenticationAuthority="local" ) def create_test_workflow() -> ChatWorkflow: """Create a test workflow for document extraction""" return ChatWorkflow( id="test-workflow-doc-001", mandateId="test-mandate-doc-001", status="running", name="Document Extraction Test Workflow", currentRound=1, lastActivity=datetime.now(UTC).isoformat(), startedAt=datetime.now(UTC).isoformat(), logs=[], messages=[], stats=None, tasks=[] ) def detect_mime_type(file_path: str) -> str: """Detect MIME type based on file extension""" ext = Path(file_path).suffix.lower() mime_types = { '.txt': 'text/plain', '.md': 'text/markdown', '.csv': 'text/csv', '.json': 'application/json', '.xml': 'application/xml', '.js': 'application/javascript', '.py': 'application/x-python', '.svg': 'image/svg+xml', '.jpg': 'image/jpeg', '.jpeg': 'image/jpeg', '.png': 'image/png', '.gif': 'image/gif', '.pdf': 'application/pdf', '.docx': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document', '.doc': 'application/msword', '.xlsx': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', '.xls': 'application/vnd.ms-excel', '.pptx': 'application/vnd.openxmlformats-officedocument.presentationml.presentation', '.ppt': 'application/vnd.ms-powerpoint', '.html': 'text/html', '.htm': 'text/html' } return mime_types.get(ext, 'application/octet-stream') async def test_document_extraction(file_path: str): """Test document extraction from a file path""" try: # Clear the log file before each run log_file_path = "test_document_extraction.log" if os.path.exists(log_file_path): with open(log_file_path, 'w') as f: f.write("") # Clear the file logger.info(f"Cleared log file: {log_file_path}") logger.info("=== STARTING DOCUMENT EXTRACTION TEST ===") # Validate file path if not os.path.exists(file_path): raise FileNotFoundError(f"File not found: {file_path}") # Get file info file_path_obj = Path(file_path) filename = file_path_obj.name mime_type = detect_mime_type(file_path) file_size = file_path_obj.stat().st_size log_extraction_debug("File information", { "file_path": file_path, "filename": filename, "mime_type": mime_type, "file_size_bytes": file_size, "file_size_mb": round(file_size / (1024 * 1024), 2) }) # Read file data try: with open(file_path, 'rb') as f: file_data = f.read() log_extraction_debug("File read successfully", { "bytes_read": len(file_data), "file_encoding": "binary" }) except Exception as e: logger.error(f"Error reading file: {str(e)}") raise # Create test user and workflow test_user = create_test_user() test_workflow = create_test_workflow() # Create service container service_container = ServiceContainer(test_user, test_workflow) log_extraction_debug("Service container created", { "user_id": test_user.id, "workflow_id": test_workflow.id }) # Create document manager document_manager = DocumentManager(service_container) log_extraction_debug("Document manager created") # Define extraction prompt extraction_prompt = "extract the table and convert it to a csv table" log_extraction_debug("Starting document extraction", { "prompt": extraction_prompt, "filename": filename, "mime_type": mime_type }) # Extract content from file data try: extracted_content = await document_manager.extractContentFromFileData( prompt=extraction_prompt, fileData=file_data, filename=filename, mimeType=mime_type, base64Encoded=False, documentId=f"test-doc-{datetime.now(UTC).timestamp()}" ) # Log extraction results extraction_result = { "extracted_content_id": extracted_content.id, "content_items_count": len(extracted_content.contents) } # Add objectId and objectType if they exist (set by DocumentManager) if hasattr(extracted_content, 'objectId'): extraction_result["object_id"] = extracted_content.objectId if hasattr(extracted_content, 'objectType'): extraction_result["object_type"] = extracted_content.objectType log_extraction_debug("Document extraction completed successfully", extraction_result) # Log detailed content information for i, content_item in enumerate(extracted_content.contents): content_info = { "label": content_item.label, "data_length": len(content_item.data) if content_item.data else 0, "data_preview": content_item.data[:500] + "..." if content_item.data and len(content_item.data) > 500 else content_item.data } # Add metadata if available if content_item.metadata: content_info["metadata"] = { "size": content_item.metadata.size, "mime_type": content_item.metadata.mimeType, "base64_encoded": content_item.metadata.base64Encoded, "pages": content_item.metadata.pages } log_extraction_debug(f"CONTENT ITEM {i+1}:", content_info) # Log summary of all extracted content all_content = "\n\n".join([item.data for item in extracted_content.contents if item.data]) log_extraction_debug("COMPLETE EXTRACTED CONTENT:", { "total_length": len(all_content), "content": all_content }) return extracted_content except Exception as e: log_extraction_debug("DOCUMENT EXTRACTION EXCEPTION:", { "error_type": type(e).__name__, "error_message": str(e), "error_args": e.args if hasattr(e, 'args') else None }) raise logger.info("=== DOCUMENT EXTRACTION TEST COMPLETED ===") return extracted_content except Exception as e: logger.error(f"❌ Document extraction test failed with error: {str(e)}") log_extraction_debug("Full error details", { "error_type": type(e).__name__, "error_message": str(e) }) raise async def main(): """Main function to run the document extraction test""" print("Inside main()") logger.info("=" * 50) logger.info("DOCUMENT EXTRACTION TEST") logger.info("=" * 50) # Parse command line arguments parser = argparse.ArgumentParser(description='Test document extraction functionality') parser.add_argument('file_path', help='Path to the file to extract content from') args = parser.parse_args() try: extracted_content = await test_document_extraction(args.file_path) logger.info("=" * 50) logger.info("TEST COMPLETED SUCCESSFULLY") logger.info("=" * 50) return extracted_content except Exception as e: logger.error("=" * 50) logger.error("TEST FAILED") logger.error("=" * 50) raise if __name__ == "__main__": print("About to run main()") asyncio.run(main()) print("main() finished")