367 lines
15 KiB
Python
367 lines
15 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Test script for methodAi operations.
|
|
Tests all OperationType's with various prompts through the workflow action interface.
|
|
"""
|
|
|
|
import asyncio
|
|
import sys
|
|
import os
|
|
from datetime import datetime
|
|
from typing import Dict, Any, List
|
|
|
|
# Add the gateway to path
|
|
sys.path.append(os.path.dirname(__file__))
|
|
|
|
from modules.datamodels.datamodelAi import OperationTypeEnum
|
|
from modules.datamodels.datamodelChat import ChatWorkflow, ChatDocument
|
|
from modules.datamodels.datamodelUam import User
|
|
|
|
|
|
class MethodAiOperationsTester:
|
|
"""Test all operation types through methodAi.process() action."""
|
|
|
|
def __init__(self):
|
|
# Use root user for testing (has full access to everything)
|
|
from modules.interfaces.interfaceDbAppObjects import getRootInterface
|
|
rootInterface = getRootInterface()
|
|
self.testUser = rootInterface.currentUser
|
|
|
|
self.services = None
|
|
self.methodAi = None
|
|
self.testResults = []
|
|
|
|
# Create logs directory if it doesn't exist
|
|
self.logsDir = os.path.join(os.path.dirname(__file__), "..", "local", "logs")
|
|
os.makedirs(self.logsDir, exist_ok=True)
|
|
|
|
# Create modeltest subdirectory
|
|
self.modelTestDir = os.path.join(self.logsDir, "modeltest")
|
|
os.makedirs(self.modelTestDir, exist_ok=True)
|
|
|
|
# Test prompts for each operation type
|
|
self.testPrompts = {
|
|
OperationTypeEnum.PLAN: {
|
|
"aiPrompt": "Create a 5-step plan to organize a project meeting and include the manual for the project management office.",
|
|
"resultType": "json"
|
|
},
|
|
OperationTypeEnum.DATA_ANALYSE: {
|
|
"aiPrompt": "Analyze the following text and extract the main topics and key points: 'Machine learning is transforming healthcare by enabling early disease detection through pattern recognition in medical images.'",
|
|
"resultType": "json"
|
|
},
|
|
OperationTypeEnum.DATA_GENERATE: {
|
|
# "aiPrompt": "Generate the first 1000 prime numbers.",
|
|
"aiPrompt": "Schreibe einen Bericht über die Verhaltensweisen in Finnalnd's Saunas in 50 Kapiteln",
|
|
"resultType": "docx"
|
|
},
|
|
OperationTypeEnum.DATA_EXTRACT: {
|
|
"aiPrompt": "Extract all email addresses and phone numbers from the following text: 'Contact us at support@example.com or call 123-456-7890. For sales, email sales@example.com or call 987-654-3210.'",
|
|
"resultType": "json"
|
|
},
|
|
OperationTypeEnum.IMAGE_ANALYSE: {
|
|
"aiPrompt": "Analyze this image and describe what you see, including any text or numbers visible.",
|
|
"resultType": "json",
|
|
# documentList should contain document references resolvable by workflow service
|
|
# For testing, leave empty if no test image is available
|
|
"documentList": []
|
|
},
|
|
OperationTypeEnum.IMAGE_GENERATE: {
|
|
"aiPrompt": "A beautiful sunset over the ocean with purple and orange hues",
|
|
"resultType": "png"
|
|
},
|
|
OperationTypeEnum.WEB_SEARCH: {
|
|
"aiPrompt": "Find recent articles about ValueOn AG in Switzeerland in 2025",
|
|
"resultType": "json"
|
|
},
|
|
OperationTypeEnum.WEB_CRAWL: {
|
|
"aiPrompt": "Extract who works in this company",
|
|
"resultType": "json",
|
|
"documentList": ["https://www.valueon.com"]
|
|
}
|
|
}
|
|
|
|
async def initialize(self):
|
|
"""Initialize services and methodAi."""
|
|
print("🔧 Initializing services...")
|
|
|
|
# Set logging level to DEBUG to see debug messages
|
|
import logging
|
|
logging.getLogger().setLevel(logging.DEBUG)
|
|
|
|
# Import and initialize services - use the same approach as routeChatPlayground
|
|
import modules.interfaces.interfaceDbChatObjects as interfaceDbChatObjects
|
|
interfaceDbChat = interfaceDbChatObjects.getInterface(self.testUser)
|
|
|
|
# Import and initialize services
|
|
from modules.features.chatPlayground.mainChatPlayground import getServices
|
|
|
|
# Get services first
|
|
self.services = getServices(self.testUser, None)
|
|
|
|
# Now create AND SAVE workflow in database using the interface
|
|
import uuid
|
|
import time
|
|
currentTimestamp = time.time()
|
|
|
|
testWorkflow = ChatWorkflow(
|
|
id=str(uuid.uuid4()),
|
|
name="Test Workflow",
|
|
status="running",
|
|
startedAt=currentTimestamp,
|
|
lastActivity=currentTimestamp,
|
|
currentRound=1,
|
|
currentTask=0,
|
|
currentAction=0,
|
|
totalTasks=0,
|
|
totalActions=0,
|
|
mandateId=self.testUser.mandateId,
|
|
messageIds=[],
|
|
workflowMode="React",
|
|
maxSteps=5
|
|
)
|
|
|
|
# SAVE workflow to database so it exists for access control
|
|
# Convert ChatWorkflow to dict for createWorkflow
|
|
workflowDict = testWorkflow.model_dump()
|
|
interfaceDbChat.createWorkflow(workflowDict)
|
|
|
|
# Set the workflow in services
|
|
self.services.currentWorkflow = testWorkflow
|
|
|
|
# Debug: Print workflow status
|
|
print(f"Debug: services.currentWorkflow is set: {hasattr(self.services, 'currentWorkflow') and self.services.currentWorkflow is not None}")
|
|
if self.services.currentWorkflow:
|
|
print(f"Debug: Workflow ID: {self.services.currentWorkflow.id}")
|
|
|
|
# Import and initialize methodAi AFTER setting workflow
|
|
from modules.workflows.methods.methodAi import MethodAi
|
|
self.methodAi = MethodAi(self.services)
|
|
|
|
# Verify methodAi has access to the workflow
|
|
if hasattr(self.methodAi, 'services'):
|
|
print(f"Debug: methodAi.services.currentWorkflow is set: {hasattr(self.methodAi.services, 'currentWorkflow') and self.methodAi.services.currentWorkflow is not None}")
|
|
|
|
print("✅ Services initialized")
|
|
print(f"📁 Results will be saved to: {self.modelTestDir}")
|
|
|
|
async def testOperation(self, operationType: OperationTypeEnum) -> Dict[str, Any]:
|
|
"""Test a specific operation type."""
|
|
print(f"\n{'='*80}")
|
|
print(f"TESTING OPERATION: {operationType.value}")
|
|
print(f"{'='*80}")
|
|
|
|
startTime = asyncio.get_event_loop().time()
|
|
|
|
# Get test prompt for this operation
|
|
testConfig = self.testPrompts.get(operationType, {})
|
|
|
|
if not testConfig:
|
|
result = {
|
|
"operationType": operationType.value,
|
|
"status": "ERROR",
|
|
"error": "No test configuration found for this operation type",
|
|
"processingTime": 0.0
|
|
}
|
|
self.testResults.append(result)
|
|
return result
|
|
|
|
print(f"Prompt: {testConfig.get('aiPrompt', 'N/A')}")
|
|
print(f"Result Type: {testConfig.get('resultType', 'txt')}")
|
|
|
|
try:
|
|
# Prepare parameters
|
|
parameters = {
|
|
"aiPrompt": testConfig.get("aiPrompt"),
|
|
"resultType": testConfig.get("resultType", "txt")
|
|
}
|
|
|
|
# Add document list if provided
|
|
if "documentList" in testConfig and testConfig["documentList"]:
|
|
parameters["documentList"] = testConfig["documentList"]
|
|
|
|
# Ensure workflow is still set in both self.services AND methodAi.services
|
|
if not self.services.currentWorkflow or (hasattr(self, 'methodAi') and hasattr(self.methodAi, 'services') and not self.methodAi.services.currentWorkflow):
|
|
print(f"⚠️ Warning: Workflow is None, trying to re-set it...")
|
|
import time
|
|
import uuid
|
|
currentTimestamp = time.time()
|
|
testWorkflow = ChatWorkflow(
|
|
id=str(uuid.uuid4()),
|
|
name="Test Workflow",
|
|
status="running",
|
|
startedAt=currentTimestamp,
|
|
lastActivity=currentTimestamp,
|
|
currentRound=1,
|
|
currentTask=0,
|
|
currentAction=0,
|
|
totalTasks=0,
|
|
totalActions=0,
|
|
mandateId="test_mandate",
|
|
messageIds=[],
|
|
workflowMode="React",
|
|
maxSteps=5
|
|
)
|
|
self.services.currentWorkflow = testWorkflow
|
|
# Also set in methodAi.services if it exists
|
|
if hasattr(self, 'methodAi') and hasattr(self.methodAi, 'services'):
|
|
self.methodAi.services.currentWorkflow = testWorkflow
|
|
|
|
# Call methodAi.process()
|
|
print(f"Calling methodAi.process()...")
|
|
print(f"Debug: Current workflow ID before call: {self.services.currentWorkflow.id if self.services.currentWorkflow else 'None'}")
|
|
print(f"Debug: methodAi.services.currentWorkflow: {self.methodAi.services.currentWorkflow.id if hasattr(self.methodAi, 'services') and self.methodAi.services.currentWorkflow else 'None/NotSet'}")
|
|
print(f"Debug: Is same services object? {self.services is self.methodAi.services}")
|
|
print(f"Debug: services id: {id(self.services)}")
|
|
print(f"Debug: methodAi.services id: {id(self.methodAi.services)}")
|
|
|
|
actionResult = await self.methodAi.process(parameters)
|
|
|
|
endTime = asyncio.get_event_loop().time()
|
|
processingTime = endTime - startTime
|
|
|
|
# Analyze result
|
|
result = {
|
|
"operationType": operationType.value,
|
|
"status": "SUCCESS" if actionResult.success else "ERROR",
|
|
"processingTime": round(processingTime, 2),
|
|
"hasDocuments": len(actionResult.documents) > 0 if actionResult.documents else False,
|
|
"documentCount": len(actionResult.documents) if actionResult.documents else 0,
|
|
"error": actionResult.error if not actionResult.success else None
|
|
}
|
|
|
|
# Extract document information
|
|
if actionResult.documents:
|
|
doc = actionResult.documents[0]
|
|
result["documentName"] = doc.documentName
|
|
result["mimeType"] = doc.mimeType
|
|
result["dataSize"] = len(doc.documentData) if doc.documentData else 0
|
|
result["dataPreview"] = str(doc.documentData)[:200] + "..." if len(str(doc.documentData)) > 200 else str(doc.documentData)
|
|
|
|
print(f"✅ Status: {result['status']}")
|
|
print(f"⏱️ Processing time: {result['processingTime']}s")
|
|
print(f"📄 Documents: {result.get('documentCount', 0)}")
|
|
|
|
if actionResult.success:
|
|
if result.get('documentName'):
|
|
print(f"📄 Saved: {result['documentName']}")
|
|
print(f"📄 MIME type: {result.get('mimeType')}")
|
|
print(f"📄 Size: {result.get('dataSize')} bytes")
|
|
|
|
# Try to decode if it's JSON
|
|
if result.get('mimeType') == 'application/json':
|
|
try:
|
|
import json
|
|
jsonData = json.loads(actionResult.documents[0].documentData)
|
|
result["isValidJson"] = True
|
|
result["jsonKeys"] = list(jsonData.keys()) if isinstance(jsonData, dict) else "Not a dict"
|
|
print(f"✅ Valid JSON with keys: {result['jsonKeys']}")
|
|
except:
|
|
result["isValidJson"] = False
|
|
print(f"⚠️ Not valid JSON")
|
|
else:
|
|
print(f"❌ Error: {result.get('error')}")
|
|
|
|
self.testResults.append(result)
|
|
return result
|
|
|
|
except Exception as e:
|
|
endTime = asyncio.get_event_loop().time()
|
|
processingTime = endTime - startTime
|
|
|
|
result = {
|
|
"operationType": operationType.value,
|
|
"status": "EXCEPTION",
|
|
"processingTime": round(processingTime, 2),
|
|
"error": str(e),
|
|
"hasDocuments": False
|
|
}
|
|
|
|
print(f"💥 EXCEPTION: {str(e)}")
|
|
self.testResults.append(result)
|
|
return result
|
|
|
|
async def testAllOperations(self):
|
|
"""Test all operation types."""
|
|
print(f"\n{'='*80}")
|
|
print("STARTING METHODAI OPERATIONS TESTS - DATA_GENERATE ONLY")
|
|
print(f"{'='*80}")
|
|
print("Testing DATA_GENERATE operation type...")
|
|
|
|
# Test only ONE operation type TODO
|
|
await self.testOperation(OperationTypeEnum.IMAGE_ANALYSE)
|
|
print(f"\n{'─'*80}")
|
|
|
|
# Print summary
|
|
self.printSummary()
|
|
|
|
def printSummary(self):
|
|
"""Print test summary."""
|
|
print(f"\n{'='*80}")
|
|
print("TEST SUMMARY")
|
|
print(f"{'='*80}")
|
|
|
|
successfulTests = [r for r in self.testResults if r["status"] == "SUCCESS"]
|
|
failedTests = [r for r in self.testResults if r["status"] == "ERROR"]
|
|
exceptionTests = [r for r in self.testResults if r["status"] == "EXCEPTION"]
|
|
|
|
print(f"\nTotal tests: {len(self.testResults)}")
|
|
print(f"✅ Successful: {len(successfulTests)}")
|
|
print(f"❌ Failed: {len(failedTests)}")
|
|
print(f"💥 Exceptions: {len(exceptionTests)}")
|
|
|
|
if successfulTests:
|
|
print(f"\n{'─'*80}")
|
|
print("SUCCESSFUL TESTS")
|
|
print(f"{'─'*80}")
|
|
for result in successfulTests:
|
|
print(f"✅ {result['operationType']}: {result['processingTime']}s")
|
|
|
|
if failedTests:
|
|
print(f"\n{'─'*80}")
|
|
print("FAILED TESTS")
|
|
print(f"{'─'*80}")
|
|
for result in failedTests:
|
|
print(f"❌ {result['operationType']}: {result.get('error', 'Unknown error')}")
|
|
|
|
if exceptionTests:
|
|
print(f"\n{'─'*80}")
|
|
print("EXCEPTIONS")
|
|
print(f"{'─'*80}")
|
|
for result in exceptionTests:
|
|
print(f"💥 {result['operationType']}: {result.get('error', 'Unknown error')}")
|
|
|
|
# Save results
|
|
import json
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
resultsFile = os.path.join(self.modelTestDir, f"method_ai_operations_test_{timestamp}.json")
|
|
|
|
with open(resultsFile, 'w', encoding='utf-8') as f:
|
|
json.dump({
|
|
"timestamp": timestamp,
|
|
"summary": {
|
|
"total": len(self.testResults),
|
|
"successful": len(successfulTests),
|
|
"failed": len(failedTests),
|
|
"exceptions": len(exceptionTests)
|
|
},
|
|
"results": self.testResults
|
|
}, f, indent=2, ensure_ascii=False)
|
|
|
|
print(f"\n📄 Results saved to: {resultsFile}")
|
|
|
|
|
|
async def main():
|
|
"""Run methodAI operations tests."""
|
|
tester = MethodAiOperationsTester()
|
|
|
|
await tester.initialize()
|
|
await tester.testAllOperations()
|
|
|
|
print(f"\n{'='*80}")
|
|
print("TESTING COMPLETED")
|
|
print(f"{'='*80}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|
|
|