1032 lines
33 KiB
Markdown
1032 lines
33 KiB
Markdown
# MCP Integration Proposal: Adapting Methods and Actions to MCP Standard
|
|
|
|
## Overview
|
|
|
|
This document proposes a concrete solution for adapting the current method/action system to work with the MCP (Model Context Protocol) standard while maintaining backward compatibility with existing code.
|
|
|
|
---
|
|
|
|
## Current Architecture
|
|
|
|
### Current Method/Action Structure
|
|
|
|
**Method Definition** (`methodAi.py`):
|
|
```python
|
|
class MethodAi(MethodBase):
|
|
def __init__(self, services):
|
|
super().__init__(services)
|
|
self.name = "ai"
|
|
self.description = "AI processing methods"
|
|
|
|
@action
|
|
async def process(self, parameters: Dict[str, Any]) -> ActionResult:
|
|
"""
|
|
Universal AI document processing action.
|
|
|
|
Parameters:
|
|
- aiPrompt (str, required): Instruction for the AI
|
|
- documentList (list, optional): Document references
|
|
- resultType (str, optional): Output format. Default: txt
|
|
"""
|
|
# Implementation...
|
|
aiPrompt = parameters.get("aiPrompt")
|
|
documentList = parameters.get("documentList", [])
|
|
resultType = parameters.get("resultType", "txt")
|
|
# ... process and return ActionResult
|
|
```
|
|
|
|
**Action Discovery**:
|
|
- Actions discovered via `@action` decorator
|
|
- Collected via `method.actions` property
|
|
- Accessed via `methods[methodName]['instance'].actions[actionName]`
|
|
|
|
**Action Execution**:
|
|
```python
|
|
# Current execution flow
|
|
methodInstance = methods["ai"]['instance']
|
|
actionMethod = methodInstance.actions["process"]['method']
|
|
result = await actionMethod(parameters={"aiPrompt": "...", "documentList": [...]})
|
|
```
|
|
|
|
---
|
|
|
|
## Proposed MCP Integration Architecture
|
|
|
|
### Architecture Overview
|
|
|
|
```
|
|
Current System (Unchanged)
|
|
↓
|
|
MCP Adapter Layer (NEW)
|
|
├─> MethodMcpServer (wraps each Method)
|
|
├─> McpServerRegistry (manages all MCP servers)
|
|
└─> McpClient (calls MCP tools)
|
|
↓
|
|
MCP Protocol (JSON-RPC 2.0)
|
|
↓
|
|
External MCP Clients (optional - for external tools)
|
|
```
|
|
|
|
**Key Principle**: MCP integration is an **adapter layer** - existing methods/actions remain unchanged.
|
|
|
|
---
|
|
|
|
## Implementation Components
|
|
|
|
### 1. MCP Server Wrapper for Methods
|
|
|
|
**File**: `gateway/modules/workflows/mcp/methodMcpServer.py`
|
|
|
|
```python
|
|
"""
|
|
MCP Server wrapper for Method classes.
|
|
Exposes method actions as MCP tools.
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
from typing import Dict, Any, List, Optional, Union
|
|
from pydantic import BaseModel, Field
|
|
|
|
from modules.workflows.methods.methodBase import MethodBase
|
|
from modules.datamodels.datamodelChat import ActionResult
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class McpTool(BaseModel):
|
|
"""MCP Tool definition"""
|
|
name: str = Field(description="Tool name (method.action)")
|
|
description: str = Field(description="Tool description")
|
|
inputSchema: Dict[str, Any] = Field(description="JSON Schema for tool parameters")
|
|
|
|
class MethodMcpServer:
|
|
"""MCP Server that wraps a Method and exposes its actions as MCP tools"""
|
|
|
|
def __init__(self, method: MethodBase):
|
|
"""
|
|
Initialize MCP server for a method.
|
|
|
|
Args:
|
|
method: MethodBase instance (e.g., MethodAi, MethodOutlook)
|
|
"""
|
|
self.method = method
|
|
self.tools = self._discoverTools()
|
|
logger.info(f"Created MCP server for method '{method.name}' with {len(self.tools)} tools")
|
|
|
|
def _discoverTools(self) -> List[McpTool]:
|
|
"""Convert @action methods to MCP tools"""
|
|
tools = []
|
|
|
|
for actionName, actionInfo in self.method.actions.items():
|
|
# Build tool name: "method.action"
|
|
toolName = f"{self.method.name}.{actionName}"
|
|
|
|
# Get description from docstring
|
|
description = actionInfo.get('description', '')
|
|
mainDescription = self.method._extractMainDescription(description)
|
|
if not mainDescription:
|
|
mainDescription = f"{self.method.name}.{actionName} action"
|
|
|
|
# Convert parameters to JSON Schema
|
|
inputSchema = self._buildJsonSchema(actionInfo, description)
|
|
|
|
tool = McpTool(
|
|
name=toolName,
|
|
description=mainDescription,
|
|
inputSchema=inputSchema
|
|
)
|
|
tools.append(tool)
|
|
|
|
return tools
|
|
|
|
def _buildJsonSchema(self, actionInfo: Dict[str, Any], docstring: str) -> Dict[str, Any]:
|
|
"""Build JSON Schema from action parameters"""
|
|
# Extract parameter details from docstring
|
|
paramDescriptions, paramTypes = self.method._extractParameterDetails(docstring)
|
|
|
|
properties = {}
|
|
required = []
|
|
|
|
# Build schema from action parameters
|
|
for paramName, paramInfo in actionInfo.get('parameters', {}).items():
|
|
paramType = paramInfo.get('type', Any)
|
|
paramRequired = paramInfo.get('required', False)
|
|
paramDefault = paramInfo.get('default', None)
|
|
|
|
# Get description from docstring
|
|
paramDesc = paramDescriptions.get(paramName, '')
|
|
|
|
# Convert Python type to JSON Schema type
|
|
jsonType, jsonFormat = self._pythonTypeToJsonSchema(paramType)
|
|
|
|
properties[paramName] = {
|
|
"type": jsonType,
|
|
"description": paramDesc
|
|
}
|
|
|
|
if jsonFormat:
|
|
properties[paramName]["format"] = jsonFormat
|
|
|
|
if paramDefault is not None:
|
|
properties[paramName]["default"] = paramDefault
|
|
|
|
if paramRequired:
|
|
required.append(paramName)
|
|
|
|
schema = {
|
|
"type": "object",
|
|
"properties": properties
|
|
}
|
|
|
|
if required:
|
|
schema["required"] = required
|
|
|
|
return schema
|
|
|
|
def _pythonTypeToJsonSchema(self, pythonType: Any) -> tuple[str, Optional[str]]:
|
|
"""Convert Python type to JSON Schema type"""
|
|
typeMapping = {
|
|
str: ("string", None),
|
|
int: ("integer", None),
|
|
float: ("number", None),
|
|
bool: ("boolean", None),
|
|
list: ("array", None),
|
|
dict: ("object", None),
|
|
Any: ("object", None), # Fallback for Any type
|
|
}
|
|
|
|
# Handle Optional types
|
|
if hasattr(pythonType, '__origin__'):
|
|
if pythonType.__origin__ is Optional or pythonType.__origin__ is Union:
|
|
# Get the inner type (first argument)
|
|
innerType = pythonType.__args__[0] if pythonType.__args__ else Any
|
|
return self._pythonTypeToJsonSchema(innerType)
|
|
elif pythonType.__origin__ is list:
|
|
return ("array", None)
|
|
elif pythonType.__origin__ is dict:
|
|
return ("object", None)
|
|
|
|
# Direct type lookup
|
|
if pythonType in typeMapping:
|
|
return typeMapping[pythonType]
|
|
|
|
# Fallback
|
|
return ("object", None)
|
|
|
|
async def callTool(self, toolName: str, arguments: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
Execute MCP tool call.
|
|
|
|
Args:
|
|
toolName: Tool name (e.g., "ai.process")
|
|
arguments: Tool arguments (JSON Schema validated)
|
|
|
|
Returns:
|
|
Dict with ActionResult serialized to JSON-compatible format
|
|
"""
|
|
# Extract method and action name
|
|
if '.' not in toolName:
|
|
raise ValueError(f"Invalid tool name format: {toolName}. Expected 'method.action'")
|
|
|
|
methodName, actionName = toolName.split('.', 1)
|
|
|
|
if methodName != self.method.name:
|
|
raise ValueError(f"Tool '{toolName}' does not belong to method '{self.method.name}'")
|
|
|
|
if actionName not in self.method.actions:
|
|
raise ValueError(f"Action '{actionName}' not found in method '{self.method.name}'")
|
|
|
|
# Get action method
|
|
actionMethod = self.method.actions[actionName]['method']
|
|
|
|
# Execute action (existing code - unchanged)
|
|
try:
|
|
result = await actionMethod(parameters=arguments)
|
|
|
|
# Convert ActionResult to MCP-compatible format
|
|
return self._actionResultToMcpFormat(result)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error executing tool '{toolName}': {str(e)}")
|
|
return {
|
|
"success": False,
|
|
"error": str(e),
|
|
"content": None,
|
|
"documents": []
|
|
}
|
|
|
|
def _actionResultToMcpFormat(self, result: ActionResult) -> Dict[str, Any]:
|
|
"""Convert ActionResult to MCP-compatible JSON format"""
|
|
mcpResult = {
|
|
"success": result.success,
|
|
"error": result.error if hasattr(result, 'error') and result.error else None,
|
|
"content": None,
|
|
"documents": []
|
|
}
|
|
|
|
# Convert ActionDocuments to MCP format
|
|
if hasattr(result, 'documents') and result.documents:
|
|
for doc in result.documents:
|
|
docDict = {
|
|
"name": doc.name if hasattr(doc, 'name') else "document",
|
|
"mimeType": doc.mimeType if hasattr(doc, 'mimeType') else "application/octet-stream",
|
|
"data": None # Will be base64 encoded if binary
|
|
}
|
|
|
|
# Handle document data
|
|
if hasattr(doc, 'data'):
|
|
if isinstance(doc.data, bytes):
|
|
import base64
|
|
docDict["data"] = base64.b64encode(doc.data).decode('utf-8')
|
|
docDict["encoding"] = "base64"
|
|
elif isinstance(doc.data, str):
|
|
docDict["data"] = doc.data
|
|
docDict["encoding"] = "utf-8"
|
|
else:
|
|
# Try to serialize
|
|
docDict["data"] = json.dumps(doc.data) if doc.data else None
|
|
|
|
mcpResult["documents"].append(docDict)
|
|
|
|
# Extract content if available
|
|
if hasattr(result, 'content') and result.content:
|
|
mcpResult["content"] = result.content
|
|
|
|
return mcpResult
|
|
|
|
def getTools(self) -> List[McpTool]:
|
|
"""Get list of MCP tools provided by this server"""
|
|
return self.tools
|
|
|
|
def getTool(self, toolName: str) -> Optional[McpTool]:
|
|
"""Get specific tool by name"""
|
|
for tool in self.tools:
|
|
if tool.name == toolName:
|
|
return tool
|
|
return None
|
|
```
|
|
|
|
---
|
|
|
|
### 2. MCP Server Registry
|
|
|
|
**File**: `gateway/modules/workflows/mcp/mcpServerRegistry.py`
|
|
|
|
```python
|
|
"""
|
|
MCP Server Registry - manages all MCP servers for discovered methods.
|
|
"""
|
|
|
|
import logging
|
|
from typing import Dict, List, Optional
|
|
from modules.workflows.methods.methodBase import MethodBase
|
|
from modules.workflows.mcp.methodMcpServer import MethodMcpServer, McpTool
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class McpServerRegistry:
|
|
"""Registry for all MCP servers"""
|
|
|
|
def __init__(self):
|
|
self.servers: Dict[str, MethodMcpServer] = {}
|
|
|
|
def registerMethod(self, method: MethodBase) -> MethodMcpServer:
|
|
"""
|
|
Register a method as an MCP server.
|
|
|
|
Args:
|
|
method: MethodBase instance
|
|
|
|
Returns:
|
|
MethodMcpServer instance
|
|
"""
|
|
if method.name in self.servers:
|
|
logger.debug(f"MCP server for method '{method.name}' already registered")
|
|
return self.servers[method.name]
|
|
|
|
server = MethodMcpServer(method)
|
|
self.servers[method.name] = server
|
|
logger.info(f"Registered MCP server for method '{method.name}'")
|
|
|
|
return server
|
|
|
|
def getServer(self, methodName: str) -> Optional[MethodMcpServer]:
|
|
"""Get MCP server for a method"""
|
|
return self.servers.get(methodName)
|
|
|
|
def getAllTools(self) -> List[McpTool]:
|
|
"""Get all tools from all registered servers"""
|
|
allTools = []
|
|
for server in self.servers.values():
|
|
allTools.extend(server.getTools())
|
|
return allTools
|
|
|
|
def getTool(self, toolName: str) -> Optional[McpTool]:
|
|
"""Get tool by name (format: 'method.action')"""
|
|
for server in self.servers.values():
|
|
tool = server.getTool(toolName)
|
|
if tool:
|
|
return tool
|
|
return None
|
|
|
|
def findServerForTool(self, toolName: str) -> Optional[MethodMcpServer]:
|
|
"""Find server that provides a specific tool"""
|
|
if '.' not in toolName:
|
|
return None
|
|
|
|
methodName = toolName.split('.', 1)[0]
|
|
return self.servers.get(methodName)
|
|
```
|
|
|
|
---
|
|
|
|
### 3. MCP Client (for calling MCP tools)
|
|
|
|
**File**: `gateway/modules/workflows/mcp/mcpClient.py`
|
|
|
|
```python
|
|
"""
|
|
MCP Client - calls MCP tools (both internal and external).
|
|
"""
|
|
|
|
import logging
|
|
from typing import Dict, Any, Optional, List
|
|
from modules.workflows.mcp.mcpServerRegistry import McpServerRegistry
|
|
from modules.workflows.mcp.methodMcpServer import McpTool
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class McpClient:
|
|
"""MCP Client for calling MCP tools"""
|
|
|
|
def __init__(self, registry: McpServerRegistry):
|
|
"""
|
|
Initialize MCP client.
|
|
|
|
Args:
|
|
registry: McpServerRegistry instance
|
|
"""
|
|
self.registry = registry
|
|
|
|
async def callTool(
|
|
self,
|
|
toolName: str,
|
|
arguments: Dict[str, Any],
|
|
validateSchema: bool = True
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Call an MCP tool.
|
|
|
|
Args:
|
|
toolName: Tool name (format: "method.action")
|
|
arguments: Tool arguments
|
|
validateSchema: Whether to validate arguments against tool schema
|
|
|
|
Returns:
|
|
Dict with tool result (MCP-compatible format)
|
|
"""
|
|
# Find server for tool
|
|
server = self.registry.findServerForTool(toolName)
|
|
|
|
if not server:
|
|
raise ValueError(f"Tool '{toolName}' not found in any MCP server")
|
|
|
|
# Validate arguments if requested
|
|
if validateSchema:
|
|
tool = server.getTool(toolName)
|
|
if tool:
|
|
self._validateArguments(arguments, tool.inputSchema)
|
|
|
|
# Call tool
|
|
result = await server.callTool(toolName, arguments)
|
|
|
|
return result
|
|
|
|
def _validateArguments(self, arguments: Dict[str, Any], schema: Dict[str, Any]):
|
|
"""Validate arguments against JSON Schema"""
|
|
# Basic validation (can be enhanced with jsonschema library)
|
|
required = schema.get("required", [])
|
|
properties = schema.get("properties", {})
|
|
|
|
# Check required fields
|
|
for field in required:
|
|
if field not in arguments:
|
|
raise ValueError(f"Required argument '{field}' is missing")
|
|
|
|
# Check field types (basic validation)
|
|
for field, value in arguments.items():
|
|
if field in properties:
|
|
propSchema = properties[field]
|
|
expectedType = propSchema.get("type")
|
|
|
|
if expectedType == "string" and not isinstance(value, str):
|
|
raise ValueError(f"Argument '{field}' must be a string")
|
|
elif expectedType == "integer" and not isinstance(value, int):
|
|
raise ValueError(f"Argument '{field}' must be an integer")
|
|
elif expectedType == "boolean" and not isinstance(value, bool):
|
|
raise ValueError(f"Argument '{field}' must be a boolean")
|
|
# Add more type checks as needed
|
|
|
|
def listTools(self) -> List[McpTool]:
|
|
"""List all available tools"""
|
|
return self.registry.getAllTools()
|
|
|
|
def getTool(self, toolName: str) -> Optional[McpTool]:
|
|
"""Get tool definition"""
|
|
return self.registry.getTool(toolName)
|
|
```
|
|
|
|
---
|
|
|
|
### 4. Integration with Existing Action Executor
|
|
|
|
**File**: `gateway/modules/workflows/processing/actionExecutor.py` (modification)
|
|
|
|
```python
|
|
# Add MCP support to existing ActionExecutor
|
|
|
|
class ActionExecutor:
|
|
def __init__(self, services, useMcp: bool = False):
|
|
self.services = services
|
|
self.useMcp = useMcp # Feature flag
|
|
|
|
# Initialize MCP components if enabled
|
|
if useMcp:
|
|
from modules.workflows.mcp.mcpServerRegistry import McpServerRegistry
|
|
from modules.workflows.mcp.mcpClient import McpClient
|
|
|
|
self.mcpRegistry = McpServerRegistry()
|
|
self.mcpClient = McpClient(self.mcpRegistry)
|
|
|
|
# Register all discovered methods as MCP servers
|
|
self._registerMethodsAsMcpServers()
|
|
|
|
def _registerMethodsAsMcpServers(self):
|
|
"""Register all discovered methods as MCP servers"""
|
|
from modules.workflows.processing.shared.methodDiscovery import methods
|
|
|
|
for methodName, methodInfo in methods.items():
|
|
if 'instance' in methodInfo:
|
|
methodInstance = methodInfo['instance']
|
|
self.mcpRegistry.registerMethod(methodInstance)
|
|
|
|
async def executeAction(
|
|
self,
|
|
methodName: str,
|
|
actionName: str,
|
|
parameters: Dict[str, Any]
|
|
) -> ActionResult:
|
|
"""
|
|
Execute action (with optional MCP support).
|
|
|
|
Args:
|
|
methodName: Method name (e.g., "ai")
|
|
actionName: Action name (e.g., "process")
|
|
parameters: Action parameters
|
|
"""
|
|
# Use MCP if enabled
|
|
if self.useMcp:
|
|
toolName = f"{methodName}.{actionName}"
|
|
mcpResult = await self.mcpClient.callTool(toolName, parameters)
|
|
|
|
# Convert MCP result back to ActionResult
|
|
return self._mcpResultToActionResult(mcpResult)
|
|
|
|
# Original execution (unchanged)
|
|
methodInfo = methods.get(methodName)
|
|
if not methodInfo:
|
|
raise ValueError(f"Method '{methodName}' not found")
|
|
|
|
methodInstance = methodInfo['instance']
|
|
actionMethod = methodInstance.actions[actionName]['method']
|
|
|
|
return await actionMethod(parameters=parameters)
|
|
|
|
def _mcpResultToActionResult(self, mcpResult: Dict[str, Any]) -> ActionResult:
|
|
"""Convert MCP result to ActionResult"""
|
|
from modules.datamodels.datamodelChat import ActionResult, ActionDocument
|
|
|
|
if not mcpResult.get("success"):
|
|
return ActionResult.isFailure(error=mcpResult.get("error", "Unknown error"))
|
|
|
|
# Convert documents
|
|
actionDocuments = []
|
|
for docDict in mcpResult.get("documents", []):
|
|
# Decode document data if needed
|
|
data = docDict.get("data")
|
|
if docDict.get("encoding") == "base64":
|
|
import base64
|
|
data = base64.b64decode(data)
|
|
|
|
doc = ActionDocument(
|
|
name=docDict.get("name", "document"),
|
|
mimeType=docDict.get("mimeType", "application/octet-stream"),
|
|
data=data
|
|
)
|
|
actionDocuments.append(doc)
|
|
|
|
return ActionResult(
|
|
success=True,
|
|
documents=actionDocuments
|
|
)
|
|
```
|
|
|
|
---
|
|
|
|
## Comparison: Before vs After
|
|
|
|
### Example 1: Calling `ai.process` Action
|
|
|
|
#### BEFORE (Current Proprietary System)
|
|
|
|
```python
|
|
# ActionExecutor.executeAction()
|
|
methodName = "ai"
|
|
actionName = "process"
|
|
parameters = {
|
|
"aiPrompt": "Summarize the document",
|
|
"documentList": ["docList:msg_123:results"],
|
|
"resultType": "txt"
|
|
}
|
|
|
|
# Direct method call
|
|
methodInfo = methods[methodName]['instance']
|
|
actionMethod = methodInfo.actions[actionName]['method']
|
|
result = await actionMethod(parameters=parameters)
|
|
|
|
# Result: ActionResult object
|
|
print(result.success) # True
|
|
print(result.documents) # [ActionDocument(...)]
|
|
```
|
|
|
|
#### AFTER (With MCP Standard)
|
|
|
|
```python
|
|
# Option A: Via MCP Client (standardized interface)
|
|
mcpClient = McpClient(mcpRegistry)
|
|
toolName = "ai.process"
|
|
arguments = {
|
|
"aiPrompt": "Summarize the document",
|
|
"documentList": ["docList:msg_123:results"],
|
|
"resultType": "txt"
|
|
}
|
|
|
|
# MCP tool call (JSON-RPC compatible)
|
|
mcpResult = await mcpClient.callTool(toolName, arguments)
|
|
|
|
# Result: MCP-compatible dict
|
|
print(mcpResult["success"]) # True
|
|
print(mcpResult["documents"]) # [{"name": "...", "mimeType": "...", "data": "..."}]
|
|
|
|
# Option B: Via ActionExecutor (backward compatible)
|
|
# ActionExecutor can use MCP internally but maintain same interface
|
|
result = await actionExecutor.executeAction("ai", "process", parameters)
|
|
# Result: Still ActionResult (converted from MCP internally)
|
|
```
|
|
|
|
**Key Differences**:
|
|
- ✅ **Standardized Interface**: MCP uses JSON-RPC 2.0 protocol
|
|
- ✅ **Tool Discovery**: Tools can be discovered via `mcpClient.listTools()`
|
|
- ✅ **Schema Validation**: Arguments validated against JSON Schema
|
|
- ✅ **Backward Compatible**: ActionExecutor can use MCP internally
|
|
|
|
---
|
|
|
|
### Example 2: Tool Discovery
|
|
|
|
#### BEFORE (Current System)
|
|
|
|
```python
|
|
# Discover actions manually
|
|
from modules.workflows.processing.shared.methodDiscovery import methods
|
|
|
|
for methodName, methodInfo in methods.items():
|
|
print(f"Method: {methodName}")
|
|
for actionName, actionInfo in methodInfo['actions'].items():
|
|
print(f" - {actionName}: {actionInfo['description']}")
|
|
```
|
|
|
|
#### AFTER (With MCP Standard)
|
|
|
|
```python
|
|
# Discover tools via MCP
|
|
mcpClient = McpClient(mcpRegistry)
|
|
tools = mcpClient.listTools()
|
|
|
|
for tool in tools:
|
|
print(f"Tool: {tool.name}")
|
|
print(f" Description: {tool.description}")
|
|
print(f" Schema: {tool.inputSchema}")
|
|
|
|
# Example output:
|
|
# Tool: ai.process
|
|
# Description: Universal AI document processing action
|
|
# Schema: {
|
|
# "type": "object",
|
|
# "properties": {
|
|
# "aiPrompt": {"type": "string", "description": "..."},
|
|
# "documentList": {"type": "array", "description": "..."},
|
|
# "resultType": {"type": "string", "default": "txt"}
|
|
# },
|
|
# "required": ["aiPrompt"]
|
|
# }
|
|
```
|
|
|
|
**Key Differences**:
|
|
- ✅ **Standardized Format**: JSON Schema for all tools
|
|
- ✅ **Self-Documenting**: Schema includes descriptions and types
|
|
- ✅ **Interoperable**: Can be used by external MCP clients
|
|
|
|
---
|
|
|
|
### Example 3: External MCP Server Integration
|
|
|
|
#### BEFORE (Current System)
|
|
|
|
```python
|
|
# External tools require custom integration code
|
|
# Each external service needs custom connector
|
|
class CustomSlackConnector:
|
|
async def sendMessage(self, channel: str, text: str):
|
|
# Custom implementation
|
|
pass
|
|
```
|
|
|
|
#### AFTER (With MCP Standard)
|
|
|
|
```python
|
|
# External MCP servers can be integrated seamlessly
|
|
from modules.workflows.mcp.externalMcpClient import ExternalMcpClient
|
|
|
|
# Connect to external MCP server (e.g., Slack MCP server)
|
|
slackMcpClient = ExternalMcpClient(
|
|
serverUrl="http://slack-mcp-server:8080",
|
|
transport="http" # or "stdio", "websocket"
|
|
)
|
|
|
|
# Discover external tools
|
|
externalTools = await slackMcpClient.listTools()
|
|
# Returns: [McpTool(name="slack.sendMessage", ...), ...]
|
|
|
|
# Call external tool (same interface as internal tools)
|
|
result = await slackMcpClient.callTool(
|
|
"slack.sendMessage",
|
|
{"channel": "#general", "text": "Hello from workflow!"}
|
|
)
|
|
|
|
# Unified interface: internal and external tools work the same way
|
|
```
|
|
|
|
**Key Benefits**:
|
|
- ✅ **No Custom Code**: External tools work via standard MCP protocol
|
|
- ✅ **Unified Interface**: Internal and external tools use same API
|
|
- ✅ **Easy Integration**: Just connect to MCP server URL
|
|
|
|
---
|
|
|
|
## Migration Strategy
|
|
|
|
### Phase 1: Add MCP Adapter Layer (Non-Breaking)
|
|
|
|
1. **Create MCP modules** (new files, no changes to existing code)
|
|
- `methodMcpServer.py`
|
|
- `mcpServerRegistry.py`
|
|
- `mcpClient.py`
|
|
|
|
2. **Add feature flag** to ActionExecutor
|
|
```python
|
|
useMcp = os.getenv("ENABLE_MCP", "false").lower() == "true"
|
|
```
|
|
|
|
3. **Test MCP integration** alongside existing system
|
|
|
|
### Phase 2: Enable MCP for New Actions (Gradual)
|
|
|
|
1. **Enable MCP** for specific methods/actions
|
|
2. **Monitor performance** and compatibility
|
|
3. **Gradually migrate** more actions to MCP
|
|
|
|
### Phase 3: Full MCP Support (Optional)
|
|
|
|
1. **Enable MCP by default** (keep old path as fallback)
|
|
2. **Add external MCP server support**
|
|
3. **Document MCP tool APIs** for external clients
|
|
|
|
---
|
|
|
|
## Benefits Summary
|
|
|
|
### 1. Standardization
|
|
- ✅ **JSON-RPC 2.0 Protocol**: Standard protocol for tool calls
|
|
- ✅ **JSON Schema**: Standard schema for tool parameters
|
|
- ✅ **Tool Discovery**: Standard discovery mechanism
|
|
|
|
### 2. Interoperability
|
|
- ✅ **External Tools**: Easy integration of external MCP servers
|
|
- ✅ **Multi-Client**: Multiple clients can use same tools
|
|
- ✅ **Cross-Platform**: Works with any MCP-compatible system
|
|
|
|
### 3. Backward Compatibility
|
|
- ✅ **No Breaking Changes**: Existing code continues to work
|
|
- ✅ **Gradual Migration**: Can enable MCP per method/action
|
|
- ✅ **Feature Flag**: Enable/disable via configuration
|
|
|
|
### 4. Developer Experience
|
|
- ✅ **Self-Documenting**: Tools include schemas and descriptions
|
|
- ✅ **Type Safety**: JSON Schema validation
|
|
- ✅ **Error Handling**: Standardized error responses
|
|
|
|
---
|
|
|
|
## Next Steps
|
|
|
|
1. **Review Proposal**: Review this proposal and provide feedback
|
|
2. **Implement Core**: Create `methodMcpServer.py`, `mcpServerRegistry.py`, `mcpClient.py`
|
|
3. **Add Integration**: Modify `ActionExecutor` to support MCP (with feature flag)
|
|
4. **Test**: Test with existing `ai.process` action
|
|
5. **Document**: Document MCP tool APIs
|
|
6. **Extend**: Add support for external MCP servers
|
|
|
|
---
|
|
|
|
## Practical Example: Complete Before/After Comparison
|
|
|
|
### Scenario: Execute `ai.process` action to summarize a document
|
|
|
|
#### BEFORE: Current Proprietary System
|
|
|
|
**Step 1: Action Discovery**
|
|
```python
|
|
# In workflowProcessor or actionExecutor
|
|
from modules.workflows.processing.shared.methodDiscovery import methods
|
|
|
|
# Discover available actions
|
|
methodInfo = methods.get("ai")
|
|
if not methodInfo:
|
|
raise ValueError("Method 'ai' not found")
|
|
|
|
methodInstance = methodInfo['instance']
|
|
actionInfo = methodInstance.actions.get("process")
|
|
if not actionInfo:
|
|
raise ValueError("Action 'process' not found in method 'ai'")
|
|
|
|
# Get action description and parameters (from docstring parsing)
|
|
description = actionInfo['description']
|
|
parameters = actionInfo['parameters'] # Dict with type info from signature
|
|
```
|
|
|
|
**Step 2: Parameter Preparation**
|
|
```python
|
|
# Build parameters dict (no schema validation)
|
|
parameters = {
|
|
"aiPrompt": "Summarize the following document",
|
|
"documentList": ["docList:msg_123:task1_results"],
|
|
"resultType": "txt"
|
|
}
|
|
|
|
# No validation - errors only discovered at runtime
|
|
```
|
|
|
|
**Step 3: Action Execution**
|
|
```python
|
|
# Direct method call
|
|
actionMethod = actionInfo['method']
|
|
result = await actionMethod(parameters=parameters)
|
|
|
|
# Result: ActionResult object
|
|
if result.success:
|
|
for doc in result.documents:
|
|
print(f"Generated: {doc.name} ({doc.mimeType})")
|
|
# Access document data directly
|
|
content = doc.data if isinstance(doc.data, str) else doc.data.decode('utf-8')
|
|
else:
|
|
print(f"Error: {result.error}")
|
|
```
|
|
|
|
**Step 4: Error Handling**
|
|
```python
|
|
# Errors are Python exceptions or ActionResult with success=False
|
|
try:
|
|
result = await actionMethod(parameters=parameters)
|
|
if not result.success:
|
|
# Handle error from ActionResult
|
|
logger.error(f"Action failed: {result.error}")
|
|
except Exception as e:
|
|
# Handle Python exception
|
|
logger.error(f"Execution error: {str(e)}")
|
|
```
|
|
|
|
---
|
|
|
|
#### AFTER: With MCP Standard
|
|
|
|
**Step 1: Tool Discovery**
|
|
```python
|
|
# Initialize MCP components
|
|
from modules.workflows.mcp.mcpServerRegistry import McpServerRegistry
|
|
from modules.workflows.mcp.mcpClient import McpClient
|
|
from modules.workflows.processing.shared.methodDiscovery import methods
|
|
|
|
# Create registry and register methods
|
|
mcpRegistry = McpServerRegistry()
|
|
for methodName, methodInfo in methods.items():
|
|
if 'instance' in methodInfo:
|
|
mcpRegistry.registerMethod(methodInfo['instance'])
|
|
|
|
# Create MCP client
|
|
mcpClient = McpClient(mcpRegistry)
|
|
|
|
# Discover tools (standardized format)
|
|
tools = mcpClient.listTools()
|
|
tool = mcpClient.getTool("ai.process")
|
|
|
|
# Tool includes JSON Schema
|
|
print(tool.name) # "ai.process"
|
|
print(tool.description) # "Universal AI document processing action"
|
|
print(tool.inputSchema)
|
|
# {
|
|
# "type": "object",
|
|
# "properties": {
|
|
# "aiPrompt": {
|
|
# "type": "string",
|
|
# "description": "Instruction for the AI"
|
|
# },
|
|
# "documentList": {
|
|
# "type": "array",
|
|
# "description": "Document references"
|
|
# },
|
|
# "resultType": {
|
|
# "type": "string",
|
|
# "default": "txt",
|
|
# "description": "Output format"
|
|
# }
|
|
# },
|
|
# "required": ["aiPrompt"]
|
|
# }
|
|
```
|
|
|
|
**Step 2: Parameter Preparation with Validation**
|
|
```python
|
|
# Build parameters (same format)
|
|
arguments = {
|
|
"aiPrompt": "Summarize the following document",
|
|
"documentList": ["docList:msg_123:task1_results"],
|
|
"resultType": "txt"
|
|
}
|
|
|
|
# Optional: Validate against schema before calling
|
|
try:
|
|
mcpClient._validateArguments(arguments, tool.inputSchema)
|
|
print("Parameters valid")
|
|
except ValueError as e:
|
|
print(f"Validation error: {e}")
|
|
# Fix parameters before calling
|
|
```
|
|
|
|
**Step 3: Tool Execution (MCP Standard)**
|
|
```python
|
|
# MCP tool call (JSON-RPC compatible)
|
|
mcpResult = await mcpClient.callTool("ai.process", arguments)
|
|
|
|
# Result: MCP-compatible dict
|
|
if mcpResult["success"]:
|
|
for doc in mcpResult["documents"]:
|
|
print(f"Generated: {doc['name']} ({doc['mimeType']})")
|
|
# Handle encoding
|
|
if doc.get("encoding") == "base64":
|
|
import base64
|
|
content = base64.b64decode(doc["data"]).decode('utf-8')
|
|
else:
|
|
content = doc["data"]
|
|
print(f"Content: {content}")
|
|
else:
|
|
print(f"Error: {mcpResult.get('error')}")
|
|
```
|
|
|
|
**Step 4: Error Handling (Standardized)**
|
|
```python
|
|
# MCP standardizes error format
|
|
try:
|
|
mcpResult = await mcpClient.callTool("ai.process", arguments)
|
|
|
|
if not mcpResult.get("success"):
|
|
# Standardized error format
|
|
error = mcpResult.get("error", "Unknown error")
|
|
logger.error(f"MCP tool failed: {error}")
|
|
# Handle error (standardized format)
|
|
|
|
except ValueError as e:
|
|
# Schema validation error
|
|
logger.error(f"Invalid arguments: {e}")
|
|
except Exception as e:
|
|
# Execution error
|
|
logger.error(f"MCP call failed: {e}")
|
|
```
|
|
|
|
---
|
|
|
|
### Key Differences Summary
|
|
|
|
| Aspect | BEFORE (Proprietary) | AFTER (MCP Standard) |
|
|
|--------|---------------------|----------------------|
|
|
| **Discovery** | Manual docstring parsing | Standardized JSON Schema |
|
|
| **Validation** | Runtime errors only | Pre-call schema validation |
|
|
| **Format** | Python objects | JSON-compatible dicts |
|
|
| **Protocol** | Direct method calls | JSON-RPC 2.0 standard |
|
|
| **Interoperability** | Internal only | External MCP servers supported |
|
|
| **Documentation** | Docstring parsing | Self-documenting schemas |
|
|
| **Error Format** | Mixed (exceptions + ActionResult) | Standardized dict format |
|
|
|
|
---
|
|
|
|
### Integration Example: Using MCP in ActionExecutor
|
|
|
|
```python
|
|
# Modified ActionExecutor with MCP support
|
|
class ActionExecutor:
|
|
def __init__(self, services, useMcp: bool = False):
|
|
self.services = services
|
|
self.useMcp = useMcp
|
|
|
|
if useMcp:
|
|
# Initialize MCP
|
|
self.mcpRegistry = McpServerRegistry()
|
|
self.mcpClient = McpClient(self.mcpRegistry)
|
|
self._registerMethodsAsMcpServers()
|
|
else:
|
|
# Original discovery (unchanged)
|
|
from modules.workflows.processing.shared.methodDiscovery import methods
|
|
self.methods = methods
|
|
|
|
async def executeAction(self, methodName: str, actionName: str, parameters: Dict):
|
|
if self.useMcp:
|
|
# MCP path
|
|
toolName = f"{methodName}.{actionName}"
|
|
mcpResult = await self.mcpClient.callTool(toolName, parameters)
|
|
return self._mcpResultToActionResult(mcpResult)
|
|
else:
|
|
# Original path (unchanged)
|
|
methodInfo = self.methods.get(methodName)
|
|
methodInstance = methodInfo['instance']
|
|
actionMethod = methodInstance.actions[actionName]['method']
|
|
return await actionMethod(parameters=parameters)
|
|
|
|
# Usage (backward compatible)
|
|
executor = ActionExecutor(services, useMcp=True) # Enable MCP
|
|
result = await executor.executeAction("ai", "process", {
|
|
"aiPrompt": "...",
|
|
"documentList": [...]
|
|
})
|
|
# Returns: ActionResult (same as before, but uses MCP internally)
|
|
```
|
|
|
|
---
|
|
|
|
## Questions for Discussion
|
|
|
|
1. **Feature Flag**: Should MCP be opt-in (feature flag) or always enabled?
|
|
2. **External Servers**: Should we support external MCP servers from the start?
|
|
3. **Pydantic Models**: Should we enhance parameter models to include JSON Schema metadata?
|
|
4. **Performance**: Should we benchmark MCP overhead vs direct calls?
|
|
5. **Migration**: Should we migrate all actions at once or gradually?
|
|
|