219 lines
No EOL
9.4 KiB
Python
219 lines
No EOL
9.4 KiB
Python
from typing import Dict, List, Optional, Any, Literal
|
|
from datetime import datetime, UTC
|
|
import logging
|
|
|
|
from functools import wraps
|
|
import inspect
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
def action(func):
|
|
"""Decorator to mark a method as an available action
|
|
|
|
IMPORTANT: Action methods should NOT return resultLabel in their ActionResult.
|
|
The resultLabel is managed by the action handler using the action's execResultLabel
|
|
from the action plan. This ensures consistent document routing throughout the workflow.
|
|
|
|
Action methods should only return:
|
|
- success: bool
|
|
- documents: List[ActionDocument]
|
|
- error: str (if success=False)
|
|
"""
|
|
@wraps(func)
|
|
async def wrapper(self, parameters: Dict[str, Any], *args, **kwargs):
|
|
return await func(self, parameters, *args, **kwargs)
|
|
wrapper.is_action = True
|
|
return wrapper
|
|
|
|
class MethodBase:
|
|
"""Base class for all methods"""
|
|
|
|
def __init__(self, services: Any):
|
|
"""Initialize method with services object"""
|
|
self.services = services
|
|
self.name: str
|
|
self.description: str
|
|
self.logger = logging.getLogger(f"{__name__}.{self.__class__.__name__}")
|
|
|
|
@property
|
|
def actions(self) -> Dict[str, Dict[str, Any]]:
|
|
"""Dynamically collect all actions decorated with @action in the class."""
|
|
actions = {}
|
|
for attr_name in dir(self):
|
|
# Skip the actions property itself to avoid recursion
|
|
if attr_name == 'actions':
|
|
continue
|
|
try:
|
|
attr = getattr(self, attr_name)
|
|
if callable(attr) and getattr(attr, 'is_action', False):
|
|
sig = inspect.signature(attr)
|
|
params = {}
|
|
for param_name, param in sig.parameters.items():
|
|
if param_name not in ['self', 'parameters']:
|
|
param_type = param.annotation if param.annotation != param.empty else Any
|
|
params[param_name] = {
|
|
'type': param_type,
|
|
'required': param.default == param.empty,
|
|
'description': None,
|
|
'default': param.default if param.default != param.empty else None
|
|
}
|
|
actions[attr_name] = {
|
|
'description': attr.__doc__ or '',
|
|
'parameters': params,
|
|
'method': attr
|
|
}
|
|
except (AttributeError, RecursionError):
|
|
# Skip attributes that cause issues
|
|
continue
|
|
return actions
|
|
|
|
def getActionSignature(self, actionName: str) -> str:
|
|
"""Get formatted action signature for AI prompt generation (detailed version)"""
|
|
if actionName not in self.actions:
|
|
return ""
|
|
|
|
action = self.actions[actionName]
|
|
paramList = []
|
|
|
|
# Extract detailed parameter information from docstring
|
|
docstring = action.get('description', '')
|
|
paramDescriptions, paramTypes = self._extractParameterDetails(docstring)
|
|
|
|
for paramName in paramDescriptions:
|
|
paramType = paramTypes.get(paramName, 'Any')
|
|
paramDesc = paramDescriptions.get(paramName, '')
|
|
# Mark required parameters with * if possible (not available from docstring, so omit)
|
|
if paramDesc:
|
|
paramList.append(f"{paramName}:{paramType} # {paramDesc}")
|
|
else:
|
|
paramList.append(f"{paramName}:{paramType}")
|
|
|
|
signature = f"{self.name}.{actionName}"
|
|
|
|
if paramList:
|
|
signature += f"({', '.join(paramList)})"
|
|
|
|
# Add return type and main description
|
|
returnType = "ActionResult"
|
|
mainDesc = self._extractMainDescription(docstring)
|
|
|
|
if mainDesc:
|
|
signature += f" -> {returnType} # {mainDesc}"
|
|
|
|
return signature
|
|
|
|
def _extractParameterDetails(self, docstring: str):
|
|
"""Extract parameter names, types, and descriptions from docstring"""
|
|
descriptions = {}
|
|
types = {}
|
|
if not docstring:
|
|
return descriptions, types
|
|
|
|
lines = docstring.split('\n')
|
|
inParameters = False
|
|
for line in lines:
|
|
line = line.strip()
|
|
if 'Parameters:' in line:
|
|
inParameters = True
|
|
continue
|
|
elif inParameters and (line.startswith('Returns:') or line.startswith('Raises:') or line.startswith('Args:')):
|
|
break
|
|
elif inParameters and line:
|
|
# Look for parameter descriptions like "paramName (type): description"
|
|
if ':' in line and '(' in line:
|
|
parts = line.split(':', 1)
|
|
if len(parts) == 2:
|
|
paramPart = parts[0].strip()
|
|
descPart = parts[1].strip()
|
|
# Extract parameter name and type
|
|
if '(' in paramPart:
|
|
paramName = paramPart.split('(')[0].strip()
|
|
# Normalize bullet-prefixed parameter names like "- aiPrompt" or "* aiPrompt"
|
|
if paramName.startswith('-') or paramName.startswith('*'):
|
|
paramName = paramName[1:].strip()
|
|
paramType = paramPart[paramPart.find('(')+1:paramPart.find(')')].strip()
|
|
descriptions[paramName] = descPart
|
|
types[paramName] = paramType
|
|
# Also handle multi-line descriptions
|
|
elif line and not line.startswith('Each document') and not line.startswith('contains'):
|
|
if descriptions:
|
|
lastParam = list(descriptions.keys())[-1]
|
|
descriptions[lastParam] += " " + line
|
|
return descriptions, types
|
|
|
|
def _extractMainDescription(self, docstring: str) -> str:
|
|
"""Extract main description from docstring"""
|
|
if not docstring:
|
|
return ""
|
|
|
|
lines = docstring.split('\n')
|
|
mainDesc = ""
|
|
|
|
for line in lines:
|
|
line = line.strip()
|
|
if line and not line.startswith('Parameters:') and not line.startswith('Returns:') and not line.startswith('Raises:'):
|
|
mainDesc = line
|
|
break
|
|
|
|
return mainDesc
|
|
|
|
def _formatType(self, type_annotation) -> str:
|
|
"""Format type annotation for display"""
|
|
if type_annotation == Any:
|
|
return "Any"
|
|
elif hasattr(type_annotation, '__name__'):
|
|
return type_annotation.__name__
|
|
elif hasattr(type_annotation, '_name'):
|
|
return type_annotation._name
|
|
else:
|
|
return str(type_annotation)
|
|
|
|
def _generateMeaningfulFileName(self, base_name: str, extension: str, workflow_context: Dict[str, Any] = None, action_name: str = None) -> str:
|
|
"""
|
|
Generate a meaningful file name with round/task/action information.
|
|
|
|
Format: {base_name}_alpha_r{round}t{task}a{action}.{extension}
|
|
Example: report_alpha_r1t3a4.json
|
|
|
|
Args:
|
|
base_name: Base name for the file (e.g., "report", "analysis", "summary")
|
|
extension: File extension without dot (e.g., "json", "html", "txt")
|
|
workflow_context: Dictionary with currentRound, currentTask, currentAction
|
|
action_name: Name of the action being performed (optional, for additional context)
|
|
|
|
Returns:
|
|
Formatted file name string
|
|
"""
|
|
try:
|
|
# Get workflow context from services if not provided
|
|
if workflow_context is None and hasattr(self.services, 'workflow'):
|
|
workflow_context = self.services.chat.getWorkflowContext()
|
|
|
|
# Extract round, task, action numbers
|
|
round_num = workflow_context.get('currentRound', 0) if workflow_context else 0
|
|
task_num = workflow_context.get('currentTask', 0) if workflow_context else 0
|
|
action_num = workflow_context.get('currentAction', 0) if workflow_context else 0
|
|
|
|
# Clean base name (remove special characters, spaces)
|
|
clean_base = base_name.lower().replace(' ', '_').replace('-', '_')
|
|
# Remove any non-alphanumeric characters except underscores
|
|
import re
|
|
clean_base = re.sub(r'[^a-z0-9_]', '', clean_base)
|
|
|
|
# Add action name if provided
|
|
if action_name:
|
|
clean_action = action_name.lower().replace(' ', '_').replace('-', '_')
|
|
clean_action = re.sub(r'[^a-z0-9_]', '', clean_action)
|
|
clean_base = f"{clean_base}_{clean_action}"
|
|
|
|
# Generate the meaningful file name
|
|
meaningful_name = f"{clean_base}_r{round_num}t{task_num}a{action_num}.{extension}"
|
|
|
|
self.logger.debug(f"Generated meaningful file name: {meaningful_name} (Round: {round_num}, Task: {task_num}, Action: {action_num})")
|
|
return meaningful_name
|
|
|
|
except Exception as e:
|
|
self.logger.warning(f"Error generating meaningful file name, using fallback: {str(e)}")
|
|
# Fallback to timestamp-based naming
|
|
timestamp = datetime.now(UTC).strftime("%Y%m%d_%H%M%S")
|
|
return f"{base_name}_{timestamp}.{extension}" |