""" Context-aware placeholder service for different workflow phases. This module provides different levels of context based on the workflow phase. """ import json import logging from typing import Dict, Any, Optional from enum import Enum from modules.workflows.processing.shared.placeholderFactory import ( extractUserPromptFromService, extractFullDocumentContext, extractWorkflowHistory, extractUserLanguageFromService, extractMinimalDocumentContext, extractAvailableMethods, extractMinimalConnectionContext, extractFullConnectionContext, extractReviewContent, extractPreviousActionResults, extractLearningsAndImprovements, extractLatestRefinementFeedback ) from modules.datamodels.datamodelAi import AiCallOptions, OperationType, Priority, ProcessingMode logger = logging.getLogger(__name__) class WorkflowPhase(Enum): """Different phases of workflow execution requiring different context levels.""" TASK_PLANNING = "task_planning" # Needs full context for planning REACT_PLAN_SELECTION = "react_plan_selection" # Needs minimal context for action selection REACT_PARAMETERS = "react_parameters" # Needs full context for parameter generation ACTION_PLANNING = "action_planning" # Needs full context for action planning RESULT_REVIEW = "result_review" # Needs full context for review class ContextAwarePlaceholders: """Context-aware placeholder service that provides different context levels based on workflow phase.""" def __init__(self, services): self.services = services async def getPlaceholders(self, phase: WorkflowPhase, context: Any, additional_data: Dict[str, Any] = None) -> Dict[str, str]: """ Get placeholders based on workflow phase and context. Args: phase: The workflow phase determining context level context: The workflow context object additional_data: Additional data for specific phases (e.g., selected action) Returns: Dictionary of placeholder key-value pairs """ if phase == WorkflowPhase.TASK_PLANNING: return { "USER_PROMPT": extractUserPromptFromService(self.services), "AVAILABLE_DOCUMENTS": extractFullDocumentContext(self.services, context), "WORKFLOW_HISTORY": extractWorkflowHistory(self.services, context), "USER_LANGUAGE": extractUserLanguageFromService(self.services), } elif phase == WorkflowPhase.REACT_PLAN_SELECTION: return { "USER_PROMPT": extractUserPromptFromService(self.services), "AVAILABLE_DOCUMENTS": extractMinimalDocumentContext(self.services, context), "USER_LANGUAGE": extractUserLanguageFromService(self.services), "AVAILABLE_METHODS": extractAvailableMethods(self.services), "AVAILABLE_CONNECTIONS": extractMinimalConnectionContext(self.services), } elif phase == WorkflowPhase.REACT_PARAMETERS: # Get both original user prompt and current task objective original_prompt = extractUserPromptFromService(self.services) current_task = "" if hasattr(context, 'task_step') and context.task_step and context.task_step.objective: current_task = context.task_step.objective # Combine original prompt and current task for better context combined_prompt = f"Original request: {original_prompt}" if current_task and current_task != original_prompt: combined_prompt += f"\n\nCurrent task: {current_task}" # Generate intelligent action objective action_objective = await self._generateActionObjective(context, current_task, original_prompt, additional_data) placeholders = { "USER_PROMPT": combined_prompt, "ACTION_OBJECTIVE": action_objective, # AI-generated intelligent objective "AVAILABLE_DOCUMENTS": extractFullDocumentContext(self.services, context), "USER_LANGUAGE": extractUserLanguageFromService(self.services), "AVAILABLE_CONNECTIONS": extractFullConnectionContext(self.services), "PREVIOUS_ACTION_RESULTS": extractPreviousActionResults(context), "LEARNINGS_AND_IMPROVEMENTS": extractLearningsAndImprovements(context), "LATEST_REFINEMENT_FEEDBACK": extractLatestRefinementFeedback(context), } # Add additional data if provided (e.g., selected action, action signature) if additional_data: placeholders.update(additional_data) return placeholders elif phase == WorkflowPhase.ACTION_PLANNING: return { "USER_PROMPT": extractUserPromptFromService(self.services), "AVAILABLE_DOCUMENTS": extractFullDocumentContext(self.services, context), "WORKFLOW_HISTORY": extractWorkflowHistory(self.services, context), "AVAILABLE_METHODS": extractAvailableMethods(self.services), "AVAILABLE_CONNECTIONS": extractFullConnectionContext(self.services), "USER_LANGUAGE": extractUserLanguageFromService(self.services), } elif phase == WorkflowPhase.RESULT_REVIEW: return { "USER_PROMPT": extractUserPromptFromService(self.services), "REVIEW_CONTENT": extractReviewContent(context), } else: logger.warning(f"Unknown workflow phase: {phase}") return { "USER_PROMPT": extractUserPromptFromService(self.services), "USER_LANGUAGE": extractUserLanguageFromService(self.services), } async def _generateActionObjective(self, context: Any, current_task: str, original_prompt: str, additional_data: Dict[str, Any] = None) -> str: """Generate intelligent, context-aware action objective using AI.""" try: # Get the selected action from additional_data selected_action = additional_data.get('SELECTED_ACTION', '') if additional_data else '' # Build context for AI objective generation context_info = { "original_prompt": original_prompt, "current_task": current_task, "selected_action": selected_action, "available_documents": extractFullDocumentContext(self.services, context), "available_connections": extractFullConnectionContext(self.services), "previous_results": extractPreviousActionResults(context), "learnings": extractLearningsAndImprovements(context), "refinement_feedback": extractLatestRefinementFeedback(context), "user_language": extractUserLanguageFromService(self.services) } # Create AI prompt for objective generation objective_prompt = f"""Generate a specific, actionable objective for the selected action. CONTEXT: - Original User Request: {context_info['original_prompt']} - Current Task: {context_info['current_task']} - Selected Action: {context_info['selected_action']} - Available Documents: {context_info['available_documents']} - Available Connections: {context_info['available_connections']} - Previous Action Results: {context_info['previous_results']} - Learnings and Improvements: {context_info['learnings']} - Latest Refinement Feedback: {context_info['refinement_feedback']} - User Language: {context_info['user_language']} REQUIREMENTS: 1. Create a SPECIFIC objective that tells the action exactly what to accomplish 2. Include relevant details about documents, connections, recipients, etc. 3. Learn from previous attempts and refinement feedback 4. Make it actionable and concrete 5. Focus on the user's actual intent, not just the task description 6. If this is a retry, incorporate learnings from previous failures RESPONSE FORMAT: Return ONLY the objective text, no explanations or formatting. OBJECTIVE:""" # Call AI to generate the objective if self.services and hasattr(self.services, 'ai'): options = AiCallOptions( operationType=OperationType.ANALYSE_CONTENT, priority=Priority.BALANCED, compressPrompt=False, compressContext=False, processingMode=ProcessingMode.ADVANCED, maxCost=0.01, maxProcessingTime=10 ) response = await self.services.ai.callAi( prompt=objective_prompt, placeholders={}, options=options ) # Extract objective from response if response and response.strip(): return response.strip() # Fallback to current task if AI fails return current_task or original_prompt except Exception as e: logger.error(f"Error generating action objective: {str(e)}") # Fallback to current task return current_task or original_prompt