gateway/test_workflow.py
2025-06-21 01:30:27 +02:00

175 lines
No EOL
6.1 KiB
Python

#!/usr/bin/env python3
"""
Test routine for WorkflowManager.workflowProcess()
"""
import asyncio
import logging
import sys
import os
from datetime import datetime, UTC
import uuid
# Set up test configuration
os.environ['POWERON_CONFIG_FILE'] = 'test_config.ini'
# Simple imports from modules (same as app.py)
from modules.interfaces.interfaceAppObjects import User, UserConnection
from modules.interfaces.interfaceChatObjects import ChatObjects
from modules.interfaces.interfaceChatModel import UserInputRequest, ChatWorkflow
from modules.workflow.managerWorkflow import WorkflowManager
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler('test_workflow.log')
]
)
logger = logging.getLogger(__name__)
def create_test_user() -> User:
"""Create a test user for the workflow"""
# Create test connections for Microsoft services
connections = [
UserConnection(
id="conn-001",
authority="microsoft",
name="Test Microsoft Account",
enabled=True,
accessToken="test-token-123",
refreshToken="test-refresh-456",
expiresAt=datetime.now(UTC).isoformat(),
scopes=["Files.ReadWrite", "Mail.ReadWrite", "Sites.ReadWrite.All"]
)
]
return User(
id="test-user-001",
mandateId="test-mandate-001",
username="testuser",
email="test@example.com",
fullName="Test User",
enabled=True,
language="en",
privilege="user",
authenticationAuthority="local",
connections=connections
)
def create_test_workflow() -> ChatWorkflow:
"""Create a test workflow"""
return ChatWorkflow(
id="test-workflow-001",
mandateId="test-mandate-001",
status="running",
name="Business Intelligence Analysis Workflow",
currentRound=1,
lastActivity=datetime.now(UTC).isoformat(),
startedAt=datetime.now(UTC).isoformat(),
logs=[],
messages=[],
stats=None,
tasks=[]
)
def create_test_user_input() -> UserInputRequest:
"""Create test user input with a meaningful business intelligence task"""
return UserInputRequest(
prompt="""Please analyze the quarterly sales data and create a comprehensive business intelligence report.
The task involves:
1. Extract and analyze sales data from the provided Excel files
2. Identify key trends, patterns, and anomalies in the data
3. Create visualizations (charts and graphs) to illustrate findings
4. Generate a professional PowerPoint presentation summarizing the analysis
5. Create a detailed markdown report with actionable insights
6. Search for industry benchmarks and best practices to compare our performance
7. Store the final reports in SharePoint for team access
Please ensure the analysis includes:
- Sales performance by region and product category
- Month-over-month growth trends
- Customer segmentation analysis
- Revenue forecasting for the next quarter
- Recommendations for improving sales performance
The output should be suitable for executive review and include both high-level summaries and detailed technical analysis.""",
listFileId=["sales_data_q1.xlsx", "sales_data_q2.xlsx", "customer_data.csv"],
userLanguage="en"
)
async def test_workflow_process():
"""Test the workflowProcess function"""
try:
logger.info("Starting workflow process test...")
# Create test data
test_user = create_test_user()
test_workflow = create_test_workflow()
test_user_input = create_test_user_input()
logger.info(f"Test user: {test_user.username}")
logger.info(f"Test workflow: {test_workflow.id}")
logger.info(f"Test input prompt: {test_user_input.prompt[:100]}...")
logger.info(f"Test files: {test_user_input.listFileId}")
# Initialize ChatObjects interface
chat_interface = ChatObjects(test_user)
logger.info("ChatObjects interface initialized")
# Initialize WorkflowManager
workflow_manager = WorkflowManager(chat_interface, test_user)
logger.info("WorkflowManager initialized")
# Test the workflowProcess function
logger.info("Calling workflowProcess...")
task = await workflow_manager.workflowProcess(test_user_input, test_workflow)
# Log results
if task:
logger.info("✅ Task created successfully!")
logger.info(f"Task ID: {task.id}")
logger.info(f"Task Status: {task.status}")
logger.info(f"Task Feedback: {task.feedback}")
logger.info(f"Number of actions: {len(task.actionList) if task.actionList else 0}")
if task.actionList:
for i, action in enumerate(task.actionList):
logger.info(f"Action {i+1}: {action.execMethod}.{action.execAction}")
logger.info(f" Parameters: {action.execParameters}")
else:
logger.warning("⚠️ No task was created")
logger.info("Test completed successfully!")
return task
except Exception as e:
logger.error(f"❌ Test failed with error: {str(e)}")
logger.exception("Full traceback:")
raise
async def main():
"""Main function to run the test"""
logger.info("=" * 50)
logger.info("BUSINESS INTELLIGENCE WORKFLOW TEST")
logger.info("=" * 50)
try:
task = await test_workflow_process()
logger.info("=" * 50)
logger.info("TEST COMPLETED SUCCESSFULLY")
logger.info("=" * 50)
return task
except Exception as e:
logger.error("=" * 50)
logger.error("TEST FAILED")
logger.error("=" * 50)
raise
if __name__ == "__main__":
# Run the test
asyncio.run(main())