#!/usr/bin/env python3 # Copyright (c) 2025 Patrick Motsch # All rights reserved. """ Document Generation Formats Test 10 - Tests document generation in DOCX, XLSX, PPTX, and PDF formats Tests professional document formats with various content types including tables, images, and structured data. """ import asyncio import json import sys import os import time import base64 from typing import Dict, Any, List, Optional # Add the gateway to path (go up 2 levels from tests/functional/) _gateway_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) if _gateway_path not in sys.path: sys.path.insert(0, _gateway_path) # Import the service initialization from modules.services import getInterface as getServices from modules.datamodels.datamodelChat import UserInputRequest, WorkflowModeEnum from modules.datamodels.datamodelUam import User from modules.features.workflow import chatStart import modules.interfaces.interfaceDbChatObjects as interfaceDbChatObjects class DocumentGenerationFormatsTester10: def __init__(self): # Use root user for testing (has full access to everything) from modules.interfaces.interfaceDbAppObjects import getRootInterface rootInterface = getRootInterface() self.testUser = rootInterface.currentUser # Initialize services using the existing system self.services = getServices(self.testUser, None) # Test user, no workflow self.workflow = None self.testResults = {} self.generatedDocuments = {} self.pdfFileId = None # Store PDF file ID for reuse async def initialize(self): """Initialize the test environment.""" # Enable debug file logging for tests from modules.shared.configuration import APP_CONFIG APP_CONFIG.set("APP_DEBUG_CHAT_WORKFLOW_ENABLED", True) # Set logging level to INFO to see workflow progress import logging logging.getLogger().setLevel(logging.INFO) print(f"Initialized test with user: {self.testUser.id}") print(f"Mandate ID: {self.testUser.mandateId}") print(f"Debug logging enabled: {APP_CONFIG.get('APP_DEBUG_CHAT_WORKFLOW_ENABLED', False)}") # Upload PDF file for testing await self.uploadPdfFile() async def uploadPdfFile(self): """Upload the PDF file and store its file ID.""" pdfPath = os.path.join(os.path.dirname(__file__), "..", "..", "..", "local", "temp", "B2025-02c.pdf") pdfPath = os.path.abspath(pdfPath) if not os.path.exists(pdfPath): print(f"⚠️ Warning: PDF file not found at {pdfPath}") print(" Test will continue without PDF attachment") return try: # Read PDF file with open(pdfPath, "rb") as f: pdfContent = f.read() # Create file using services.interfaceDbComponent if not hasattr(self.services, 'interfaceDbComponent') or not self.services.interfaceDbComponent: print("⚠️ Warning: interfaceDbComponent not available in services") print(" Test will continue without PDF attachment") return interfaceDbComponent = self.services.interfaceDbComponent fileItem = interfaceDbComponent.createFile( name="B2025-02c.pdf", mimeType="application/pdf", content=pdfContent ) # Store file data interfaceDbComponent.createFileData(fileItem.id, pdfContent) self.pdfFileId = fileItem.id print(f"✅ Uploaded PDF file: {fileItem.fileName} (ID: {self.pdfFileId}, Size: {len(pdfContent)} bytes)") except Exception as e: import traceback print(f"⚠️ Warning: Failed to upload PDF file: {str(e)}") print(f" Traceback: {traceback.format_exc()}") print(" Test will continue without PDF attachment") def createTestPrompt(self, format: str) -> str: """Create a test prompt for document generation in the specified format. The prompt requests: - Professional document structure with title, sections, tables, and images - Extraction of content from attached PDF - Structured data presentation appropriate for the format """ formatPrompts = { "docx": ( "Create a professional Word document about 'Fuel Station Receipt Analysis' with:\n" "1) A main title\n" "2) An executive summary paragraph\n" "3) Extract and include the image from the attached PDF document (B2025-02c.pdf)\n" "4) A detailed analysis section with:\n" " - Bullet points of key findings\n" " - A table summarizing transaction details\n" "5) A conclusion section with recommendations\n\n" "Format as a professional DOCX document with proper headings and structure." ), "xlsx": ( "Create an Excel spreadsheet analyzing the fuel station receipt from the attached PDF (B2025-02c.pdf).\n" "Include:\n" "1) A summary sheet with key metrics\n" "2) A detailed data sheet with:\n" " - Transaction details in rows\n" " - Columns for: Date, Item, Quantity, Price, Total\n" " - Proper formatting and headers\n" "3) A calculations sheet with:\n" " - VAT calculations\n" " - Net and gross totals\n\n" "Format as a professional XLSX spreadsheet with formulas and formatting." ), "pptx": ( "Create a PowerPoint presentation about 'Fuel Station Receipt Analysis' with:\n" "1) Title slide with main title\n" "2) Overview slide explaining the receipt analysis\n" "3) Extract and include the image from the attached PDF document (B2025-02c.pdf)\n" "4) Analysis slides with:\n" " - Bullet points of key findings\n" " - Visual representation of data\n" "5) Conclusion slide with recommendations\n\n" "Format as a professional PPTX presentation with consistent styling." ), "pdf": ( "Create a professional PDF document about 'Fuel Station Receipt Analysis' with:\n" "1) A main title\n" "2) An introduction paragraph explaining the receipt analysis\n" "3) Extract and include the image from the attached PDF document (B2025-02c.pdf)\n" "4) A section analyzing the receipt data with:\n" " - Bullet points of key findings\n" " - A table summarizing transaction details\n" "5) A conclusion paragraph with recommendations\n\n" "Format as a professional PDF document suitable for printing." ), "html": ( "Create a professional HTML document about 'Fuel Station Receipt Analysis' with:\n" "1) A main title\n" "2) An introduction paragraph explaining the receipt analysis\n" "3) Extract and include the image from the attached PDF document (B2025-02c.pdf)\n" "4) A section analyzing the receipt data with:\n" " - Bullet points of key findings\n" " - A table summarizing transaction details\n" "5) A conclusion paragraph with recommendations\n\n" "Format as a professional HTML document with proper styling, responsive design, and embedded CSS." ) } return formatPrompts.get(format.lower(), formatPrompts["docx"]) async def generateDocumentInFormat(self, format: str) -> Dict[str, Any]: """Generate a document in the specified format using workflow.""" print("\n" + "="*80) print(f"GENERATING DOCUMENT IN {format.upper()} FORMAT") print("="*80) prompt = self.createTestPrompt(format) print(f"Prompt: {prompt[:200]}...") # Create user input request with PDF file attachment listFileId = [] if self.pdfFileId: listFileId = [self.pdfFileId] print(f"Attaching PDF file (ID: {self.pdfFileId})") else: print("⚠️ No PDF file attached (file upload may have failed)") # Create user input request userInput = UserInputRequest( prompt=prompt, listFileId=listFileId, userLanguage="en" ) # Start workflow print(f"\nStarting workflow for {format.upper()} generation...") workflow = await chatStart( currentUser=self.testUser, userInput=userInput, workflowMode=WorkflowModeEnum.WORKFLOW_DYNAMIC, workflowId=None ) if not workflow: return { "success": False, "error": "Failed to start workflow" } self.workflow = workflow print(f"Workflow started: {workflow.id}") # Wait for workflow completion (no timeout - wait indefinitely) print(f"Waiting for workflow completion...") completed = await self.waitForWorkflowCompletion(timeout=None) if not completed: return { "success": False, "error": "Workflow did not complete", "workflowId": workflow.id, "status": workflow.status if workflow else "unknown" } # Analyze results results = self.analyzeWorkflowResults() # Extract documents for this format documents = results.get("documents", []) formatDocuments = [d for d in documents if d.get("fileName", "").endswith(f".{format.lower()}")] return { "success": True, "format": format, "workflowId": workflow.id, "status": results.get("status"), "documentCount": len(formatDocuments), "documents": formatDocuments, "results": results } async def waitForWorkflowCompletion(self, timeout: Optional[int] = None, checkInterval: int = 2) -> bool: """Wait for workflow to complete.""" if not self.workflow: return False startTime = time.time() lastStatus = None interfaceDbChat = interfaceDbChatObjects.getInterface(self.testUser) if timeout is None: print("Waiting indefinitely (no timeout)") while True: # Check timeout only if specified if timeout is not None and time.time() - startTime > timeout: print(f"\n⏱️ Timeout after {timeout} seconds") return False # Get current workflow status try: currentWorkflow = interfaceDbChat.getWorkflow(self.workflow.id) if not currentWorkflow: print("\n❌ Workflow not found") return False currentStatus = currentWorkflow.status elapsed = int(time.time() - startTime) # Print status if it changed if currentStatus != lastStatus: print(f"Workflow status: {currentStatus} (elapsed: {elapsed}s)") lastStatus = currentStatus # Check if workflow is complete if currentStatus in ["completed", "stopped", "failed"]: self.workflow = currentWorkflow statusIcon = "✅" if currentStatus == "completed" else "❌" print(f"\n{statusIcon} Workflow finished with status: {currentStatus} (elapsed: {elapsed}s)") return currentStatus == "completed" # Wait before next check await asyncio.sleep(checkInterval) except Exception as e: print(f"\n⚠️ Error checking workflow status: {str(e)}") await asyncio.sleep(checkInterval) def analyzeWorkflowResults(self) -> Dict[str, Any]: """Analyze workflow results and extract information.""" if not self.workflow: return {"error": "No workflow to analyze"} interfaceDbChat = interfaceDbChatObjects.getInterface(self.testUser) workflow = interfaceDbChat.getWorkflow(self.workflow.id) if not workflow: return {"error": "Workflow not found"} # Get unified chat data chatData = interfaceDbChat.getUnifiedChatData(workflow.id, None) # Count messages messages = chatData.get("messages", []) userMessages = [m for m in messages if m.get("role") == "user"] assistantMessages = [m for m in messages if m.get("role") == "assistant"] # Count documents documents = chatData.get("documents", []) # Get logs logs = chatData.get("logs", []) results = { "workflowId": workflow.id, "status": workflow.status, "workflowMode": str(workflow.workflowMode) if hasattr(workflow, 'workflowMode') else None, "currentRound": workflow.currentRound, "totalTasks": workflow.totalTasks, "totalActions": workflow.totalActions, "messageCount": len(messages), "userMessageCount": len(userMessages), "assistantMessageCount": len(assistantMessages), "documentCount": len(documents), "logCount": len(logs), "documents": documents, "logs": logs } print(f"\nWorkflow Results:") print(f" Status: {results['status']}") print(f" Tasks: {results['totalTasks']}") print(f" Actions: {results['totalActions']}") print(f" Messages: {results['messageCount']}") print(f" Documents: {results['documentCount']}") # Print document details if documents: print(f"\nGenerated Documents:") for doc in documents: fileName = doc.get("fileName", "unknown") fileSize = doc.get("fileSize", 0) mimeType = doc.get("mimeType", "unknown") documentType = doc.get("documentType", "N/A") print(f" - {fileName} ({fileSize} bytes, {mimeType}, type: {documentType})") return results def verifyDocumentFormat(self, document: Dict[str, Any], expectedFormat: str) -> Dict[str, Any]: """Verify that a document matches the expected format and contains expected metadata.""" fileName = document.get("fileName", "") mimeType = document.get("mimeType", "") fileSize = document.get("fileSize", 0) documentType = document.get("documentType") metadata = document.get("metadata") # Expected MIME types expectedMimeTypes = { "pdf": ["application/pdf"], "docx": ["application/vnd.openxmlformats-officedocument.wordprocessingml.document"], "xlsx": ["application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"], "pptx": ["application/vnd.openxmlformats-officedocument.presentationml.presentation"], "html": ["text/html", "application/xhtml+xml"] } # Expected file extensions expectedExtensions = { "pdf": [".pdf"], "docx": [".docx"], "xlsx": [".xlsx"], "pptx": [".pptx"], "html": [".html", ".htm"] } formatLower = expectedFormat.lower() expectedMimes = expectedMimeTypes.get(formatLower, []) expectedExts = expectedExtensions.get(formatLower, []) # Check file extension hasCorrectExtension = any(fileName.lower().endswith(ext) for ext in expectedExts) # Check MIME type hasCorrectMimeType = any(mimeType.lower() == mime.lower() for mime in expectedMimes) # Check file size (should be > 0) hasValidSize = fileSize > 0 # Check document type (should be present) hasDocumentType = documentType is not None # Check metadata (should be present) hasMetadata = metadata is not None and isinstance(metadata, dict) verification = { "format": expectedFormat, "fileName": fileName, "mimeType": mimeType, "fileSize": fileSize, "documentType": documentType, "hasMetadata": hasMetadata, "hasCorrectExtension": hasCorrectExtension, "hasCorrectMimeType": hasCorrectMimeType, "hasValidSize": hasValidSize, "hasDocumentType": hasDocumentType, "isValid": hasCorrectExtension and hasValidSize and hasCorrectMimeType, "isComplete": hasCorrectExtension and hasValidSize and hasCorrectMimeType and hasDocumentType and hasMetadata } return verification async def testAllFormats(self) -> Dict[str, Any]: """Test document generation in DOCX, XLSX, PPTX, PDF, and HTML formats.""" print("\n" + "="*80) print("TESTING DOCUMENT GENERATION IN HTML FORMAT") print("="*80) # Only test HTML format formats = ["html"] # formats = ["docx", "xlsx", "pptx", "pdf", "html"] # Commented out other formats results = {} for format in formats: try: print(f"\n{'='*80}") print(f"Testing {format.upper()} format...") print(f"{'='*80}") result = await self.generateDocumentInFormat(format) results[format] = result if result.get("success"): documents = result.get("documents", []) if documents: # Verify first document verification = self.verifyDocumentFormat(documents[0], format) result["verification"] = verification print(f"\n✅ {format.upper()} generation successful!") print(f" Documents: {len(documents)}") print(f" Verification: {'✅ PASS' if verification['isValid'] else '❌ FAIL'}") print(f" Complete (with metadata): {'✅ YES' if verification['isComplete'] else '❌ NO'}") if verification.get("fileName"): print(f" File: {verification['fileName']}") print(f" Size: {verification['fileSize']} bytes") print(f" MIME: {verification['mimeType']}") print(f" Document Type: {verification.get('documentType', 'N/A')}") print(f" Has Metadata: {'✅' if verification.get('hasMetadata') else '❌'}") else: print(f"\n⚠️ {format.upper()} generation completed but no documents found") else: error = result.get("error", "Unknown error") print(f"\n❌ {format.upper()} generation failed: {error}") # Small delay between tests await asyncio.sleep(2) except Exception as e: import traceback print(f"\n❌ Error testing {format.upper()}: {str(e)}") print(traceback.format_exc()) results[format] = { "success": False, "error": str(e), "traceback": traceback.format_exc() } return results async def runTest(self): """Run the complete test.""" print("\n" + "="*80) print("DOCUMENT GENERATION FORMATS TEST 10 - HTML ONLY") print("="*80) try: # Initialize await self.initialize() # Test all formats formatResults = await self.testAllFormats() # Summary print("\n" + "="*80) print("TEST SUMMARY") print("="*80) # Format tests summary print("\nFormat Tests:") successCount = 0 failCount = 0 completeCount = 0 # Documents with metadata for format, result in formatResults.items(): if result.get("success"): successCount += 1 verification = result.get("verification", {}) isValid = verification.get("isValid", False) isComplete = verification.get("isComplete", False) if isComplete: completeCount += 1 statusIcon = "✅" if isValid else "⚠️" completeIcon = "✅" if isComplete else "❌" docCount = result.get("documentCount", 0) print(f"{statusIcon} {format.upper():6s}: {'PASS' if isValid else 'FAIL'} - {docCount} document(s) - Metadata: {completeIcon}") else: failCount += 1 error = result.get("error", "Unknown error") print(f"❌ {format.upper():6s}: FAIL - {error}") print(f"\nFormat Tests: {successCount} passed, {failCount} failed out of {len(formatResults)} formats") print(f"Complete Documents (with metadata): {completeCount} out of {successCount} successful generations") self.testResults = { "success": failCount == 0, "formatTests": { "successCount": successCount, "failCount": failCount, "completeCount": completeCount, "totalFormats": len(formatResults), "results": formatResults }, "totalSuccess": successCount, "totalFail": failCount } return self.testResults except Exception as e: import traceback print(f"\n❌ Test failed with error: {type(e).__name__}: {str(e)}") print(f"Traceback:\n{traceback.format_exc()}") self.testResults = { "success": False, "error": str(e), "traceback": traceback.format_exc() } return self.testResults async def main(): """Run document generation formats test 10.""" tester = DocumentGenerationFormatsTester10() results = await tester.runTest() # Print final results as JSON for easy parsing print("\n" + "="*80) print("FINAL RESULTS (JSON)") print("="*80) print(json.dumps(results, indent=2, default=str)) if __name__ == "__main__": asyncio.run(main())