gateway/modules/workflows/processing/shared/methodDiscovery.py
2026-04-14 16:15:32 +02:00

123 lines
5.1 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
# methodDiscovery.py
# Method discovery and management for workflow execution
import logging
import importlib
import pkgutil
import inspect
from typing import Any, Dict, List
from modules.workflows.methods.methodBase import MethodBase
# Set up logger
logger = logging.getLogger(__name__)
# Global methods catalog - moved from serviceCenter
methods = {}
def _collectActionsUnfiltered(methodInstance) -> Dict[str, Dict[str, Any]]:
"""Collect actions from a method instance without RBAC filtering.
During discovery, services.rbac is not yet available (no user context).
RBAC is enforced at execution time by the ActionExecutor instead.
"""
result = {}
if not hasattr(methodInstance, '_actions') or not methodInstance._actions:
return result
for actionName, actionDef in methodInstance._actions.items():
result[actionName] = {
'description': actionDef.description,
'parameters': methodInstance._convertParametersToSystemFormat(actionDef.parameters),
'method': methodInstance._createActionWrapper(actionDef)
}
return result
def discoverMethods(serviceCenter):
"""Dynamically discover all method classes and their actions in modules methods package.
Always creates fresh method instances bound to the given serviceCenter,
preventing stale or cross-workflow service references.
"""
global methods
try:
methodsPackage = importlib.import_module('modules.workflows.methods')
# Clear and rebuild to prevent cross-workflow state contamination
methods.clear()
uniqueCount = 0
for _, name, isPkg in pkgutil.iter_modules(methodsPackage.__path__):
if name.startswith('method'):
try:
module = importlib.import_module(f'modules.workflows.methods.{name}')
for itemName, item in inspect.getmembers(module):
if (inspect.isclass(item) and
issubclass(item, MethodBase) and
item != MethodBase):
shortName = itemName.replace('Method', '').lower()
# Skip if already processed (via another module path)
if itemName in methods:
continue
methodInstance = item(serviceCenter)
actions = _collectActionsUnfiltered(methodInstance)
methodInfo = {
'instance': methodInstance,
'actions': actions,
'description': item.__doc__ or f"Method {itemName}"
}
methods[itemName] = methodInfo
methods[shortName] = methodInfo
uniqueCount += 1
logger.info(f"Discovered method {itemName} (short: {shortName}) with {len(actions)} actions")
except Exception as e:
logger.error(f"Error discovering method {name}: {str(e)}")
continue
logger.info(f"Discovered {uniqueCount} unique methods ({len(methods)} entries with aliases)")
except Exception as e:
logger.error(f"Error discovering methods: {str(e)}")
def getActionParameterList(methodName: str, actionName: str, methods: Dict[str, Any]) -> str:
"""Get action parameter list from WorkflowActionParameter structure for AI parameter generation (list only)."""
try:
if not methods or methodName not in methods:
return ""
storedActions = methods[methodName].get('actions', {})
if actionName not in storedActions:
return ""
action_info = storedActions[actionName]
# Use structured WorkflowActionParameter objects from new system
parameters = action_info.get('parameters', {})
param_list = []
for paramName, paramInfo in parameters.items():
paramType = paramInfo.get('type', 'Any')
paramDesc = paramInfo.get('description', '')
paramRequired = paramInfo.get('required', False)
# Format: paramName (type, required/optional): description
reqText = "required" if paramRequired else "optional"
if paramDesc:
param_list.append(f"- {paramName} ({paramType}, {reqText}): {paramDesc}")
else:
param_list.append(f"- {paramName} ({paramType}, {reqText})")
# Return list only, without leading headings or trailing text
return "\n".join(param_list)
except Exception as e:
logger.error(f"Error getting action parameter signature for {methodName}.{actionName}: {str(e)}")
return ""