810 lines
40 KiB
Python
810 lines
40 KiB
Python
# modeActionplan.py
|
|
# Actionplan mode implementation for workflows
|
|
|
|
import json
|
|
import logging
|
|
import uuid
|
|
from datetime import datetime, timezone
|
|
from typing import List, Dict, Any
|
|
from modules.datamodels.datamodelChat import (
|
|
TaskStep, TaskContext, TaskResult, ActionItem, TaskStatus,
|
|
ActionResult, ReviewResult, ReviewContext
|
|
)
|
|
from modules.datamodels.datamodelChat import ChatWorkflow
|
|
from modules.datamodels.datamodelAi import AiCallOptions, OperationTypeEnum, ProcessingModeEnum, PriorityEnum
|
|
from modules.workflows.processing.modes.modeBase import BaseMode
|
|
from modules.workflows.processing.shared.stateTools import checkWorkflowStopped
|
|
from modules.workflows.processing.shared.executionState import TaskExecutionState
|
|
from modules.workflows.processing.shared.promptGenerationActionsActionplan import (
|
|
generateActionDefinitionPrompt,
|
|
generateResultReviewPrompt
|
|
)
|
|
from modules.workflows.processing.adaptive import IntentAnalyzer, ContentValidator, LearningEngine, ProgressTracker
|
|
from modules.workflows.processing.adaptive.adaptiveLearningEngine import AdaptiveLearningEngine
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class ActionplanMode(BaseMode):
|
|
"""Actionplan mode implementation - batch planning and sequential execution"""
|
|
|
|
def __init__(self, services):
|
|
super().__init__(services)
|
|
# Initialize adaptive components for enhanced validation and learning
|
|
self.intentAnalyzer = IntentAnalyzer(services)
|
|
self.learningEngine = LearningEngine()
|
|
self.adaptiveLearningEngine = AdaptiveLearningEngine()
|
|
self.contentValidator = ContentValidator(services, self.adaptiveLearningEngine)
|
|
self.progressTracker = ProgressTracker()
|
|
self.workflowIntent = None
|
|
self.taskIntent = None
|
|
|
|
async def generateActionItems(self, taskStep: TaskStep, workflow: ChatWorkflow,
|
|
previousResults: List = None, enhancedContext: TaskContext = None) -> List[ActionItem]:
|
|
"""Generate actions for a given task step using batch planning approach"""
|
|
try:
|
|
# Check workflow status before generating actions
|
|
checkWorkflowStopped(self.services)
|
|
|
|
retryInfo = f" (Retry #{enhancedContext.retryCount})" if enhancedContext and enhancedContext.retryCount > 0 else ""
|
|
logger.info(f"Generating actions for task: {taskStep.objective}{retryInfo}")
|
|
|
|
# Log criteria progress if this is a retry
|
|
if enhancedContext and hasattr(enhancedContext, 'criteriaProgress') and enhancedContext.criteriaProgress is not None:
|
|
progress = enhancedContext.criteriaProgress
|
|
logger.info(f"Retry attempt {enhancedContext.retryCount} - Criteria progress:")
|
|
if progress.get('met_criteria'):
|
|
logger.info(f" Met criteria: {', '.join(progress['met_criteria'])}")
|
|
if progress.get('unmet_criteria'):
|
|
logger.warning(f" Unmet criteria: {', '.join(progress['unmet_criteria'])}")
|
|
|
|
# Show improvement trends
|
|
if progress.get('attempt_history'):
|
|
recentAttempts = progress['attempt_history'][-2:] # Last 2 attempts
|
|
if len(recentAttempts) >= 2:
|
|
prevScore = recentAttempts[0].get('quality_score', 0)
|
|
currScore = recentAttempts[1].get('quality_score', 0)
|
|
if currScore > prevScore:
|
|
logger.info(f" Quality improving: {prevScore} -> {currScore}")
|
|
elif currScore < prevScore:
|
|
logger.warning(f" Quality declining: {prevScore} -> {currScore}")
|
|
else:
|
|
logger.info(f" Quality stable: {currScore}")
|
|
|
|
# Enhanced retry context logging
|
|
if enhancedContext and enhancedContext.retryCount > 0:
|
|
logger.info("=== RETRY CONTEXT FOR ACTION GENERATION ===")
|
|
logger.info(f"Retry Count: {enhancedContext.retryCount}")
|
|
logger.debug(f"Previous Improvements: {enhancedContext.improvements}")
|
|
logger.debug(f"Previous Review Result: {enhancedContext.previousReviewResult}")
|
|
logger.debug(f"Failure Patterns: {enhancedContext.failurePatterns}")
|
|
logger.debug(f"Failed Actions: {enhancedContext.failedActions}")
|
|
logger.debug(f"Successful Actions: {enhancedContext.successfulActions}")
|
|
logger.info("=== END RETRY CONTEXT ===")
|
|
|
|
# Log that we're starting action generation
|
|
logger.info("=== STARTING ACTION GENERATION ===")
|
|
|
|
# Create proper context object for action definition
|
|
if enhancedContext and isinstance(enhancedContext, TaskContext):
|
|
# Use existing TaskContext if provided
|
|
actionContext = TaskContext(
|
|
taskStep=enhancedContext.taskStep,
|
|
workflow=enhancedContext.workflow,
|
|
workflowId=enhancedContext.workflowId,
|
|
availableDocuments=enhancedContext.availableDocuments,
|
|
availableConnections=enhancedContext.availableConnections,
|
|
previousResults=enhancedContext.previousResults or previousResults or [],
|
|
previousHandover=enhancedContext.previousHandover,
|
|
improvements=enhancedContext.improvements or [],
|
|
retryCount=enhancedContext.retryCount or 0,
|
|
previousActionResults=enhancedContext.previousActionResults or [],
|
|
previousReviewResult=enhancedContext.previousReviewResult,
|
|
isRegeneration=enhancedContext.isRegeneration or False,
|
|
failurePatterns=enhancedContext.failurePatterns or [],
|
|
failedActions=enhancedContext.failedActions or [],
|
|
successfulActions=enhancedContext.successfulActions or [],
|
|
criteriaProgress=enhancedContext.criteriaProgress
|
|
)
|
|
else:
|
|
# Create new context from scratch
|
|
actionContext = TaskContext(
|
|
taskStep=taskStep,
|
|
workflow=workflow,
|
|
workflowId=workflow.id,
|
|
availableDocuments=None,
|
|
availableConnections=None,
|
|
previousResults=previousResults or [],
|
|
previousHandover=None,
|
|
improvements=[],
|
|
retryCount=0,
|
|
previousActionResults=[],
|
|
previousReviewResult=None,
|
|
isRegeneration=False,
|
|
failurePatterns=[],
|
|
failedActions=[],
|
|
successfulActions=[],
|
|
criteriaProgress=None
|
|
)
|
|
|
|
# Check workflow status before calling AI service
|
|
checkWorkflowStopped(self.services)
|
|
|
|
# Build prompt bundle (template + placeholders)
|
|
bundle = generateActionDefinitionPrompt(self.services, actionContext)
|
|
actionPromptTemplate = bundle.prompt
|
|
placeholders = bundle.placeholders
|
|
|
|
|
|
# Centralized AI call: Action planning (quality, detailed) with placeholders
|
|
options = AiCallOptions(
|
|
operationType=OperationTypeEnum.PLAN,
|
|
priority=PriorityEnum.QUALITY,
|
|
compressPrompt=False,
|
|
compressContext=False,
|
|
processingMode=ProcessingModeEnum.DETAILED,
|
|
maxCost=0.10,
|
|
maxProcessingTime=30
|
|
)
|
|
|
|
prompt = await self.services.ai.callAiPlanning(
|
|
prompt=actionPromptTemplate,
|
|
placeholders=placeholders,
|
|
debugType="actionplan"
|
|
)
|
|
|
|
# Check if AI response is valid
|
|
if not prompt:
|
|
raise ValueError("AI service returned no response")
|
|
|
|
# Log action response received
|
|
logger.info("=== ACTION PLAN AI RESPONSE RECEIVED ===")
|
|
logger.info(f"Response length: {len(prompt) if prompt else 0}")
|
|
|
|
# Parse action response
|
|
jsonStart = prompt.find('{')
|
|
jsonEnd = prompt.rfind('}') + 1
|
|
if jsonStart == -1 or jsonEnd == 0:
|
|
raise ValueError("No JSON found in response")
|
|
jsonStr = prompt[jsonStart:jsonEnd]
|
|
|
|
try:
|
|
actionData = json.loads(jsonStr)
|
|
except Exception as e:
|
|
logger.error(f"Error parsing action response JSON: {str(e)}")
|
|
actionData = {}
|
|
|
|
if 'actions' not in actionData:
|
|
raise ValueError("Action response missing 'actions' field")
|
|
|
|
actions = actionData['actions']
|
|
if not actions:
|
|
raise ValueError("Action response contains empty actions list")
|
|
|
|
if not isinstance(actions, list):
|
|
raise ValueError(f"Action response 'actions' field is not a list: {type(actions)}")
|
|
|
|
if not self.validator.validateAction(actions, actionContext):
|
|
logger.error("Generated actions failed validation")
|
|
raise Exception("AI-generated actions failed validation - AI is required for action generation")
|
|
|
|
# Convert to ActionItem objects
|
|
taskActions = []
|
|
for i, a in enumerate(actions):
|
|
if not isinstance(a, dict):
|
|
logger.warning(f"Skipping invalid action {i+1}: not a dictionary")
|
|
continue
|
|
|
|
|
|
# Handle compound action format (new) or separate method/action format (old)
|
|
action_name = a.get('action', 'unknown')
|
|
if '.' in action_name:
|
|
# New compound action format: "method.action"
|
|
method_name, action_name = action_name.split('.', 1)
|
|
else:
|
|
# Old separate format: method + action fields
|
|
method_name = a.get('method', 'unknown')
|
|
|
|
taskAction = self._createActionItem({
|
|
"execMethod": method_name,
|
|
"execAction": action_name,
|
|
"execParameters": a.get('parameters', {}),
|
|
"execResultLabel": a.get('resultLabel', ''),
|
|
"expectedDocumentFormats": a.get('expectedDocumentFormats', None),
|
|
"status": TaskStatus.PENDING,
|
|
# Extract user-friendly message if available
|
|
"userMessage": a.get('userMessage', None)
|
|
})
|
|
|
|
if taskAction:
|
|
taskActions.append(taskAction)
|
|
else:
|
|
logger.warning(f"Skipping invalid action {i+1}: failed to create ActionItem")
|
|
|
|
validActions = [ta for ta in taskActions if ta]
|
|
|
|
if not validActions:
|
|
raise ValueError("No valid actions could be created from AI response")
|
|
|
|
return validActions
|
|
except Exception as e:
|
|
logger.error(f"Error in generateActionItems: {str(e)}")
|
|
return []
|
|
|
|
|
|
async def executeTask(self, taskStep: TaskStep, workflow: ChatWorkflow, context: TaskContext,
|
|
taskIndex: int = None, totalTasks: int = None) -> TaskResult:
|
|
"""Execute all actions for a task step using Actionplan mode"""
|
|
logger.info(f"=== STARTING TASK {taskIndex or '?'}: {taskStep.objective} ===")
|
|
|
|
# Use workflow-level intent from planning phase (stored in workflow object)
|
|
# This avoids redundant intent analysis - intent was already analyzed during task planning
|
|
if hasattr(workflow, '_workflowIntent') and workflow._workflowIntent:
|
|
self.workflowIntent = workflow._workflowIntent
|
|
logger.info(f"Using workflow intent from planning phase")
|
|
else:
|
|
# Fallback: analyze if not available (shouldn't happen in normal flow)
|
|
originalPrompt = self.services.currentUserPrompt if self.services and hasattr(self.services, 'currentUserPrompt') else taskStep.objective
|
|
self.workflowIntent = await self.intentAnalyzer.analyzeUserIntent(originalPrompt, context)
|
|
logger.warning(f"Workflow intent not found in workflow object, analyzed fresh")
|
|
|
|
# Task-level intent is NOT needed - use task.objective + task format fields (dataType, expectedFormats, qualityRequirements)
|
|
# These format fields are populated from workflow intent during task planning
|
|
self.taskIntent = None # Removed redundant task-level intent analysis
|
|
logger.info(f"Workflow intent: {self.workflowIntent}")
|
|
if taskStep.dataType or taskStep.expectedFormats or taskStep.qualityRequirements:
|
|
logger.info(f"Task format info: dataType={taskStep.dataType}, expectedFormats={taskStep.expectedFormats}")
|
|
|
|
# Reset progress tracking for new task
|
|
self.progressTracker.reset()
|
|
|
|
# Update workflow object before executing task
|
|
if taskIndex is not None:
|
|
self._updateWorkflowBeforeExecutingTask(taskIndex)
|
|
|
|
# Update workflow context for this task
|
|
if taskIndex is not None:
|
|
self.services.chat.setWorkflowContext(taskNumber=taskIndex)
|
|
|
|
# Create task start message
|
|
await self.messageCreator.createTaskStartMessage(taskStep, workflow, taskIndex, totalTasks)
|
|
|
|
state = TaskExecutionState(taskStep)
|
|
retryContext = context
|
|
maxRetries = state.max_retries
|
|
|
|
for attempt in range(maxRetries):
|
|
logger.info(f"Task execution attempt {attempt+1}/{maxRetries}")
|
|
|
|
# Check workflow status before starting task execution
|
|
checkWorkflowStopped(self.services)
|
|
|
|
# Update retry context with current attempt information
|
|
if retryContext:
|
|
retryContext.retryCount = attempt + 1
|
|
|
|
actions = await self.generateActionItems(taskStep, workflow,
|
|
previousResults=retryContext.previousResults,
|
|
enhancedContext=retryContext)
|
|
|
|
# Log total actions count for this task
|
|
totalActions = len(actions) if actions else 0
|
|
logger.info(f"Task {taskIndex or '?'} has {totalActions} actions")
|
|
|
|
# Update workflow object after action planning
|
|
self._updateWorkflowAfterActionPlanning(totalActions)
|
|
self._setWorkflowTotals(totalActions=totalActions)
|
|
|
|
if not actions:
|
|
logger.error("No actions defined for task step, aborting task execution")
|
|
break
|
|
|
|
actionResults = []
|
|
for actionIdx, action in enumerate(actions):
|
|
# Check workflow status before each action execution
|
|
checkWorkflowStopped(self.services)
|
|
|
|
# Update workflow object before executing action
|
|
actionNumber = actionIdx + 1
|
|
self._updateWorkflowBeforeExecutingAction(actionNumber)
|
|
|
|
|
|
# Log action start
|
|
logger.info(f"Task {taskIndex} - Starting action {actionNumber}/{totalActions}")
|
|
|
|
# Create action start message
|
|
actionStartMessage = {
|
|
"workflowId": workflow.id,
|
|
"role": "assistant",
|
|
"message": f"⚡ **Action {actionNumber}** (Method {action.execMethod}.{action.execAction})",
|
|
"status": "step",
|
|
"sequenceNr": len(workflow.messages) + 1,
|
|
"publishedAt": self.services.utils.timestampGetUtc(),
|
|
"documentsLabel": f"action_{actionNumber}_start",
|
|
"documents": [],
|
|
"actionProgress": "running",
|
|
"roundNumber": workflow.currentRound,
|
|
"taskNumber": taskIndex,
|
|
"actionNumber": actionNumber
|
|
}
|
|
|
|
# Add user-friendly message if available
|
|
if action.userMessage:
|
|
actionStartMessage["message"] += f"\n\n💬 {action.userMessage}"
|
|
|
|
self.services.chat.storeMessageWithDocuments(workflow, actionStartMessage, [])
|
|
logger.info(f"Action start message created for action {actionNumber}")
|
|
|
|
# Execute single action
|
|
result = await self.actionExecutor.executeSingleAction(action, workflow, taskStep,
|
|
taskIndex, actionNumber, totalActions)
|
|
actionResults.append(result)
|
|
|
|
# Enhanced validation: Content validation after each action (like Dynamic mode)
|
|
if getattr(self, 'workflowIntent', None) and result.documents:
|
|
# Pass ALL documents to validator - validator decides what to validate (generic approach)
|
|
# Pass taskStep so validator can use task.objective and format fields
|
|
# Pass action name so validator knows which action created the documents
|
|
actionName = f"{action.execMethod}.{action.execAction}"
|
|
validationResult = await self.contentValidator.validateContent(result.documents, self.workflowIntent, taskStep, actionName)
|
|
qualityScore = validationResult.get('qualityScore', 0.0)
|
|
if qualityScore is None:
|
|
qualityScore = 0.0
|
|
logger.info(f"Content validation for action {actionNumber}: {validationResult['overallSuccess']} (quality: {qualityScore:.2f})")
|
|
|
|
# Record validation result for adaptive learning
|
|
actionContext = {
|
|
'actionName': f"{action.execMethod}.{action.execAction}",
|
|
'workflowId': context.workflowId
|
|
}
|
|
|
|
self.adaptiveLearningEngine.recordValidationResult(
|
|
validationResult,
|
|
actionContext,
|
|
context.workflowId,
|
|
actionNumber
|
|
)
|
|
|
|
# Learn from feedback
|
|
feedback = self._collectFeedback(result, validationResult, self.workflowIntent)
|
|
self.learningEngine.learnFromFeedback(feedback, context, self.workflowIntent)
|
|
|
|
# Update progress
|
|
self.progressTracker.updateOperation(result, validationResult, self.workflowIntent)
|
|
|
|
if result.success:
|
|
state.addSuccessfulAction(result)
|
|
else:
|
|
state.addFailedAction(result)
|
|
|
|
# Check workflow status before review
|
|
checkWorkflowStopped(self.services)
|
|
|
|
reviewResult = await self._reviewTaskCompletion(taskStep, actions, actionResults, workflow)
|
|
success = reviewResult.status == 'success'
|
|
feedback = reviewResult.reason
|
|
error = None if success else reviewResult.reason
|
|
|
|
if success:
|
|
logger.info(f"=== TASK {taskIndex or '?'} COMPLETED SUCCESSFULLY: {taskStep.objective} ===")
|
|
|
|
# Create task completion message
|
|
await self.messageCreator.createTaskCompletionMessage(taskStep, workflow, taskIndex, totalTasks, reviewResult)
|
|
|
|
return TaskResult(
|
|
taskId=taskStep.id,
|
|
status=TaskStatus.COMPLETED,
|
|
success=True,
|
|
feedback=feedback,
|
|
error=None
|
|
)
|
|
|
|
elif reviewResult.status == 'retry' and state.canRetry():
|
|
logger.warning(f"Task step '{taskStep.objective}' requires retry: {reviewResult.improvements}")
|
|
|
|
# Enhanced logging of criteria status
|
|
if reviewResult.metCriteria:
|
|
logger.info(f"Met criteria: {', '.join(reviewResult.metCriteria)}")
|
|
if reviewResult.unmetCriteria:
|
|
logger.warning(f"Unmet criteria: {', '.join(reviewResult.unmetCriteria)}")
|
|
|
|
state.incrementRetryCount()
|
|
|
|
# Update retry context with retry information and criteria tracking
|
|
if retryContext:
|
|
retryContext.retryCount = state.retry_count
|
|
retryContext.improvements = reviewResult.improvements
|
|
retryContext.previousActionResults = actionResults
|
|
retryContext.previousReviewResult = reviewResult
|
|
retryContext.isRegeneration = True
|
|
retryContext.failurePatterns = state.getFailurePatterns()
|
|
retryContext.failedActions = state.failed_actions
|
|
retryContext.successfulActions = state.successful_actions
|
|
|
|
# Track criteria progress across retries
|
|
if not hasattr(retryContext, 'criteriaProgress'):
|
|
retryContext.criteriaProgress = {
|
|
'met_criteria': set(),
|
|
'unmet_criteria': set(),
|
|
'attempt_history': []
|
|
}
|
|
|
|
# Update criteria progress
|
|
if reviewResult.metCriteria:
|
|
retryContext.criteriaProgress['met_criteria'].update(reviewResult.metCriteria)
|
|
if reviewResult.unmetCriteria:
|
|
retryContext.criteriaProgress['unmet_criteria'].update(reviewResult.unmetCriteria)
|
|
|
|
# Record this attempt's criteria status
|
|
attemptRecord = {
|
|
'attempt': state.retry_count,
|
|
'met_criteria': reviewResult.metCriteria or [],
|
|
'unmet_criteria': reviewResult.unmetCriteria or [],
|
|
'quality_score': reviewResult.qualityScore,
|
|
'improvements': reviewResult.improvements or []
|
|
}
|
|
retryContext.criteriaProgress['attempt_history'].append(attemptRecord)
|
|
|
|
# Create retry message
|
|
await self.messageCreator.createRetryMessage(taskStep, workflow, taskIndex, reviewResult)
|
|
|
|
continue
|
|
else:
|
|
logger.error(f"=== TASK {taskIndex or '?'} FAILED: {taskStep.objective} after {attempt+1} attempts ===")
|
|
|
|
# Create error message
|
|
await self.messageCreator.createErrorMessage(taskStep, workflow, taskIndex, reviewResult.reason)
|
|
|
|
return TaskResult(
|
|
taskId=taskStep.id,
|
|
status=TaskStatus.FAILED,
|
|
success=False,
|
|
feedback=feedback,
|
|
error=reviewResult.reason if reviewResult and hasattr(reviewResult, 'reason') else "Task failed after retry attempts"
|
|
)
|
|
|
|
logger.error(f"=== TASK {taskIndex or '?'} FAILED AFTER ALL RETRIES: {taskStep.objective} ===")
|
|
|
|
# Create final error message
|
|
await self.messageCreator.createErrorMessage(taskStep, workflow, taskIndex, "Task failed after all retries")
|
|
|
|
return TaskResult(
|
|
taskId=taskStep.id,
|
|
status=TaskStatus.FAILED,
|
|
success=False,
|
|
feedback="Task failed after all retries.",
|
|
error="Task failed after all retries."
|
|
)
|
|
|
|
async def _reviewTaskCompletion(self, taskStep: TaskStep, taskActions: List[ActionItem],
|
|
actionResults: List[ActionResult], workflow: ChatWorkflow) -> ReviewResult:
|
|
"""Review task completion and determine success/failure/retry"""
|
|
try:
|
|
# Check workflow status before reviewing task completion
|
|
checkWorkflowStopped(self.services)
|
|
|
|
logger.info(f"=== STARTING TASK COMPLETION REVIEW ===")
|
|
logger.info(f"Task: {taskStep.objective}")
|
|
logger.info(f"Actions executed: {len(taskActions) if taskActions else 0}")
|
|
logger.info(f"Action results: {len(actionResults) if actionResults else 0}")
|
|
|
|
# Create proper context object for result review
|
|
reviewContext = ReviewContext(
|
|
taskStep=taskStep,
|
|
taskActions=taskActions,
|
|
actionResults=actionResults,
|
|
stepResult={
|
|
'successful_actions': sum(1 for result in actionResults if result.success),
|
|
'total_actions': len(actionResults),
|
|
'results': [self._extractResultText(result) for result in actionResults if result.success],
|
|
'errors': [result.error for result in actionResults if not result.success],
|
|
'documents': [
|
|
{
|
|
'action_index': i,
|
|
'documents_count': len(result.documents) if result.documents else 0,
|
|
'documents': result.documents if result.documents else []
|
|
}
|
|
for i, result in enumerate(actionResults)
|
|
]
|
|
},
|
|
workflowId=workflow.id,
|
|
previousResults=[]
|
|
)
|
|
|
|
# Check workflow status before calling AI service
|
|
checkWorkflowStopped(self.services)
|
|
|
|
# Build prompt bundle for result review
|
|
bundle = generateResultReviewPrompt(reviewContext)
|
|
promptTemplate = bundle.prompt
|
|
placeholders = bundle.placeholders
|
|
|
|
# Log result review prompt sent to AI
|
|
logger.info("=== RESULT REVIEW PROMPT SENT TO AI ===")
|
|
logger.info(f"Task: {taskStep.objective}")
|
|
logger.info(f"Action Results Count: {len(reviewContext.actionResults) if reviewContext.actionResults else 0}")
|
|
logger.info(f"Task Actions Count: {len(reviewContext.taskActions) if reviewContext.taskActions else 0}")
|
|
|
|
# Centralized AI call: Result validation (balanced analysis) with placeholders
|
|
options = AiCallOptions(
|
|
operationType=OperationTypeEnum.DATA_ANALYSE,
|
|
priority=PriorityEnum.BALANCED,
|
|
compressPrompt=True,
|
|
compressContext=False,
|
|
processingMode=ProcessingModeEnum.ADVANCED,
|
|
maxCost=0.05,
|
|
maxProcessingTime=30
|
|
)
|
|
|
|
response = await self.services.ai.callAiPlanning(
|
|
prompt=promptTemplate,
|
|
placeholders=placeholders,
|
|
debugType="resultreview"
|
|
)
|
|
|
|
# Log result review response received
|
|
logger.info("=== RESULT REVIEW AI RESPONSE RECEIVED ===")
|
|
logger.info(f"Response length: {len(response) if response else 0}")
|
|
|
|
# Parse review response
|
|
jsonStart = response.find('{')
|
|
jsonEnd = response.rfind('}') + 1
|
|
if jsonStart == -1 or jsonEnd == 0:
|
|
raise ValueError("No JSON found in review response")
|
|
jsonStr = response[jsonStart:jsonEnd]
|
|
|
|
try:
|
|
review = json.loads(jsonStr)
|
|
except Exception as e:
|
|
logger.error(f"Error parsing review response JSON: {str(e)}")
|
|
review = {}
|
|
if 'status' not in review:
|
|
raise ValueError("Review response missing 'status' field")
|
|
review.setdefault('status', 'unknown')
|
|
review.setdefault('reason', 'No reason provided')
|
|
review.setdefault('quality_score', 5.0)
|
|
|
|
# Ensure improvements is a list
|
|
improvements = review.get('improvements', [])
|
|
if isinstance(improvements, str):
|
|
# Split string into list if it's a single improvement
|
|
improvements = [improvements.strip()] if improvements.strip() else []
|
|
elif not isinstance(improvements, list):
|
|
improvements = []
|
|
|
|
# Ensure all list fields are properly typed
|
|
metCriteria = review.get('met_criteria', [])
|
|
if not isinstance(metCriteria, list):
|
|
metCriteria = []
|
|
|
|
unmetCriteria = review.get('unmet_criteria', [])
|
|
if not isinstance(unmetCriteria, list):
|
|
unmetCriteria = []
|
|
|
|
reviewResult = ReviewResult(
|
|
status=review.get('status', 'unknown'),
|
|
reason=review.get('reason', 'No reason provided'),
|
|
improvements=improvements,
|
|
qualityScore=float(review.get('quality_score', review.get('qualityScore', 5.0))),
|
|
missingOutputs=[],
|
|
metCriteria=metCriteria,
|
|
unmetCriteria=unmetCriteria,
|
|
confidence=review.get('confidence', 0.5),
|
|
# Extract user-friendly message if available
|
|
userMessage=review.get('userMessage', None)
|
|
)
|
|
|
|
# Enhanced validation logging
|
|
logger.info(f"VALIDATION RESULT - Task: '{taskStep.objective}' - Status: {reviewResult.status.upper()}, Quality: {reviewResult.qualityScore}/10")
|
|
if reviewResult.status == 'success':
|
|
logger.info(f"VALIDATION SUCCESS - Task completed successfully")
|
|
if reviewResult.metCriteria:
|
|
logger.info(f"Met criteria: {', '.join(reviewResult.metCriteria)}")
|
|
elif reviewResult.status == 'retry':
|
|
logger.warning(f"VALIDATION RETRY - Task requires retry: {reviewResult.improvements}")
|
|
if reviewResult.unmetCriteria:
|
|
logger.warning(f"Unmet criteria: {', '.join(reviewResult.unmetCriteria)}")
|
|
else:
|
|
logger.error(f"VALIDATION FAILED - Task failed: {reviewResult.reason}")
|
|
|
|
logger.info(f"=== TASK COMPLETION REVIEW FINISHED ===")
|
|
logger.info(f"Final Status: {reviewResult.status}")
|
|
logger.info(f"Quality Score: {reviewResult.qualityScore}/10")
|
|
logger.info(f"Improvements: {reviewResult.improvements}")
|
|
logger.info("=== END REVIEW ===")
|
|
|
|
return reviewResult
|
|
except Exception as e:
|
|
logger.error(f"Error in reviewTaskCompletion: {str(e)}")
|
|
return ReviewResult(
|
|
status='failed',
|
|
reason=str(e),
|
|
qualityScore=0.0
|
|
)
|
|
|
|
def _createActionItem(self, actionData: Dict[str, Any]) -> ActionItem:
|
|
"""Creates a new task action"""
|
|
try:
|
|
# Ensure ID is present
|
|
if "id" not in actionData or not actionData["id"]:
|
|
actionData["id"] = f"action_{uuid.uuid4()}"
|
|
|
|
# Ensure required fields
|
|
if "status" not in actionData:
|
|
actionData["status"] = TaskStatus.PENDING
|
|
|
|
if "execMethod" not in actionData:
|
|
logger.error("execMethod is required for task action")
|
|
return None
|
|
|
|
if "execAction" not in actionData:
|
|
logger.error("execAction is required for task action")
|
|
return None
|
|
|
|
if "execParameters" not in actionData:
|
|
actionData["execParameters"] = {}
|
|
|
|
# Use generic field separation based on ActionItem model
|
|
simpleFields, objectFields = self.services.interfaceDbChat._separateObjectFields(ActionItem, actionData)
|
|
|
|
# Create action in database
|
|
createdAction = self.services.interfaceDbChat.db.recordCreate(ActionItem, simpleFields)
|
|
|
|
# Convert to ActionItem model
|
|
return ActionItem(
|
|
id=createdAction["id"],
|
|
execMethod=createdAction["execMethod"],
|
|
execAction=createdAction["execAction"],
|
|
execParameters=createdAction.get("execParameters", {}),
|
|
execResultLabel=createdAction.get("execResultLabel"),
|
|
expectedDocumentFormats=createdAction.get("expectedDocumentFormats"),
|
|
status=createdAction.get("status", TaskStatus.PENDING),
|
|
error=createdAction.get("error"),
|
|
retryCount=createdAction.get("retryCount", 0),
|
|
retryMax=createdAction.get("retryMax", 3),
|
|
processingTime=createdAction.get("processingTime"),
|
|
timestamp=float(createdAction.get("timestamp", self.services.utils.timestampGetUtc())),
|
|
result=createdAction.get("result"),
|
|
resultDocuments=createdAction.get("resultDocuments", []),
|
|
userMessage=createdAction.get("userMessage")
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error creating task action: {str(e)}")
|
|
return None
|
|
|
|
def _extractResultText(self, result: ActionResult) -> str:
|
|
"""Extract result text from ActionResult documents"""
|
|
if not result.success or not result.documents:
|
|
return ""
|
|
|
|
# Extract text directly from ActionDocument objects
|
|
resultParts = []
|
|
for doc in result.documents:
|
|
if hasattr(doc, 'documentData') and doc.documentData:
|
|
resultParts.append(str(doc.documentData))
|
|
|
|
# Join all document results with separators
|
|
return "\n\n---\n\n".join(resultParts) if resultParts else ""
|
|
|
|
def _collectFeedback(self, result: Any, validation: Dict[str, Any], intent: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Collects comprehensive feedback from action execution"""
|
|
try:
|
|
# Extract content summary
|
|
contentDelivered = ""
|
|
if result.documents:
|
|
firstDoc = result.documents[0]
|
|
if hasattr(firstDoc, 'documentData'):
|
|
data = firstDoc.documentData
|
|
if isinstance(data, dict) and 'content' in data:
|
|
content = str(data['content'])
|
|
contentDelivered = content[:100] + "..." if len(content) > 100 else content
|
|
else:
|
|
contentDelivered = str(data)[:100] + "..." if len(str(data)) > 100 else str(data)
|
|
|
|
return {
|
|
"actionAttempted": result.resultLabel or "unknown",
|
|
"parametersUsed": {}, # Would be extracted from action context
|
|
"contentDelivered": contentDelivered,
|
|
"intentMatchScore": validation.get('qualityScore', 0),
|
|
"qualityScore": validation.get('qualityScore', 0),
|
|
"issuesFound": validation.get('improvementSuggestions', []),
|
|
"learningOpportunities": validation.get('improvementSuggestions', []),
|
|
"userSatisfaction": None, # Would be collected from user feedback
|
|
"timestamp": datetime.now(timezone.utc).timestamp()
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error collecting feedback: {str(e)}")
|
|
return {
|
|
"actionAttempted": "unknown",
|
|
"parametersUsed": {},
|
|
"contentDelivered": "",
|
|
"intentMatchScore": 0,
|
|
"qualityScore": 0,
|
|
"issuesFound": [],
|
|
"learningOpportunities": [],
|
|
"userSatisfaction": None,
|
|
"timestamp": datetime.now(timezone.utc).timestamp()
|
|
}
|
|
|
|
def _updateWorkflowBeforeExecutingTask(self, taskNumber: int):
|
|
"""Update workflow object before executing a task"""
|
|
try:
|
|
workflow = self.services.workflow
|
|
updateData = {
|
|
"currentTask": taskNumber,
|
|
"currentAction": 0,
|
|
"totalActions": 0
|
|
}
|
|
|
|
# Update workflow object
|
|
workflow.currentTask = taskNumber
|
|
workflow.currentAction = 0
|
|
workflow.totalActions = 0
|
|
|
|
# Update in database
|
|
self.services.interfaceDbChat.updateWorkflow(workflow.id, updateData)
|
|
logger.info(f"Updated workflow {workflow.id} before executing task {taskNumber}: {updateData}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error updating workflow before executing task: {str(e)}")
|
|
|
|
def _updateWorkflowAfterActionPlanning(self, totalActions: int):
|
|
"""Update workflow object after action planning for current task"""
|
|
try:
|
|
workflow = self.services.workflow
|
|
updateData = {
|
|
"totalActions": totalActions
|
|
}
|
|
|
|
# Update workflow object
|
|
workflow.totalActions = totalActions
|
|
|
|
# Update in database
|
|
self.services.interfaceDbChat.updateWorkflow(workflow.id, updateData)
|
|
logger.info(f"Updated workflow {workflow.id} after action planning: {updateData}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error updating workflow after action planning: {str(e)}")
|
|
|
|
def _updateWorkflowBeforeExecutingAction(self, actionNumber: int):
|
|
"""Update workflow object before executing an action"""
|
|
try:
|
|
workflow = self.services.workflow
|
|
updateData = {
|
|
"currentAction": actionNumber
|
|
}
|
|
|
|
# Update workflow object
|
|
workflow.currentAction = actionNumber
|
|
|
|
# Update in database
|
|
self.services.interfaceDbChat.updateWorkflow(workflow.id, updateData)
|
|
logger.info(f"Updated workflow {workflow.id} before executing action {actionNumber}: {updateData}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error updating workflow before executing action: {str(e)}")
|
|
|
|
def _setWorkflowTotals(self, totalTasks: int = None, totalActions: int = None):
|
|
"""Set total counts for workflow progress tracking and update database"""
|
|
try:
|
|
workflow = self.services.workflow
|
|
updateData = {}
|
|
|
|
if totalTasks is not None:
|
|
workflow.totalTasks = totalTasks
|
|
updateData["totalTasks"] = totalTasks
|
|
|
|
if totalActions is not None:
|
|
workflow.totalActions = totalActions
|
|
updateData["totalActions"] = totalActions
|
|
|
|
# Update workflow object in database if we have changes
|
|
if updateData:
|
|
self.services.interfaceDbChat.updateWorkflow(workflow.id, updateData)
|
|
logger.info(f"Updated workflow {workflow.id} totals in database: {updateData}")
|
|
|
|
logger.debug(f"Updated workflow totals: Tasks {workflow.totalTasks if hasattr(workflow, 'totalTasks') else 'N/A'}, Actions {workflow.totalActions if hasattr(workflow, 'totalActions') else 'N/A'}")
|
|
except Exception as e:
|
|
logger.error(f"Error setting workflow totals: {str(e)}")
|
|
|