217 lines
10 KiB
Python
217 lines
10 KiB
Python
"""
|
|
Context-aware placeholder service for different workflow phases.
|
|
This module provides different levels of context based on the workflow phase.
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
from typing import Dict, Any, Optional
|
|
from enum import Enum
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class WorkflowPhase(Enum):
|
|
"""Different phases of workflow execution requiring different context levels."""
|
|
TASK_PLANNING = "task_planning" # Needs full context for planning
|
|
REACT_PLAN_SELECTION = "react_plan_selection" # Needs minimal context for action selection
|
|
REACT_PARAMETERS = "react_parameters" # Needs full context for parameter generation
|
|
ACTION_PLANNING = "action_planning" # Needs full context for action planning
|
|
RESULT_REVIEW = "result_review" # Needs full context for review
|
|
|
|
class ContextAwarePlaceholders:
|
|
"""Context-aware placeholder service that provides different context levels based on workflow phase."""
|
|
|
|
def __init__(self, services):
|
|
self.services = services
|
|
|
|
def getPlaceholders(self, phase: WorkflowPhase, context: Any, additional_data: Dict[str, Any] = None) -> Dict[str, str]:
|
|
"""
|
|
Get placeholders based on workflow phase and context.
|
|
|
|
Args:
|
|
phase: The workflow phase determining context level
|
|
context: The workflow context object
|
|
additional_data: Additional data for specific phases (e.g., selected action)
|
|
|
|
Returns:
|
|
Dictionary of placeholder key-value pairs
|
|
"""
|
|
if phase == WorkflowPhase.TASK_PLANNING:
|
|
return self._getTaskPlanningPlaceholders(context)
|
|
elif phase == WorkflowPhase.REACT_PLAN_SELECTION:
|
|
return self._getReactPlanSelectionPlaceholders(context)
|
|
elif phase == WorkflowPhase.REACT_PARAMETERS:
|
|
return self._getReactParametersPlaceholders(context, additional_data)
|
|
elif phase == WorkflowPhase.ACTION_PLANNING:
|
|
return self._getActionPlanningPlaceholders(context)
|
|
elif phase == WorkflowPhase.RESULT_REVIEW:
|
|
return self._getResultReviewPlaceholders(context)
|
|
else:
|
|
logger.warning(f"Unknown workflow phase: {phase}")
|
|
return self._getMinimalPlaceholders(context)
|
|
|
|
def _getTaskPlanningPlaceholders(self, context: Any) -> Dict[str, str]:
|
|
"""Get full context placeholders for task planning."""
|
|
return {
|
|
"USER_PROMPT": self._extractUserPrompt(context),
|
|
"AVAILABLE_DOCUMENTS": self._getFullDocumentContext(context),
|
|
"WORKFLOW_HISTORY": self._getWorkflowHistory(context),
|
|
"USER_LANGUAGE": self._extractUserLanguage(),
|
|
}
|
|
|
|
def _getReactPlanSelectionPlaceholders(self, context: Any) -> Dict[str, str]:
|
|
"""Get minimal context placeholders for React plan selection."""
|
|
return {
|
|
"USER_PROMPT": self._extractUserPrompt(context),
|
|
"AVAILABLE_DOCUMENTS": self._getMinimalDocumentContext(context),
|
|
"USER_LANGUAGE": self._extractUserLanguage(),
|
|
"AVAILABLE_METHODS": self._getAvailableMethods(),
|
|
"AVAILABLE_CONNECTIONS": self._getMinimalConnectionContext(),
|
|
}
|
|
|
|
def _getReactParametersPlaceholders(self, context: Any, additional_data: Dict[str, Any] = None) -> Dict[str, str]:
|
|
"""Get full context placeholders for React parameter generation."""
|
|
placeholders = {
|
|
"USER_PROMPT": self._extractUserPrompt(context),
|
|
"AVAILABLE_DOCUMENTS": self._getFullDocumentContext(context),
|
|
"USER_LANGUAGE": self._extractUserLanguage(),
|
|
"AVAILABLE_CONNECTIONS": self._getFullConnectionContext(),
|
|
}
|
|
|
|
# Add additional data if provided (e.g., selected action, action signature)
|
|
if additional_data:
|
|
placeholders.update(additional_data)
|
|
|
|
return placeholders
|
|
|
|
def _getActionPlanningPlaceholders(self, context: Any) -> Dict[str, str]:
|
|
"""Get full context placeholders for action planning."""
|
|
return {
|
|
"USER_PROMPT": self._extractUserPrompt(context),
|
|
"AVAILABLE_DOCUMENTS": self._getFullDocumentContext(context),
|
|
"WORKFLOW_HISTORY": self._getWorkflowHistory(context),
|
|
"AVAILABLE_METHODS": self._getAvailableMethods(),
|
|
"AVAILABLE_CONNECTIONS": self._getFullConnectionContext(),
|
|
"USER_LANGUAGE": self._extractUserLanguage(),
|
|
}
|
|
|
|
def _getResultReviewPlaceholders(self, context: Any) -> Dict[str, str]:
|
|
"""Get full context placeholders for result review."""
|
|
return {
|
|
"USER_PROMPT": self._extractUserPrompt(context),
|
|
"REVIEW_CONTENT": self._getReviewContent(context),
|
|
}
|
|
|
|
def _getMinimalPlaceholders(self, context: Any) -> Dict[str, str]:
|
|
"""Get minimal placeholders as fallback."""
|
|
return {
|
|
"USER_PROMPT": self._extractUserPrompt(context),
|
|
"USER_LANGUAGE": self._extractUserLanguage(),
|
|
}
|
|
|
|
# Helper methods for extracting different context levels
|
|
|
|
def _extractUserPrompt(self, context: Any) -> str:
|
|
"""Extract user prompt from context."""
|
|
if hasattr(context, 'task_step') and context.task_step:
|
|
return context.task_step.objective or 'No request specified'
|
|
return 'No request specified'
|
|
|
|
def _extractUserLanguage(self) -> str:
|
|
"""Extract user language from service."""
|
|
return self.services.user.language if self.services and self.services.user else 'en'
|
|
|
|
def _getMinimalDocumentContext(self, context: Any) -> str:
|
|
"""Get minimal document context (counts only) for React plan selection."""
|
|
try:
|
|
if hasattr(context, 'workflow') and context.workflow:
|
|
# Get document count from workflow
|
|
documents = self.services.workflow.getAvailableDocuments(context.workflow)
|
|
if documents and documents != "No documents available":
|
|
# Count documents by counting docList and docItem references
|
|
doc_count = documents.count("docList:") + documents.count("docItem:")
|
|
return f"{doc_count} documents available from previous tasks"
|
|
else:
|
|
return "No documents available"
|
|
return "No documents available"
|
|
except Exception as e:
|
|
logger.error(f"Error getting minimal document context: {str(e)}")
|
|
return "No documents available"
|
|
|
|
def _getFullDocumentContext(self, context: Any) -> str:
|
|
"""Get full document context with detailed references for parameter generation."""
|
|
try:
|
|
if hasattr(context, 'workflow') and context.workflow:
|
|
return self.services.workflow.getAvailableDocuments(context.workflow)
|
|
return "No documents available"
|
|
except Exception as e:
|
|
logger.error(f"Error getting full document context: {str(e)}")
|
|
return "No documents available"
|
|
|
|
def _getMinimalConnectionContext(self) -> str:
|
|
"""Get minimal connection context (count only) for React plan selection."""
|
|
try:
|
|
connections = self.services.workflow.getConnectionReferenceList()
|
|
if connections:
|
|
return f"{len(connections)} connections available"
|
|
return "No connections available"
|
|
except Exception as e:
|
|
logger.error(f"Error getting minimal connection context: {str(e)}")
|
|
return "No connections available"
|
|
|
|
def _getFullConnectionContext(self) -> str:
|
|
"""Get full connection context with detailed references for parameter generation."""
|
|
try:
|
|
connections = self.services.workflow.getConnectionReferenceList()
|
|
if connections:
|
|
return '\n'.join(f"- {conn}" for conn in connections)
|
|
return "No connections available"
|
|
except Exception as e:
|
|
logger.error(f"Error getting full connection context: {str(e)}")
|
|
return "No connections available"
|
|
|
|
def _getWorkflowHistory(self, context: Any) -> str:
|
|
"""Get workflow history for task planning."""
|
|
try:
|
|
if hasattr(context, 'workflow') and context.workflow:
|
|
from modules.workflows.processing.shared.promptFactory import getPreviousRoundContext
|
|
return getPreviousRoundContext(self.services, context.workflow) or "No previous workflow rounds - this is the first round."
|
|
return "No previous workflow rounds - this is the first round."
|
|
except Exception as e:
|
|
logger.error(f"Error getting workflow history: {str(e)}")
|
|
return "No previous workflow rounds - this is the first round."
|
|
|
|
def _getAvailableMethods(self) -> str:
|
|
"""Get available methods for action selection and planning."""
|
|
try:
|
|
from modules.workflows.processing.shared.promptFactory import methods, discoverMethods
|
|
|
|
# Get the methods dictionary
|
|
if not methods:
|
|
discoverMethods(self.services)
|
|
|
|
# Create a structured JSON format for better AI parsing
|
|
available_methods_json = {}
|
|
for methodName, methodInfo in methods.items():
|
|
# Convert MethodAi -> ai, MethodDocument -> document, etc.
|
|
shortName = methodName.replace('Method', '').lower()
|
|
available_methods_json[shortName] = {}
|
|
|
|
for actionName, actionInfo in methodInfo['actions'].items():
|
|
# Get the action description
|
|
action_description = actionInfo.get('description', f"Execute {actionName} action")
|
|
available_methods_json[shortName][actionName] = action_description
|
|
|
|
return json.dumps(available_methods_json, indent=2, ensure_ascii=False)
|
|
except Exception as e:
|
|
logger.error(f"Error extracting available methods: {str(e)}")
|
|
return json.dumps({}, indent=2, ensure_ascii=False)
|
|
|
|
def _getReviewContent(self, context: Any) -> str:
|
|
"""Get review content for result validation."""
|
|
try:
|
|
from modules.workflows.processing.shared.promptFactoryPlaceholders import extractReviewContent
|
|
return extractReviewContent(context)
|
|
except Exception as e:
|
|
logger.error(f"Error getting review content: {str(e)}")
|
|
return "No review content available"
|