""" Task Manager Module for managing task states and transitions. """ import logging from typing import Dict, Any, List, Optional from datetime import datetime, UTC import uuid from modules.interfaces.serviceChatModel import Task, ChatLog, ChatMessage logger = logging.getLogger(__name__) class TaskManager: """Manager for task state management and transitions.""" _instance = None @classmethod def getInstance(cls): """Return a singleton instance of the task manager.""" if cls._instance is None: cls._instance = cls() return cls._instance def __init__(self): """Initialize the task manager.""" if TaskManager._instance is not None: raise RuntimeError("Singleton instance already exists - use getInstance()") self.service = None def initialize(self, service=None): """Initialize or update the manager with service references.""" if service: # Validate required interfaces required_interfaces = ['base', 'msft', 'google'] missing_interfaces = [] for interface in required_interfaces: if not hasattr(service, interface): missing_interfaces.append(interface) if missing_interfaces: logger.warning(f"Service container missing required interfaces: {', '.join(missing_interfaces)}") return False self.service = service return True def createTask(self, workflowId: str, agentName: str, prompt: str, userLanguage: str, filesInput: List[str] = None, filesOutput: List[str] = None) -> Task: """ Create a new task. Args: workflowId: ID of the workflow this task belongs to agentName: Name of the agent to execute the task prompt: Task prompt userLanguage: User's preferred language filesInput: List of input files filesOutput: List of output files Returns: New Task object """ return Task( id=str(uuid.uuid4()), workflowId=workflowId, agentName=agentName, status="pending", progress=0.0, prompt=prompt, userLanguage=userLanguage, filesInput=filesInput or [], filesOutput=filesOutput or [], startedAt=datetime.now(UTC).isoformat() ) def updateTaskStatus(self, task: Task, newStatus: str, progress: float = None, error: str = None, result: ChatMessage = None) -> Task: """ Update task status and related fields. Args: task: Task to update newStatus: New status value progress: Optional progress value error: Optional error message result: Optional result message Returns: Updated Task object """ # Validate status transition valid_transitions = { "pending": ["running", "failed"], "running": ["completed", "failed"], "completed": [], "failed": [] } if newStatus not in valid_transitions.get(task.status, []): logger.warning(f"Invalid status transition from {task.status} to {newStatus}") return task # Update task fields task.status = newStatus if progress is not None: task.progress = progress if error is not None: task.error = error if result is not None: task.result = result # Update timestamps if newStatus in ["completed", "failed"]: task.finishedAt = datetime.now(UTC).isoformat() return task def createTaskLog(self, task: Task, message: str, logType: str = "info") -> ChatLog: """ Create a log entry for a task. Args: task: Task to create log for message: Log message logType: Type of log entry Returns: New ChatLog object """ return ChatLog( id=str(uuid.uuid4()), workflowId=task.workflowId, message=message, type=logType, timestamp=datetime.now(UTC).isoformat(), agentName=task.agentName, status=task.status, progress=task.progress ) def updateTaskProgress(self, task: Task, progress: float, message: str = None) -> Task: """ Update task progress and optionally create a log entry. Args: task: Task to update progress: New progress value (0-100) message: Optional progress message Returns: Updated Task object """ # Validate progress value if not 0 <= progress <= 100: logger.warning(f"Invalid progress value: {progress}") return task # Update progress task.progress = progress # Create log entry if message provided if message: log = self.createTaskLog(task, message, "progress") if self.service and hasattr(self.service, 'logAdd'): self.service.logAdd(log) return task def handleTaskError(self, task: Task, error: str) -> Task: """ Handle task error and update task state. Args: task: Task to update error: Error message Returns: Updated Task object """ # Update task status task = self.updateTaskStatus(task, "failed", error=error) # Create error log log = self.createTaskLog(task, f"Task failed: {error}", "error") if self.service and hasattr(self.service, 'logAdd'): self.service.logAdd(log) return task def completeTask(self, task: Task, result: ChatMessage) -> Task: """ Mark task as completed and set result. Args: task: Task to complete result: Result message Returns: Updated Task object """ # Update task status task = self.updateTaskStatus(task, "completed", progress=100.0, result=result) # Create completion log log = self.createTaskLog(task, "Task completed successfully", "info") if self.service and hasattr(self.service, 'logAdd'): self.service.logAdd(log) return task # Singleton factory for the task manager def getTaskManager(): return TaskManager.getInstance()