gateway/modules/workflows/methods/methodBase.py
2025-12-03 11:18:33 +01:00

277 lines
No EOL
12 KiB
Python

from typing import Dict, List, Optional, Any, Literal
from datetime import datetime, UTC
import logging
from functools import wraps
import inspect
logger = logging.getLogger(__name__)
def action(func):
"""Decorator to mark a method as an available action
IMPORTANT: Action methods should NOT return resultLabel in their ActionResult.
The resultLabel is managed by the action handler using the action's execResultLabel
from the action plan. This ensures consistent document routing throughout the workflow.
Action methods should only return:
- success: bool
- documents: List[ActionDocument]
- error: str (if success=False)
REQUIRED: All ActionDocument instances MUST include validationMetadata for content validation
and refinement. Without validationMetadata, results cannot be approved.
Example validationMetadata structure:
validationMetadata = {
"actionType": "moduleName.actionName",
"param1": value1,
"param2": value2,
# ... other relevant parameters for validation
}
See MethodBase._createValidationMetadata() for a helper method to create standard metadata.
"""
@wraps(func)
async def wrapper(self, parameters: Dict[str, Any], *args, **kwargs):
return await func(self, parameters, *args, **kwargs)
wrapper.is_action = True
return wrapper
class MethodBase:
"""Base class for all methods
IMPORTANT: All actions that return ActionDocument instances MUST include validationMetadata.
This metadata is required for content validation and refinement. Without it, results cannot
be approved by the validation system.
Use _createValidationMetadata() helper method to create standardized metadata structures.
"""
def __init__(self, services: Any):
"""Initialize method with services object"""
self.services = services
self.name: str
self.description: str
self.logger = logging.getLogger(f"{__name__}.{self.__class__.__name__}")
@property
def actions(self) -> Dict[str, Dict[str, Any]]:
"""Dynamically collect all actions decorated with @action in the class."""
actions = {}
for attr_name in dir(self):
# Skip the actions property itself to avoid recursion
if attr_name == 'actions':
continue
try:
attr = getattr(self, attr_name)
if callable(attr) and getattr(attr, 'is_action', False):
sig = inspect.signature(attr)
params = {}
for param_name, param in sig.parameters.items():
if param_name not in ['self', 'parameters']:
param_type = param.annotation if param.annotation != param.empty else Any
params[param_name] = {
'type': param_type,
'required': param.default == param.empty,
'description': None,
'default': param.default if param.default != param.empty else None
}
actions[attr_name] = {
'description': attr.__doc__ or '',
'parameters': params,
'method': attr
}
except (AttributeError, RecursionError):
# Skip attributes that cause issues
continue
return actions
def getActionSignature(self, actionName: str) -> str:
"""Get formatted action signature for AI prompt generation (detailed version)"""
if actionName not in self.actions:
return ""
action = self.actions[actionName]
paramList = []
# Extract detailed parameter information from docstring
docstring = action.get('description', '')
paramDescriptions, paramTypes = self._extractParameterDetails(docstring)
for paramName in paramDescriptions:
paramType = paramTypes.get(paramName, 'Any')
paramDesc = paramDescriptions.get(paramName, '')
# Mark required parameters with * if possible (not available from docstring, so omit)
if paramDesc:
paramList.append(f"{paramName}:{paramType} # {paramDesc}")
else:
paramList.append(f"{paramName}:{paramType}")
signature = f"{self.name}.{actionName}"
if paramList:
signature += f"({', '.join(paramList)})"
# Add return type and main description
returnType = "ActionResult"
mainDesc = self._extractMainDescription(docstring)
if mainDesc:
signature += f" -> {returnType} # {mainDesc}"
return signature
def _extractParameterDetails(self, docstring: str):
"""Extract parameter names, types, and descriptions from docstring"""
descriptions = {}
types = {}
if not docstring:
return descriptions, types
lines = docstring.split('\n')
inParameters = False
for line in lines:
line = line.strip()
if 'Parameters:' in line:
inParameters = True
continue
elif inParameters and (line.startswith('Returns:') or line.startswith('Raises:') or line.startswith('Args:')):
break
elif inParameters and line:
# Look for parameter descriptions like "paramName (type): description"
if ':' in line and '(' in line:
parts = line.split(':', 1)
if len(parts) == 2:
paramPart = parts[0].strip()
descPart = parts[1].strip()
# Extract parameter name and type
if '(' in paramPart:
paramName = paramPart.split('(')[0].strip()
# Normalize bullet-prefixed parameter names like "- aiPrompt" or "* aiPrompt"
if paramName.startswith('-') or paramName.startswith('*'):
paramName = paramName[1:].strip()
paramType = paramPart[paramPart.find('(')+1:paramPart.find(')')].strip()
descriptions[paramName] = descPart
types[paramName] = paramType
# Also handle multi-line descriptions
elif line and not line.startswith('Each document') and not line.startswith('contains'):
if descriptions:
lastParam = list(descriptions.keys())[-1]
descriptions[lastParam] += " " + line
return descriptions, types
def _extractMainDescription(self, docstring: str) -> str:
"""Extract main description from docstring"""
if not docstring:
return ""
lines = docstring.split('\n')
mainDesc = ""
for line in lines:
line = line.strip()
if line and not line.startswith('Parameters:') and not line.startswith('Returns:') and not line.startswith('Raises:'):
mainDesc = line
break
return mainDesc
def _formatType(self, type_annotation) -> str:
"""Format type annotation for display"""
if type_annotation == Any:
return "Any"
elif hasattr(type_annotation, '__name__'):
return type_annotation.__name__
elif hasattr(type_annotation, '_name'):
return type_annotation._name
else:
return str(type_annotation)
def _createValidationMetadata(self, actionName: str, **kwargs) -> Dict[str, Any]:
"""
Helper method to create standardized validationMetadata for ActionDocument instances.
This method ensures all actions include the required validationMetadata structure
for content validation and refinement. Without metadata, results cannot be approved.
Args:
actionName: Name of the action (e.g., "readEmails", "uploadDocument")
**kwargs: Additional action-specific metadata fields
Returns:
Dictionary with validationMetadata structure including:
- actionType: Full action identifier (moduleName.actionName)
- All provided kwargs as additional metadata fields
Example:
validationMetadata = self._createValidationMetadata(
"readEmails",
connectionReference=connectionReference,
folder=folder,
limit=limit,
emailCount=len(emails)
)
ActionDocument(
documentName="emails.json",
documentData=json.dumps(data),
mimeType="application/json",
validationMetadata=validationMetadata # REQUIRED
)
"""
metadata = {
"actionType": f"{self.name}.{actionName}"
}
metadata.update(kwargs)
return metadata
def _generateMeaningfulFileName(self, base_name: str, extension: str, workflow_context: Dict[str, Any] = None, action_name: str = None) -> str:
"""
Generate a meaningful file name with round/task/action information.
Format: {base_name}_alpha_r{round}t{task}a{action}.{extension}
Example: report_alpha_r1t3a4.json
Args:
base_name: Base name for the file (e.g., "report", "analysis", "summary")
extension: File extension without dot (e.g., "json", "html", "txt")
workflow_context: Dictionary with currentRound, currentTask, currentAction
action_name: Name of the action being performed (optional, for additional context)
Returns:
Formatted file name string
"""
try:
# Get workflow context from services if not provided
if workflow_context is None and hasattr(self.services, 'workflow'):
workflow_context = self.services.chat.getWorkflowContext()
# Extract round, task, action numbers
round_num = workflow_context.get('currentRound', 0) if workflow_context else 0
task_num = workflow_context.get('currentTask', 0) if workflow_context else 0
action_num = workflow_context.get('currentAction', 0) if workflow_context else 0
# Clean base name (remove special characters, spaces)
clean_base = base_name.lower().replace(' ', '_').replace('-', '_')
# Remove any non-alphanumeric characters except underscores
import re
clean_base = re.sub(r'[^a-z0-9_]', '', clean_base)
# Add action name if provided
if action_name:
clean_action = action_name.lower().replace(' ', '_').replace('-', '_')
clean_action = re.sub(r'[^a-z0-9_]', '', clean_action)
clean_base = f"{clean_base}_{clean_action}"
# Generate the meaningful file name
meaningful_name = f"{clean_base}_r{round_num}t{task_num}a{action_num}.{extension}"
self.logger.debug(f"Generated meaningful file name: {meaningful_name} (Round: {round_num}, Task: {task_num}, Action: {action_num})")
return meaningful_name
except Exception as e:
self.logger.warning(f"Error generating meaningful file name, using fallback: {str(e)}")
# Fallback to timestamp-based naming
timestamp = datetime.now(UTC).strftime("%Y%m%d_%H%M%S")
return f"{base_name}_{timestamp}.{extension}"