Skip to content

MCP Examples

This page provides practical examples of using JAF with Model Context Protocol (MCP) servers across different transport mechanisms and use cases.

Quick Start Examples

Filesystem Operations with Stdio

Connect to a filesystem MCP server and perform file operations:

import asyncio
from jaf import Agent, run
from jaf.providers.mcp import create_mcp_stdio_client, create_mcp_tools_from_client
from jaf.providers.model import make_litellm_provider
from jaf.core.types import RunConfig, RunState, Message, ContentRole

async def filesystem_example():
    # Connect to filesystem MCP server
    mcp_client = create_mcp_stdio_client([
        'npx', '-y', '@modelcontextprotocol/server-filesystem', '/Users'
    ])

    # Create tools from MCP server
    mcp_tools = await create_mcp_tools_from_client(mcp_client)

    # Create filesystem agent
    def instructions(state):
        return """You are a helpful filesystem assistant. You can:
        - List directory contents
        - Read file contents
        - Write files
        - Get file information

        Always be helpful and explain what you're doing."""

    agent = Agent(
        name="FilesystemAgent",
        instructions=instructions,
        tools=mcp_tools
    )

    # Setup model provider
    model_provider = make_litellm_provider('http://localhost:4000')

    # Create initial state
    initial_state = RunState(
        messages=[
            Message(
                role=ContentRole.USER,
                content="List the files in my Desktop directory"
            )
        ],
        current_agent_name="FilesystemAgent"
    )

    # Create run config
    config = RunConfig(
        agent_registry={"FilesystemAgent": agent},
        model_provider=model_provider,
        max_turns=5
    )

    # Run the agent
    result = await run(initial_state, config)

    print("Agent Response:")
    for message in result.final_state.messages:
        if message.role == ContentRole.ASSISTANT:
            print(f"Assistant: {message.content}")

    # Cleanup
    await mcp_client.close()

if __name__ == "__main__":
    asyncio.run(filesystem_example())

WebSocket MCP Integration

Connect to a WebSocket MCP server for real-time operations:

import asyncio
from jaf import Agent
from jaf.providers.mcp import create_mcp_websocket_client, MCPTool, MCPToolArgs
from pydantic import BaseModel

class DatabaseQueryArgs(MCPToolArgs):
    query: str
    parameters: dict = {}

async def websocket_database_example():
    # Connect to WebSocket MCP server
    mcp_client = create_mcp_websocket_client('ws://localhost:8080/mcp')
    await mcp_client.initialize()

    # Create specific tools
    query_tool = MCPTool(mcp_client, "execute_query", DatabaseQueryArgs)

    # Create database agent
    def instructions(state):
        return """You are a database assistant. You can execute SQL queries
        and help users interact with the database safely."""

    agent = Agent(
        name="DatabaseAgent",
        instructions=instructions,
        tools=[query_tool]
    )

    print("Database agent ready with WebSocket MCP connection")

    # Example usage
    try:
        # Test tool execution
        args = DatabaseQueryArgs(
            query="SELECT COUNT(*) FROM users",
            parameters={}
        )
        result = await query_tool.execute(args, {})
        print(f"Query result: {result.data}")

    finally:
        await mcp_client.close()

if __name__ == "__main__":
    asyncio.run(websocket_database_example())

Advanced Examples

Multi-Transport MCP Server

Create a server that uses multiple MCP transports:

import asyncio
import os
from jaf import Agent, run_server
from jaf.providers.mcp import (
    create_mcp_stdio_client,
    create_mcp_websocket_client,
    create_mcp_sse_client,
    create_mcp_tools_from_client
)
from jaf.providers.model import make_litellm_provider
from jaf.core.types import RunConfig

async def create_multi_transport_server():
    """Create a server with multiple MCP transports."""

    # Initialize multiple MCP clients
    clients = {}
    all_tools = []

    try:
        # Filesystem via stdio
        print("🔌 Connecting to filesystem MCP server...")
        fs_client = create_mcp_stdio_client([
            'npx', '-y', '@modelcontextprotocol/server-filesystem', '/Users'
        ])
        clients['filesystem'] = fs_client
        fs_tools = await create_mcp_tools_from_client(fs_client)
        all_tools.extend(fs_tools)
        print(f"✅ Filesystem: {len(fs_tools)} tools loaded")

        # Database via WebSocket (if available)
        try:
            print("🔌 Connecting to database MCP server...")
            db_client = create_mcp_websocket_client('ws://localhost:8080/database')
            clients['database'] = db_client
            db_tools = await create_mcp_tools_from_client(db_client)
            all_tools.extend(db_tools)
            print(f"✅ Database: {len(db_tools)} tools loaded")
        except Exception as e:
            print(f"⚠️ Database MCP server not available: {e}")

        # Events via SSE (if available)
        try:
            print("🔌 Connecting to events MCP server...")
            events_client = create_mcp_sse_client('http://localhost:8080/events')
            clients['events'] = events_client
            await events_client.initialize()
            print("✅ Events: SSE connection established")
        except Exception as e:
            print(f"⚠️ Events MCP server not available: {e}")

        # Create comprehensive agent
        def instructions(state):
            available_transports = list(clients.keys())
            tool_count = len(all_tools)

            return f"""You are a comprehensive assistant with access to multiple MCP servers:

**Available Transports:** {', '.join(available_transports)}
**Total Tools:** {tool_count}

**Capabilities:**
- Filesystem operations (read, write, list files)
- Database operations (if available)
- Real-time event monitoring (if available)

**Instructions:**
- Help users with complex tasks using available tools
- Explain which transport/server you're using for each operation
- Handle errors gracefully and suggest alternatives
- Provide detailed feedback about operations performed

Always be helpful and explain your actions clearly."""

        agent = Agent(
            name="MultiTransportAgent",
            instructions=instructions,
            tools=all_tools
        )

        # Setup providers
        model_provider = make_litellm_provider(
            os.getenv('LITELLM_URL', 'http://localhost:4000'),
            os.getenv('LITELLM_API_KEY')
        )

        # Create run config
        run_config = RunConfig(
            agent_registry={"MultiTransportAgent": agent},
            model_provider=model_provider,
            max_turns=10
        )

        print(f"\n🚀 Starting multi-transport MCP server...")
        print(f"📊 Total MCP clients: {len(clients)}")
        print(f"🔧 Total tools available: {len(all_tools)}")

        # Start server
        await run_server(
            [agent],
            run_config,
            host="127.0.0.1",
            port=3004,
            cors=True
        )

    except Exception as e:
        print(f"❌ Failed to start multi-transport server: {e}")

    finally:
        # Cleanup all clients
        for name, client in clients.items():
            try:
                await client.close()
                print(f"🔌 Closed {name} MCP client")
            except Exception as e:
                print(f"⚠️ Error closing {name} client: {e}")

if __name__ == "__main__":
    asyncio.run(create_multi_transport_server())

Secure MCP Tool Wrapper

Example of creating secure wrappers for MCP tools:

import os
import re
from typing import List
from jaf.providers.mcp import MCPTool, MCPToolArgs
from jaf.core.tool_results import ToolResult, ToolResultStatus, ToolErrorCodes

class SecureFilesystemTool:
    """Secure wrapper for filesystem MCP tools."""

    def __init__(self, mcp_tool: MCPTool, allowed_paths: List[str], max_file_size: int = 10_000_000):
        self.mcp_tool = mcp_tool
        self.allowed_paths = [os.path.abspath(path) for path in allowed_paths]
        self.max_file_size = max_file_size
        self._schema = mcp_tool.schema

    @property
    def schema(self):
        return self._schema

    def _validate_path(self, path: str) -> tuple[bool, str]:
        """Validate if path is allowed."""
        try:
            abs_path = os.path.abspath(path)

            # Check if path is within allowed directories
            is_allowed = any(abs_path.startswith(allowed) for allowed in self.allowed_paths)
            if not is_allowed:
                return False, f"Path '{path}' is not within allowed directories"

            # Check for path traversal attempts
            if '..' in path or path.startswith('/'):
                return False, f"Path '{path}' contains invalid characters"

            return True, ""

        except Exception as e:
            return False, f"Invalid path: {e}"

    def _validate_filename(self, filename: str) -> tuple[bool, str]:
        """Validate filename for security."""
        # Remove dangerous characters
        safe_pattern = re.compile(r'^[a-zA-Z0-9._-]+$')
        if not safe_pattern.match(filename):
            return False, f"Filename '{filename}' contains invalid characters"

        # Check length
        if len(filename) > 255:
            return False, f"Filename too long: {len(filename)} characters"

        return True, ""

    async def execute(self, args, context) -> ToolResult:
        """Execute with security validation."""
        try:
            # Path validation
            if hasattr(args, 'path') and args.path:
                is_valid, error_msg = self._validate_path(args.path)
                if not is_valid:
                    return ToolResult(
                        status=ToolResultStatus.ERROR,
                        error_code=ToolErrorCodes.INVALID_INPUT,
                        error_message=error_msg,
                        data={"path": args.path, "allowed_paths": self.allowed_paths}
                    )

            # Filename validation for write operations
            if hasattr(args, 'filename') and args.filename:
                is_valid, error_msg = self._validate_filename(args.filename)
                if not is_valid:
                    return ToolResult(
                        status=ToolResultStatus.ERROR,
                        error_code=ToolErrorCodes.INVALID_INPUT,
                        error_message=error_msg,
                        data={"filename": args.filename}
                    )

            # File size validation for write operations
            if hasattr(args, 'content') and args.content:
                content_size = len(str(args.content).encode('utf-8'))
                if content_size > self.max_file_size:
                    return ToolResult(
                        status=ToolResultStatus.ERROR,
                        error_code=ToolErrorCodes.INVALID_INPUT,
                        error_message=f"Content too large: {content_size} bytes (max: {self.max_file_size})",
                        data={"size": content_size, "max_size": self.max_file_size}
                    )

            # Execute the original tool
            result = await self.mcp_tool.execute(args, context)

            # Add security metadata
            if result.metadata is None:
                result.metadata = {}
            result.metadata['security_validated'] = True
            result.metadata['allowed_paths'] = self.allowed_paths

            return result

        except Exception as e:
            return ToolResult(
                status=ToolResultStatus.ERROR,
                error_code=ToolErrorCodes.EXECUTION_FAILED,
                error_message=f"Secure tool execution failed: {e}",
                data={"error": str(e)}
            )

# Usage example
async def create_secure_filesystem_agent():
    # Connect to MCP server
    mcp_client = create_mcp_stdio_client([
        'npx', '-y', '@modelcontextprotocol/server-filesystem', '/Users'
    ])

    # Get MCP tools
    mcp_tools = await create_mcp_tools_from_client(mcp_client)

    # Wrap with security
    secure_tools = []
    allowed_paths = ['/Users/username/Documents', '/tmp']

    for mcp_tool in mcp_tools:
        secure_tool = SecureFilesystemTool(mcp_tool, allowed_paths)
        secure_tools.append(secure_tool)

    # Create agent with secure tools
    def instructions(state):
        return """You are a secure filesystem assistant. You can perform file operations
        within allowed directories only. All operations are validated for security."""

    return Agent(
        name="SecureFilesystemAgent",
        instructions=instructions,
        tools=secure_tools
    )

MCP Tool Testing Framework

Example of testing MCP tools:

import pytest
import asyncio
from unittest.mock import AsyncMock, MagicMock
from jaf.providers.mcp import MCPTool, MCPToolArgs, MCPClient
from jaf.core.tool_results import ToolResultStatus

class TestMCPArgs(MCPToolArgs):
    test_param: str

class TestMCPIntegration:
    """Test suite for MCP integration."""

    @pytest.fixture
    async def mock_mcp_client(self):
        """Create a mock MCP client."""
        client = AsyncMock(spec=MCPClient)
        client.get_tool_info.return_value = {
            "name": "test_tool",
            "description": "Test tool for unit testing"
        }
        return client

    @pytest.fixture
    def mcp_tool(self, mock_mcp_client):
        """Create an MCP tool for testing."""
        return MCPTool(mock_mcp_client, "test_tool", TestMCPArgs)

    @pytest.mark.asyncio
    async def test_successful_tool_execution(self, mcp_tool, mock_mcp_client):
        """Test successful tool execution."""
        # Setup mock response
        mock_mcp_client.call_tool.return_value = {
            "content": [{"type": "text", "text": "Success!"}]
        }

        # Execute tool
        args = TestMCPArgs(test_param="test_value")
        result = await mcp_tool.execute(args, {})

        # Verify results
        assert result.status == ToolResultStatus.SUCCESS
        assert "Success!" in result.data
        mock_mcp_client.call_tool.assert_called_once_with(
            "test_tool", 
            {"test_param": "test_value"}
        )

    @pytest.mark.asyncio
    async def test_tool_execution_error(self, mcp_tool, mock_mcp_client):
        """Test tool execution with error."""
        # Setup mock error response
        mock_mcp_client.call_tool.return_value = {
            "error": {"message": "Tool execution failed"}
        }

        # Execute tool
        args = TestMCPArgs(test_param="test_value")
        result = await mcp_tool.execute(args, {})

        # Verify error handling
        assert result.status == ToolResultStatus.ERROR
        assert "Tool execution failed" in result.error_message

    @pytest.mark.asyncio
    async def test_tool_execution_exception(self, mcp_tool, mock_mcp_client):
        """Test tool execution with exception."""
        # Setup mock exception
        mock_mcp_client.call_tool.side_effect = Exception("Connection failed")

        # Execute tool
        args = TestMCPArgs(test_param="test_value")
        result = await mcp_tool.execute(args, {})

        # Verify exception handling
        assert result.status == ToolResultStatus.ERROR
        assert "Connection failed" in result.error_message

# Integration test with real MCP server
@pytest.mark.integration
@pytest.mark.asyncio
async def test_real_filesystem_mcp():
    """Integration test with real filesystem MCP server."""
    try:
        # Connect to real MCP server
        from jaf.providers.mcp import create_mcp_stdio_client, create_mcp_tools_from_client

        client = create_mcp_stdio_client([
            'npx', '-y', '@modelcontextprotocol/server-filesystem', '/tmp'
        ])

        # Test connection and tool discovery
        tools = await create_mcp_tools_from_client(client)
        assert len(tools) > 0

        # Test a simple operation
        list_tool = next((t for t in tools if 'list' in t.schema.name.lower()), None)
        if list_tool:
            # Create dynamic args
            class DynamicArgs(MCPToolArgs):
                class Config:
                    extra = "allow"

                def __init__(self, **data):
                    super().__init__()
                    for key, value in data.items():
                        setattr(self, key, value)

            args = DynamicArgs(path="/tmp")
            result = await list_tool.execute(args, {})
            assert result.status == ToolResultStatus.SUCCESS

        await client.close()

    except Exception as e:
        pytest.skip(f"Real MCP server not available: {e}")

# Run tests
if __name__ == "__main__":
    pytest.main([__file__, "-v"])

Performance Monitoring for MCP

Example of monitoring MCP tool performance:

import time
import asyncio
from typing import Dict, List
from dataclasses import dataclass, field
from jaf.providers.mcp import MCPTool, MCPToolArgs
from jaf.core.tool_results import ToolResult

@dataclass
class MCPPerformanceMetrics:
    """Performance metrics for MCP tools."""
    tool_name: str
    execution_count: int = 0
    total_execution_time: float = 0.0
    average_execution_time: float = 0.0
    success_count: int = 0
    error_count: int = 0
    success_rate: float = 0.0
    execution_times: List[float] = field(default_factory=list)

class PerformanceMonitoredMCPTool:
    """MCP tool wrapper with performance monitoring."""

    def __init__(self, mcp_tool: MCPTool):
        self.mcp_tool = mcp_tool
        self._schema = mcp_tool.schema
        self.metrics = MCPPerformanceMetrics(tool_name=mcp_tool.tool_name)

    @property
    def schema(self):
        return self._schema

    async def execute(self, args, context) -> ToolResult:
        """Execute with performance monitoring."""
        start_time = time.time()

        try:
            # Execute the original tool
            result = await self.mcp_tool.execute(args, context)

            # Record metrics
            execution_time = time.time() - start_time
            self._update_metrics(execution_time, result.status.value == "success")

            # Add performance metadata
            if result.metadata is None:
                result.metadata = {}
            result.metadata['execution_time'] = execution_time
            result.metadata['tool_metrics'] = self.get_metrics_summary()

            return result

        except Exception as e:
            # Record error metrics
            execution_time = time.time() - start_time
            self._update_metrics(execution_time, False)
            raise

    def _update_metrics(self, execution_time: float, success: bool):
        """Update performance metrics."""
        self.metrics.execution_count += 1
        self.metrics.total_execution_time += execution_time
        self.metrics.execution_times.append(execution_time)

        if success:
            self.metrics.success_count += 1
        else:
            self.metrics.error_count += 1

        # Calculate derived metrics
        self.metrics.average_execution_time = (
            self.metrics.total_execution_time / self.metrics.execution_count
        )
        self.metrics.success_rate = (
            self.metrics.success_count / self.metrics.execution_count * 100
        )

    def get_metrics_summary(self) -> Dict:
        """Get performance metrics summary."""
        return {
            "tool_name": self.metrics.tool_name,
            "execution_count": self.metrics.execution_count,
            "average_execution_time": round(self.metrics.average_execution_time, 3),
            "success_rate": round(self.metrics.success_rate, 2),
            "total_execution_time": round(self.metrics.total_execution_time, 3)
        }

    def get_detailed_metrics(self) -> MCPPerformanceMetrics:
        """Get detailed performance metrics."""
        return self.metrics

class MCPPerformanceMonitor:
    """Monitor performance across multiple MCP tools."""

    def __init__(self):
        self.monitored_tools: Dict[str, PerformanceMonitoredMCPTool] = {}

    def add_tool(self, mcp_tool: MCPTool) -> PerformanceMonitoredMCPTool:
        """Add a tool for monitoring."""
        monitored_tool = PerformanceMonitoredMCPTool(mcp_tool)
        self.monitored_tools[mcp_tool.tool_name] = monitored_tool
        return monitored_tool

    def get_performance_report(self) -> Dict:
        """Generate comprehensive performance report."""
        report = {
            "summary": {
                "total_tools": len(self.monitored_tools),
                "total_executions": sum(
                    tool.metrics.execution_count 
                    for tool in self.monitored_tools.values()
                ),
                "overall_success_rate": 0.0,
                "average_execution_time": 0.0
            },
            "tools": {}
        }

        if self.monitored_tools:
            # Calculate overall metrics
            total_executions = report["summary"]["total_executions"]
            total_successes = sum(
                tool.metrics.success_count 
                for tool in self.monitored_tools.values()
            )
            total_time = sum(
                tool.metrics.total_execution_time 
                for tool in self.monitored_tools.values()
            )

            if total_executions > 0:
                report["summary"]["overall_success_rate"] = round(
                    total_successes / total_executions * 100, 2
                )
                report["summary"]["average_execution_time"] = round(
                    total_time / total_executions, 3
                )

            # Add individual tool metrics
            for tool_name, tool in self.monitored_tools.items():
                report["tools"][tool_name] = tool.get_metrics_summary()

        return report

    def print_performance_report(self):
        """Print formatted performance report."""
        report = self.get_performance_report()

        print("\n" + "="*60)
        print("MCP PERFORMANCE REPORT")
        print("="*60)

        summary = report["summary"]
        print(f"Total Tools: {summary['total_tools']}")
        print(f"Total Executions: {summary['total_executions']}")
        print(f"Overall Success Rate: {summary['overall_success_rate']}%")
        print(f"Average Execution Time: {summary['average_execution_time']}s")

        print("\nTool Performance:")
        print("-" * 60)

        for tool_name, metrics in report["tools"].items():
            print(f"📊 {tool_name}:")
            print(f"   Executions: {metrics['execution_count']}")
            print(f"   Success Rate: {metrics['success_rate']}%")
            print(f"   Avg Time: {metrics['average_execution_time']}s")
            print()

# Usage example
async def performance_monitoring_example():
    """Example of using performance monitoring with MCP tools."""
    from jaf.providers.mcp import create_mcp_stdio_client, create_mcp_tools_from_client

    # Connect to MCP server
    client = create_mcp_stdio_client([
        'npx', '-y', '@modelcontextprotocol/server-filesystem', '/tmp'
    ])

    # Get MCP tools
    mcp_tools = await create_mcp_tools_from_client(client)

    # Setup performance monitoring
    monitor = MCPPerformanceMonitor()
    monitored_tools = []

    for mcp_tool in mcp_tools:
        monitored_tool = monitor.add_tool(mcp_tool)
        monitored_tools.append(monitored_tool)

    # Simulate tool usage
    print("🔧 Running performance test...")

    for i in range(5):
        for tool in monitored_tools[:2]:  # Test first 2 tools
            try:
                # Create dynamic args
                class DynamicArgs(MCPToolArgs):
                    class Config:
                        extra = "allow"

                    def __init__(self, **data):
                        super().__init__()
                        for key, value in data.items():
                            setattr(self, key, value)

                args = DynamicArgs(path="/tmp")
                await tool.execute(args, {})

            except Exception as e:
                print(f"Tool execution failed: {e}")

    # Print performance report
    monitor.print_performance_report()

    await client.close()

if __name__ == "__main__":
    asyncio.run(performance_monitoring_example())

Production Deployment Examples

Docker Compose with MCP Services

Example Docker Compose setup for MCP services:

# docker-compose.yml
version: '3.8'

services:
  jaf-server:
    build: .
    ports:
      - "3000:3000"
    environment:
      - LITELLM_URL=http://litellm:4000
      - MCP_FILESYSTEM_PATH=/app/data
      - MCP_DATABASE_URL=postgresql://user:pass@postgres:5432/mcpdb
    volumes:
      - ./data:/app/data
    depends_on:
      - postgres
      - litellm
      - mcp-filesystem
      - mcp-database

  mcp-filesystem:
    image: node:18-alpine
    command: npx @modelcontextprotocol/server-filesystem /app/data
    volumes:
      - ./data:/app/data
    ports:
      - "8001:8001"

  mcp-database:
    build: ./mcp-database
    environment:
      - DATABASE_URL=postgresql://user:pass@postgres:5432/mcpdb
    ports:
      - "8002:8002"
    depends_on:
      - postgres

  postgres:
    image: postgres:15
    environment:
      - POSTGRES_USER=user
      - POSTGRES_PASSWORD=pass
      - POSTGRES_DB=mcpdb
    volumes:
      - postgres_data:/var/lib/postgresql/data

  litellm:
    image: ghcr.io/berriai/litellm:main-latest
    ports:
      - "4000:4000"
    environment:
      - OPENAI_API_KEY=${OPENAI_API_KEY}

volumes:
  postgres_data:

Kubernetes Deployment

Example Kubernetes deployment for MCP-enabled JAF:

# k8s-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: jaf-mcp-server
spec:
  replicas: 3
  selector:
    matchLabels:
      app: jaf-mcp-server
  template:
    metadata:
      labels:
        app: jaf-mcp-server
    spec:
      containers:
      - name: jaf-server
        image: jaf-mcp:latest
        ports:
        - containerPort: 3000
        env:
        - name: LITELLM_URL
          value: "http://litellm-service:4000"
        - name: MCP_FILESYSTEM_URL
          value: "ws://mcp-filesystem-service:8001"
        - name: MCP_DATABASE_URL
          value: "ws://mcp-database-service:8002"
        resources:
          requests:
            memory: "256Mi"
            cpu: "250m"
          limits:
            memory: "512Mi"
            cpu: "500m"

---
apiVersion: v1
kind: Service
metadata:
  name: jaf-mcp-service
spec:
  selector:
    app: jaf-mcp-server
  ports:
  - port: 80
    targetPort: 3000
  type: LoadBalancer

These examples demonstrate various aspects of MCP integration with JAF, from basic usage to advanced production deployments. Each example includes error handling, security considerations, and best practices for real-world usage.