gateway/test4_method_ai_operations.py
2025-10-29 23:40:39 +01:00

372 lines
16 KiB
Python

#!/usr/bin/env python3
"""
Test script for methodAi operations.
Tests all OperationType's with various prompts through the workflow action interface.
"""
import asyncio
import sys
import os
from datetime import datetime
from typing import Dict, Any, List
# Add the gateway to path
sys.path.append(os.path.dirname(__file__))
from modules.datamodels.datamodelAi import OperationTypeEnum
from modules.datamodels.datamodelChat import ChatWorkflow, ChatDocument
from modules.datamodels.datamodelUam import User
class MethodAiOperationsTester:
"""Test all operation types through methodAi.process() action."""
def __init__(self):
# Use root user for testing (has full access to everything)
from modules.interfaces.interfaceDbAppObjects import getRootInterface
rootInterface = getRootInterface()
self.testUser = rootInterface.currentUser
self.services = None
self.methodAi = None
self.testResults = []
# Create logs directory if it doesn't exist
self.logsDir = os.path.join(os.path.dirname(__file__), "..", "local", "logs")
os.makedirs(self.logsDir, exist_ok=True)
# Create modeltest subdirectory
self.modelTestDir = os.path.join(self.logsDir, "modeltest")
os.makedirs(self.modelTestDir, exist_ok=True)
# Test prompts for each operation type
self.testPrompts = {
OperationTypeEnum.PLAN: {
"aiPrompt": "Create a 5-step plan to organize a project meeting and include the manual for the project management office.",
"resultType": "json"
},
OperationTypeEnum.DATA_ANALYSE: {
"aiPrompt": "Analyze the following text and extract the main topics and key points: 'Machine learning is transforming healthcare by enabling early disease detection through pattern recognition in medical images.'",
"resultType": "json"
},
OperationTypeEnum.DATA_GENERATE: {
# "aiPrompt": "Generate the first 1000 prime numbers.",
"aiPrompt": "Schreibe einen Bericht über die Verhaltensweisen in Finnalnd's Saunas in 50 Kapiteln",
"resultType": "docx"
},
OperationTypeEnum.DATA_EXTRACT: {
"aiPrompt": "Extract all email addresses and phone numbers from the following text: 'Contact us at support@example.com or call 123-456-7890. For sales, email sales@example.com or call 987-654-3210.'",
"resultType": "json"
},
OperationTypeEnum.IMAGE_ANALYSE: {
"aiPrompt": "Analyze this image and describe what you see, including any text or numbers visible.",
"resultType": "json",
# documentList should contain document references resolvable by workflow service
# For testing, leave empty if no test image is available
"documentList": []
},
OperationTypeEnum.IMAGE_GENERATE: {
"aiPrompt": "A beautiful sunset over the ocean with purple and orange hues",
"resultType": "png"
},
OperationTypeEnum.WEB_SEARCH: {
"aiPrompt": "Find recent articles about ValueOn AG in Switzeerland in 2025",
"resultType": "json"
},
OperationTypeEnum.WEB_CRAWL: {
"aiPrompt": "Extract who works in this company",
"resultType": "json",
"documentList": ["https://www.valueon.com"]
}
}
async def initialize(self):
"""Initialize services and methodAi."""
print("🔧 Initializing services...")
# Set logging level to DEBUG to see debug messages
import logging
logging.getLogger().setLevel(logging.DEBUG)
# Import and initialize services - use the same approach as routeChatPlayground
import modules.interfaces.interfaceDbChatObjects as interfaceDbChatObjects
interfaceDbChat = interfaceDbChatObjects.getInterface(self.testUser)
# Import and initialize services
from modules.features.chatPlayground.mainChatPlayground import getServices
# Get services first
self.services = getServices(self.testUser, None)
# Now create AND SAVE workflow in database using the interface
import uuid
import time
currentTimestamp = time.time()
testWorkflow = ChatWorkflow(
id=str(uuid.uuid4()),
name="Test Workflow",
status="running",
startedAt=currentTimestamp,
lastActivity=currentTimestamp,
currentRound=1,
currentTask=0,
currentAction=0,
totalTasks=0,
totalActions=0,
mandateId=self.testUser.mandateId,
messageIds=[],
workflowMode="React",
maxSteps=5
)
# SAVE workflow to database so it exists for access control
# Convert ChatWorkflow to dict for createWorkflow
workflowDict = testWorkflow.model_dump()
interfaceDbChat.createWorkflow(workflowDict)
# Set the workflow in services
self.services.currentWorkflow = testWorkflow
# Debug: Print workflow status
print(f"Debug: services.currentWorkflow is set: {hasattr(self.services, 'currentWorkflow') and self.services.currentWorkflow is not None}")
if self.services.currentWorkflow:
print(f"Debug: Workflow ID: {self.services.currentWorkflow.id}")
# Import and initialize methodAi AFTER setting workflow
from modules.workflows.methods.methodAi import MethodAi
self.methodAi = MethodAi(self.services)
# Verify methodAi has access to the workflow
if hasattr(self.methodAi, 'services'):
print(f"Debug: methodAi.services.currentWorkflow is set: {hasattr(self.methodAi.services, 'currentWorkflow') and self.methodAi.services.currentWorkflow is not None}")
print("✅ Services initialized")
print(f"📁 Results will be saved to: {self.modelTestDir}")
async def testOperation(self, operationType: OperationTypeEnum) -> Dict[str, Any]:
"""Test a specific operation type."""
print(f"\n{'='*80}")
print(f"TESTING OPERATION: {operationType.value}")
print(f"{'='*80}")
startTime = asyncio.get_event_loop().time()
# Get test prompt for this operation
testConfig = self.testPrompts.get(operationType, {})
if not testConfig:
result = {
"operationType": operationType.value,
"status": "ERROR",
"error": "No test configuration found for this operation type",
"processingTime": 0.0
}
self.testResults.append(result)
return result
print(f"Prompt: {testConfig.get('aiPrompt', 'N/A')}")
print(f"Result Type: {testConfig.get('resultType', 'txt')}")
try:
# Prepare parameters
parameters = {
"aiPrompt": testConfig.get("aiPrompt"),
"resultType": testConfig.get("resultType", "txt")
}
# Add document list if provided
if "documentList" in testConfig and testConfig["documentList"]:
parameters["documentList"] = testConfig["documentList"]
# Ensure workflow is still set in both self.services AND methodAi.services
if not self.services.currentWorkflow or (hasattr(self, 'methodAi') and hasattr(self.methodAi, 'services') and not self.methodAi.services.currentWorkflow):
print(f"⚠️ Warning: Workflow is None, trying to re-set it...")
import time
import uuid
currentTimestamp = time.time()
testWorkflow = ChatWorkflow(
id=str(uuid.uuid4()),
name="Test Workflow",
status="running",
startedAt=currentTimestamp,
lastActivity=currentTimestamp,
currentRound=1,
currentTask=0,
currentAction=0,
totalTasks=0,
totalActions=0,
mandateId="test_mandate",
messageIds=[],
workflowMode="React",
maxSteps=5
)
self.services.currentWorkflow = testWorkflow
# Also set in methodAi.services if it exists
if hasattr(self, 'methodAi') and hasattr(self.methodAi, 'services'):
self.methodAi.services.currentWorkflow = testWorkflow
# Call methodAi.process()
print(f"Calling methodAi.process()...")
print(f"Debug: Current workflow ID before call: {self.services.currentWorkflow.id if self.services.currentWorkflow else 'None'}")
print(f"Debug: methodAi.services.currentWorkflow: {self.methodAi.services.currentWorkflow.id if hasattr(self.methodAi, 'services') and self.methodAi.services.currentWorkflow else 'None/NotSet'}")
print(f"Debug: Is same services object? {self.services is self.methodAi.services}")
print(f"Debug: services id: {id(self.services)}")
print(f"Debug: methodAi.services id: {id(self.methodAi.services)}")
# Final safety check: ensure methodAi.services has the workflow
if hasattr(self.methodAi, 'services') and not self.methodAi.services.currentWorkflow:
print(f"⚠️ Fixing: Setting workflow in methodAi.services...")
self.methodAi.services.currentWorkflow = self.services.currentWorkflow
actionResult = await self.methodAi.process(parameters)
endTime = asyncio.get_event_loop().time()
processingTime = endTime - startTime
# Analyze result
result = {
"operationType": operationType.value,
"status": "SUCCESS" if actionResult.success else "ERROR",
"processingTime": round(processingTime, 2),
"hasDocuments": len(actionResult.documents) > 0 if actionResult.documents else False,
"documentCount": len(actionResult.documents) if actionResult.documents else 0,
"error": actionResult.error if not actionResult.success else None
}
# Extract document information
if actionResult.documents:
doc = actionResult.documents[0]
result["documentName"] = doc.documentName
result["mimeType"] = doc.mimeType
result["dataSize"] = len(doc.documentData) if doc.documentData else 0
result["dataPreview"] = str(doc.documentData)[:200] + "..." if len(str(doc.documentData)) > 200 else str(doc.documentData)
print(f"✅ Status: {result['status']}")
print(f"⏱️ Processing time: {result['processingTime']}s")
print(f"📄 Documents: {result.get('documentCount', 0)}")
if actionResult.success:
if result.get('documentName'):
print(f"📄 Saved: {result['documentName']}")
print(f"📄 MIME type: {result.get('mimeType')}")
print(f"📄 Size: {result.get('dataSize')} bytes")
# Try to decode if it's JSON
if result.get('mimeType') == 'application/json':
try:
import json
jsonData = json.loads(actionResult.documents[0].documentData)
result["isValidJson"] = True
result["jsonKeys"] = list(jsonData.keys()) if isinstance(jsonData, dict) else "Not a dict"
print(f"✅ Valid JSON with keys: {result['jsonKeys']}")
except:
result["isValidJson"] = False
print(f"⚠️ Not valid JSON")
else:
print(f"❌ Error: {result.get('error')}")
self.testResults.append(result)
return result
except Exception as e:
endTime = asyncio.get_event_loop().time()
processingTime = endTime - startTime
result = {
"operationType": operationType.value,
"status": "EXCEPTION",
"processingTime": round(processingTime, 2),
"error": str(e),
"hasDocuments": False
}
print(f"💥 EXCEPTION: {str(e)}")
self.testResults.append(result)
return result
async def testAllOperations(self):
"""Test all operation types."""
print(f"\n{'='*80}")
print("STARTING METHODAI OPERATIONS TESTS - DATA_GENERATE ONLY")
print(f"{'='*80}")
print("Testing DATA_GENERATE operation type...")
# Test only ONE operation type TODO
await self.testOperation(OperationTypeEnum.IMAGE_ANALYSE)
print(f"\n{''*80}")
# Print summary
self.printSummary()
def printSummary(self):
"""Print test summary."""
print(f"\n{'='*80}")
print("TEST SUMMARY")
print(f"{'='*80}")
successfulTests = [r for r in self.testResults if r["status"] == "SUCCESS"]
failedTests = [r for r in self.testResults if r["status"] == "ERROR"]
exceptionTests = [r for r in self.testResults if r["status"] == "EXCEPTION"]
print(f"\nTotal tests: {len(self.testResults)}")
print(f"✅ Successful: {len(successfulTests)}")
print(f"❌ Failed: {len(failedTests)}")
print(f"💥 Exceptions: {len(exceptionTests)}")
if successfulTests:
print(f"\n{''*80}")
print("SUCCESSFUL TESTS")
print(f"{''*80}")
for result in successfulTests:
print(f"{result['operationType']}: {result['processingTime']}s")
if failedTests:
print(f"\n{''*80}")
print("FAILED TESTS")
print(f"{''*80}")
for result in failedTests:
print(f"{result['operationType']}: {result.get('error', 'Unknown error')}")
if exceptionTests:
print(f"\n{''*80}")
print("EXCEPTIONS")
print(f"{''*80}")
for result in exceptionTests:
print(f"💥 {result['operationType']}: {result.get('error', 'Unknown error')}")
# Save results
import json
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
resultsFile = os.path.join(self.modelTestDir, f"method_ai_operations_test_{timestamp}.json")
with open(resultsFile, 'w', encoding='utf-8') as f:
json.dump({
"timestamp": timestamp,
"summary": {
"total": len(self.testResults),
"successful": len(successfulTests),
"failed": len(failedTests),
"exceptions": len(exceptionTests)
},
"results": self.testResults
}, f, indent=2, ensure_ascii=False)
print(f"\n📄 Results saved to: {resultsFile}")
async def main():
"""Run methodAI operations tests."""
tester = MethodAiOperationsTester()
await tester.initialize()
await tester.testAllOperations()
print(f"\n{'='*80}")
print("TESTING COMPLETED")
print(f"{'='*80}")
if __name__ == "__main__":
asyncio.run(main())