488 lines
No EOL
23 KiB
Python
488 lines
No EOL
23 KiB
Python
# handlingTasks.py
|
|
# Refactored for clarity and consolidation
|
|
|
|
import asyncio
|
|
import logging
|
|
import json
|
|
import time
|
|
from typing import Dict, Any, Optional, List, Union
|
|
from datetime import datetime, UTC
|
|
from modules.interfaces.interfaceChatModel import (
|
|
TaskStatus, TaskStep, TaskContext, TaskAction, ReviewResult, TaskPlan, WorkflowResult, TaskResult, ReviewContext, ActionResult
|
|
)
|
|
from .executionState import TaskExecutionState
|
|
from .promptFactory import createTaskPlanningPrompt, createActionDefinitionPrompt, createResultReviewPrompt
|
|
from modules.chat.documents.documentGeneration import DocumentGenerator
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class HandlingTasks:
|
|
def __init__(self, chatInterface, service, workflow=None):
|
|
self.chatInterface = chatInterface
|
|
self.service = service
|
|
self.workflow = workflow
|
|
self.documentGenerator = DocumentGenerator(service)
|
|
|
|
async def generateTaskPlan(self, userInput: str, workflow) -> TaskPlan:
|
|
"""Generate a high-level task plan for the workflow."""
|
|
try:
|
|
logger.info(f"Generating task plan for workflow {workflow.id}")
|
|
available_docs = self.service.getAvailableDocuments(workflow)
|
|
logger.debug(f"Available documents: {available_docs}")
|
|
|
|
prompt = await self.service.callAiTextAdvanced(
|
|
createTaskPlanningPrompt(self, {
|
|
'user_request': userInput,
|
|
'available_documents': available_docs,
|
|
'workflow_id': workflow.id
|
|
})
|
|
)
|
|
# Inline _parseTaskPlanResponse logic
|
|
try:
|
|
json_start = prompt.find('{')
|
|
json_end = prompt.rfind('}') + 1
|
|
if json_start == -1 or json_end == 0:
|
|
raise ValueError("No JSON found in response")
|
|
json_str = prompt[json_start:json_end]
|
|
task_plan_dict = json.loads(json_str)
|
|
if 'tasks' not in task_plan_dict:
|
|
raise ValueError("Task plan missing 'tasks' field")
|
|
except Exception as e:
|
|
logger.error(f"Error parsing task plan response: {str(e)}")
|
|
task_plan_dict = {'tasks': []}
|
|
|
|
if not self._validateTaskPlan(task_plan_dict):
|
|
logger.error("Generated task plan failed validation")
|
|
raise Exception("AI-generated task plan failed validation - AI is required for task planning")
|
|
|
|
tasks = [TaskStep(**task_dict) for task_dict in task_plan_dict.get('tasks', [])]
|
|
task_plan = TaskPlan(
|
|
overview=task_plan_dict.get('overview', ''),
|
|
tasks=tasks
|
|
)
|
|
|
|
logger.info(f"Task plan generated successfully with {len(tasks)} tasks")
|
|
logger.debug(f"Task plan: {json.dumps(task_plan_dict, indent=2)}")
|
|
|
|
return task_plan
|
|
except Exception as e:
|
|
logger.error(f"Error in generateTaskPlan: {str(e)}")
|
|
raise
|
|
|
|
async def generateTaskActions(self, task_step, workflow, previous_results=None, enhanced_context=None) -> List[TaskAction]:
|
|
"""Generate actions for a given task step."""
|
|
try:
|
|
logger.info(f"Generating actions for task: {task_step.description}")
|
|
|
|
available_docs = self.service.getAvailableDocuments(workflow)
|
|
available_connections = self.service.getConnectionReferenceList()
|
|
logger.debug(f"Available documents: {available_docs}")
|
|
logger.debug(f"Available connections: {available_connections}")
|
|
|
|
context = enhanced_context or TaskContext(
|
|
task_step=task_step,
|
|
workflow=workflow,
|
|
workflow_id=workflow.id,
|
|
available_documents=available_docs,
|
|
previous_results=previous_results or [],
|
|
improvements=[],
|
|
retry_count=0,
|
|
previous_action_results=[],
|
|
previous_review_result=None,
|
|
is_regeneration=False,
|
|
failure_patterns=[],
|
|
failed_actions=[],
|
|
successful_actions=[]
|
|
)
|
|
prompt = await self.service.callAiTextAdvanced(
|
|
await createActionDefinitionPrompt(self, context)
|
|
)
|
|
# Inline parseActionResponse logic here
|
|
json_start = prompt.find('{')
|
|
json_end = prompt.rfind('}') + 1
|
|
if json_start == -1 or json_end == 0:
|
|
raise ValueError("No JSON found in response")
|
|
json_str = prompt[json_start:json_end]
|
|
try:
|
|
action_data = json.loads(json_str)
|
|
except Exception as e:
|
|
logger.error(f"Error parsing action response JSON: {str(e)}")
|
|
action_data = {}
|
|
if 'actions' not in action_data:
|
|
raise ValueError("Action response missing 'actions' field")
|
|
actions = action_data['actions']
|
|
if not self._validateActions(actions, context):
|
|
logger.error("Generated actions failed validation")
|
|
raise Exception("AI-generated actions failed validation - AI is required for action generation")
|
|
|
|
# Convert to TaskAction objects
|
|
task_actions = [self.chatInterface.createTaskAction({
|
|
"execMethod": a.get('method', 'unknown'),
|
|
"execAction": a.get('action', 'unknown'),
|
|
"execParameters": a.get('parameters', {}),
|
|
"execResultLabel": a.get('resultLabel', ''),
|
|
"expectedDocumentFormats": a.get('expectedDocumentFormats', None),
|
|
"status": TaskStatus.PENDING
|
|
}) for a in actions]
|
|
|
|
valid_actions = [ta for ta in task_actions if ta]
|
|
logger.info(f"Generated {len(valid_actions)} actions for task: {task_step.description}")
|
|
logger.debug(f"Task actions plan: {json.dumps(action_data, indent=2)}")
|
|
|
|
return valid_actions
|
|
except Exception as e:
|
|
logger.error(f"Error in generateTaskActions: {str(e)}")
|
|
return []
|
|
|
|
async def executeTask(self, task_step, workflow, context) -> TaskResult:
|
|
"""Execute all actions for a task step, with state management and retries."""
|
|
logger.info(f"Executing task: {task_step.description}")
|
|
state = TaskExecutionState(task_step)
|
|
retry_context = context
|
|
max_retries = state.max_retries
|
|
for attempt in range(max_retries):
|
|
logger.info(f"Task execution attempt {attempt+1}/{max_retries}")
|
|
actions = await self.generateTaskActions(task_step, workflow, previous_results=retry_context.previous_results, enhanced_context=retry_context)
|
|
if not actions:
|
|
logger.error("No actions defined for task step, aborting task execution")
|
|
break
|
|
action_results = []
|
|
for action in actions:
|
|
result = await self.executeSingleAction(action, workflow)
|
|
action_results.append(result)
|
|
if result.success:
|
|
state.addSuccessfulAction(result)
|
|
else:
|
|
state.addFailedAction(result)
|
|
review_result = await self.reviewTaskCompletion(task_step, actions, action_results, workflow)
|
|
success = review_result.status == 'success'
|
|
feedback = review_result.reason
|
|
error = None if success else review_result.reason
|
|
if success:
|
|
logger.info(f"Task step '{task_step.description}' completed successfully")
|
|
return TaskResult(
|
|
taskId=task_step.id,
|
|
status=TaskStatus.COMPLETED,
|
|
success=True,
|
|
feedback=feedback,
|
|
error=None
|
|
)
|
|
elif review_result.status == 'retry' and state.canRetry():
|
|
logger.warning(f"Task step '{task_step.description}' requires retry: {review_result.improvements}")
|
|
state.incrementRetryCount()
|
|
retry_context.retry_count = state.retry_count
|
|
retry_context.improvements = review_result.improvements
|
|
retry_context.previous_action_results = action_results
|
|
retry_context.previous_review_result = review_result
|
|
retry_context.is_regeneration = True
|
|
retry_context.failure_patterns = state.getFailurePatterns()
|
|
retry_context.failed_actions = state.failed_actions
|
|
retry_context.successful_actions = state.successful_actions
|
|
continue
|
|
else:
|
|
logger.error(f"Task step '{task_step.description}' failed after {attempt+1} attempts")
|
|
return TaskResult(
|
|
taskId=task_step.id,
|
|
status=TaskStatus.FAILED,
|
|
success=False,
|
|
feedback=feedback,
|
|
error=error
|
|
)
|
|
logger.error(f"Task step '{task_step.description}' failed after all retries")
|
|
return TaskResult(
|
|
taskId=task_step.id,
|
|
status=TaskStatus.FAILED,
|
|
success=False,
|
|
feedback="Task failed after all retries.",
|
|
error="Task failed after all retries."
|
|
)
|
|
|
|
async def reviewTaskCompletion(self, task_step, task_actions, action_results, workflow):
|
|
try:
|
|
review_context = ReviewContext(
|
|
task_step=task_step,
|
|
action_results=action_results,
|
|
workflow=workflow,
|
|
step_result={
|
|
'successful_actions': sum(1 for result in action_results if result.success),
|
|
'total_actions': len(action_results),
|
|
'results': [result.data.get('result', '') for result in action_results if result.success],
|
|
'errors': [result.error for result in action_results if not result.success]
|
|
}
|
|
)
|
|
# Use promptFactory for review prompt
|
|
prompt = await createResultReviewPrompt(self, review_context)
|
|
response = await self.service.callAiTextAdvanced(prompt)
|
|
# Inline parseReviewResponse logic here
|
|
json_start = response.find('{')
|
|
json_end = response.rfind('}') + 1
|
|
if json_start == -1 or json_end == 0:
|
|
raise ValueError("No JSON found in review response")
|
|
json_str = response[json_start:json_end]
|
|
try:
|
|
review = json.loads(json_str)
|
|
except Exception as e:
|
|
logger.error(f"Error parsing review response JSON: {str(e)}")
|
|
review = {}
|
|
if 'status' not in review:
|
|
raise ValueError("Review response missing 'status' field")
|
|
review.setdefault('status', 'unknown')
|
|
review.setdefault('reason', 'No reason provided')
|
|
review.setdefault('quality_score', 5)
|
|
|
|
# Ensure improvements is a list
|
|
improvements = review.get('improvements', [])
|
|
if isinstance(improvements, str):
|
|
# Split string into list if it's a single improvement
|
|
improvements = [improvements.strip()] if improvements.strip() else []
|
|
elif not isinstance(improvements, list):
|
|
improvements = []
|
|
|
|
# Ensure all list fields are properly typed
|
|
missing_outputs = review.get('missing_outputs', [])
|
|
if not isinstance(missing_outputs, list):
|
|
missing_outputs = []
|
|
|
|
met_criteria = review.get('met_criteria', [])
|
|
if not isinstance(met_criteria, list):
|
|
met_criteria = []
|
|
|
|
unmet_criteria = review.get('unmet_criteria', [])
|
|
if not isinstance(unmet_criteria, list):
|
|
unmet_criteria = []
|
|
|
|
return ReviewResult(
|
|
status=review.get('status', 'unknown'),
|
|
reason=review.get('reason', 'No reason provided'),
|
|
improvements=improvements,
|
|
quality_score=review.get('quality_score', 5),
|
|
missing_outputs=missing_outputs,
|
|
met_criteria=met_criteria,
|
|
unmet_criteria=unmet_criteria,
|
|
confidence=review.get('confidence', 0.5)
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Error in reviewTaskCompletion: {str(e)}")
|
|
return ReviewResult(
|
|
status='failed',
|
|
reason=str(e),
|
|
quality_score=0
|
|
)
|
|
|
|
async def prepareTaskHandover(self, task_step, task_actions, review_result, workflow):
|
|
try:
|
|
# Log handover status summary
|
|
if hasattr(review_result, 'status'):
|
|
status = review_result.status
|
|
if hasattr(review_result, 'missing_outputs'):
|
|
missing = review_result.missing_outputs
|
|
else:
|
|
missing = []
|
|
if hasattr(review_result, 'met_criteria'):
|
|
met = review_result.met_criteria
|
|
else:
|
|
met = []
|
|
|
|
logger.debug(f"Task handover status: {status}")
|
|
logger.debug(f"Promised documents: {task_step.expected_outputs}")
|
|
logger.debug(f"Delivered documents: {met}")
|
|
logger.debug(f"Missing documents: {missing}")
|
|
|
|
handover_data = {
|
|
'task_id': task_step.id,
|
|
'task_description': task_step.description,
|
|
'actions': [action.to_dict() for action in task_actions],
|
|
'review_result': review_result.to_dict() if hasattr(review_result, 'to_dict') else review_result,
|
|
'workflow_id': workflow.id,
|
|
'handover_time': datetime.now(UTC).isoformat()
|
|
}
|
|
logger.info(f"Prepared handover for task {task_step.id} in workflow {workflow.id}")
|
|
return handover_data
|
|
except Exception as e:
|
|
logger.error(f"Error in prepareTaskHandover: {str(e)}")
|
|
return {'error': str(e)}
|
|
|
|
# --- Helper action handling methods ---
|
|
|
|
async def executeSingleAction(self, action, workflow):
|
|
"""Execute a single action and return ActionResult with enhanced document processing"""
|
|
try:
|
|
logger.info(f"Executing action: {action.execMethod}.{action.execAction}")
|
|
|
|
# Log input documents and connections
|
|
input_docs = action.execParameters.get('documentList', [])
|
|
logger.debug(f"Input documents: {input_docs}")
|
|
logger.debug(f"Input connections: {action.execParameters.get('connections', [])}")
|
|
|
|
enhanced_parameters = action.execParameters.copy()
|
|
if action.expectedDocumentFormats:
|
|
enhanced_parameters['expectedDocumentFormats'] = action.expectedDocumentFormats
|
|
logger.debug(f"Expected document formats: {action.expectedDocumentFormats}")
|
|
|
|
result = await self.service.executeAction(
|
|
methodName=action.execMethod,
|
|
actionName=action.execAction,
|
|
parameters=enhanced_parameters
|
|
)
|
|
result_label = action.execResultLabel
|
|
|
|
if result.success:
|
|
action.setSuccess()
|
|
action.result = result.data.get("result", "")
|
|
action.execResultLabel = result_label
|
|
await self.createActionMessage(action, result, workflow, result_label)
|
|
logger.info(f"Action {action.execMethod}.{action.execAction} executed successfully")
|
|
else:
|
|
action.setError(result.error or "Action execution failed")
|
|
logger.error(f"Action {action.execMethod}.{action.execAction} failed: {result.error}")
|
|
|
|
return ActionResult(
|
|
success=result.success,
|
|
data={
|
|
"result": result.data.get("result", ""),
|
|
"documents": [], # Documents will be processed in createActionMessage
|
|
"actionId": action.id,
|
|
"actionMethod": action.execMethod,
|
|
"actionName": action.execAction,
|
|
"resultLabel": result_label
|
|
},
|
|
metadata={
|
|
"actionId": action.id,
|
|
"actionMethod": action.execMethod,
|
|
"actionName": action.execAction,
|
|
"resultLabel": result_label
|
|
},
|
|
validation={},
|
|
error=result.error or ""
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Error executing single action: {str(e)}")
|
|
action.setError(str(e))
|
|
return ActionResult(
|
|
success=False,
|
|
data={
|
|
"actionId": action.id,
|
|
"actionMethod": action.execMethod,
|
|
"actionName": action.execAction,
|
|
"documents": []
|
|
},
|
|
metadata={
|
|
"actionId": action.id,
|
|
"actionMethod": action.execMethod,
|
|
"actionName": action.execAction
|
|
},
|
|
validation={},
|
|
error=str(e)
|
|
)
|
|
|
|
async def createActionMessage(self, action, result, workflow, result_label=None):
|
|
"""Create and store a message for the action result in the workflow with enhanced document processing"""
|
|
try:
|
|
if result_label is None:
|
|
result_label = action.execResultLabel
|
|
|
|
# Use the local createDocumentsFromActionResult method
|
|
created_documents = self.documentGenerator.createDocumentsFromActionResult(result, action, workflow)
|
|
|
|
# Log delivered documents with sizes
|
|
if created_documents:
|
|
doc_info = []
|
|
for doc in created_documents:
|
|
if hasattr(doc, 'filename') and hasattr(doc, 'fileSize'):
|
|
doc_info.append(f"{doc.filename} ({doc.fileSize} bytes)")
|
|
elif hasattr(doc, 'filename'):
|
|
doc_info.append(f"{doc.filename}")
|
|
else:
|
|
doc_info.append("unknown document")
|
|
logger.debug(f"Produced result label: {result_label}")
|
|
logger.debug(f"Delivered documents: {doc_info}")
|
|
else:
|
|
logger.debug(f"Produced result label: {result_label} (no documents)")
|
|
|
|
message_data = {
|
|
"workflowId": workflow.id,
|
|
"role": "assistant",
|
|
"message": f"Executed action {action.execMethod}.{action.execAction}",
|
|
"status": "step",
|
|
"sequenceNr": len(workflow.messages) + 1,
|
|
"publishedAt": datetime.now(UTC).isoformat(),
|
|
"actionId": action.id,
|
|
"actionMethod": action.execMethod,
|
|
"actionName": action.execAction,
|
|
"documentsLabel": result_label,
|
|
"documents": created_documents
|
|
}
|
|
|
|
message = self.chatInterface.createWorkflowMessage(message_data)
|
|
if message:
|
|
workflow.messages.append(message)
|
|
logger.info(f"Created action message for {action.execMethod}.{action.execAction} with {len(created_documents)} documents")
|
|
else:
|
|
logger.error(f"Failed to create workflow message for action {action.execMethod}.{action.execAction}")
|
|
except Exception as e:
|
|
logger.error(f"Error creating action message: {str(e)}")
|
|
|
|
# --- Helper validation methods ---
|
|
|
|
def _validateTaskPlan(self, task_plan: Dict[str, Any]) -> bool:
|
|
try:
|
|
if not isinstance(task_plan, dict):
|
|
return False
|
|
if 'tasks' not in task_plan or not isinstance(task_plan['tasks'], list):
|
|
return False
|
|
task_ids = set()
|
|
for task in task_plan['tasks']:
|
|
if not isinstance(task, dict):
|
|
return False
|
|
required_fields = ['id', 'description', 'expected_outputs', 'success_criteria']
|
|
if not all(field in task for field in required_fields):
|
|
return False
|
|
if task['id'] in task_ids:
|
|
return False
|
|
task_ids.add(task['id'])
|
|
dependencies = task.get('dependencies', [])
|
|
if not isinstance(dependencies, list):
|
|
return False
|
|
for dep in dependencies:
|
|
if dep not in task_ids and dep != 'task_0':
|
|
return False
|
|
if 'ai_prompt' in task and not isinstance(task['ai_prompt'], str):
|
|
return False
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Error validating task plan: {str(e)}")
|
|
return False
|
|
|
|
def _validateActions(self, actions: List[Dict[str, Any]], context) -> bool:
|
|
try:
|
|
if not isinstance(actions, list):
|
|
logger.error("Actions must be a list")
|
|
return False
|
|
if len(actions) == 0:
|
|
logger.warning("No actions generated")
|
|
return False
|
|
for i, action in enumerate(actions):
|
|
if not isinstance(action, dict):
|
|
logger.error(f"Action {i} must be a dictionary")
|
|
return False
|
|
required_fields = ['method', 'action', 'parameters', 'resultLabel']
|
|
missing_fields = []
|
|
for field in required_fields:
|
|
if field not in action or not action[field]:
|
|
missing_fields.append(field)
|
|
if missing_fields:
|
|
logger.error(f"Action {i} missing required fields: {missing_fields}")
|
|
return False
|
|
result_label = action.get('resultLabel', '')
|
|
if not result_label.startswith('task'):
|
|
logger.error(f"Action {i} result label must start with 'task': {result_label}")
|
|
return False
|
|
parameters = action.get('parameters', {})
|
|
if not isinstance(parameters, dict):
|
|
logger.error(f"Action {i} parameters must be a dictionary")
|
|
return False
|
|
logger.info(f"Successfully validated {len(actions)} actions")
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Error validating actions: {str(e)}")
|
|
return False |