# executionState.py # Contains all execution state management logic import logging from typing import List, Optional from modules.datamodels.datamodelChat import TaskStep, ActionResult, Observation logger = logging.getLogger(__name__) class TaskExecutionState: """Manages execution state for a task with retry logic""" def __init__(self, task_step: TaskStep): self.task_step = task_step self.successful_actions: List[ActionResult] = [] # Preserved across retries self.failed_actions: List[ActionResult] = [] # For analysis self.current_action_index = 0 self.retry_count = 0 self.max_retries = 3 # Iterative loop (dynamic mode) self.current_step = 0 self.max_steps = 5 def addSuccessfulAction(self, action_result: ActionResult): """Add a successful action to the state""" self.successful_actions.append(action_result) self.current_action_index += 1 def addFailedAction(self, action_result: ActionResult): """Add a failed action to the state for analysis""" self.failed_actions.append(action_result) self.current_action_index += 1 def canRetry(self) -> bool: """Check if task can be retried""" return self.retry_count < self.max_retries def incrementRetryCount(self): """Increment retry count""" self.retry_count += 1 def getFailurePatterns(self) -> list: """Analyze failure patterns from failed actions""" patterns = [] for action in self.failed_actions: error = action.error.lower() if action.error else '' if "timeout" in error: patterns.append("timeout_issues") elif "document_not_found" in error or "file not found" in error: patterns.append("document_reference_issues") elif "empty_result" in error or "no content" in error: patterns.append("content_extraction_issues") elif "invalid_format" in error or "wrong format" in error: patterns.append("format_issues") elif "permission" in error or "access denied" in error: patterns.append("permission_issues") return list(set(patterns)) def shouldContinue(observation: Optional[Observation], review=None, current_step: int = 0, max_steps: int = 5) -> bool: """Helper to decide if the iterative loop should continue Args: observation: Observation Pydantic model with action execution results review: ReviewResult or dict with review decision (optional) current_step: Current step number in the iteration max_steps: Maximum allowed steps Returns: bool: True if loop should continue, False if should stop Logic: - Stop if max steps reached - Stop if review indicates 'stop' or success criteria are met - Continue if observation indicates failure but allow one more step (caller caps by max_steps) """ try: # Stop if max steps reached if current_step >= max_steps: logger.info(f"Stopping workflow: reached max_steps limit ({current_step} >= {max_steps})") return False # Check review decision (can be ReviewResult model or dict) if review: if hasattr(review, 'status'): # ReviewResult Pydantic model if review.status in ('stop', 'success'): return False elif isinstance(review, dict): # Legacy dict format decision = review.get('decision') or review.get('status') if decision in ('stop', 'success'): return False # Check observation: if hard failure with no documents, allow one more step # The caller will enforce max_steps limit if observation: if observation.success is False and observation.documentsCount == 0: # Allow next step once; the caller caps by max_steps return True return True except Exception as e: logger.warning(f"Error in shouldContinue: {e}") return False