gateway/gwserver/modules/agentservice_workflow_manager.py
2025-04-16 10:49:27 +02:00

689 lines
No EOL
26 KiB
Python

"""
Refactored WorkflowManager class for the Agentservice (continued).
"""
import os
import logging
import asyncio
import uuid
from datetime import datetime
from typing import List, Dict, Any, Optional, Tuple, Union
logger = logging.getLogger(__name__)
class WorkflowManager:
def __init__(self, mandate_id: int = None, user_id: int = None, ai_service = None, lucydom_interface = None):
"""Initialize the WorkflowManager."""
self.mandate_id = mandate_id
self.user_id = user_id
self.ai_service = ai_service
self.lucydom_interface = lucydom_interface
# Cache for workflows
self.workflows = {}
# Directory for results
self.results_dir = os.path.join("results", "workflows")
os.makedirs(self.results_dir, exist_ok=True)
# Initialize document handler
from modules.agentservice_document_handler import get_document_handler
self.document_handler = get_document_handler(
lucydom_interface=lucydom_interface,
ai_service=ai_service
)
# Initialize agent registry with dependencies
from modules.agentservice_registry import AgentRegistry
registry = AgentRegistry.get_instance()
registry.set_dependencies(
ai_service=ai_service,
document_handler=self.document_handler,
lucydom_interface=lucydom_interface
)
async def list_workflows(self, mandate_id: int = None, user_id: int = None) -> List[Dict[str, Any]]:
"""
List all available workflows.
Args:
mandate_id: Optional mandate ID for filtering
user_id: Optional user ID for filtering
Returns:
List of workflow summaries
"""
workflows = []
# Load from database if available
if self.lucydom_interface:
try:
# Get all workflows for the user
if user_id is not None:
user_workflows = self.lucydom_interface.get_workflows_by_user(user_id)
else:
user_workflows = self.lucydom_interface.get_all_workflows()
# Filter by mandate if specified
if mandate_id is not None:
user_workflows = [wf for wf in user_workflows if wf.get("mandate_id") == mandate_id]
# Create workflow summaries
for workflow in user_workflows:
summary = {
"id": workflow.get("id"),
"name": workflow.get("name", f"Workflow {workflow.get('id')}"),
"status": workflow.get("status"),
"started_at": workflow.get("started_at"),
"last_activity": workflow.get("last_activity"),
"completed_at": workflow.get("completed_at")
}
# Add message count if available
messages = self.lucydom_interface.get_workflow_messages(workflow.get("id"))
if messages:
summary["message_count"] = len(messages)
workflows.append(summary)
logger.info(f"Loaded {len(workflows)} workflows from database")
# Sort by last activity (newest first)
return sorted(workflows, key=lambda w: w.get("last_activity", ""), reverse=True)
except Exception as e:
logger.error(f"Error retrieving workflows from database: {str(e)}")
# Load from files if no database or error occurred
try:
for filename in os.listdir(self.results_dir):
if filename.startswith("workflow_") and filename.endswith(".json"):
workflow_path = os.path.join(self.results_dir, filename)
try:
import json
with open(workflow_path, 'r', encoding='utf-8') as f:
workflow = json.load(f)
# Check if mandate and user ID match filters
if mandate_id is not None and workflow.get("mandate_id") != mandate_id:
continue
if user_id is not None and workflow.get("user_id") != user_id:
continue
# Create workflow summary
summary = {
"id": workflow.get("id"),
"name": workflow.get("name", f"Workflow {workflow.get('id')}"),
"status": workflow.get("status"),
"started_at": workflow.get("started_at"),
"last_activity": workflow.get("last_activity"),
"message_count": len(workflow.get("messages", []))
}
workflows.append(summary)
except Exception as e:
logger.error(f"Error loading workflow file {filename}: {str(e)}")
logger.info(f"Loaded {len(workflows)} workflows from files")
# Sort by last activity (newest first)
return sorted(workflows, key=lambda w: w.get("last_activity", ""), reverse=True)
except Exception as e:
logger.error(f"Error listing workflows: {str(e)}")
return []
async def execute_workflow(self, message: Dict[str, Any], files: List[Dict[str, Any]] = None, workflow_id: str = None, is_user_input: bool = False) -> Dict[str, Any]:
"""
Execute a workflow with the given message and files.
Args:
message: Input message (prompt)
files: Optional list of file metadata
workflow_id: Optional ID for continuing an existing workflow
is_user_input: Flag indicating if this is user input to an existing workflow
Returns:
Workflow execution result
"""
# Use provided workflow_id or generate a new one for a new workflow
if not workflow_id:
workflow_id = f"wf_{uuid.uuid4()}"
# Initialize a new workflow
workflow = self._initialize_workflow(workflow_id)
else:
# Load existing workflow for continuation
workflow = await self.load_workflow(workflow_id)
if not workflow:
# Fallback: initialize a new workflow with the provided ID
workflow = self._initialize_workflow(workflow_id)
# Capture start time
start_time = datetime.now()
try:
# Create WorkflowExecution with document handler
from modules.agentservice_workflow_execution import WorkflowExecution
execution = WorkflowExecution(
workflow_manager=self,
workflow_id=workflow_id,
mandate_id=self.mandate_id,
user_id=self.user_id,
ai_service=self.ai_service,
lucydom_interface=self.lucydom_interface
)
# Set the document handler's workflow ID
self.document_handler.set_workflow_id(workflow_id)
# Execute the workflow
result = await execution.execute(message, workflow, files, is_user_input)
# Calculate duration
duration = (datetime.now() - start_time).total_seconds()
# Update workflow stats
if "data_stats" not in workflow:
workflow["data_stats"] = {
"total_processing_time": 0.0,
"total_token_count": 0,
"total_bytes_sent": 0,
"total_bytes_received": 0
}
workflow["data_stats"]["total_processing_time"] = duration
workflow["completed_at"] = datetime.now().isoformat()
# Save final state
self._save_workflow(workflow)
return result
except Exception as e:
logger.error(f"Error executing workflow: {str(e)}", exc_info=True)
# Update workflow status
workflow["status"] = "failed"
workflow["last_activity"] = datetime.now().isoformat()
self._add_log(workflow, f"Workflow execution failed: {str(e)}", "error")
# Save failed state
self._save_workflow(workflow)
return {
"workflow_id": workflow_id,
"status": "failed",
"error": str(e)
}
def _save_workflow(self, workflow: Dict[str, Any]) -> bool:
"""
Save workflow state to database and/or file.
Enhanced to handle structured documents.
Args:
workflow: The workflow object to save
Returns:
True if saved successfully, False otherwise
"""
try:
workflow_id = workflow.get("id")
# Update in-memory cache
self.workflows[workflow_id] = workflow
# Update in database if available
if self.lucydom_interface:
# NEW: Enhanced document handling for database persistence
# Create a copy of the workflow for database storage
db_workflow = workflow.copy()
# Save to database
try:
self.lucydom_interface.save_workflow_state(db_workflow)
logger.info(f"Workflow {workflow_id} saved to database")
except Exception as db_error:
logger.error(f"Error saving workflow to database: {str(db_error)}")
# Continue to file saving even if database fails
# Save to file (always do this as backup)
import json
workflow_path = os.path.join(self.results_dir, f"workflow_{workflow_id}.json")
with open(workflow_path, 'w', encoding='utf-8') as f:
json.dump(workflow, f, indent=2, ensure_ascii=False)
logger.info(f"Workflow {workflow_id} saved to file: {workflow_path}")
return True
except Exception as e:
logger.error(f"Error saving workflow state: {str(e)}")
return False
async def load_workflow(self, workflow_id: str) -> Optional[Dict[str, Any]]:
"""
Load a workflow by ID.
Enhanced to ensure document handler is properly configured.
Args:
workflow_id: ID of the workflow to load
Returns:
The workflow object or None if not found
"""
# Check memory cache first
if workflow_id in self.workflows:
workflow = self.workflows[workflow_id]
# NEW: Configure document handler for this workflow
self.document_handler.set_workflow_id(workflow_id)
return workflow
# Try to load from database
if self.lucydom_interface:
try:
workflow = self.lucydom_interface.load_workflow_state(workflow_id)
if workflow:
# Cache in memory
self.workflows[workflow_id] = workflow
# NEW: Configure document handler for this workflow
self.document_handler.set_workflow_id(workflow_id)
logger.info(f"Workflow {workflow_id} loaded from database")
return workflow
except Exception as e:
logger.error(f"Error loading workflow from database: {str(e)}")
# Try to load from file
workflow_path = os.path.join(self.results_dir, f"workflow_{workflow_id}.json")
if os.path.exists(workflow_path):
try:
import json
with open(workflow_path, 'r', encoding='utf-8') as f:
workflow = json.load(f)
# Cache in memory
self.workflows[workflow_id] = workflow
# NEW: Configure document handler for this workflow
self.document_handler.set_workflow_id(workflow_id)
logger.info(f"Workflow {workflow_id} loaded from file: {workflow_path}")
return workflow
except Exception as e:
logger.error(f"Error loading workflow from file: {str(e)}")
logger.warning(f"Workflow {workflow_id} not found")
return None
async def delete_workflow(self, workflow_id: str) -> bool:
"""
Delete a workflow.
Args:
workflow_id: ID of the workflow
Returns:
True on success, False if workflow not found
"""
# Remove from memory
if workflow_id in self.workflows:
del self.workflows[workflow_id]
# Delete from database
if self.lucydom_interface:
try:
db_success = self.lucydom_interface.delete_workflow(workflow_id)
logger.info(f"Workflow {workflow_id} deleted from database: {db_success}")
except Exception as e:
logger.error(f"Error deleting workflow {workflow_id} from database: {str(e)}")
# Delete file
workflow_path = os.path.join(self.results_dir, f"workflow_{workflow_id}.json")
try:
if os.path.exists(workflow_path):
os.remove(workflow_path)
logger.info(f"Workflow {workflow_id} deleted from file: {workflow_path}")
return True
else:
logger.warning(f"Workflow {workflow_id} not found: {workflow_path}")
return False
except Exception as e:
logger.error(f"Error deleting workflow file {workflow_id}: {str(e)}")
return False
def _initialize_workflow(self, workflow_id: str) -> Dict[str, Any]:
"""
Initialize a new workflow.
Args:
workflow_id: ID of the workflow
Returns:
The initialized workflow object
"""
current_time = datetime.now().isoformat()
# Create complete workflow object according to the data model
workflow = {
"id": workflow_id,
"name": f"Workflow {workflow_id}",
"mandate_id": self.mandate_id,
"user_id": self.user_id,
"status": "running",
"started_at": current_time,
"last_activity": current_time,
"current_round": 1,
# Complete statistics structure according to DataStats model
"data_stats": {
"total_processing_time": 0.0,
"total_token_count": 0,
"total_bytes_sent": 0,
"total_bytes_received": 0
},
# Empty arrays for messages and logs
"messages": [],
"logs": []
}
# Log entry for workflow start
self._add_log(workflow, "Workflow started", "info", "workflow", "Workflow Management")
# Save workflow to database
if self.lucydom_interface:
try:
# Direct save of the complete workflow object
self.lucydom_interface.save_workflow_state(workflow)
logger.info(f"Workflow {workflow_id} created in database")
except Exception as e:
logger.error(f"Error creating workflow {workflow_id} in database: {str(e)}")
# Cache workflow in memory
self.workflows[workflow_id] = workflow
return workflow
async def stop_workflow(self, workflow_id: str) -> bool:
"""
Stop a running workflow.
Args:
workflow_id: ID of the workflow to stop
Returns:
True on success, False if workflow not found or already stopped
"""
try:
workflow = self.workflows.get(workflow_id)
if not workflow:
# Try to load the workflow
workflow = await self.load_workflow(workflow_id)
if not workflow:
return False
# If workflow is not running or completed, abort
if workflow.get("status") not in ["running", "completed"]:
return False
# Set status to stopped
workflow["status"] = "stopped"
workflow["last_activity"] = datetime.now().isoformat()
self._add_log(workflow, "Workflow was manually stopped", "info", "workflow", "Workflow Management")
# Save workflow
self._save_workflow(workflow)
return True
except Exception as e:
logger.error(f"Error stopping workflow {workflow_id}: {str(e)}")
return False
def _add_log(self, workflow: Dict[str, Any], message: str, log_type: str, agent_id: Optional[str] = None, agent_name: Optional[str] = None) -> None:
"""Add a log entry to the workflow."""
# First, check if workflow is a string (ID) instead of dictionary
if isinstance(workflow, str):
# Try to load the workflow by ID
workflow_id = workflow
workflow = self.workflows.get(workflow_id)
if not workflow:
# Just log to the logger and return
logger.info(f"Log (couldn't add to workflow {workflow_id}): {log_type} - {message}")
return
# Check if workflow is a dictionary
if not isinstance(workflow, dict):
logger.error(f"Invalid workflow type: {type(workflow)}. Expected dictionary.")
# Just log to the logger and return
logger.info(f"Log (couldn't add to workflow): {log_type} - {message}")
return
# Create log entry
log_entry = {
"id": f"log_{uuid.uuid4()}",
"message": message,
"type": log_type,
"timestamp": datetime.now().isoformat(),
"agent_id": agent_id,
"agent_name": agent_name
}
# Add log entry to workflow
if "logs" not in workflow:
workflow["logs"] = []
workflow["logs"].append(log_entry)
# Update last activity
workflow["last_activity"] = log_entry["timestamp"]
# Save log entry to database if available
if self.lucydom_interface:
try:
# Add workflow ID to log entry
log_data = log_entry.copy()
log_data["workflow_id"] = workflow["id"]
self.lucydom_interface.create_workflow_log(log_data)
logger.debug(f"Log entry for workflow {workflow['id']} saved to database")
except Exception as e:
logger.error(f"Error saving log entry for workflow {workflow['id']} to database: {str(e)}")
# Also log to standard logger with the category prefix
category_prefix = f"[{agent_name or agent_id or 'Workflow'}]" if agent_name or agent_id else ""
log_message = f"{category_prefix} {message}"
if log_type == "error":
logger.error(log_message)
elif log_type == "warning":
logger.warning(log_message)
else:
logger.info(log_message)
def get_workflow_status(self, workflow_id: str) -> Optional[Dict[str, Any]]:
"""
Get the status of a workflow.
Args:
workflow_id: ID of the workflow
Returns:
Dictionary with status information or None if workflow not found
"""
# Get from memory
workflow = self.workflows.get(workflow_id)
# If not in memory, load from database or file
if not workflow:
# Load from database if available
if self.lucydom_interface:
try:
workflow_data = self.lucydom_interface.get_workflow(workflow_id)
if workflow_data:
workflow = workflow_data
except Exception as e:
logger.error(f"Error loading workflow status from database: {str(e)}")
# If not in database, load from file
if not workflow:
try:
import json
workflow_path = os.path.join(self.results_dir, f"workflow_{workflow_id}.json")
if os.path.exists(workflow_path):
with open(workflow_path, 'r', encoding='utf-8') as f:
workflow = json.load(f)
except Exception as e:
logger.error(f"Error loading workflow status from file: {str(e)}")
return None
if not workflow:
return None
# Extract status information
status_info = {
"id": workflow.get("id"),
"name": workflow.get("name", f"Workflow {workflow_id}"),
"status": workflow.get("status"),
"progress": 1.0 if workflow.get("status") in ["completed", "failed", "stopped"] else 0.5,
"started_at": workflow.get("started_at"),
"last_activity": workflow.get("last_activity"),
"workflow_complete": workflow.get("status") == "completed",
"current_round": workflow.get("current_round", 1),
"data_stats": workflow.get("data_stats", {
"total_processing_time": 0.0,
"total_token_count": 0,
"total_bytes_sent": 0,
"total_bytes_received": 0
})
}
return status_info
def get_workflow_logs(self, workflow_id: str) -> Optional[List[Dict[str, Any]]]:
"""
Get logs for a workflow.
Args:
workflow_id: ID of the workflow
Returns:
List of logs or None if workflow not found
"""
# Get from memory
workflow = self.workflows.get(workflow_id)
# If not in memory, load from database
if not workflow and self.lucydom_interface:
try:
logs = self.lucydom_interface.get_workflow_logs(workflow_id)
return logs
except Exception as e:
logger.error(f"Error loading workflow logs from database: {str(e)}")
# If not in database or no interface available, load from file
if not workflow:
try:
import json
workflow_path = os.path.join(self.results_dir, f"workflow_{workflow_id}.json")
if os.path.exists(workflow_path):
with open(workflow_path, 'r', encoding='utf-8') as f:
workflow = json.load(f)
except Exception as e:
logger.error(f"Error loading workflow logs from file: {str(e)}")
return None
return workflow.get("logs", []) if workflow else None
def get_workflow_messages(self, workflow_id: str) -> Optional[List[Dict[str, Any]]]:
"""
Get messages for a workflow.
Args:
workflow_id: ID of the workflow
Returns:
List of messages or None if workflow not found
"""
# Get from memory
workflow = self.workflows.get(workflow_id)
# If not in memory, load from database
if not workflow and self.lucydom_interface:
try:
messages = self.lucydom_interface.get_workflow_messages(workflow_id)
return messages
except Exception as e:
logger.error(f"Error loading workflow messages from database: {str(e)}")
# If not in database or no interface available, load from file
if not workflow:
try:
import json
workflow_path = os.path.join(self.results_dir, f"workflow_{workflow_id}.json")
if os.path.exists(workflow_path):
with open(workflow_path, 'r', encoding='utf-8') as f:
workflow = json.load(f)
except Exception as e:
logger.error(f"Error loading workflow messages from file: {str(e)}")
return None
return workflow.get("messages", []) if workflow else None
# Factory function for WorkflowManager
def get_workflow_manager(mandate_id: int = None, user_id: int = None, ai_service = None, lucydom_interface = None):
"""
Get a WorkflowManager instance for the specified context.
Reuses existing instances and updates dependencies.
Args:
mandate_id: Mandate ID
user_id: User ID
ai_service: AI service
lucydom_interface: LucyDOM interface
Returns:
WorkflowManager instance
"""
from modules.lucydom_interface import get_lucydom_interface
context_key = f"{mandate_id}_{user_id}"
# Get LucyDOM interface if not provided
if not lucydom_interface:
lucydom_interface = get_lucydom_interface(mandate_id, user_id)
if context_key not in _workflow_managers:
_workflow_managers[context_key] = WorkflowManager(
mandate_id,
user_id,
ai_service,
lucydom_interface
)
# Update services if provided
if ai_service is not None:
_workflow_managers[context_key].ai_service = ai_service
# NEW: Update document handler's AI service
if hasattr(_workflow_managers[context_key], 'document_handler'):
_workflow_managers[context_key].document_handler.set_ai_service(ai_service)
# NEW: Update agent registry dependencies
from modules.agentservice_registry import AgentRegistry
registry = AgentRegistry.get_instance()
registry.set_dependencies(ai_service=ai_service)
return _workflow_managers[context_key]
# Singleton factory for WorkflowManager instances per context
_workflow_managers = {}