gateway/tests/functional/test09_document_generation_formats.py
2025-12-25 23:51:47 +01:00

732 lines
31 KiB
Python

#!/usr/bin/env python3
# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Document Generation Formats Test - Tests document generation in all supported formats
Tests HTML, PDF, DOCX, XLSX, and PPTX generation with images and various content types.
"""
import asyncio
import json
import sys
import os
import time
import base64
from typing import Dict, Any, List, Optional
# Add the gateway to path (go up 2 levels from tests/functional/)
_gateway_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))
if _gateway_path not in sys.path:
sys.path.insert(0, _gateway_path)
# Import the service initialization
from modules.services import getInterface as getServices
from modules.datamodels.datamodelChat import UserInputRequest, WorkflowModeEnum
from modules.datamodels.datamodelUam import User
from modules.features.workflow import chatStart
import modules.interfaces.interfaceDbChatObjects as interfaceDbChatObjects
class DocumentGenerationFormatsTester:
def __init__(self):
# Use root user for testing (has full access to everything)
from modules.interfaces.interfaceDbAppObjects import getRootInterface
rootInterface = getRootInterface()
self.testUser = rootInterface.currentUser
# Initialize services using the existing system
self.services = getServices(self.testUser, None) # Test user, no workflow
self.workflow = None
self.testResults = {}
self.generatedDocuments = {}
self.pdfFileId = None # Store PDF file ID for reuse
async def initialize(self):
"""Initialize the test environment."""
# Enable debug file logging for tests
from modules.shared.configuration import APP_CONFIG
APP_CONFIG.set("APP_DEBUG_CHAT_WORKFLOW_ENABLED", True)
# Set logging level to INFO to see workflow progress
import logging
logging.getLogger().setLevel(logging.INFO)
print(f"Initialized test with user: {self.testUser.id}")
print(f"Mandate ID: {self.testUser.mandateId}")
print(f"Debug logging enabled: {APP_CONFIG.get('APP_DEBUG_CHAT_WORKFLOW_ENABLED', False)}")
# Upload PDF file for testing
await self.uploadPdfFile()
async def uploadPdfFile(self):
"""Upload the PDF file and store its file ID."""
pdfPath = os.path.join(os.path.dirname(__file__), "..", "..", "..", "local", "temp", "B2025-02c.pdf")
pdfPath = os.path.abspath(pdfPath)
if not os.path.exists(pdfPath):
print(f"⚠️ Warning: PDF file not found at {pdfPath}")
print(" Test will continue without PDF attachment")
return
try:
# Read PDF file
with open(pdfPath, "rb") as f:
pdfContent = f.read()
# Create file using services.interfaceDbComponent
if not hasattr(self.services, 'interfaceDbComponent') or not self.services.interfaceDbComponent:
print("⚠️ Warning: interfaceDbComponent not available in services")
print(" Test will continue without PDF attachment")
return
interfaceDbComponent = self.services.interfaceDbComponent
fileItem = interfaceDbComponent.createFile(
name="B2025-02c.pdf",
mimeType="application/pdf",
content=pdfContent
)
# Store file data
interfaceDbComponent.createFileData(fileItem.id, pdfContent)
self.pdfFileId = fileItem.id
print(f"✅ Uploaded PDF file: {fileItem.fileName} (ID: {self.pdfFileId}, Size: {len(pdfContent)} bytes)")
except Exception as e:
import traceback
print(f"⚠️ Warning: Failed to upload PDF file: {str(e)}")
print(f" Traceback: {traceback.format_exc()}")
print(" Test will continue without PDF attachment")
def createTestPrompt(self, format: str) -> str:
"""Create a unified test prompt for document generation in the specified format.
The prompt requests:
- Extraction of images from the attached PDF
- Generation of a new image
- Document creation with both images
"""
basePrompt = (
"Create a professional document about 'Fuel Station Receipt Analysis' with the following content:\n"
"1) A main title\n"
"2) An introduction paragraph explaining the receipt analysis\n"
"3) Extract and include the image from the attached PDF document (B2025-02c.pdf)\n"
"4) A section analyzing the receipt data with bullet points\n"
"5) Generate a new image showing a visual representation of fuel consumption trends\n"
"6) A conclusion paragraph with recommendations\n\n"
"Make sure to include both: the image extracted from the PDF and the newly generated image.\n"
f"Format the output as {format.upper()}."
)
return basePrompt
def createRefactoringTestPrompt(self, testType: str, format: str = "html") -> str:
"""Create test prompts for specific refactoring features.
Args:
testType: Type of refactoring test:
- "intent_analysis": Test DocumentIntent analysis
- "conditional_extraction": Test conditional extraction (extract vs render)
- "image_render": Test image rendering as asset
- "multi_document": Test multi-document rendering
- "metadata_preservation": Test metadata preservation
format: Output format (default: html)
"""
prompts = {
"intent_analysis": (
"Create a document with the following requirements:\n"
"1) Extract text content from the attached PDF\n"
"2) Include images from the PDF as visual elements (render them, don't extract text from them)\n"
"3) Generate a summary document\n\n"
"This tests that the system correctly identifies which documents need extraction vs rendering."
),
"conditional_extraction": (
"Create a document that:\n"
"1) Extracts and uses text from the attached PDF\n"
"2) Renders images from the PDF as visual assets (not as extracted text)\n"
"3) Generates new content based on the extracted text\n\n"
"This tests conditional extraction - only extract what needs extraction, render what needs rendering."
),
"image_render": (
"Create a document that includes images from the attached PDF.\n"
"The images should be rendered as visual elements in the document, not extracted as text.\n"
"Include a title and description for each image.\n\n"
"This tests the image asset pipeline with render intent."
),
"multi_document": (
"Create multiple separate documents:\n"
"1) Document 1: Summary of the PDF content\n"
"2) Document 2: Analysis of the PDF content\n"
"3) Document 3: Recommendations based on the PDF content\n\n"
"Each document should be separate and complete.\n"
"This tests multi-document generation and rendering."
),
"metadata_preservation": (
"Create a document that extracts content from the attached PDF.\n"
"The document should clearly show which content came from which source document.\n"
"Include source references in the generated content.\n\n"
"This tests that metadata (documentId, mimeType) is preserved in the generation prompt."
)
}
prompt = prompts.get(testType, self.createTestPrompt(format))
return f"{prompt}\n\nFormat the output as {format.upper()}."
async def generateDocumentInFormat(self, format: str) -> Dict[str, Any]:
"""Generate a document in the specified format using workflow."""
print("\n" + "="*80)
print(f"GENERATING DOCUMENT IN {format.upper()} FORMAT")
print("="*80)
prompt = self.createTestPrompt(format)
print(f"Prompt: {prompt[:200]}...")
# Create user input request with PDF file attachment
listFileId = []
if self.pdfFileId:
listFileId = [self.pdfFileId]
print(f"Attaching PDF file (ID: {self.pdfFileId})")
else:
print("⚠️ No PDF file attached (file upload may have failed)")
# Create user input request
userInput = UserInputRequest(
prompt=prompt,
listFileId=listFileId,
userLanguage="en"
)
# Start workflow
print(f"\nStarting workflow for {format.upper()} generation...")
workflow = await chatStart(
currentUser=self.testUser,
userInput=userInput,
workflowMode=WorkflowModeEnum.WORKFLOW_DYNAMIC,
workflowId=None
)
if not workflow:
return {
"success": False,
"error": "Failed to start workflow"
}
self.workflow = workflow
print(f"Workflow started: {workflow.id}")
# Wait for workflow completion (no timeout - wait indefinitely)
print(f"Waiting for workflow completion...")
completed = await self.waitForWorkflowCompletion(timeout=None)
if not completed:
return {
"success": False,
"error": "Workflow did not complete",
"workflowId": workflow.id,
"status": workflow.status if workflow else "unknown"
}
# Analyze results
results = self.analyzeWorkflowResults()
# Extract documents for this format
documents = results.get("documents", [])
formatDocuments = [d for d in documents if d.get("fileName", "").endswith(f".{format.lower()}")]
return {
"success": True,
"format": format,
"workflowId": workflow.id,
"status": results.get("status"),
"documentCount": len(formatDocuments),
"documents": formatDocuments,
"results": results
}
async def waitForWorkflowCompletion(self, timeout: Optional[int] = None, checkInterval: int = 2) -> bool:
"""Wait for workflow to complete."""
if not self.workflow:
return False
startTime = time.time()
lastStatus = None
interfaceDbChat = interfaceDbChatObjects.getInterface(self.testUser)
if timeout is None:
print("Waiting indefinitely (no timeout)")
while True:
# Check timeout only if specified
if timeout is not None and time.time() - startTime > timeout:
print(f"\n⏱️ Timeout after {timeout} seconds")
return False
# Get current workflow status
try:
currentWorkflow = interfaceDbChat.getWorkflow(self.workflow.id)
if not currentWorkflow:
print("\n❌ Workflow not found")
return False
currentStatus = currentWorkflow.status
elapsed = int(time.time() - startTime)
# Print status if it changed
if currentStatus != lastStatus:
print(f"Workflow status: {currentStatus} (elapsed: {elapsed}s)")
lastStatus = currentStatus
# Check if workflow is complete
if currentStatus in ["completed", "stopped", "failed"]:
self.workflow = currentWorkflow
statusIcon = "" if currentStatus == "completed" else ""
print(f"\n{statusIcon} Workflow finished with status: {currentStatus} (elapsed: {elapsed}s)")
return currentStatus == "completed"
# Wait before next check
await asyncio.sleep(checkInterval)
except Exception as e:
print(f"\n⚠️ Error checking workflow status: {str(e)}")
await asyncio.sleep(checkInterval)
def analyzeWorkflowResults(self) -> Dict[str, Any]:
"""Analyze workflow results and extract information."""
if not self.workflow:
return {"error": "No workflow to analyze"}
interfaceDbChat = interfaceDbChatObjects.getInterface(self.testUser)
workflow = interfaceDbChat.getWorkflow(self.workflow.id)
if not workflow:
return {"error": "Workflow not found"}
# Get unified chat data
chatData = interfaceDbChat.getUnifiedChatData(workflow.id, None)
# Count messages
messages = chatData.get("messages", [])
userMessages = [m for m in messages if m.get("role") == "user"]
assistantMessages = [m for m in messages if m.get("role") == "assistant"]
# Count documents
documents = chatData.get("documents", [])
# Get logs
logs = chatData.get("logs", [])
results = {
"workflowId": workflow.id,
"status": workflow.status,
"workflowMode": str(workflow.workflowMode) if hasattr(workflow, 'workflowMode') else None,
"currentRound": workflow.currentRound,
"totalTasks": workflow.totalTasks,
"totalActions": workflow.totalActions,
"messageCount": len(messages),
"userMessageCount": len(userMessages),
"assistantMessageCount": len(assistantMessages),
"documentCount": len(documents),
"logCount": len(logs),
"documents": documents,
"logs": logs
}
print(f"\nWorkflow Results:")
print(f" Status: {results['status']}")
print(f" Tasks: {results['totalTasks']}")
print(f" Actions: {results['totalActions']}")
print(f" Messages: {results['messageCount']}")
print(f" Documents: {results['documentCount']}")
# Print document details
if documents:
print(f"\nGenerated Documents:")
for doc in documents:
fileName = doc.get("fileName", "unknown")
fileSize = doc.get("fileSize", 0)
mimeType = doc.get("mimeType", "unknown")
print(f" - {fileName} ({fileSize} bytes, {mimeType})")
return results
def verifyDocumentFormat(self, document: Dict[str, Any], expectedFormat: str) -> Dict[str, Any]:
"""Verify that a document matches the expected format."""
fileName = document.get("fileName", "")
mimeType = document.get("mimeType", "")
fileSize = document.get("fileSize", 0)
# Expected MIME types
expectedMimeTypes = {
"html": ["text/html", "application/xhtml+xml"],
"pdf": ["application/pdf"],
"docx": ["application/vnd.openxmlformats-officedocument.wordprocessingml.document"],
"xlsx": ["application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"],
"pptx": ["application/vnd.openxmlformats-officedocument.presentationml.presentation"]
}
# Expected file extensions
expectedExtensions = {
"html": [".html", ".htm"],
"pdf": [".pdf"],
"docx": [".docx"],
"xlsx": [".xlsx"],
"pptx": [".pptx"]
}
formatLower = expectedFormat.lower()
expectedMimes = expectedMimeTypes.get(formatLower, [])
expectedExts = expectedExtensions.get(formatLower, [])
# Check file extension
hasCorrectExtension = any(fileName.lower().endswith(ext) for ext in expectedExts)
# Check MIME type
hasCorrectMimeType = any(mimeType.lower() == mime.lower() for mime in expectedMimes)
# Check file size (should be > 0)
hasValidSize = fileSize > 0
verification = {
"format": expectedFormat,
"fileName": fileName,
"mimeType": mimeType,
"fileSize": fileSize,
"hasCorrectExtension": hasCorrectExtension,
"hasCorrectMimeType": hasCorrectMimeType,
"hasValidSize": hasValidSize,
"isValid": hasCorrectExtension and hasValidSize
}
return verification
async def testRefactoringFeatures(self) -> Dict[str, Any]:
"""Test specific refactoring features."""
print("\n" + "="*80)
print("TESTING REFACTORING FEATURES")
print("="*80)
refactoringTests = [
("intent_analysis", "html"),
("conditional_extraction", "html"),
("image_render", "html"),
("multi_document", "html"),
("metadata_preservation", "html")
]
results = {}
for testType, format in refactoringTests:
try:
print(f"\n{'='*80}")
print(f"Testing Refactoring Feature: {testType}")
print(f"{'='*80}")
prompt = self.createRefactoringTestPrompt(testType, format)
print(f"Prompt: {prompt[:200]}...")
# Create user input request with PDF file attachment
listFileId = []
if self.pdfFileId:
listFileId = [self.pdfFileId]
print(f"Attaching PDF file (ID: {self.pdfFileId})")
else:
print("⚠️ No PDF file attached (file upload may have failed)")
userInput = UserInputRequest(
prompt=prompt,
listFileId=listFileId,
userLanguage="en"
)
# Start workflow
print(f"\nStarting workflow for {testType} test...")
workflow = await chatStart(
currentUser=self.testUser,
userInput=userInput,
workflowMode=WorkflowModeEnum.WORKFLOW_DYNAMIC,
workflowId=None
)
if not workflow:
results[testType] = {
"success": False,
"error": "Failed to start workflow"
}
continue
self.workflow = workflow
print(f"Workflow started: {workflow.id}")
# Wait for workflow completion (no timeout - wait indefinitely)
completed = await self.waitForWorkflowCompletion(timeout=None)
if not completed:
results[testType] = {
"success": False,
"error": "Workflow did not complete",
"workflowId": workflow.id
}
continue
# Analyze results
workflowResults = self.analyzeWorkflowResults()
# Check for specific refactoring features
verification = self.verifyRefactoringFeature(testType, workflowResults)
results[testType] = {
"success": True,
"workflowId": workflow.id,
"verification": verification,
"workflowResults": workflowResults
}
print(f"\n{testType} test completed!")
print(f" Verification: {'✅ PASS' if verification.get('passed', False) else '❌ FAIL'}")
if verification.get("details"):
for detail in verification["details"]:
print(f" - {detail}")
await asyncio.sleep(2)
except Exception as e:
import traceback
print(f"\n❌ Error testing {testType}: {str(e)}")
print(traceback.format_exc())
results[testType] = {
"success": False,
"error": str(e),
"traceback": traceback.format_exc()
}
return results
def verifyRefactoringFeature(self, testType: str, workflowResults: Dict[str, Any]) -> Dict[str, Any]:
"""Verify that a refactoring feature works correctly."""
documents = workflowResults.get("documents", [])
logs = workflowResults.get("logs", [])
verification = {
"testType": testType,
"passed": False,
"details": []
}
if testType == "intent_analysis":
# Check that intent analysis was performed
intentLogs = [log for log in logs if "intent" in str(log).lower() or "analyzing document intent" in str(log).lower()]
if intentLogs:
verification["details"].append("Intent analysis logs found")
verification["passed"] = True
else:
verification["details"].append("No intent analysis logs found")
elif testType == "conditional_extraction":
# Check that extraction and rendering both occurred
extractionLogs = [log for log in logs if "extract" in str(log).lower()]
renderLogs = [log for log in logs if "render" in str(log).lower() or "image" in str(log).lower()]
if extractionLogs and renderLogs:
verification["details"].append("Both extraction and rendering occurred")
verification["passed"] = True
else:
verification["details"].append(f"Missing logs: extraction={len(extractionLogs)}, render={len(renderLogs)}")
elif testType == "image_render":
# Check that images were rendered (not extracted as text)
imageLogs = [log for log in logs if "image" in str(log).lower()]
if imageLogs:
verification["details"].append("Image rendering logs found")
verification["passed"] = True
else:
verification["details"].append("No image rendering logs found")
elif testType == "multi_document":
# Check that multiple documents were generated
if len(documents) >= 2:
verification["details"].append(f"Multiple documents generated: {len(documents)}")
verification["passed"] = True
else:
verification["details"].append(f"Expected multiple documents, got {len(documents)}")
elif testType == "metadata_preservation":
# Check that metadata was preserved (check logs for documentId references)
metadataLogs = [log for log in logs if "documentId" in str(log) or "SOURCE:" in str(log)]
if metadataLogs:
verification["details"].append("Metadata preservation logs found")
verification["passed"] = True
else:
verification["details"].append("No metadata preservation logs found")
return verification
async def testAllFormats(self) -> Dict[str, Any]:
"""Test document generation in all formats."""
print("\n" + "="*80)
print("TESTING DOCUMENT GENERATION IN ALL FORMATS")
print("="*80)
formats = ["html", "pdf", "docx", "xlsx", "pptx"]
results = {}
for format in formats:
try:
print(f"\n{'='*80}")
print(f"Testing {format.upper()} format...")
print(f"{'='*80}")
result = await self.generateDocumentInFormat(format)
results[format] = result
if result.get("success"):
documents = result.get("documents", [])
if documents:
# Verify first document
verification = self.verifyDocumentFormat(documents[0], format)
result["verification"] = verification
print(f"\n{format.upper()} generation successful!")
print(f" Documents: {len(documents)}")
print(f" Verification: {'✅ PASS' if verification['isValid'] else '❌ FAIL'}")
if verification.get("fileName"):
print(f" File: {verification['fileName']}")
print(f" Size: {verification['fileSize']} bytes")
print(f" MIME: {verification['mimeType']}")
else:
print(f"\n⚠️ {format.upper()} generation completed but no documents found")
else:
error = result.get("error", "Unknown error")
print(f"\n{format.upper()} generation failed: {error}")
# Small delay between tests
await asyncio.sleep(2)
except Exception as e:
import traceback
print(f"\n❌ Error testing {format.upper()}: {str(e)}")
print(traceback.format_exc())
results[format] = {
"success": False,
"error": str(e),
"traceback": traceback.format_exc()
}
return results
async def runTest(self, includeRefactoringTests: bool = True):
"""Run the complete test.
Args:
includeRefactoringTests: If True, also run refactoring feature tests
"""
print("\n" + "="*80)
print("DOCUMENT GENERATION FORMATS TEST")
print("="*80)
try:
# Initialize
await self.initialize()
# Test refactoring features first (if enabled)
refactoringResults = {}
if includeRefactoringTests:
refactoringResults = await self.testRefactoringFeatures()
# Test all formats
formatResults = await self.testAllFormats()
# Summary
print("\n" + "="*80)
print("TEST SUMMARY")
print("="*80)
# Refactoring tests summary
refactoringSuccessCount = 0
refactoringFailCount = 0
if includeRefactoringTests and refactoringResults:
print("\nRefactoring Features:")
for testType, result in refactoringResults.items():
if result.get("success"):
refactoringSuccessCount += 1
verification = result.get("verification", {})
passed = verification.get("passed", False)
statusIcon = "" if passed else "⚠️"
print(f"{statusIcon} {testType:25s}: {'PASS' if passed else 'FAIL'}")
else:
refactoringFailCount += 1
error = result.get("error", "Unknown error")
print(f"{testType:25s}: FAIL - {error}")
print(f"Refactoring Tests: {refactoringSuccessCount} passed, {refactoringFailCount} failed out of {len(refactoringResults)} tests")
# Format tests summary
print("\nFormat Tests:")
successCount = 0
failCount = 0
for format, result in formatResults.items():
if result.get("success"):
successCount += 1
status = "✅ PASS"
docCount = result.get("documentCount", 0)
verification = result.get("verification", {})
isValid = verification.get("isValid", False)
statusIcon = "" if isValid else "⚠️"
print(f"{statusIcon} {format.upper():6s}: {status} - {docCount} document(s)")
else:
failCount += 1
error = result.get("error", "Unknown error")
print(f"{format.upper():6s}: FAIL - {error}")
print(f"\nFormat Tests: {successCount} passed, {failCount} failed out of {len(formatResults)} formats")
# Calculate totals
totalSuccess = successCount + refactoringSuccessCount if includeRefactoringTests else successCount
totalFail = failCount + refactoringFailCount if includeRefactoringTests else failCount
self.testResults = {
"success": failCount == 0 and (not includeRefactoringTests or refactoringFailCount == 0),
"formatTests": {
"successCount": successCount,
"failCount": failCount,
"totalFormats": len(formatResults),
"results": formatResults
},
"refactoringTests": {
"successCount": refactoringSuccessCount if includeRefactoringTests else 0,
"failCount": refactoringFailCount if includeRefactoringTests else 0,
"totalTests": len(refactoringResults) if includeRefactoringTests else 0,
"results": refactoringResults if includeRefactoringTests else {}
},
"totalSuccess": totalSuccess,
"totalFail": totalFail
}
return self.testResults
except Exception as e:
import traceback
print(f"\n❌ Test failed with error: {type(e).__name__}: {str(e)}")
print(f"Traceback:\n{traceback.format_exc()}")
self.testResults = {
"success": False,
"error": str(e),
"traceback": traceback.format_exc()
}
return self.testResults
async def main():
"""Run document generation formats test."""
tester = DocumentGenerationFormatsTester()
results = await tester.runTest()
# Print final results as JSON for easy parsing
print("\n" + "="*80)
print("FINAL RESULTS (JSON)")
print("="*80)
print(json.dumps(results, indent=2, default=str))
if __name__ == "__main__":
asyncio.run(main())