# Copyright (c) 2025 Patrick Motsch # All rights reserved. from typing import Dict, List, Optional, Any, Literal from datetime import datetime, UTC import logging from functools import wraps import inspect from modules.datamodels.datamodelWorkflowActions import WorkflowActionDefinition, WorkflowActionParameter from modules.datamodels.datamodelRbac import AccessRuleContext logger = logging.getLogger(__name__) def action(func): """Decorator to mark a method as an available action IMPORTANT: Action methods should NOT return resultLabel in their ActionResult. The resultLabel is managed by the action handler using the action's execResultLabel from the action plan. This ensures consistent document routing throughout the workflow. Action methods should only return: - success: bool - documents: List[ActionDocument] - error: str (if success=False) REQUIRED: All ActionDocument instances MUST include validationMetadata for content validation and refinement. Without validationMetadata, results cannot be approved. Example validationMetadata structure: validationMetadata = { "actionType": "moduleName.actionName", "param1": value1, "param2": value2, # ... other relevant parameters for validation } See MethodBase._createValidationMetadata() for a helper method to create standard metadata. """ @wraps(func) async def wrapper(self, parameters: Dict[str, Any], *args, **kwargs): return await func(self, parameters, *args, **kwargs) wrapper.is_action = True return wrapper class MethodBase: """Base class for all methods IMPORTANT: All actions that return ActionDocument instances MUST include validationMetadata. This metadata is required for content validation and refinement. Without it, results cannot be approved by the validation system. Use _createValidationMetadata() helper method to create standardized metadata structures. """ def __init__(self, services: Any): """Initialize method with services object""" self.services = services self.name: str self.description: str self.logger = logging.getLogger(f"{__name__}.{self.__class__.__name__}") # Actions MÜSSEN als Dictionary definiert sein # Jede Method-Klasse muss _actions Dictionary in __init__ definieren self._actions: Dict[str, WorkflowActionDefinition] = {} # Nach Initialisierung: Actions validieren (wird überschrieben, wenn _actions gesetzt wird) # Validierung erfolgt erst nach vollständiger Initialisierung der Subklasse def _validateActions(self): """Validate that _actions dictionary is properly defined""" if not hasattr(self, '_actions') or not isinstance(self._actions, dict): raise ValueError(f"Method {self.name} must define _actions dictionary in __init__") for actionName, actionDef in self._actions.items(): if not isinstance(actionDef, WorkflowActionDefinition): raise ValueError(f"Action '{actionName}' in {self.name} must be WorkflowActionDefinition instance") if not actionDef.actionId: raise ValueError(f"Action '{actionName}' in {self.name} must have actionId") if not actionDef.execute: raise ValueError(f"Action '{actionName}' in {self.name} must have execute function") @property def actions(self) -> Dict[str, Dict[str, Any]]: """ Dynamically collect all actions from _actions dictionary. Returns format for API/UI consumption. REQUIREMENT: Alle Actions müssen in _actions Dictionary definiert sein. Actions ohne _actions Definition sind nicht verfügbar. """ result = {} # Actions müssen in _actions Dictionary definiert sein if not hasattr(self, '_actions') or not self._actions: self.logger.error(f"Method {self.name} has no _actions dictionary defined. Actions will not be available.") return result totalActions = len(self._actions) deniedActions = [] for actionName, actionDef in self._actions.items(): # RBAC-Check: Prüfe ob Action für aktuellen User verfügbar ist if not self._checkActionPermission(actionDef.actionId): deniedActions.append(f"{actionName} ({actionDef.actionId})") continue # Skip if user doesn't have permission # Konvertiere WorkflowActionDefinition zu System-Format result[actionName] = { 'description': actionDef.description, 'parameters': self._convertParametersToSystemFormat(actionDef.parameters), 'method': self._createActionWrapper(actionDef) } if deniedActions: self.logger.warning(f"Method {self.name}: {len(deniedActions)}/{totalActions} actions denied by RBAC: {deniedActions[:5]}{'...' if len(deniedActions) > 5 else ''}") if not result and totalActions > 0: self.logger.error(f"Method {self.name}: ALL {totalActions} actions denied by RBAC! This will result in empty action list.") return result def _checkActionPermission(self, actionId: str) -> bool: """ Check if current user has permission to execute this action. Uses RBAC RESOURCE context. REQUIREMENT: RBAC-Service muss verfügbar sein. """ if not hasattr(self.services, 'rbac') or not self.services.rbac: self.logger.error(f"RBAC service not available (services.rbac is None). Action {actionId} will be denied.") return False # Get current user from services.user (not from chat service) currentUser = getattr(self.services, 'user', None) if not currentUser: self.logger.warning(f"No current user found (services.user is None). Action {actionId} will be denied.") return False # RBAC-Check: RESOURCE context, item = actionId # mandateId/featureInstanceId from services context needed to resolve user roles try: mandateId = getattr(self.services, 'mandateId', None) featureInstanceId = getattr(self.services, 'featureInstanceId', None) permissions = self.services.rbac.getUserPermissions( user=currentUser, context=AccessRuleContext.RESOURCE, item=actionId, mandateId=str(mandateId) if mandateId else None, featureInstanceId=str(featureInstanceId) if featureInstanceId else None ) hasPermission = permissions.view if not hasPermission: # Log detailed RBAC denial info userRoles = getattr(currentUser, 'roleLabels', []) or [] self.logger.warning( f"RBAC denied action {actionId} for user {currentUser.id}. " f"User roles: {userRoles}, mandateId={mandateId}, " f"Permissions: view={permissions.view}, read={permissions.read}, " f"create={permissions.create}, update={permissions.update}, delete={permissions.delete}. " f"No matching RBAC rule found for context=RESOURCE, item={actionId}" ) return hasPermission except Exception as e: self.logger.error(f"RBAC check failed for action {actionId}: {str(e)}. Action will be denied.") return False def _convertParametersToSystemFormat(self, parameters: Dict[str, WorkflowActionParameter]) -> Dict[str, Dict[str, Any]]: """Convert WorkflowActionParameter dict to system format for API/UI consumption""" result = {} for paramName, param in parameters.items(): result[paramName] = { 'type': param.type, 'required': param.required, 'description': param.description, 'default': param.default, 'frontendType': param.frontendType.value, 'frontendOptions': param.frontendOptions, 'validation': param.validation } return result def _createActionWrapper(self, actionDef: WorkflowActionDefinition): """Create wrapper function for action execution with parameter validation""" async def wrapper(parameters: Dict[str, Any], *args, **kwargs): # Parameter-Validierung basierend auf WorkflowActionParameter definitions validatedParams = self._validateParameters(parameters, actionDef.parameters) # Execute action return await actionDef.execute(validatedParams, *args, **kwargs) wrapper.is_action = True return wrapper def _validateParameters(self, parameters: Dict[str, Any], paramDefs: Dict[str, WorkflowActionParameter]) -> Dict[str, Any]: """Validate parameters against definitions IMPORTANT: System parameters (like parentOperationId, expectedDocumentFormats) are preserved even if they're not in the parameter definitions, as they're used internally by the framework. """ validated = {} # System parameters that should always be preserved, even if not in paramDefs systemParams = ['parentOperationId', 'expectedDocumentFormats'] for sysParam in systemParams: if sysParam in parameters: validated[sysParam] = parameters[sysParam] for paramName, paramDef in paramDefs.items(): value = parameters.get(paramName) # Check required if paramDef.required and value is None: raise ValueError(f"Required parameter '{paramName}' is missing") # Use default if not provided if value is None and paramDef.default is not None: value = paramDef.default # Type validation if value is not None: value = self._validateType(value, paramDef.type) # Custom validation rules if paramDef.validation and value is not None: self._applyValidationRules(value, paramDef.validation) validated[paramName] = value return validated def _validateType(self, value: Any, expectedType: str) -> Any: """Validate and convert value to expected type""" # Type validation logic typeMap = { 'str': str, 'int': int, 'float': float, 'bool': bool, 'list': list, 'dict': dict, } # Handle List[str], List[int], etc. if expectedType.startswith('List['): if not isinstance(value, list): raise ValueError(f"Expected list for type '{expectedType}', got {type(value).__name__}") # Extract inner type innerType = expectedType[5:-1].strip() # Remove "List[" and "]" if innerType in typeMap: return [typeMap[innerType](v) for v in value] return value # Handle Dict[str, Any], etc. if expectedType.startswith('Dict['): if not isinstance(value, dict): raise ValueError(f"Expected dict for type '{expectedType}', got {type(value).__name__}") return value # Handle simple types if expectedType in typeMap: expectedTypeClass = typeMap[expectedType] if not isinstance(value, expectedTypeClass): try: return expectedTypeClass(value) except (ValueError, TypeError) as e: raise ValueError(f"Cannot convert {value} to {expectedType}: {str(e)}") return value def _applyValidationRules(self, value: Any, rules: Dict[str, Any]): """Apply custom validation rules""" if 'min' in rules: if isinstance(value, (int, float)) and value < rules['min']: raise ValueError(f"Value must be >= {rules['min']}") elif isinstance(value, str) and len(value) < rules['min']: raise ValueError(f"String length must be >= {rules['min']}") if 'max' in rules: if isinstance(value, (int, float)) and value > rules['max']: raise ValueError(f"Value must be <= {rules['max']}") elif isinstance(value, str) and len(value) > rules['max']: raise ValueError(f"String length must be <= {rules['max']}") if 'pattern' in rules: import re if not re.match(rules['pattern'], str(value)): raise ValueError(f"Value does not match required pattern: {rules['pattern']}") def getActionSignature(self, actionName: str) -> str: """Get formatted action signature for AI prompt generation (detailed version)""" if actionName not in self.actions: return "" action = self.actions[actionName] paramList = [] # Extract detailed parameter information from docstring docstring = action.get('description', '') paramDescriptions, paramTypes = self._extractParameterDetails(docstring) for paramName in paramDescriptions: paramType = paramTypes.get(paramName, 'Any') paramDesc = paramDescriptions.get(paramName, '') # Mark required parameters with * if possible (not available from docstring, so omit) if paramDesc: paramList.append(f"{paramName}:{paramType} # {paramDesc}") else: paramList.append(f"{paramName}:{paramType}") signature = f"{self.name}.{actionName}" if paramList: signature += f"({', '.join(paramList)})" # Add return type and main description returnType = "ActionResult" mainDesc = self._extractMainDescription(docstring) if mainDesc: signature += f" -> {returnType} # {mainDesc}" return signature def _extractParameterDetails(self, docstring: str): """Extract parameter names, types, and descriptions from docstring""" descriptions = {} types = {} if not docstring: return descriptions, types lines = docstring.split('\n') inParameters = False for line in lines: line = line.strip() if 'Parameters:' in line: inParameters = True continue elif inParameters and (line.startswith('Returns:') or line.startswith('Raises:') or line.startswith('Args:')): break elif inParameters and line: # Look for parameter descriptions like "paramName (type): description" if ':' in line and '(' in line: parts = line.split(':', 1) if len(parts) == 2: paramPart = parts[0].strip() descPart = parts[1].strip() # Extract parameter name and type if '(' in paramPart: paramName = paramPart.split('(')[0].strip() # Normalize bullet-prefixed parameter names like "- aiPrompt" or "* aiPrompt" if paramName.startswith('-') or paramName.startswith('*'): paramName = paramName[1:].strip() paramType = paramPart[paramPart.find('(')+1:paramPart.find(')')].strip() descriptions[paramName] = descPart types[paramName] = paramType # Also handle multi-line descriptions elif line and not line.startswith('Each document') and not line.startswith('contains'): if descriptions: lastParam = list(descriptions.keys())[-1] descriptions[lastParam] += " " + line return descriptions, types def _extractMainDescription(self, docstring: str) -> str: """Extract main description from docstring""" if not docstring: return "" lines = docstring.split('\n') mainDesc = "" for line in lines: line = line.strip() if line and not line.startswith('Parameters:') and not line.startswith('Returns:') and not line.startswith('Raises:'): mainDesc = line break return mainDesc def _formatType(self, type_annotation) -> str: """Format type annotation for display""" if type_annotation == Any: return "Any" elif hasattr(type_annotation, '__name__'): return type_annotation.__name__ elif hasattr(type_annotation, '_name'): return type_annotation._name else: return str(type_annotation) def _createValidationMetadata(self, actionName: str, **kwargs) -> Dict[str, Any]: """ Helper method to create standardized validationMetadata for ActionDocument instances. This method ensures all actions include the required validationMetadata structure for content validation and refinement. Without metadata, results cannot be approved. Args: actionName: Name of the action (e.g., "readEmails", "uploadDocument") **kwargs: Additional action-specific metadata fields Returns: Dictionary with validationMetadata structure including: - actionType: Full action identifier (moduleName.actionName) - All provided kwargs as additional metadata fields Example: validationMetadata = self._createValidationMetadata( "readEmails", connectionReference=connectionReference, folder=folder, limit=limit, emailCount=len(emails) ) ActionDocument( documentName="emails.json", documentData=json.dumps(data), mimeType="application/json", validationMetadata=validationMetadata # REQUIRED ) """ metadata = { "actionType": f"{self.name}.{actionName}" } metadata.update(kwargs) return metadata def _generateMeaningfulFileName(self, base_name: str, extension: str, workflow_context: Dict[str, Any] = None, action_name: str = None) -> str: """ Generate a meaningful file name with round/task/action information. Format: {base_name}_alpha_r{round}t{task}a{action}.{extension} Example: report_alpha_r1t3a4.json Args: base_name: Base name for the file (e.g., "report", "analysis", "summary") extension: File extension without dot (e.g., "json", "html", "txt") workflow_context: Dictionary with currentRound, currentTask, currentAction action_name: Name of the action being performed (optional, for additional context) Returns: Formatted file name string """ try: # Get workflow context from services if not provided if workflow_context is None and hasattr(self.services, 'workflow'): workflow_context = self.services.chat.getWorkflowContext() # Extract round, task, action numbers round_num = workflow_context.get('currentRound', 0) if workflow_context else 0 task_num = workflow_context.get('currentTask', 0) if workflow_context else 0 action_num = workflow_context.get('currentAction', 0) if workflow_context else 0 # Clean base name (remove special characters, spaces) clean_base = base_name.lower().replace(' ', '_').replace('-', '_') # Remove any non-alphanumeric characters except underscores import re clean_base = re.sub(r'[^a-z0-9_]', '', clean_base) # Add action name if provided if action_name: clean_action = action_name.lower().replace(' ', '_').replace('-', '_') clean_action = re.sub(r'[^a-z0-9_]', '', clean_action) clean_base = f"{clean_base}_{clean_action}" # Generate the meaningful file name meaningful_name = f"{clean_base}_r{round_num}t{task_num}a{action_num}.{extension}" self.logger.debug(f"Generated meaningful file name: {meaningful_name} (Round: {round_num}, Task: {task_num}, Action: {action_num})") return meaningful_name except Exception as e: self.logger.warning(f"Error generating meaningful file name, using fallback: {str(e)}") # Fallback to timestamp-based naming timestamp = datetime.now(UTC).strftime("%Y%m%d_%H%M%S") return f"{base_name}_{timestamp}.{extension}"