Model Context Protocol (MCP) Integration¶
JAF provides comprehensive support for the Model Context Protocol (MCP), enabling seamless integration with external tools and services. MCP allows agents to access tools and resources from external servers through standardized protocols.
Overview¶
The Model Context Protocol (MCP) is an open standard that enables secure connections between host applications (like JAF) and external data sources and tools. JAF's MCP integration provides:
- Multiple Transport Mechanisms: Support for stdio, WebSocket, and SSE transports
- Secure Tool Integration: Safe execution of external tools with validation
- Dynamic Tool Discovery: Automatic detection and integration of MCP server tools
- Type-Safe Operations: Pydantic-based validation for all MCP interactions
- Production Ready: Robust error handling and connection management
Transport Mechanisms¶
JAF supports all three MCP transport mechanisms:
1. Stdio Transport¶
Best for local MCP servers running as separate processes:
from jaf.providers.mcp import create_mcp_stdio_client
# Connect to a local filesystem MCP server
mcp_client = create_mcp_stdio_client([
'npx', '-y', '@modelcontextprotocol/server-filesystem', '/Users'
])
await mcp_client.initialize()
Use Cases: - Local development tools - File system operations - Command-line utilities - Local database connections
2. WebSocket Transport¶
Ideal for real-time, bidirectional communication:
from jaf.providers.mcp import create_mcp_websocket_client
# Connect to a WebSocket MCP server
mcp_client = create_mcp_websocket_client('ws://localhost:8080/mcp')
await mcp_client.initialize()
Use Cases: - Real-time data feeds - Interactive services - Persistent connections - Streaming operations
3. Server-Sent Events (SSE) Transport¶
Perfect for server-to-client streaming:
from jaf.providers.mcp import create_mcp_sse_client
# Connect to an SSE MCP server
mcp_client = create_mcp_sse_client('http://localhost:8080/events')
await mcp_client.initialize()
Use Cases: - Event streams - Notifications - Log monitoring - Status updates
4. HTTP Transport¶
For simple request-response patterns:
from jaf.providers.mcp import create_mcp_http_client
# Connect to an HTTP MCP server
mcp_client = create_mcp_http_client('http://localhost:8080/mcp')
await mcp_client.initialize()
Use Cases: - REST API integration - Simple tool calls - Stateless operations - Web service integration
Basic Usage¶
Creating MCP Tools¶
Convert MCP server tools into JAF tools:
from jaf.providers.mcp import MCPTool, MCPToolArgs
from pydantic import BaseModel
# Define arguments for the MCP tool
class FileReadArgs(MCPToolArgs):
path: str
# Create an MCP tool
mcp_tool = MCPTool(mcp_client, "read_file", FileReadArgs)
# Use in an agent
from jaf import Agent
def agent_instructions(state):
return "You can read files using the read_file tool."
agent = Agent(
name="FileAgent",
instructions=agent_instructions,
tools=[mcp_tool]
)
Dynamic Tool Discovery¶
Automatically discover and integrate all available MCP tools:
from jaf.providers.mcp import create_mcp_tools_from_client
# Connect to MCP server
mcp_client = create_mcp_stdio_client(['mcp-server-command'])
# Automatically create JAF tools from all available MCP tools
mcp_tools = await create_mcp_tools_from_client(mcp_client)
# Use all tools in an agent
agent = Agent(
name="MCPAgent",
instructions=lambda state: "You have access to various MCP tools.",
tools=mcp_tools
)
Advanced Features¶
Secure Tool Wrapper¶
Create secure wrappers for MCP tools with validation:
from jaf.core.tool_results import ToolResult, ToolResultStatus, ToolErrorCodes
class SecureMCPTool:
def __init__(self, mcp_tool: MCPTool, allowed_paths: List[str]):
self.mcp_tool = mcp_tool
self.allowed_paths = allowed_paths
self._schema = mcp_tool.schema
@property
def schema(self):
return self._schema
async def execute(self, args, context) -> ToolResult:
# Validate paths for security
if hasattr(args, 'path') and args.path:
path = str(args.path)
is_allowed = any(path.startswith(allowed) for allowed in self.allowed_paths)
if not is_allowed:
return ToolResult(
status=ToolResultStatus.ERROR,
error_code=ToolErrorCodes.INVALID_INPUT,
error_message=f"Path '{path}' not allowed",
data={"path": path, "allowed_paths": self.allowed_paths}
)
# Execute the original MCP tool
return await self.mcp_tool.execute(args, context)
# Use secure wrapper
secure_tool = SecureMCPTool(mcp_tool, ['/Users', '/tmp'])
Custom Transport Implementation¶
Create custom transport mechanisms:
from jaf.providers.mcp import MCPTransport
import asyncio
class CustomMCPTransport(MCPTransport):
def __init__(self, config):
self.config = config
self.connection = None
async def connect(self):
# Implement custom connection logic
self.connection = await self._create_connection()
async def disconnect(self):
# Implement cleanup
if self.connection:
await self.connection.close()
async def send_request(self, method: str, params: dict) -> dict:
# Implement request sending
return await self._send_and_receive(method, params)
async def send_notification(self, method: str, params: dict):
# Implement notification sending
await self._send_notification(method, params)
Production Examples¶
Filesystem Agent with MCP¶
Complete example of a filesystem agent using MCP:
import asyncio
from jaf import Agent, run_server
from jaf.providers.mcp import create_mcp_stdio_client, MCPTool, MCPToolArgs
from jaf.providers.model import make_litellm_provider
from jaf.core.types import RunConfig
class DynamicMCPArgs(MCPToolArgs):
"""Dynamic args that accept any parameters."""
class Config:
extra = "allow"
def __init__(self, **data):
super().__init__()
for key, value in data.items():
setattr(self, key, value)
async def create_filesystem_agent():
# Connect to filesystem MCP server
mcp_client = create_mcp_stdio_client([
'npx', '-y', '@modelcontextprotocol/server-filesystem', '/Users'
])
await mcp_client.initialize()
# Create tools for all available MCP operations
tools = []
for tool_name in mcp_client.get_available_tools():
mcp_tool = MCPTool(mcp_client, tool_name, DynamicMCPArgs)
tools.append(mcp_tool)
# Create agent with filesystem capabilities
def instructions(state):
return """You are a filesystem assistant with access to file operations.
You can read, write, list, and manage files safely within allowed directories.
Always validate paths and provide helpful feedback to users."""
return Agent(
name="FilesystemAgent",
instructions=instructions,
tools=tools
)
async def main():
# Create agent
agent = await create_filesystem_agent()
# Setup providers
model_provider = make_litellm_provider('http://localhost:4000')
# Create run config
run_config = RunConfig(
agent_registry={"FilesystemAgent": agent},
model_provider=model_provider,
max_turns=10
)
# Start server
await run_server([agent], run_config, host="127.0.0.1", port=3003)
if __name__ == "__main__":
asyncio.run(main())
Multi-Transport MCP Integration¶
Example using multiple MCP transports:
async def create_multi_transport_agent():
# Filesystem via stdio
fs_client = create_mcp_stdio_client([
'npx', '-y', '@modelcontextprotocol/server-filesystem', '/Users'
])
# Database via WebSocket
db_client = create_mcp_websocket_client('ws://localhost:8080/database')
# Events via SSE
events_client = create_mcp_sse_client('http://localhost:8080/events')
# Initialize all clients
await fs_client.initialize()
await db_client.initialize()
await events_client.initialize()
# Create tools from all clients
fs_tools = await create_mcp_tools_from_client(fs_client)
db_tools = await create_mcp_tools_from_client(db_client)
# Combine all tools
all_tools = fs_tools + db_tools
def instructions(state):
return """You are a comprehensive assistant with access to:
- Filesystem operations (read, write, list files)
- Database operations (query, update, insert)
- Real-time event monitoring
Use these capabilities to help users with complex tasks."""
return Agent(
name="MultiTransportAgent",
instructions=instructions,
tools=all_tools
)
Error Handling¶
Connection Management¶
Handle MCP connection errors gracefully:
async def robust_mcp_connection(command):
max_retries = 3
retry_delay = 1.0
for attempt in range(max_retries):
try:
mcp_client = create_mcp_stdio_client(command)
await mcp_client.initialize()
return mcp_client
except Exception as e:
if attempt == max_retries - 1:
raise Exception(f"Failed to connect after {max_retries} attempts: {e}")
print(f"Connection attempt {attempt + 1} failed: {e}")
await asyncio.sleep(retry_delay)
retry_delay *= 2 # Exponential backoff
Tool Execution Safety¶
Implement safe tool execution with timeouts:
import asyncio
class SafeMCPTool:
def __init__(self, mcp_tool: MCPTool, timeout: float = 30.0):
self.mcp_tool = mcp_tool
self.timeout = timeout
self._schema = mcp_tool.schema
@property
def schema(self):
return self._schema
async def execute(self, args, context) -> ToolResult:
try:
# Execute with timeout
result = await asyncio.wait_for(
self.mcp_tool.execute(args, context),
timeout=self.timeout
)
return result
except asyncio.TimeoutError:
return ToolResult(
status=ToolResultStatus.ERROR,
error_code=ToolErrorCodes.TIMEOUT,
error_message=f"Tool execution timed out after {self.timeout}s",
data={"timeout": self.timeout}
)
except Exception as e:
return ToolResult(
status=ToolResultStatus.ERROR,
error_code=ToolErrorCodes.EXECUTION_FAILED,
error_message=f"Tool execution failed: {e}",
data={"error": str(e)}
)
Best Practices¶
1. Security Considerations¶
Always validate inputs and restrict access:
# Good: Validate file paths
def validate_path(path: str, allowed_dirs: List[str]) -> bool:
abs_path = os.path.abspath(path)
return any(abs_path.startswith(allowed) for allowed in allowed_dirs)
# Good: Sanitize inputs
def sanitize_filename(filename: str) -> str:
# Remove dangerous characters
return re.sub(r'[^\w\-_\.]', '', filename)
2. Resource Management¶
Properly manage MCP connections:
class MCPManager:
def __init__(self):
self.clients = {}
async def add_client(self, name: str, client: MCPClient):
self.clients[name] = client
await client.initialize()
async def close_all(self):
for client in self.clients.values():
await client.close()
self.clients.clear()
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close_all()
# Usage
async with MCPManager() as manager:
await manager.add_client("fs", fs_client)
await manager.add_client("db", db_client)
# Clients automatically closed on exit
3. Performance Optimization¶
Cache tool schemas and reuse connections:
class CachedMCPClient:
def __init__(self, client: MCPClient):
self.client = client
self._tool_cache = {}
self._schema_cache = {}
async def get_tool(self, name: str) -> MCPTool:
if name not in self._tool_cache:
self._tool_cache[name] = MCPTool(self.client, name, DynamicMCPArgs)
return self._tool_cache[name]
def get_cached_tools(self) -> List[MCPTool]:
return list(self._tool_cache.values())
Testing MCP Integration¶
Unit Testing¶
Test MCP tools with mock clients:
import pytest
from unittest.mock import AsyncMock
@pytest.mark.asyncio
async def test_mcp_tool_execution():
# Mock MCP client
mock_client = AsyncMock()
mock_client.call_tool.return_value = {
"content": [{"type": "text", "text": "File contents"}]
}
# Create tool
tool = MCPTool(mock_client, "read_file", FileReadArgs)
# Test execution
args = FileReadArgs(path="/test/file.txt")
result = await tool.execute(args, {})
assert result.status == ToolResultStatus.SUCCESS
assert "File contents" in result.data
mock_client.call_tool.assert_called_once()
Integration Testing¶
Test with real MCP servers:
@pytest.mark.asyncio
async def test_filesystem_integration():
# Start test MCP server
client = create_mcp_stdio_client(['test-mcp-server'])
await client.initialize()
try:
# Test tool discovery
tools = await create_mcp_tools_from_client(client)
assert len(tools) > 0
# Test tool execution
if 'list_directory' in [t.schema.name for t in tools]:
list_tool = next(t for t in tools if t.schema.name == 'list_directory')
result = await list_tool.execute({'path': '/tmp'}, {})
assert result.status == ToolResultStatus.SUCCESS
finally:
await client.close()
Troubleshooting¶
Common Issues¶
-
Connection Failures
-
Tool Discovery Issues
-
Execution Errors
Debug Mode¶
Enable debug logging for MCP operations:
import logging
# Enable debug logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger('jaf.providers.mcp')
# Add to MCP client
class DebugMCPClient(MCPClient):
async def call_tool(self, name: str, arguments: dict) -> dict:
logger.debug(f"Calling MCP tool: {name} with args: {arguments}")
result = await super().call_tool(name, arguments)
logger.debug(f"MCP tool result: {result}")
return result
Next Steps¶
- Explore MCP Examples for practical implementations
- Learn about MCP Transport Configuration for advanced setups
- Check MCP Security for production deployment guidelines
- Review MCP Performance for optimization techniques