""" Context-aware placeholder service for different workflow phases. This module provides different levels of context based on the workflow phase. """ import json import logging from typing import Dict, Any, Optional from enum import Enum logger = logging.getLogger(__name__) class WorkflowPhase(Enum): """Different phases of workflow execution requiring different context levels.""" TASK_PLANNING = "task_planning" # Needs full context for planning REACT_PLAN_SELECTION = "react_plan_selection" # Needs minimal context for action selection REACT_PARAMETERS = "react_parameters" # Needs full context for parameter generation ACTION_PLANNING = "action_planning" # Needs full context for action planning RESULT_REVIEW = "result_review" # Needs full context for review class ContextAwarePlaceholders: """Context-aware placeholder service that provides different context levels based on workflow phase.""" def __init__(self, services): self.services = services def getPlaceholders(self, phase: WorkflowPhase, context: Any, additional_data: Dict[str, Any] = None) -> Dict[str, str]: """ Get placeholders based on workflow phase and context. Args: phase: The workflow phase determining context level context: The workflow context object additional_data: Additional data for specific phases (e.g., selected action) Returns: Dictionary of placeholder key-value pairs """ if phase == WorkflowPhase.TASK_PLANNING: return self._getTaskPlanningPlaceholders(context) elif phase == WorkflowPhase.REACT_PLAN_SELECTION: return self._getReactPlanSelectionPlaceholders(context) elif phase == WorkflowPhase.REACT_PARAMETERS: return self._getReactParametersPlaceholders(context, additional_data) elif phase == WorkflowPhase.ACTION_PLANNING: return self._getActionPlanningPlaceholders(context) elif phase == WorkflowPhase.RESULT_REVIEW: return self._getResultReviewPlaceholders(context) else: logger.warning(f"Unknown workflow phase: {phase}") return self._getMinimalPlaceholders(context) def _getTaskPlanningPlaceholders(self, context: Any) -> Dict[str, str]: """Get full context placeholders for task planning.""" return { "USER_PROMPT": self._extractUserPrompt(context), "AVAILABLE_DOCUMENTS": self._getFullDocumentContext(context), "WORKFLOW_HISTORY": self._getWorkflowHistory(context), "USER_LANGUAGE": self._extractUserLanguage(), } def _getReactPlanSelectionPlaceholders(self, context: Any) -> Dict[str, str]: """Get minimal context placeholders for React plan selection.""" return { "USER_PROMPT": self._extractUserPrompt(context), "AVAILABLE_DOCUMENTS": self._getMinimalDocumentContext(context), "USER_LANGUAGE": self._extractUserLanguage(), "AVAILABLE_METHODS": self._getAvailableMethods(), "AVAILABLE_CONNECTIONS": self._getMinimalConnectionContext(), } def _getReactParametersPlaceholders(self, context: Any, additional_data: Dict[str, Any] = None) -> Dict[str, str]: """Get full context placeholders for React parameter generation.""" placeholders = { "USER_PROMPT": self._extractUserPrompt(context), "AVAILABLE_DOCUMENTS": self._getFullDocumentContext(context), "USER_LANGUAGE": self._extractUserLanguage(), "AVAILABLE_CONNECTIONS": self._getFullConnectionContext(), } # Add additional data if provided (e.g., selected action, action signature) if additional_data: placeholders.update(additional_data) return placeholders def _getActionPlanningPlaceholders(self, context: Any) -> Dict[str, str]: """Get full context placeholders for action planning.""" return { "USER_PROMPT": self._extractUserPrompt(context), "AVAILABLE_DOCUMENTS": self._getFullDocumentContext(context), "WORKFLOW_HISTORY": self._getWorkflowHistory(context), "AVAILABLE_METHODS": self._getAvailableMethods(), "AVAILABLE_CONNECTIONS": self._getFullConnectionContext(), "USER_LANGUAGE": self._extractUserLanguage(), } def _getResultReviewPlaceholders(self, context: Any) -> Dict[str, str]: """Get full context placeholders for result review.""" return { "USER_PROMPT": self._extractUserPrompt(context), "REVIEW_CONTENT": self._getReviewContent(context), } def _getMinimalPlaceholders(self, context: Any) -> Dict[str, str]: """Get minimal placeholders as fallback.""" return { "USER_PROMPT": self._extractUserPrompt(context), "USER_LANGUAGE": self._extractUserLanguage(), } # Helper methods for extracting different context levels def _extractUserPrompt(self, context: Any) -> str: """Extract user prompt from context.""" # Get the current user prompt from services (clean and reliable) if self.services and hasattr(self.services, 'currentUserPrompt') and self.services.currentUserPrompt: return self.services.currentUserPrompt # Fallback to task step objective if no current prompt found if hasattr(context, 'task_step') and context.task_step: return context.task_step.objective or 'No request specified' return 'No request specified' def _extractUserLanguage(self) -> str: """Extract user language from service.""" return self.services.user.language if self.services and self.services.user else 'en' def _getMinimalDocumentContext(self, context: Any) -> str: """Get minimal document context (counts only) for React plan selection.""" try: if hasattr(context, 'workflow') and context.workflow: # Get document count from workflow documents = self.services.workflow.getAvailableDocuments(context.workflow) if documents and documents != "No documents available": # Count documents by counting docList and docItem references doc_count = documents.count("docList:") + documents.count("docItem:") return f"{doc_count} documents available from previous tasks" else: return "No documents available" return "No documents available" except Exception as e: logger.error(f"Error getting minimal document context: {str(e)}") return "No documents available" def _getFullDocumentContext(self, context: Any) -> str: """Get full document context with detailed references for parameter generation.""" try: if hasattr(context, 'workflow') and context.workflow: return self.services.workflow.getAvailableDocuments(context.workflow) return "No documents available" except Exception as e: logger.error(f"Error getting full document context: {str(e)}") return "No documents available" def _getMinimalConnectionContext(self) -> str: """Get minimal connection context (count only) for React plan selection.""" try: connections = self.services.workflow.getConnectionReferenceList() if connections: return f"{len(connections)} connections available" return "No connections available" except Exception as e: logger.error(f"Error getting minimal connection context: {str(e)}") return "No connections available" def _getFullConnectionContext(self) -> str: """Get full connection context with detailed references for parameter generation.""" try: connections = self.services.workflow.getConnectionReferenceList() if connections: return '\n'.join(f"- {conn}" for conn in connections) return "No connections available" except Exception as e: logger.error(f"Error getting full connection context: {str(e)}") return "No connections available" def _getWorkflowHistory(self, context: Any) -> str: """Get workflow history for task planning.""" try: if hasattr(context, 'workflow') and context.workflow: from modules.workflows.processing.shared.promptFactory import getPreviousRoundContext return getPreviousRoundContext(self.services, context.workflow) or "No previous workflow rounds - this is the first round." return "No previous workflow rounds - this is the first round." except Exception as e: logger.error(f"Error getting workflow history: {str(e)}") return "No previous workflow rounds - this is the first round." def _getAvailableMethods(self) -> str: """Get available methods for action selection and planning using compound action names.""" try: from modules.workflows.processing.shared.promptFactory import methods, discoverMethods # Get the methods dictionary if not methods: discoverMethods(self.services) # Create a flat JSON format with compound action names for better AI parsing available_actions_json = {} for methodName, methodInfo in methods.items(): # Convert MethodAi -> ai, MethodDocument -> document, etc. shortName = methodName.replace('Method', '').lower() for actionName, actionInfo in methodInfo['actions'].items(): # Create compound action name: method.action compoundActionName = f"{shortName}.{actionName}" # Get the action description action_description = actionInfo.get('description', f"Execute {actionName} action") available_actions_json[compoundActionName] = action_description return json.dumps(available_actions_json, indent=2, ensure_ascii=False) except Exception as e: logger.error(f"Error extracting available methods: {str(e)}") return json.dumps({}, indent=2, ensure_ascii=False) def _getReviewContent(self, context: Any) -> str: """Get review content for result validation.""" try: from modules.workflows.processing.shared.promptFactoryPlaceholders import extractReviewContent return extractReviewContent(context) except Exception as e: logger.error(f"Error getting review content: {str(e)}") return "No review content available"