#!/usr/bin/env python3 # Copyright (c) 2025 Patrick Motsch # All rights reserved. """ Document Generation Formats Test - Tests document generation in all supported formats Tests HTML, PDF, DOCX, XLSX, and PPTX generation with images and various content types. """ import asyncio import json import sys import os import time import base64 from typing import Dict, Any, List, Optional # Add the gateway to path (go up 2 levels from tests/functional/) _gateway_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) if _gateway_path not in sys.path: sys.path.insert(0, _gateway_path) # Import the service initialization from modules.services import getInterface as getServices from modules.datamodels.datamodelChat import UserInputRequest, WorkflowModeEnum from modules.datamodels.datamodelUam import User from modules.features.workflow import chatStart import modules.interfaces.interfaceDbChatObjects as interfaceDbChatObjects class DocumentGenerationFormatsTester: def __init__(self): # Use root user for testing (has full access to everything) from modules.interfaces.interfaceDbAppObjects import getRootInterface rootInterface = getRootInterface() self.testUser = rootInterface.currentUser # Initialize services using the existing system self.services = getServices(self.testUser, None) # Test user, no workflow self.workflow = None self.testResults = {} self.generatedDocuments = {} async def initialize(self): """Initialize the test environment.""" # Enable debug file logging for tests from modules.shared.configuration import APP_CONFIG APP_CONFIG.set("APP_DEBUG_CHAT_WORKFLOW_ENABLED", True) # Set logging level to INFO to see workflow progress import logging logging.getLogger().setLevel(logging.INFO) print(f"Initialized test with user: {self.testUser.id}") print(f"Mandate ID: {self.testUser.mandateId}") print(f"Debug logging enabled: {APP_CONFIG.get('APP_DEBUG_CHAT_WORKFLOW_ENABLED', False)}") def createTestPrompt(self, format: str) -> str: """Create a test prompt for document generation in the specified format.""" prompts = { "html": "Create a professional HTML document about 'The Future of Artificial Intelligence' with: 1) A main title, 2) An introduction paragraph, 3) Three key sections with headings, 4) A bullet list of benefits, 5) An image showing AI technology (generate it), 6) A conclusion paragraph. Format as HTML.", "pdf": "Create a professional PDF report about 'Climate Change Impact Analysis' with: 1) A title page, 2) An executive summary, 3) Three main sections with data tables, 4) Charts/graphs described, 5) An image showing environmental impact (generate it), 6) Conclusions and recommendations. Format as PDF.", "docx": "Create a comprehensive Word document about 'Project Management Best Practices' with: 1) A cover page with title, 2) Table of contents, 3) Five chapters with headings and paragraphs, 4) A table comparing methodologies, 5) An image illustrating project workflow (generate it), 6) Appendices. Format as DOCX.", "xlsx": "Create an Excel workbook about 'Sales Performance Analysis' with: 1) A summary sheet with key metrics, 2) A detailed data sheet with sales data in a table format (columns: Month, Product, Sales, Units, Revenue), 3) A chart sheet with visualizations described, 4) An analysis sheet with calculations. Format as XLSX.", "pptx": "Create a PowerPoint presentation about 'Digital Transformation Strategy' with: 1) A title slide, 2) An agenda slide, 3) Five content slides with bullet points, 4) A slide with an image showing transformation roadmap (generate it), 5) A conclusion slide. Format as PPTX." } return prompts.get(format.lower(), prompts["docx"]) async def generateDocumentInFormat(self, format: str) -> Dict[str, Any]: """Generate a document in the specified format using workflow.""" print("\n" + "="*80) print(f"GENERATING DOCUMENT IN {format.upper()} FORMAT") print("="*80) prompt = self.createTestPrompt(format) print(f"Prompt: {prompt[:200]}...") # Create user input request userInput = UserInputRequest( prompt=prompt, userLanguage="en" ) # Start workflow print(f"\nStarting workflow for {format.upper()} generation...") workflow = await chatStart( currentUser=self.testUser, userInput=userInput, workflowMode=WorkflowModeEnum.WORKFLOW_DYNAMIC, workflowId=None ) if not workflow: return { "success": False, "error": "Failed to start workflow" } self.workflow = workflow print(f"Workflow started: {workflow.id}") # Wait for workflow completion print(f"Waiting for workflow completion...") completed = await self.waitForWorkflowCompletion(timeout=300) # 5 minute timeout if not completed: return { "success": False, "error": "Workflow did not complete within timeout", "workflowId": workflow.id, "status": workflow.status if workflow else "unknown" } # Analyze results results = self.analyzeWorkflowResults() # Extract documents for this format documents = results.get("documents", []) formatDocuments = [d for d in documents if d.get("fileName", "").endswith(f".{format.lower()}")] return { "success": True, "format": format, "workflowId": workflow.id, "status": results.get("status"), "documentCount": len(formatDocuments), "documents": formatDocuments, "results": results } async def waitForWorkflowCompletion(self, timeout: int = 300, checkInterval: int = 2) -> bool: """Wait for workflow to complete.""" if not self.workflow: return False startTime = time.time() lastStatus = None interfaceDbChat = interfaceDbChatObjects.getInterface(self.testUser) while True: # Check timeout if time.time() - startTime > timeout: print(f"\n⏱️ Timeout after {timeout} seconds") return False # Get current workflow status try: currentWorkflow = interfaceDbChat.getWorkflow(self.workflow.id) if not currentWorkflow: print("\n❌ Workflow not found") return False currentStatus = currentWorkflow.status elapsed = int(time.time() - startTime) # Print status if it changed if currentStatus != lastStatus: print(f"Workflow status: {currentStatus} (elapsed: {elapsed}s)") lastStatus = currentStatus # Check if workflow is complete if currentStatus in ["completed", "stopped", "failed"]: self.workflow = currentWorkflow statusIcon = "✅" if currentStatus == "completed" else "❌" print(f"\n{statusIcon} Workflow finished with status: {currentStatus} (elapsed: {elapsed}s)") return currentStatus == "completed" # Wait before next check await asyncio.sleep(checkInterval) except Exception as e: print(f"\n⚠️ Error checking workflow status: {str(e)}") await asyncio.sleep(checkInterval) def analyzeWorkflowResults(self) -> Dict[str, Any]: """Analyze workflow results and extract information.""" if not self.workflow: return {"error": "No workflow to analyze"} interfaceDbChat = interfaceDbChatObjects.getInterface(self.testUser) workflow = interfaceDbChat.getWorkflow(self.workflow.id) if not workflow: return {"error": "Workflow not found"} # Get unified chat data chatData = interfaceDbChat.getUnifiedChatData(workflow.id, None) # Count messages messages = chatData.get("messages", []) userMessages = [m for m in messages if m.get("role") == "user"] assistantMessages = [m for m in messages if m.get("role") == "assistant"] # Count documents documents = chatData.get("documents", []) # Get logs logs = chatData.get("logs", []) results = { "workflowId": workflow.id, "status": workflow.status, "workflowMode": str(workflow.workflowMode) if hasattr(workflow, 'workflowMode') else None, "currentRound": workflow.currentRound, "totalTasks": workflow.totalTasks, "totalActions": workflow.totalActions, "messageCount": len(messages), "userMessageCount": len(userMessages), "assistantMessageCount": len(assistantMessages), "documentCount": len(documents), "logCount": len(logs), "documents": documents, "logs": logs } print(f"\nWorkflow Results:") print(f" Status: {results['status']}") print(f" Tasks: {results['totalTasks']}") print(f" Actions: {results['totalActions']}") print(f" Messages: {results['messageCount']}") print(f" Documents: {results['documentCount']}") # Print document details if documents: print(f"\nGenerated Documents:") for doc in documents: fileName = doc.get("fileName", "unknown") fileSize = doc.get("fileSize", 0) mimeType = doc.get("mimeType", "unknown") print(f" - {fileName} ({fileSize} bytes, {mimeType})") return results def verifyDocumentFormat(self, document: Dict[str, Any], expectedFormat: str) -> Dict[str, Any]: """Verify that a document matches the expected format.""" fileName = document.get("fileName", "") mimeType = document.get("mimeType", "") fileSize = document.get("fileSize", 0) # Expected MIME types expectedMimeTypes = { "html": ["text/html", "application/xhtml+xml"], "pdf": ["application/pdf"], "docx": ["application/vnd.openxmlformats-officedocument.wordprocessingml.document"], "xlsx": ["application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"], "pptx": ["application/vnd.openxmlformats-officedocument.presentationml.presentation"] } # Expected file extensions expectedExtensions = { "html": [".html", ".htm"], "pdf": [".pdf"], "docx": [".docx"], "xlsx": [".xlsx"], "pptx": [".pptx"] } formatLower = expectedFormat.lower() expectedMimes = expectedMimeTypes.get(formatLower, []) expectedExts = expectedExtensions.get(formatLower, []) # Check file extension hasCorrectExtension = any(fileName.lower().endswith(ext) for ext in expectedExts) # Check MIME type hasCorrectMimeType = any(mimeType.lower() == mime.lower() for mime in expectedMimes) # Check file size (should be > 0) hasValidSize = fileSize > 0 verification = { "format": expectedFormat, "fileName": fileName, "mimeType": mimeType, "fileSize": fileSize, "hasCorrectExtension": hasCorrectExtension, "hasCorrectMimeType": hasCorrectMimeType, "hasValidSize": hasValidSize, "isValid": hasCorrectExtension and hasValidSize } return verification async def testAllFormats(self) -> Dict[str, Any]: """Test document generation in all formats.""" print("\n" + "="*80) print("TESTING DOCUMENT GENERATION IN ALL FORMATS") print("="*80) formats = ["html", "pdf", "docx", "xlsx", "pptx"] results = {} for format in formats: try: print(f"\n{'='*80}") print(f"Testing {format.upper()} format...") print(f"{'='*80}") result = await self.generateDocumentInFormat(format) results[format] = result if result.get("success"): documents = result.get("documents", []) if documents: # Verify first document verification = self.verifyDocumentFormat(documents[0], format) result["verification"] = verification print(f"\n✅ {format.upper()} generation successful!") print(f" Documents: {len(documents)}") print(f" Verification: {'✅ PASS' if verification['isValid'] else '❌ FAIL'}") if verification.get("fileName"): print(f" File: {verification['fileName']}") print(f" Size: {verification['fileSize']} bytes") print(f" MIME: {verification['mimeType']}") else: print(f"\n⚠️ {format.upper()} generation completed but no documents found") else: error = result.get("error", "Unknown error") print(f"\n❌ {format.upper()} generation failed: {error}") # Small delay between tests await asyncio.sleep(2) except Exception as e: import traceback print(f"\n❌ Error testing {format.upper()}: {str(e)}") print(traceback.format_exc()) results[format] = { "success": False, "error": str(e), "traceback": traceback.format_exc() } return results async def runTest(self): """Run the complete test.""" print("\n" + "="*80) print("DOCUMENT GENERATION FORMATS TEST") print("="*80) try: # Initialize await self.initialize() # Test all formats results = await self.testAllFormats() # Summary print("\n" + "="*80) print("TEST SUMMARY") print("="*80) successCount = 0 failCount = 0 for format, result in results.items(): if result.get("success"): successCount += 1 status = "✅ PASS" docCount = result.get("documentCount", 0) verification = result.get("verification", {}) isValid = verification.get("isValid", False) statusIcon = "✅" if isValid else "⚠️" print(f"{statusIcon} {format.upper():6s}: {status} - {docCount} document(s)") else: failCount += 1 error = result.get("error", "Unknown error") print(f"❌ {format.upper():6s}: FAIL - {error}") print(f"\nTotal: {successCount} passed, {failCount} failed out of {len(results)} formats") self.testResults = { "success": failCount == 0, "successCount": successCount, "failCount": failCount, "totalFormats": len(results), "results": results } return self.testResults except Exception as e: import traceback print(f"\n❌ Test failed with error: {type(e).__name__}: {str(e)}") print(f"Traceback:\n{traceback.format_exc()}") self.testResults = { "success": False, "error": str(e), "traceback": traceback.format_exc() } return self.testResults async def main(): """Run document generation formats test.""" tester = DocumentGenerationFormatsTester() results = await tester.runTest() # Print final results as JSON for easy parsing print("\n" + "="*80) print("FINAL RESULTS (JSON)") print("="*80) print(json.dumps(results, indent=2, default=str)) if __name__ == "__main__": asyncio.run(main())