wiki/z-archive/appdoc/mcp_integration_proposal.md

33 KiB

MCP Integration Proposal: Adapting Methods and Actions to MCP Standard

Overview

This document proposes a concrete solution for adapting the current method/action system to work with the MCP (Model Context Protocol) standard while maintaining backward compatibility with existing code.


Current Architecture

Current Method/Action Structure

Method Definition (methodAi.py):

class MethodAi(MethodBase):
    def __init__(self, services):
        super().__init__(services)
        self.name = "ai"
        self.description = "AI processing methods"
    
    @action
    async def process(self, parameters: Dict[str, Any]) -> ActionResult:
        """
        Universal AI document processing action.
        
        Parameters:
        - aiPrompt (str, required): Instruction for the AI
        - documentList (list, optional): Document references
        - resultType (str, optional): Output format. Default: txt
        """
        # Implementation...
        aiPrompt = parameters.get("aiPrompt")
        documentList = parameters.get("documentList", [])
        resultType = parameters.get("resultType", "txt")
        # ... process and return ActionResult

Action Discovery:

  • Actions discovered via @action decorator
  • Collected via method.actions property
  • Accessed via methods[methodName]['instance'].actions[actionName]

Action Execution:

# Current execution flow
methodInstance = methods["ai"]['instance']
actionMethod = methodInstance.actions["process"]['method']
result = await actionMethod(parameters={"aiPrompt": "...", "documentList": [...]})

Proposed MCP Integration Architecture

Architecture Overview

Current System (Unchanged)
  ↓
MCP Adapter Layer (NEW)
  ├─> MethodMcpServer (wraps each Method)
  ├─> McpServerRegistry (manages all MCP servers)
  └─> McpClient (calls MCP tools)
  ↓
MCP Protocol (JSON-RPC 2.0)
  ↓
External MCP Clients (optional - for external tools)

Key Principle: MCP integration is an adapter layer - existing methods/actions remain unchanged.


Implementation Components

1. MCP Server Wrapper for Methods

File: gateway/modules/workflows/mcp/methodMcpServer.py

"""
MCP Server wrapper for Method classes.
Exposes method actions as MCP tools.
"""

import json
import logging
from typing import Dict, Any, List, Optional, Union
from pydantic import BaseModel, Field

from modules.workflows.methods.methodBase import MethodBase
from modules.datamodels.datamodelChat import ActionResult

logger = logging.getLogger(__name__)

class McpTool(BaseModel):
    """MCP Tool definition"""
    name: str = Field(description="Tool name (method.action)")
    description: str = Field(description="Tool description")
    inputSchema: Dict[str, Any] = Field(description="JSON Schema for tool parameters")

class MethodMcpServer:
    """MCP Server that wraps a Method and exposes its actions as MCP tools"""
    
    def __init__(self, method: MethodBase):
        """
        Initialize MCP server for a method.
        
        Args:
            method: MethodBase instance (e.g., MethodAi, MethodOutlook)
        """
        self.method = method
        self.tools = self._discoverTools()
        logger.info(f"Created MCP server for method '{method.name}' with {len(self.tools)} tools")
    
    def _discoverTools(self) -> List[McpTool]:
        """Convert @action methods to MCP tools"""
        tools = []
        
        for actionName, actionInfo in self.method.actions.items():
            # Build tool name: "method.action"
            toolName = f"{self.method.name}.{actionName}"
            
            # Get description from docstring
            description = actionInfo.get('description', '')
            mainDescription = self.method._extractMainDescription(description)
            if not mainDescription:
                mainDescription = f"{self.method.name}.{actionName} action"
            
            # Convert parameters to JSON Schema
            inputSchema = self._buildJsonSchema(actionInfo, description)
            
            tool = McpTool(
                name=toolName,
                description=mainDescription,
                inputSchema=inputSchema
            )
            tools.append(tool)
        
        return tools
    
    def _buildJsonSchema(self, actionInfo: Dict[str, Any], docstring: str) -> Dict[str, Any]:
        """Build JSON Schema from action parameters"""
        # Extract parameter details from docstring
        paramDescriptions, paramTypes = self.method._extractParameterDetails(docstring)
        
        properties = {}
        required = []
        
        # Build schema from action parameters
        for paramName, paramInfo in actionInfo.get('parameters', {}).items():
            paramType = paramInfo.get('type', Any)
            paramRequired = paramInfo.get('required', False)
            paramDefault = paramInfo.get('default', None)
            
            # Get description from docstring
            paramDesc = paramDescriptions.get(paramName, '')
            
            # Convert Python type to JSON Schema type
            jsonType, jsonFormat = self._pythonTypeToJsonSchema(paramType)
            
            properties[paramName] = {
                "type": jsonType,
                "description": paramDesc
            }
            
            if jsonFormat:
                properties[paramName]["format"] = jsonFormat
            
            if paramDefault is not None:
                properties[paramName]["default"] = paramDefault
            
            if paramRequired:
                required.append(paramName)
        
        schema = {
            "type": "object",
            "properties": properties
        }
        
        if required:
            schema["required"] = required
        
        return schema
    
    def _pythonTypeToJsonSchema(self, pythonType: Any) -> tuple[str, Optional[str]]:
        """Convert Python type to JSON Schema type"""
        typeMapping = {
            str: ("string", None),
            int: ("integer", None),
            float: ("number", None),
            bool: ("boolean", None),
            list: ("array", None),
            dict: ("object", None),
            Any: ("object", None),  # Fallback for Any type
        }
        
        # Handle Optional types
        if hasattr(pythonType, '__origin__'):
            if pythonType.__origin__ is Optional or pythonType.__origin__ is Union:
                # Get the inner type (first argument)
                innerType = pythonType.__args__[0] if pythonType.__args__ else Any
                return self._pythonTypeToJsonSchema(innerType)
            elif pythonType.__origin__ is list:
                return ("array", None)
            elif pythonType.__origin__ is dict:
                return ("object", None)
        
        # Direct type lookup
        if pythonType in typeMapping:
            return typeMapping[pythonType]
        
        # Fallback
        return ("object", None)
    
    async def callTool(self, toolName: str, arguments: Dict[str, Any]) -> Dict[str, Any]:
        """
        Execute MCP tool call.
        
        Args:
            toolName: Tool name (e.g., "ai.process")
            arguments: Tool arguments (JSON Schema validated)
        
        Returns:
            Dict with ActionResult serialized to JSON-compatible format
        """
        # Extract method and action name
        if '.' not in toolName:
            raise ValueError(f"Invalid tool name format: {toolName}. Expected 'method.action'")
        
        methodName, actionName = toolName.split('.', 1)
        
        if methodName != self.method.name:
            raise ValueError(f"Tool '{toolName}' does not belong to method '{self.method.name}'")
        
        if actionName not in self.method.actions:
            raise ValueError(f"Action '{actionName}' not found in method '{self.method.name}'")
        
        # Get action method
        actionMethod = self.method.actions[actionName]['method']
        
        # Execute action (existing code - unchanged)
        try:
            result = await actionMethod(parameters=arguments)
            
            # Convert ActionResult to MCP-compatible format
            return self._actionResultToMcpFormat(result)
        
        except Exception as e:
            logger.error(f"Error executing tool '{toolName}': {str(e)}")
            return {
                "success": False,
                "error": str(e),
                "content": None,
                "documents": []
            }
    
    def _actionResultToMcpFormat(self, result: ActionResult) -> Dict[str, Any]:
        """Convert ActionResult to MCP-compatible JSON format"""
        mcpResult = {
            "success": result.success,
            "error": result.error if hasattr(result, 'error') and result.error else None,
            "content": None,
            "documents": []
        }
        
        # Convert ActionDocuments to MCP format
        if hasattr(result, 'documents') and result.documents:
            for doc in result.documents:
                docDict = {
                    "name": doc.name if hasattr(doc, 'name') else "document",
                    "mimeType": doc.mimeType if hasattr(doc, 'mimeType') else "application/octet-stream",
                    "data": None  # Will be base64 encoded if binary
                }
                
                # Handle document data
                if hasattr(doc, 'data'):
                    if isinstance(doc.data, bytes):
                        import base64
                        docDict["data"] = base64.b64encode(doc.data).decode('utf-8')
                        docDict["encoding"] = "base64"
                    elif isinstance(doc.data, str):
                        docDict["data"] = doc.data
                        docDict["encoding"] = "utf-8"
                    else:
                        # Try to serialize
                        docDict["data"] = json.dumps(doc.data) if doc.data else None
                
                mcpResult["documents"].append(docDict)
        
        # Extract content if available
        if hasattr(result, 'content') and result.content:
            mcpResult["content"] = result.content
        
        return mcpResult
    
    def getTools(self) -> List[McpTool]:
        """Get list of MCP tools provided by this server"""
        return self.tools
    
    def getTool(self, toolName: str) -> Optional[McpTool]:
        """Get specific tool by name"""
        for tool in self.tools:
            if tool.name == toolName:
                return tool
        return None

2. MCP Server Registry

File: gateway/modules/workflows/mcp/mcpServerRegistry.py

"""
MCP Server Registry - manages all MCP servers for discovered methods.
"""

import logging
from typing import Dict, List, Optional
from modules.workflows.methods.methodBase import MethodBase
from modules.workflows.mcp.methodMcpServer import MethodMcpServer, McpTool

logger = logging.getLogger(__name__)

class McpServerRegistry:
    """Registry for all MCP servers"""
    
    def __init__(self):
        self.servers: Dict[str, MethodMcpServer] = {}
    
    def registerMethod(self, method: MethodBase) -> MethodMcpServer:
        """
        Register a method as an MCP server.
        
        Args:
            method: MethodBase instance
        
        Returns:
            MethodMcpServer instance
        """
        if method.name in self.servers:
            logger.debug(f"MCP server for method '{method.name}' already registered")
            return self.servers[method.name]
        
        server = MethodMcpServer(method)
        self.servers[method.name] = server
        logger.info(f"Registered MCP server for method '{method.name}'")
        
        return server
    
    def getServer(self, methodName: str) -> Optional[MethodMcpServer]:
        """Get MCP server for a method"""
        return self.servers.get(methodName)
    
    def getAllTools(self) -> List[McpTool]:
        """Get all tools from all registered servers"""
        allTools = []
        for server in self.servers.values():
            allTools.extend(server.getTools())
        return allTools
    
    def getTool(self, toolName: str) -> Optional[McpTool]:
        """Get tool by name (format: 'method.action')"""
        for server in self.servers.values():
            tool = server.getTool(toolName)
            if tool:
                return tool
        return None
    
    def findServerForTool(self, toolName: str) -> Optional[MethodMcpServer]:
        """Find server that provides a specific tool"""
        if '.' not in toolName:
            return None
        
        methodName = toolName.split('.', 1)[0]
        return self.servers.get(methodName)

3. MCP Client (for calling MCP tools)

File: gateway/modules/workflows/mcp/mcpClient.py

"""
MCP Client - calls MCP tools (both internal and external).
"""

import logging
from typing import Dict, Any, Optional, List
from modules.workflows.mcp.mcpServerRegistry import McpServerRegistry
from modules.workflows.mcp.methodMcpServer import McpTool

logger = logging.getLogger(__name__)

class McpClient:
    """MCP Client for calling MCP tools"""
    
    def __init__(self, registry: McpServerRegistry):
        """
        Initialize MCP client.
        
        Args:
            registry: McpServerRegistry instance
        """
        self.registry = registry
    
    async def callTool(
        self,
        toolName: str,
        arguments: Dict[str, Any],
        validateSchema: bool = True
    ) -> Dict[str, Any]:
        """
        Call an MCP tool.
        
        Args:
            toolName: Tool name (format: "method.action")
            arguments: Tool arguments
            validateSchema: Whether to validate arguments against tool schema
        
        Returns:
            Dict with tool result (MCP-compatible format)
        """
        # Find server for tool
        server = self.registry.findServerForTool(toolName)
        
        if not server:
            raise ValueError(f"Tool '{toolName}' not found in any MCP server")
        
        # Validate arguments if requested
        if validateSchema:
            tool = server.getTool(toolName)
            if tool:
                self._validateArguments(arguments, tool.inputSchema)
        
        # Call tool
        result = await server.callTool(toolName, arguments)
        
        return result
    
    def _validateArguments(self, arguments: Dict[str, Any], schema: Dict[str, Any]):
        """Validate arguments against JSON Schema"""
        # Basic validation (can be enhanced with jsonschema library)
        required = schema.get("required", [])
        properties = schema.get("properties", {})
        
        # Check required fields
        for field in required:
            if field not in arguments:
                raise ValueError(f"Required argument '{field}' is missing")
        
        # Check field types (basic validation)
        for field, value in arguments.items():
            if field in properties:
                propSchema = properties[field]
                expectedType = propSchema.get("type")
                
                if expectedType == "string" and not isinstance(value, str):
                    raise ValueError(f"Argument '{field}' must be a string")
                elif expectedType == "integer" and not isinstance(value, int):
                    raise ValueError(f"Argument '{field}' must be an integer")
                elif expectedType == "boolean" and not isinstance(value, bool):
                    raise ValueError(f"Argument '{field}' must be a boolean")
                # Add more type checks as needed
    
    def listTools(self) -> List[McpTool]:
        """List all available tools"""
        return self.registry.getAllTools()
    
    def getTool(self, toolName: str) -> Optional[McpTool]:
        """Get tool definition"""
        return self.registry.getTool(toolName)

4. Integration with Existing Action Executor

File: gateway/modules/workflows/processing/actionExecutor.py (modification)

# Add MCP support to existing ActionExecutor

class ActionExecutor:
    def __init__(self, services, useMcp: bool = False):
        self.services = services
        self.useMcp = useMcp  # Feature flag
        
        # Initialize MCP components if enabled
        if useMcp:
            from modules.workflows.mcp.mcpServerRegistry import McpServerRegistry
            from modules.workflows.mcp.mcpClient import McpClient
            
            self.mcpRegistry = McpServerRegistry()
            self.mcpClient = McpClient(self.mcpRegistry)
            
            # Register all discovered methods as MCP servers
            self._registerMethodsAsMcpServers()
    
    def _registerMethodsAsMcpServers(self):
        """Register all discovered methods as MCP servers"""
        from modules.workflows.processing.shared.methodDiscovery import methods
        
        for methodName, methodInfo in methods.items():
            if 'instance' in methodInfo:
                methodInstance = methodInfo['instance']
                self.mcpRegistry.registerMethod(methodInstance)
    
    async def executeAction(
        self,
        methodName: str,
        actionName: str,
        parameters: Dict[str, Any]
    ) -> ActionResult:
        """
        Execute action (with optional MCP support).
        
        Args:
            methodName: Method name (e.g., "ai")
            actionName: Action name (e.g., "process")
            parameters: Action parameters
        """
        # Use MCP if enabled
        if self.useMcp:
            toolName = f"{methodName}.{actionName}"
            mcpResult = await self.mcpClient.callTool(toolName, parameters)
            
            # Convert MCP result back to ActionResult
            return self._mcpResultToActionResult(mcpResult)
        
        # Original execution (unchanged)
        methodInfo = methods.get(methodName)
        if not methodInfo:
            raise ValueError(f"Method '{methodName}' not found")
        
        methodInstance = methodInfo['instance']
        actionMethod = methodInstance.actions[actionName]['method']
        
        return await actionMethod(parameters=parameters)
    
    def _mcpResultToActionResult(self, mcpResult: Dict[str, Any]) -> ActionResult:
        """Convert MCP result to ActionResult"""
        from modules.datamodels.datamodelChat import ActionResult, ActionDocument
        
        if not mcpResult.get("success"):
            return ActionResult.isFailure(error=mcpResult.get("error", "Unknown error"))
        
        # Convert documents
        actionDocuments = []
        for docDict in mcpResult.get("documents", []):
            # Decode document data if needed
            data = docDict.get("data")
            if docDict.get("encoding") == "base64":
                import base64
                data = base64.b64decode(data)
            
            doc = ActionDocument(
                name=docDict.get("name", "document"),
                mimeType=docDict.get("mimeType", "application/octet-stream"),
                data=data
            )
            actionDocuments.append(doc)
        
        return ActionResult(
            success=True,
            documents=actionDocuments
        )

Comparison: Before vs After

Example 1: Calling ai.process Action

BEFORE (Current Proprietary System)

# ActionExecutor.executeAction()
methodName = "ai"
actionName = "process"
parameters = {
    "aiPrompt": "Summarize the document",
    "documentList": ["docList:msg_123:results"],
    "resultType": "txt"
}

# Direct method call
methodInfo = methods[methodName]['instance']
actionMethod = methodInfo.actions[actionName]['method']
result = await actionMethod(parameters=parameters)

# Result: ActionResult object
print(result.success)  # True
print(result.documents)  # [ActionDocument(...)]

AFTER (With MCP Standard)

# Option A: Via MCP Client (standardized interface)
mcpClient = McpClient(mcpRegistry)
toolName = "ai.process"
arguments = {
    "aiPrompt": "Summarize the document",
    "documentList": ["docList:msg_123:results"],
    "resultType": "txt"
}

# MCP tool call (JSON-RPC compatible)
mcpResult = await mcpClient.callTool(toolName, arguments)

# Result: MCP-compatible dict
print(mcpResult["success"])  # True
print(mcpResult["documents"])  # [{"name": "...", "mimeType": "...", "data": "..."}]

# Option B: Via ActionExecutor (backward compatible)
# ActionExecutor can use MCP internally but maintain same interface
result = await actionExecutor.executeAction("ai", "process", parameters)
# Result: Still ActionResult (converted from MCP internally)

Key Differences:

  • Standardized Interface: MCP uses JSON-RPC 2.0 protocol
  • Tool Discovery: Tools can be discovered via mcpClient.listTools()
  • Schema Validation: Arguments validated against JSON Schema
  • Backward Compatible: ActionExecutor can use MCP internally

Example 2: Tool Discovery

BEFORE (Current System)

# Discover actions manually
from modules.workflows.processing.shared.methodDiscovery import methods

for methodName, methodInfo in methods.items():
    print(f"Method: {methodName}")
    for actionName, actionInfo in methodInfo['actions'].items():
        print(f"  - {actionName}: {actionInfo['description']}")

AFTER (With MCP Standard)

# Discover tools via MCP
mcpClient = McpClient(mcpRegistry)
tools = mcpClient.listTools()

for tool in tools:
    print(f"Tool: {tool.name}")
    print(f"  Description: {tool.description}")
    print(f"  Schema: {tool.inputSchema}")
    
    # Example output:
    # Tool: ai.process
    #   Description: Universal AI document processing action
    #   Schema: {
    #     "type": "object",
    #     "properties": {
    #       "aiPrompt": {"type": "string", "description": "..."},
    #       "documentList": {"type": "array", "description": "..."},
    #       "resultType": {"type": "string", "default": "txt"}
    #     },
    #     "required": ["aiPrompt"]
    #   }

Key Differences:

  • Standardized Format: JSON Schema for all tools
  • Self-Documenting: Schema includes descriptions and types
  • Interoperable: Can be used by external MCP clients

Example 3: External MCP Server Integration

BEFORE (Current System)

# External tools require custom integration code
# Each external service needs custom connector
class CustomSlackConnector:
    async def sendMessage(self, channel: str, text: str):
        # Custom implementation
        pass

AFTER (With MCP Standard)

# External MCP servers can be integrated seamlessly
from modules.workflows.mcp.externalMcpClient import ExternalMcpClient

# Connect to external MCP server (e.g., Slack MCP server)
slackMcpClient = ExternalMcpClient(
    serverUrl="http://slack-mcp-server:8080",
    transport="http"  # or "stdio", "websocket"
)

# Discover external tools
externalTools = await slackMcpClient.listTools()
# Returns: [McpTool(name="slack.sendMessage", ...), ...]

# Call external tool (same interface as internal tools)
result = await slackMcpClient.callTool(
    "slack.sendMessage",
    {"channel": "#general", "text": "Hello from workflow!"}
)

# Unified interface: internal and external tools work the same way

Key Benefits:

  • No Custom Code: External tools work via standard MCP protocol
  • Unified Interface: Internal and external tools use same API
  • Easy Integration: Just connect to MCP server URL

Migration Strategy

Phase 1: Add MCP Adapter Layer (Non-Breaking)

  1. Create MCP modules (new files, no changes to existing code)

    • methodMcpServer.py
    • mcpServerRegistry.py
    • mcpClient.py
  2. Add feature flag to ActionExecutor

    useMcp = os.getenv("ENABLE_MCP", "false").lower() == "true"
    
  3. Test MCP integration alongside existing system

Phase 2: Enable MCP for New Actions (Gradual)

  1. Enable MCP for specific methods/actions
  2. Monitor performance and compatibility
  3. Gradually migrate more actions to MCP

Phase 3: Full MCP Support (Optional)

  1. Enable MCP by default (keep old path as fallback)
  2. Add external MCP server support
  3. Document MCP tool APIs for external clients

Benefits Summary

1. Standardization

  • JSON-RPC 2.0 Protocol: Standard protocol for tool calls
  • JSON Schema: Standard schema for tool parameters
  • Tool Discovery: Standard discovery mechanism

2. Interoperability

  • External Tools: Easy integration of external MCP servers
  • Multi-Client: Multiple clients can use same tools
  • Cross-Platform: Works with any MCP-compatible system

3. Backward Compatibility

  • No Breaking Changes: Existing code continues to work
  • Gradual Migration: Can enable MCP per method/action
  • Feature Flag: Enable/disable via configuration

4. Developer Experience

  • Self-Documenting: Tools include schemas and descriptions
  • Type Safety: JSON Schema validation
  • Error Handling: Standardized error responses

Next Steps

  1. Review Proposal: Review this proposal and provide feedback
  2. Implement Core: Create methodMcpServer.py, mcpServerRegistry.py, mcpClient.py
  3. Add Integration: Modify ActionExecutor to support MCP (with feature flag)
  4. Test: Test with existing ai.process action
  5. Document: Document MCP tool APIs
  6. Extend: Add support for external MCP servers

Practical Example: Complete Before/After Comparison

Scenario: Execute ai.process action to summarize a document

BEFORE: Current Proprietary System

Step 1: Action Discovery

# In workflowProcessor or actionExecutor
from modules.workflows.processing.shared.methodDiscovery import methods

# Discover available actions
methodInfo = methods.get("ai")
if not methodInfo:
    raise ValueError("Method 'ai' not found")

methodInstance = methodInfo['instance']
actionInfo = methodInstance.actions.get("process")
if not actionInfo:
    raise ValueError("Action 'process' not found in method 'ai'")

# Get action description and parameters (from docstring parsing)
description = actionInfo['description']
parameters = actionInfo['parameters']  # Dict with type info from signature

Step 2: Parameter Preparation

# Build parameters dict (no schema validation)
parameters = {
    "aiPrompt": "Summarize the following document",
    "documentList": ["docList:msg_123:task1_results"],
    "resultType": "txt"
}

# No validation - errors only discovered at runtime

Step 3: Action Execution

# Direct method call
actionMethod = actionInfo['method']
result = await actionMethod(parameters=parameters)

# Result: ActionResult object
if result.success:
    for doc in result.documents:
        print(f"Generated: {doc.name} ({doc.mimeType})")
        # Access document data directly
        content = doc.data if isinstance(doc.data, str) else doc.data.decode('utf-8')
else:
    print(f"Error: {result.error}")

Step 4: Error Handling

# Errors are Python exceptions or ActionResult with success=False
try:
    result = await actionMethod(parameters=parameters)
    if not result.success:
        # Handle error from ActionResult
        logger.error(f"Action failed: {result.error}")
except Exception as e:
    # Handle Python exception
    logger.error(f"Execution error: {str(e)}")

AFTER: With MCP Standard

Step 1: Tool Discovery

# Initialize MCP components
from modules.workflows.mcp.mcpServerRegistry import McpServerRegistry
from modules.workflows.mcp.mcpClient import McpClient
from modules.workflows.processing.shared.methodDiscovery import methods

# Create registry and register methods
mcpRegistry = McpServerRegistry()
for methodName, methodInfo in methods.items():
    if 'instance' in methodInfo:
        mcpRegistry.registerMethod(methodInfo['instance'])

# Create MCP client
mcpClient = McpClient(mcpRegistry)

# Discover tools (standardized format)
tools = mcpClient.listTools()
tool = mcpClient.getTool("ai.process")

# Tool includes JSON Schema
print(tool.name)  # "ai.process"
print(tool.description)  # "Universal AI document processing action"
print(tool.inputSchema)
# {
#   "type": "object",
#   "properties": {
#     "aiPrompt": {
#       "type": "string",
#       "description": "Instruction for the AI"
#     },
#     "documentList": {
#       "type": "array",
#       "description": "Document references"
#     },
#     "resultType": {
#       "type": "string",
#       "default": "txt",
#       "description": "Output format"
#     }
#   },
#   "required": ["aiPrompt"]
# }

Step 2: Parameter Preparation with Validation

# Build parameters (same format)
arguments = {
    "aiPrompt": "Summarize the following document",
    "documentList": ["docList:msg_123:task1_results"],
    "resultType": "txt"
}

# Optional: Validate against schema before calling
try:
    mcpClient._validateArguments(arguments, tool.inputSchema)
    print("Parameters valid")
except ValueError as e:
    print(f"Validation error: {e}")
    # Fix parameters before calling

Step 3: Tool Execution (MCP Standard)

# MCP tool call (JSON-RPC compatible)
mcpResult = await mcpClient.callTool("ai.process", arguments)

# Result: MCP-compatible dict
if mcpResult["success"]:
    for doc in mcpResult["documents"]:
        print(f"Generated: {doc['name']} ({doc['mimeType']})")
        # Handle encoding
        if doc.get("encoding") == "base64":
            import base64
            content = base64.b64decode(doc["data"]).decode('utf-8')
        else:
            content = doc["data"]
        print(f"Content: {content}")
else:
    print(f"Error: {mcpResult.get('error')}")

Step 4: Error Handling (Standardized)

# MCP standardizes error format
try:
    mcpResult = await mcpClient.callTool("ai.process", arguments)
    
    if not mcpResult.get("success"):
        # Standardized error format
        error = mcpResult.get("error", "Unknown error")
        logger.error(f"MCP tool failed: {error}")
        # Handle error (standardized format)
    
except ValueError as e:
    # Schema validation error
    logger.error(f"Invalid arguments: {e}")
except Exception as e:
    # Execution error
    logger.error(f"MCP call failed: {e}")

Key Differences Summary

Aspect BEFORE (Proprietary) AFTER (MCP Standard)
Discovery Manual docstring parsing Standardized JSON Schema
Validation Runtime errors only Pre-call schema validation
Format Python objects JSON-compatible dicts
Protocol Direct method calls JSON-RPC 2.0 standard
Interoperability Internal only External MCP servers supported
Documentation Docstring parsing Self-documenting schemas
Error Format Mixed (exceptions + ActionResult) Standardized dict format

Integration Example: Using MCP in ActionExecutor

# Modified ActionExecutor with MCP support
class ActionExecutor:
    def __init__(self, services, useMcp: bool = False):
        self.services = services
        self.useMcp = useMcp
        
        if useMcp:
            # Initialize MCP
            self.mcpRegistry = McpServerRegistry()
            self.mcpClient = McpClient(self.mcpRegistry)
            self._registerMethodsAsMcpServers()
        else:
            # Original discovery (unchanged)
            from modules.workflows.processing.shared.methodDiscovery import methods
            self.methods = methods
    
    async def executeAction(self, methodName: str, actionName: str, parameters: Dict):
        if self.useMcp:
            # MCP path
            toolName = f"{methodName}.{actionName}"
            mcpResult = await self.mcpClient.callTool(toolName, parameters)
            return self._mcpResultToActionResult(mcpResult)
        else:
            # Original path (unchanged)
            methodInfo = self.methods.get(methodName)
            methodInstance = methodInfo['instance']
            actionMethod = methodInstance.actions[actionName]['method']
            return await actionMethod(parameters=parameters)

# Usage (backward compatible)
executor = ActionExecutor(services, useMcp=True)  # Enable MCP
result = await executor.executeAction("ai", "process", {
    "aiPrompt": "...",
    "documentList": [...]
})
# Returns: ActionResult (same as before, but uses MCP internally)

Questions for Discussion

  1. Feature Flag: Should MCP be opt-in (feature flag) or always enabled?
  2. External Servers: Should we support external MCP servers from the start?
  3. Pydantic Models: Should we enhance parameter models to include JSON Schema metadata?
  4. Performance: Should we benchmark MCP overhead vs direct calls?
  5. Migration: Should we migrate all actions at once or gradually?