gateway/modules/workflows/processing/shared/placeholderFactory.py
2025-10-05 15:01:22 +02:00

338 lines
17 KiB
Python

"""
Placeholder Factory
Centralized placeholder extraction functions for all workflow modes.
Each function corresponds to a {{KEY:PLACEHOLDER_NAME}} in prompt templates.
NAMING CONVENTION:
- All functions follow pattern: extract{PlaceholderName}()
- Placeholder names are in UPPER_CASE with underscores
- Function names are in camelCase
MAPPING TABLE (keys → function) with usage [global | react | actionplan]:
{{KEY:USER_PROMPT}} -> extractUserPrompt() [global, react, actionplan]
{{KEY:USER_LANGUAGE}} -> extractUserLanguage() [react, actionplan]
{{KEY:AVAILABLE_DOCUMENTS_SUMMARY}} -> extractAvailableDocumentsSummary() [react, actionplan]
{{KEY:AVAILABLE_DOCUMENTS_INDEX}} -> extractAvailableDocumentsIndex() [react]
{{KEY:AVAILABLE_CONNECTIONS_INDEX}} -> extractAvailableConnectionsIndex() [react, actionplan]
{{KEY:AVAILABLE_CONNECTIONS_SUMMARY}} -> extractAvailableConnectionsSummary() [unused]
{{KEY:WORKFLOW_HISTORY}} -> extractWorkflowHistory() [actionplan]
{{KEY:AVAILABLE_METHODS}} -> extractAvailableMethods() [react, actionplan]
{{KEY:REVIEW_CONTENT}} -> extractReviewContent() [react, actionplan]
{{KEY:PREVIOUS_ACTION_RESULTS}} -> extractPreviousActionResults() [react]
{{KEY:LEARNINGS_AND_IMPROVEMENTS}} -> extractLearningsAndImprovements() [react]
{{KEY:LATEST_REFINEMENT_FEEDBACK}} -> extractLatestRefinementFeedback() [react]
Following placeholders are populated directly by prompt builders with according context in promptGenerationActionsReact module:
- ACTION_OBJECTIVE,
- SELECTED_ACTION,
- ACTION_SIGNATURE
"""
import json
import logging
from typing import Dict, Any, List
from modules.datamodels.datamodelChat import ChatDocument
logger = logging.getLogger(__name__)
from modules.workflows.processing.shared.methodDiscovery import (methods, discoverMethods)
def extractUserPrompt(context: Any) -> str:
"""Extract user prompt from context. Maps to {{KEY:USER_PROMPT}}.
Prefer the cleaned intent stored on the services object if available via context.
Fallback to the task_step objective.
"""
try:
# Prefer services.currentUserPrompt when accessible through context
services = getattr(context, 'services', None)
if services and getattr(services, 'currentUserPrompt', None):
return services.currentUserPrompt
except Exception:
pass
if hasattr(context, 'task_step') and context.task_step:
return context.task_step.objective or 'No request specified'
return 'No request specified'
def extractWorkflowHistory(service: Any, context: Any) -> str:
"""Extract workflow history from context. Maps to {{KEY:WORKFLOW_HISTORY}}"""
if hasattr(context, 'workflow') and context.workflow:
return getPreviousRoundContext(service, context.workflow) or "No previous workflow rounds - this is the first round."
return "No previous workflow rounds - this is the first round."
def extractAvailableMethods(service: Any) -> str:
"""Extract available methods for action planning. Maps to {{KEY:AVAILABLE_METHODS}}"""
try:
# Get the methods dictionary directly from the global methods variable
if not methods:
discoverMethods(service)
# Create a flat JSON format with compound action names for better AI parsing
available_actions_json = {}
for methodName, methodInfo in methods.items():
# Convert MethodAi -> ai, MethodDocument -> document, etc.
shortName = methodName.replace('Method', '').lower()
for actionName, actionInfo in methodInfo['actions'].items():
# Create compound action name: method.action
compoundActionName = f"{shortName}.{actionName}"
# Get the action description
action_description = actionInfo.get('description', f"Execute {actionName} action")
available_actions_json[compoundActionName] = action_description
return json.dumps(available_actions_json, indent=2, ensure_ascii=False)
except Exception as e:
logger.error(f"Error extracting available methods: {str(e)}")
return json.dumps({}, indent=2, ensure_ascii=False)
def extractUserLanguage(service: Any) -> str:
"""Extract user language from service. Maps to {{KEY:USER_LANGUAGE}}"""
return service.user.language if service and service.user else 'en'
def getConnectionReferenceList(services) -> List[str]:
"""Get list of available connections"""
try:
# Get connections from the database
if hasattr(services, 'interfaceDbApp') and hasattr(services, 'user'):
userId = services.user.id
connections = services.interfaceDbApp.getUserConnections(userId)
if connections:
# Format connections as reference strings
connectionRefs = []
for conn in connections:
# Create reference string in format: conn_{authority}_{id}
ref = f"conn_{conn.authority.value}_{conn.id}"
connectionRefs.append(ref)
return connectionRefs
return []
except Exception as e:
logger.error(f"Error getting connection reference list: {str(e)}")
return []
def getPreviousRoundContext(services, context: Any) -> str:
"""Get previous round context for prompt"""
try:
if not context or not hasattr(context, 'workflow_id'):
return "No previous round context available"
workflowId = context.workflow_id
if not workflowId:
return "No previous round context available"
# Get previous round results
previousResults = getattr(context, 'previous_results', [])
if not previousResults:
return "No previous round context available"
contextList = []
for i, result in enumerate(previousResults, 1):
if hasattr(result, 'success') and hasattr(result, 'resultLabel'):
status = "Success" if result.success else "Failed"
contextList.append(f"{i}. {result.resultLabel} - {status}")
elif isinstance(result, dict):
status = "Success" if result.get('success', False) else "Failed"
label = result.get('resultLabel', 'Unknown')
contextList.append(f"{i}. {label} - {status}")
else:
contextList.append(f"{i}. {str(result)}")
return "\n".join(contextList) if contextList else "No previous round context available"
except Exception as e:
logger.error(f"Error getting previous round context: {str(e)}")
return "Error retrieving previous round context"
def extractReviewContent(context: Any) -> str:
"""Extract review content for result validation. Maps to {{KEY:REVIEW_CONTENT}}"""
try:
if hasattr(context, 'action_results') and context.action_results:
# Build result summary
result_summary = ""
for i, result in enumerate(context.action_results):
result_summary += f"\nRESULT {i+1}:\n"
result_summary += f" Success: {result.success}\n"
if result.error:
result_summary += f" Error: {result.error}\n"
if result.documents:
result_summary += f" Documents: {len(result.documents)} document(s)\n"
for doc in result.documents:
# Extract all available metadata without content
doc_metadata = {
"name": getattr(doc, 'documentName', 'Unknown'),
"mimeType": getattr(doc, 'mimeType', 'Unknown'),
"size": getattr(doc, 'size', 'Unknown'),
"created": getattr(doc, 'created', 'Unknown'),
"modified": getattr(doc, 'modified', 'Unknown'),
"typeGroup": getattr(doc, 'typeGroup', 'Unknown'),
"documentId": getattr(doc, 'documentId', 'Unknown'),
"reference": getattr(doc, 'reference', 'Unknown')
}
# Remove 'Unknown' values to keep it clean
doc_metadata = {k: v for k, v in doc_metadata.items() if v != 'Unknown'}
result_summary += f" - {json.dumps(doc_metadata, indent=6, ensure_ascii=False)}\n"
else:
result_summary += f" Documents: None\n"
return result_summary
elif hasattr(context, 'observation') and context.observation:
# For observation data, show full content but handle documents specially
if isinstance(context.observation, dict):
# Create a copy to modify
obs_copy = context.observation.copy()
# If there are previews with documents, show only metadata
if 'previews' in obs_copy and isinstance(obs_copy['previews'], list):
for preview in obs_copy['previews']:
if isinstance(preview, dict) and 'snippet' in preview:
# Replace snippet with metadata indicator
preview['snippet'] = f"[Content: {len(preview.get('snippet', ''))} characters]"
return json.dumps(obs_copy, indent=2, ensure_ascii=False)
else:
return json.dumps(context.observation, ensure_ascii=False)
elif hasattr(context, 'step_result') and context.step_result and 'observation' in context.step_result:
# For observation data in step_result, show full content but handle documents specially
observation = context.step_result['observation']
if isinstance(observation, dict):
# Create a copy to modify
obs_copy = observation.copy()
# If there are previews with documents, show only metadata
if 'previews' in obs_copy and isinstance(obs_copy['previews'], list):
for preview in obs_copy['previews']:
if isinstance(preview, dict) and 'snippet' in preview:
# Replace snippet with metadata indicator
preview['snippet'] = f"[Content: {len(preview.get('snippet', ''))} characters]"
return json.dumps(obs_copy, indent=2, ensure_ascii=False)
else:
return json.dumps(observation, ensure_ascii=False)
else:
return "No review content available"
except Exception as e:
logger.error(f"Error extracting review content: {str(e)}")
return "No review content available"
def extractPreviousActionResults(context: Any) -> str:
"""Extract previous action results for learning context. Maps to {{KEY:PREVIOUS_ACTION_RESULTS}}"""
try:
if not hasattr(context, 'previous_action_results') or not context.previous_action_results:
return "No previous actions executed yet"
results = []
for i, result in enumerate(context.previous_action_results[-5:], 1): # Last 5 results
if hasattr(result, 'resultLabel') and hasattr(result, 'status'):
status = "SUCCESS" if result.status == "completed" else "FAILED"
results.append(f"Action {i}: {result.resultLabel} - {status}")
if hasattr(result, 'error') and result.error:
results.append(f" Error: {result.error}")
return "\n".join(results) if results else "No previous actions executed yet"
except Exception as e:
logger.error(f"Error extracting previous action results: {str(e)}")
return "No previous actions executed yet"
def extractLearningsAndImprovements(context: Any) -> str:
"""Extract learnings and improvements from previous actions. Maps to {{KEY:LEARNINGS_AND_IMPROVEMENTS}}"""
try:
learnings = []
# Get improvements from context
if hasattr(context, 'improvements') and context.improvements and isinstance(context.improvements, list):
learnings.append("IMPROVEMENTS:")
for improvement in context.improvements[-3:]: # Last 3 improvements
learnings.append(f"- {improvement}")
# Get failure patterns
if hasattr(context, 'failure_patterns') and context.failure_patterns and isinstance(context.failure_patterns, list):
learnings.append("FAILURE PATTERNS TO AVOID:")
for pattern in context.failure_patterns[-3:]: # Last 3 patterns
learnings.append(f"- {pattern}")
# Get successful actions
if hasattr(context, 'successful_actions') and context.successful_actions and isinstance(context.successful_actions, list):
learnings.append("SUCCESSFUL APPROACHES:")
for action in context.successful_actions[-3:]: # Last 3 successful
learnings.append(f"- {action}")
return "\n".join(learnings) if learnings else "No learnings available yet"
except Exception as e:
logger.error(f"Error extracting learnings and improvements: {str(e)}")
return "No learnings available yet"
def extractLatestRefinementFeedback(context: Any) -> str:
"""Extract the latest refinement feedback. Maps to {{KEY:LATEST_REFINEMENT_FEEDBACK}}"""
try:
if not hasattr(context, 'previous_review_result') or not context.previous_review_result or not isinstance(context.previous_review_result, list):
return "No previous refinement feedback available"
# Get the most recent refinement decision
latest_decision = context.previous_review_result[-1]
if not isinstance(latest_decision, dict):
return "No previous refinement feedback available"
feedback_parts = []
# Add decision and reason
decision = latest_decision.get('decision', 'unknown')
reason = latest_decision.get('reason', 'No reason provided')
feedback_parts.append(f"Latest Decision: {decision}")
feedback_parts.append(f"Reason: {reason}")
# Add any specific feedback or suggestions
if 'feedback' in latest_decision:
feedback_parts.append(f"Feedback: {latest_decision['feedback']}")
if 'suggestions' in latest_decision:
feedback_parts.append(f"Suggestions: {latest_decision['suggestions']}")
return "\n".join(feedback_parts)
except Exception as e:
logger.error(f"Error extracting latest refinement feedback: {str(e)}")
return "No previous refinement feedback available"
def extractAvailableDocumentsSummary(service: Any, context: Any) -> str:
"""Summary of available documents (count only)."""
try:
if hasattr(context, 'workflow') and context.workflow:
documents = service.workflow.getAvailableDocuments(context.workflow)
if documents and documents != "No documents available":
doc_count = documents.count("docList:") + documents.count("docItem:")
return f"{doc_count} documents available from previous tasks"
return "No documents available"
return "No documents available"
except Exception as e:
logger.error(f"Error getting document summary: {str(e)}")
return "No documents available"
def extractAvailableDocumentsIndex(service: Any, context: Any) -> str:
"""Index of available documents with detailed references for parameter generation."""
try:
if hasattr(context, 'workflow') and context.workflow:
return service.workflow.getAvailableDocuments(context.workflow)
return "No documents available"
except Exception as e:
logger.error(f"Error getting document index: {str(e)}")
return "No documents available"
def extractAvailableConnectionsSummary(service: Any) -> str:
"""Summary of available connections (count only)."""
try:
connections = getConnectionReferenceList(service)
if connections:
return f"{len(connections)} connections available"
return "No connections available"
except Exception as e:
logger.error(f"Error getting connection summary: {str(e)}")
return "No connections available"
def extractAvailableConnectionsIndex(service: Any) -> str:
"""Index of available connections with detailed references for parameter generation."""
try:
connections = getConnectionReferenceList(service)
if connections:
return '\n'.join(f"- {conn}" for conn in connections)
return "No connections available"
except Exception as e:
logger.error(f"Error getting connection index: {str(e)}")
return "No connections available"