A2A Protocol Examples¶
This guide provides comprehensive examples of using the A2A (Agent-to-Agent) protocol for building distributed agent systems. From simple client-server interactions to complex multi-agent coordination patterns.
Quick Start Examples¶
Basic Client Connection¶
import asyncio
from jaf.a2a import connect_to_a2a_agent, send_message_to_agent
async def simple_client_example():
"""Connect to an A2A agent and send a message"""
# Connect to A2A server
client = await connect_to_a2a_agent("http://localhost:3000")
# Send a simple message
response = await send_message_to_agent(
client,
agent_name="MathTutor",
message="What is 15 * 7?"
)
print(f"Agent response: {response}")
# Run the example
asyncio.run(simple_client_example())
Basic Server Setup¶
import asyncio
from jaf.a2a import (
create_a2a_agent, create_a2a_tool,
create_server_config, start_a2a_server
)
def create_calculator_tool():
"""Create a safe calculator tool"""
def calculate(expression: str) -> str:
# Basic validation for safety
allowed_chars = set('0123456789+-*/(). ')
if not all(c in allowed_chars for c in expression):
return 'Error: Invalid characters in expression'
try:
result = eval(expression)
return f"{expression} = {result}"
except Exception as e:
return f"Error: {e}"
return create_a2a_tool(
name="calculate",
description="Perform mathematical calculations",
parameters={
"type": "object",
"properties": {
"expression": {
"type": "string",
"description": "Mathematical expression to evaluate"
}
},
"required": ["expression"]
},
execute_func=calculate
)
async def basic_server_example():
"""Create and start a basic A2A server"""
# Create calculator tool
calc_tool = create_calculator_tool()
# Create math tutor agent
math_agent = create_a2a_agent(
name="MathTutor",
description="A helpful math tutor that can perform calculations",
instruction="You are a math tutor. Use the calculate tool for math problems.",
tools=[calc_tool]
)
# Create server configuration
server_config = create_server_config(
agents={"MathTutor": math_agent},
name="Math Server",
description="Server with math calculation capabilities",
port=3000,
cors=True
)
# Start the server
print("Starting A2A server on http://localhost:3000")
server = await start_a2a_server(server_config)
# Server endpoints are automatically available:
# GET /.well-known/agent-card # Agent discovery
# POST /a2a # Main A2A endpoint
# POST /a2a/agents/MathTutor # Agent-specific endpoint
# GET /a2a/health # Health check
print("Server started successfully!")
return server
# Run the server
asyncio.run(basic_server_example())
Agent Creation Examples¶
Multi-Tool Agent¶
from jaf.a2a import create_a2a_agent, create_a2a_tool
def create_research_agent():
"""Create an agent with multiple research tools"""
# Web search tool
def web_search(query: str, max_results: int = 5) -> str:
# Mock implementation - replace with real search API
results = [
f"Search result {i+1} for '{query}'"
for i in range(min(max_results, 3))
]
return "\n".join(results)
search_tool = create_a2a_tool(
name="web_search",
description="Search the web for information",
parameters={
"type": "object",
"properties": {
"query": {"type": "string", "description": "Search query"},
"max_results": {"type": "integer", "minimum": 1, "maximum": 10, "default": 5}
},
"required": ["query"]
},
execute_func=web_search
)
# Summarization tool
def summarize_text(text: str, max_sentences: int = 3) -> str:
# Simple summarization - replace with real summarization
sentences = text.split('. ')
summary = '. '.join(sentences[:max_sentences])
return f"Summary: {summary}"
summary_tool = create_a2a_tool(
name="summarize_text",
description="Summarize long text content",
parameters={
"type": "object",
"properties": {
"text": {"type": "string", "description": "Text to summarize"},
"max_sentences": {"type": "integer", "minimum": 1, "maximum": 10, "default": 3}
},
"required": ["text"]
},
execute_func=summarize_text
)
# Create research agent with multiple tools
return create_a2a_agent(
name="ResearchAgent",
description="An intelligent research assistant that can search and summarize information",
instruction=(
"You are a research assistant. Use web_search to find information "
"and summarize_text to create concise summaries. Always provide "
"comprehensive research with multiple sources."
),
tools=[search_tool, summary_tool]
)
# Usage
research_agent = create_research_agent()
Specialized Domain Agent¶
def create_financial_advisor_agent():
"""Create a specialized financial advisory agent"""
# Stock price lookup tool
def get_stock_price(symbol: str) -> str:
# Mock implementation - integrate with real financial API
mock_prices = {
"AAPL": "$175.43",
"GOOGL": "$142.56",
"MSFT": "$378.85",
"TSLA": "$248.50"
}
price = mock_prices.get(symbol.upper(), "Unknown")
return f"Current price of {symbol.upper()}: {price}"
stock_tool = create_a2a_tool(
name="get_stock_price",
description="Get current stock price for a given symbol",
parameters={
"type": "object",
"properties": {
"symbol": {
"type": "string",
"description": "Stock symbol (e.g., AAPL, GOOGL)",
"pattern": "^[A-Z]{1,5}$"
}
},
"required": ["symbol"]
},
execute_func=get_stock_price
)
# Portfolio analysis tool
def analyze_portfolio(holdings: list) -> str:
total_value = sum(holding.get("value", 0) for holding in holdings)
risk_score = min(len(holdings) * 10, 100) # Simple diversification score
return f"""
Portfolio Analysis:
- Total Value: ${total_value:,.2f}
- Number of Holdings: {len(holdings)}
- Diversification Score: {risk_score}/100
- Recommendation: {"Well diversified" if risk_score > 50 else "Consider diversifying"}
"""
portfolio_tool = create_a2a_tool(
name="analyze_portfolio",
description="Analyze investment portfolio risk and diversification",
parameters={
"type": "object",
"properties": {
"holdings": {
"type": "array",
"items": {
"type": "object",
"properties": {
"symbol": {"type": "string"},
"shares": {"type": "number"},
"value": {"type": "number"}
},
"required": ["symbol", "shares", "value"]
}
}
},
"required": ["holdings"]
},
execute_func=analyze_portfolio
)
return create_a2a_agent(
name="FinancialAdvisor",
description="Expert financial advisor for investment guidance",
instruction=(
"You are a professional financial advisor. Use get_stock_price to "
"check current market values and analyze_portfolio to assess investment "
"portfolios. Always provide balanced, risk-aware advice."
),
tools=[stock_tool, portfolio_tool]
)
# Usage
financial_agent = create_financial_advisor_agent()
Client Examples¶
Streaming Responses¶
import asyncio
from jaf.a2a import stream_message_to_agent, create_a2a_client
async def streaming_client_example():
"""Example of streaming responses from an A2A agent"""
client = create_a2a_client("http://localhost:3000")
print("🔄 Streaming response from agent...")
async for event in stream_message_to_agent(
client,
agent_name="ResearchAgent",
message="Research the latest developments in artificial intelligence"
):
if event.get("kind") == "message":
content = event["message"]["content"]
print(f"📝 Chunk: {content}")
elif event.get("kind") == "status-update":
status = event["status"]["state"]
print(f"📊 Status: {status}")
elif event.get("kind") == "tool-call":
tool_name = event.get("tool", {}).get("name", "unknown")
print(f"🔧 Tool called: {tool_name}")
asyncio.run(streaming_client_example())
Batch Operations¶
import asyncio
from jaf.a2a import create_a2a_client, send_message_to_agent
async def batch_client_example():
"""Send multiple requests to different agents"""
client = create_a2a_client("http://localhost:3000")
# Define multiple tasks
tasks = [
("MathTutor", "What is 25 * 17?"),
("ResearchAgent", "Find information about Python programming"),
("FinancialAdvisor", "What are the risks of investing in tech stocks?")
]
# Create concurrent requests
async def send_request(agent_name, message):
try:
response = await send_message_to_agent(client, agent_name, message)
return {"agent": agent_name, "response": response, "error": None}
except Exception as e:
return {"agent": agent_name, "response": None, "error": str(e)}
# Execute all requests concurrently
print("🚀 Sending batch requests...")
results = await asyncio.gather(*[
send_request(agent, message) for agent, message in tasks
])
# Process results
for result in results:
agent = result["agent"]
if result["error"]:
print(f"❌ {agent}: Error - {result['error']}")
else:
print(f"✅ {agent}: {result['response'][:100]}...")
asyncio.run(batch_client_example())
Agent Discovery¶
import asyncio
from jaf.a2a import discover_agents, get_agent_card
async def discovery_example():
"""Discover available agents and their capabilities"""
server_url = "http://localhost:3000"
# Get overall agent card
print("🔍 Discovering agents...")
agent_card = await get_agent_card(server_url)
print(f"Server: {agent_card['name']}")
print(f"Description: {agent_card['description']}")
print(f"Protocol Version: {agent_card['protocolVersion']}")
print(f"Available Skills: {len(agent_card['skills'])}")
# List individual skills
print("\n📋 Available Skills:")
for skill in agent_card['skills']:
print(f" • {skill['name']}: {skill['description']}")
if skill.get('tags'):
print(f" Tags: {', '.join(skill['tags'])}")
# Check capabilities
capabilities = agent_card.get('capabilities', {})
print(f"\n⚙️ Capabilities:")
for cap, enabled in capabilities.items():
status = "✅" if enabled else "❌"
print(f" {status} {cap}")
asyncio.run(discovery_example())
Server Examples¶
Multi-Agent Server¶
import asyncio
from jaf.a2a import (
create_a2a_agent, create_a2a_tool,
create_server_config, start_a2a_server
)
def create_customer_service_tools():
"""Create tools for customer service agent"""
def lookup_order(order_id: str) -> str:
# Mock order lookup
return f"Order {order_id}: Status - Shipped, Expected delivery: 2 days"
def process_refund(order_id: str, reason: str) -> str:
# Mock refund processing
return f"Refund initiated for order {order_id}. Reason: {reason}. Expected processing: 3-5 business days"
return [
create_a2a_tool(
name="lookup_order",
description="Look up order status and details",
parameters={
"type": "object",
"properties": {
"order_id": {"type": "string", "description": "Order ID to lookup"}
},
"required": ["order_id"]
},
execute_func=lookup_order
),
create_a2a_tool(
name="process_refund",
description="Process customer refund request",
parameters={
"type": "object",
"properties": {
"order_id": {"type": "string", "description": "Order ID for refund"},
"reason": {"type": "string", "description": "Reason for refund"}
},
"required": ["order_id", "reason"]
},
execute_func=process_refund
)
]
def create_technical_support_tools():
"""Create tools for technical support agent"""
def diagnose_issue(symptoms: list) -> str:
# Mock diagnostic logic
if "slow" in ' '.join(symptoms).lower():
return "Likely performance issue. Try clearing cache and restarting application."
elif "error" in ' '.join(symptoms).lower():
return "Error detected. Please check logs and verify configuration."
else:
return "Unable to diagnose. Please provide more detailed symptoms."
def create_ticket(title: str, description: str, priority: str = "medium") -> str:
# Mock ticket creation
ticket_id = f"TECH-{hash(title) % 10000:04d}"
return f"Ticket {ticket_id} created. Priority: {priority}. We'll respond within 24 hours."
return [
create_a2a_tool(
name="diagnose_issue",
description="Diagnose technical issues based on symptoms",
parameters={
"type": "object",
"properties": {
"symptoms": {
"type": "array",
"items": {"type": "string"},
"description": "List of symptoms or issues"
}
},
"required": ["symptoms"]
},
execute_func=diagnose_issue
),
create_a2a_tool(
name="create_ticket",
description="Create technical support ticket",
parameters={
"type": "object",
"properties": {
"title": {"type": "string", "description": "Issue title"},
"description": {"type": "string", "description": "Detailed description"},
"priority": {
"type": "string",
"enum": ["low", "medium", "high", "urgent"],
"default": "medium"
}
},
"required": ["title", "description"]
},
execute_func=create_ticket
)
]
async def multi_agent_server_example():
"""Create server with multiple specialized agents"""
# Create customer service agent
customer_agent = create_a2a_agent(
name="CustomerService",
description="Customer service agent for order inquiries and returns",
instruction=(
"You are a friendly customer service representative. "
"Help customers with order status, returns, and general inquiries. "
"Use lookup_order to check order status and process_refund for returns."
),
tools=create_customer_service_tools()
)
# Create technical support agent
tech_agent = create_a2a_agent(
name="TechnicalSupport",
description="Technical support agent for troubleshooting and issue resolution",
instruction=(
"You are a technical support specialist. "
"Help users diagnose and resolve technical issues. "
"Use diagnose_issue for troubleshooting and create_ticket for complex problems."
),
tools=create_technical_support_tools()
)
# Create general assistant
general_agent = create_a2a_agent(
name="GeneralAssistant",
description="General purpose assistant for information and guidance",
instruction=(
"You are a helpful general assistant. "
"Provide information, answer questions, and guide users to appropriate specialists. "
"Route customers to CustomerService for orders and TechnicalSupport for tech issues."
),
tools=[]
)
# Create server with all agents
agents = {
"CustomerService": customer_agent,
"TechnicalSupport": tech_agent,
"GeneralAssistant": general_agent
}
server_config = create_server_config(
agents=agents,
name="Customer Support Server",
description="Multi-agent customer support system",
port=3000,
cors=True
)
print("🚀 Starting multi-agent customer support server...")
server = await start_a2a_server(server_config)
print("✅ Server running with agents:", list(agents.keys()))
return server
asyncio.run(multi_agent_server_example())
Server with Memory and Configuration¶
import asyncio
import os
from jaf.a2a import (
create_a2a_server_config, start_a2a_server,
create_a2a_agent, create_a2a_tool
)
from jaf.a2a.memory import create_a2a_in_memory_task_provider, A2AInMemoryTaskConfig
async def advanced_server_example():
"""Create server with advanced configuration"""
# Create a conversational agent
def remember_conversation(user_message: str, context_id: str) -> str:
# Mock conversation memory
return f"I remember our conversation about: {user_message[:50]}..."
memory_tool = create_a2a_tool(
name="remember_conversation",
description="Remember important parts of the conversation",
parameters={
"type": "object",
"properties": {
"user_message": {"type": "string"},
"context_id": {"type": "string"}
},
"required": ["user_message", "context_id"]
},
execute_func=remember_conversation
)
conversational_agent = create_a2a_agent(
name="ConversationalAgent",
description="Friendly conversational agent with memory",
instruction=(
"You are a friendly, conversational agent. "
"Remember important details from conversations using remember_conversation. "
"Be personable and maintain context across interactions."
),
tools=[memory_tool]
)
# Configure task memory
memory_config = A2AInMemoryTaskConfig(
max_tasks=1000,
max_tasks_per_context=50,
task_ttl_seconds=3600 # 1 hour
)
task_provider = create_a2a_in_memory_task_provider(memory_config)
# Advanced server configuration
config = create_a2a_server_config(
agents={"ConversationalAgent": conversational_agent},
server_info={
"name": "Advanced A2A Server",
"description": "Production-ready A2A server with memory and monitoring",
"version": "1.0.0",
"contact": {"email": "support@example.com"},
"capabilities": {
"streaming": True,
"taskManagement": True,
"conversationMemory": True
}
},
network_config={
"host": "0.0.0.0",
"port": int(os.getenv("A2A_PORT", "3000")),
"cors": {
"allow_origins": ["http://localhost:3000", "https://app.example.com"],
"allow_credentials": True
}
},
memory_config={
"task_provider": task_provider,
"conversation_ttl": 7200 # 2 hours
}
)
print("🏗️ Starting advanced A2A server...")
server = await start_a2a_server(config)
print("✅ Advanced server running with full configuration")
return server
asyncio.run(advanced_server_example())
Integration Examples¶
JAF Core Integration¶
import asyncio
from jaf import Agent, run, RunState, RunConfig, Message, generate_run_id, generate_trace_id
from jaf.a2a import create_a2a_client, transform_a2a_agent_to_jaf, connect_to_a2a_agent
async def hybrid_local_remote_example():
"""Use both local and remote agents in a single workflow"""
# Local JAF agent
def local_instructions(state):
return (
"You are a local data processor. Process data and hand off "
"to RemoteAnalyzer for complex analysis when needed."
)
local_agent = Agent(
name="LocalProcessor",
instructions=local_instructions,
tools=[],
handoffs=["RemoteAnalyzer"] # Can hand off to remote agent
)
# Connect to remote A2A agent
a2a_connection = await connect_to_a2a_agent("http://localhost:3000")
# Transform remote agent for local use
remote_agent = transform_a2a_agent_to_jaf(
await a2a_connection.get_agent("ResearchAgent")
)
# Create hybrid configuration
config = RunConfig(
agent_registry={
"LocalProcessor": local_agent,
"RemoteAnalyzer": remote_agent
},
model_provider=make_litellm_provider("http://localhost:4000"),
max_turns=5
)
# Run with hybrid agents
initial_state = RunState(
run_id=generate_run_id(),
trace_id=generate_trace_id(),
messages=[Message(role="user", content="Analyze this complex dataset")],
current_agent_name="LocalProcessor",
context={"dataset": "complex_data.csv"},
turn_count=0
)
result = await run(initial_state, config)
print(f"Hybrid execution result: {result.outcome}")
asyncio.run(hybrid_local_remote_example())
Load Balancing Example¶
import asyncio
import random
from jaf.a2a import create_a2a_client, send_message_to_agent
class A2ALoadBalancer:
"""Simple load balancer for A2A agents"""
def __init__(self, server_urls):
self.server_urls = server_urls
self.clients = {}
self.request_counts = {url: 0 for url in server_urls}
async def get_client(self, strategy="round_robin"):
"""Get client based on load balancing strategy"""
if strategy == "round_robin":
# Find server with minimum requests
selected_url = min(self.request_counts, key=self.request_counts.get)
elif strategy == "random":
selected_url = random.choice(self.server_urls)
else:
selected_url = self.server_urls[0] # Default to first
# Create client if not exists
if selected_url not in self.clients:
self.clients[selected_url] = create_a2a_client(selected_url)
self.request_counts[selected_url] += 1
return self.clients[selected_url], selected_url
async def send_message(self, agent_name, message, strategy="round_robin"):
"""Send message with load balancing"""
client, server_url = await self.get_client(strategy)
try:
response = await send_message_to_agent(client, agent_name, message)
print(f"✅ Request sent to {server_url}")
return response
except Exception as e:
print(f"❌ Request to {server_url} failed: {e}")
# Try next server
remaining_urls = [url for url in self.server_urls if url != server_url]
if remaining_urls:
backup_client = create_a2a_client(remaining_urls[0])
return await send_message_to_agent(backup_client, agent_name, message)
raise
async def load_balancing_example():
"""Example of load balancing across multiple A2A servers"""
# Multiple server URLs (in practice, these would be different servers)
server_urls = [
"http://localhost:3000",
"http://localhost:3001",
"http://localhost:3002"
]
# Create load balancer
balancer = A2ALoadBalancer(server_urls)
# Send multiple requests
tasks = []
for i in range(10):
task = balancer.send_message(
"MathTutor",
f"What is {i} * {i}?",
strategy="round_robin"
)
tasks.append(task)
# Execute all requests
results = await asyncio.gather(*tasks, return_exceptions=True)
# Show distribution
print("\n📊 Request Distribution:")
for url, count in balancer.request_counts.items():
print(f" {url}: {count} requests")
# Note: This example assumes multiple servers are running
# asyncio.run(load_balancing_example())
Error Handling Examples¶
Robust Client¶
import asyncio
import logging
from jaf.a2a import create_a2a_client, send_message_to_agent, A2AError
class RobustA2AClient:
"""A2A client with comprehensive error handling"""
def __init__(self, base_url, max_retries=3, timeout=30):
self.base_url = base_url
self.max_retries = max_retries
self.timeout = timeout
self.client = create_a2a_client(base_url, {"timeout": timeout})
self.logger = logging.getLogger(__name__)
async def send_message_with_retry(self, agent_name, message):
"""Send message with retry logic"""
last_error = None
for attempt in range(self.max_retries + 1):
try:
if attempt > 0:
self.logger.info(f"Retry attempt {attempt} for {agent_name}")
await asyncio.sleep(2 ** attempt) # Exponential backoff
response = await send_message_to_agent(
self.client, agent_name, message
)
self.logger.info(f"✅ Message sent successfully to {agent_name}")
return response
except A2AError as e:
last_error = e
self.logger.warning(f"A2A error on attempt {attempt + 1}: {e}")
# Don't retry certain errors
if e.code in ["AGENT_NOT_FOUND", "INVALID_REQUEST"]:
break
except asyncio.TimeoutError:
last_error = asyncio.TimeoutError("Request timed out")
self.logger.warning(f"Timeout on attempt {attempt + 1}")
except Exception as e:
last_error = e
self.logger.error(f"Unexpected error on attempt {attempt + 1}: {e}")
# All retries failed
self.logger.error(f"❌ All retry attempts failed for {agent_name}")
raise last_error
async def health_check(self):
"""Check if the A2A server is healthy"""
try:
import httpx
async with httpx.AsyncClient() as client:
response = await client.get(
f"{self.base_url}/a2a/health",
timeout=self.timeout
)
if response.status_code == 200:
health_data = response.json()
return health_data.get("healthy", False)
else:
return False
except Exception as e:
self.logger.error(f"Health check failed: {e}")
return False
async def robust_client_example():
"""Example of robust A2A client usage"""
# Configure logging
logging.basicConfig(level=logging.INFO)
# Create robust client
client = RobustA2AClient("http://localhost:3000", max_retries=3)
# Check server health first
is_healthy = await client.health_check()
if not is_healthy:
print("❌ Server is not healthy, aborting")
return
print("✅ Server is healthy, proceeding with requests")
# Send messages with error handling
messages = [
("MathTutor", "What is 5 + 3?"),
("NonExistentAgent", "This should fail"), # Will fail
("MathTutor", "What is 10 * 7?")
]
for agent_name, message in messages:
try:
response = await client.send_message_with_retry(agent_name, message)
print(f"✅ {agent_name}: {response}")
except Exception as e:
print(f"❌ {agent_name}: Failed after retries - {e}")
asyncio.run(robust_client_example())
Testing Examples¶
Unit Tests for A2A Components¶
import pytest
import asyncio
from unittest.mock import AsyncMock, patch
from jaf.a2a import create_a2a_agent, create_a2a_tool, create_a2a_client
class TestA2AAgent:
"""Test A2A agent functionality"""
def test_agent_creation(self):
"""Test basic agent creation"""
agent = create_a2a_agent(
name="TestAgent",
description="A test agent",
instruction="You are a test agent",
tools=[]
)
assert agent.name == "TestAgent"
assert agent.description == "A test agent"
assert agent.instruction == "You are a test agent"
assert len(agent.tools) == 0
def test_agent_with_tools(self):
"""Test agent creation with tools"""
def test_func(value: str) -> str:
return f"Processed: {value}"
tool = create_a2a_tool(
name="test_tool",
description="A test tool",
parameters={
"type": "object",
"properties": {"value": {"type": "string"}},
"required": ["value"]
},
execute_func=test_func
)
agent = create_a2a_agent(
name="ToolAgent",
description="Agent with tools",
instruction="Use tools to help users",
tools=[tool]
)
assert len(agent.tools) == 1
assert agent.tools[0].name == "test_tool"
class TestA2AClient:
"""Test A2A client functionality"""
@pytest.mark.asyncio
async def test_client_creation(self):
"""Test A2A client creation"""
client = create_a2a_client("http://localhost:3000")
assert client.base_url == "http://localhost:3000"
@pytest.mark.asyncio
async def test_mock_message_sending(self):
"""Test message sending with mocked response"""
# Mock the HTTP client
with patch('httpx.AsyncClient') as mock_client:
mock_response = AsyncMock()
mock_response.json.return_value = {
"jsonrpc": "2.0",
"id": "1",
"result": {
"message": {
"role": "assistant",
"content": "Hello from mock agent!"
}
}
}
mock_response.status_code = 200
mock_client.return_value.__aenter__.return_value.post.return_value = mock_response
from jaf.a2a import send_message_to_agent
client = create_a2a_client("http://localhost:3000")
response = await send_message_to_agent(
client, "TestAgent", "Hello"
)
assert "Hello from mock agent!" in str(response)
class TestA2ATool:
"""Test A2A tool functionality"""
@pytest.mark.asyncio
async def test_tool_execution(self):
"""Test tool execution"""
def calculator(expression: str) -> str:
try:
result = eval(expression)
return str(result)
except:
return "Error"
tool = create_a2a_tool(
name="calculator",
description="Basic calculator",
parameters={
"type": "object",
"properties": {
"expression": {"type": "string"}
},
"required": ["expression"]
},
execute_func=calculator
)
# Test tool execution
result = await tool.execute_func("2 + 2")
assert result == "4"
result = await tool.execute_func("invalid")
assert result == "Error"
# Integration tests
@pytest.mark.integration
class TestA2AIntegration:
"""Integration tests for A2A system"""
@pytest.mark.asyncio
async def test_full_workflow(self):
"""Test complete A2A workflow"""
# This test assumes a test server is running
# In practice, you might start a test server here
from jaf.a2a import create_a2a_client, send_message_to_agent
try:
client = create_a2a_client("http://localhost:3001") # Test server
response = await send_message_to_agent(
client, "TestAgent", "Hello, test!"
)
assert response is not None
except Exception:
pytest.skip("Test server not available")
# Run tests with: python -m pytest test_a2a_examples.py -v
Load Testing¶
import asyncio
import time
import statistics
from jaf.a2a import create_a2a_client, send_message_to_agent
async def load_test_a2a_server():
"""Load test an A2A server"""
client = create_a2a_client("http://localhost:3000")
# Test configuration
num_concurrent = 10
num_requests_per_client = 20
async def client_worker(worker_id):
"""Individual client worker"""
response_times = []
errors = 0
for i in range(num_requests_per_client):
start_time = time.time()
try:
response = await send_message_to_agent(
client,
"MathTutor",
f"What is {i} + {worker_id}?"
)
end_time = time.time()
response_times.append(end_time - start_time)
except Exception as e:
errors += 1
print(f"Worker {worker_id}, Request {i}: Error - {e}")
return {
"worker_id": worker_id,
"response_times": response_times,
"errors": errors,
"success_rate": (num_requests_per_client - errors) / num_requests_per_client
}
# Run load test
print(f"🚀 Starting load test: {num_concurrent} clients, {num_requests_per_client} requests each")
start_time = time.time()
# Create concurrent workers
workers = [client_worker(i) for i in range(num_concurrent)]
results = await asyncio.gather(*workers)
end_time = time.time()
total_duration = end_time - start_time
# Aggregate results
all_response_times = []
total_errors = 0
total_requests = 0
for result in results:
all_response_times.extend(result["response_times"])
total_errors += result["errors"]
total_requests += num_requests_per_client
# Calculate statistics
if all_response_times:
avg_response_time = statistics.mean(all_response_times)
median_response_time = statistics.median(all_response_times)
p95_response_time = sorted(all_response_times)[int(len(all_response_times) * 0.95)]
requests_per_second = len(all_response_times) / total_duration
else:
avg_response_time = median_response_time = p95_response_time = 0
requests_per_second = 0
# Print results
print(f"\n📊 Load Test Results:")
print(f"Total Duration: {total_duration:.2f}s")
print(f"Total Requests: {total_requests}")
print(f"Successful Requests: {total_requests - total_errors}")
print(f"Failed Requests: {total_errors}")
print(f"Success Rate: {(total_requests - total_errors) / total_requests * 100:.1f}%")
print(f"Requests/Second: {requests_per_second:.2f}")
print(f"Average Response Time: {avg_response_time * 1000:.2f}ms")
print(f"Median Response Time: {median_response_time * 1000:.2f}ms")
print(f"95th Percentile: {p95_response_time * 1000:.2f}ms")
# Run load test
# asyncio.run(load_test_a2a_server())
Production Deployment Examples¶
Docker Deployment¶
# Dockerfile for A2A server
FROM python:3.11-slim
WORKDIR /app
# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY . .
# Expose port
EXPOSE 3000
# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=30s --retries=3 \
CMD curl -f http://localhost:3000/a2a/health || exit 1
# Run application
CMD ["python", "-m", "jaf.a2a.examples.production_server"]
# docker-compose.yml
version: '3.8'
services:
a2a-server:
build: .
ports:
- "3000:3000"
environment:
- A2A_HOST=0.0.0.0
- A2A_PORT=3000
- A2A_LOG_LEVEL=INFO
- A2A_CORS_ORIGINS=https://app.example.com
depends_on:
- redis
restart: unless-stopped
redis:
image: redis:alpine
ports:
- "6379:6379"
restart: unless-stopped
nginx:
image: nginx:alpine
ports:
- "80:80"
- "443:443"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
- ./ssl:/etc/nginx/ssl
depends_on:
- a2a-server
restart: unless-stopped
Production Server Configuration¶
import os
import logging
import asyncio
from jaf.a2a import (
create_a2a_server_config, start_a2a_server,
create_a2a_agent, create_a2a_tool
)
# Configure logging
logging.basicConfig(
level=getattr(logging, os.getenv("A2A_LOG_LEVEL", "INFO")),
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
def create_production_agents():
"""Create production-ready agents"""
# Create robust tools with error handling
def safe_calculator(expression: str) -> str:
try:
# Validate expression for security
allowed_chars = set('0123456789+-*/(). ')
if not all(c in allowed_chars for c in expression):
return "Error: Invalid characters in expression"
# Limit expression length
if len(expression) > 100:
return "Error: Expression too long"
result = eval(expression)
return f"{expression} = {result}"
except Exception as e:
logging.error(f"Calculator error: {e}")
return f"Error: {str(e)}"
calc_tool = create_a2a_tool(
name="calculate",
description="Perform safe mathematical calculations",
parameters={
"type": "object",
"properties": {
"expression": {
"type": "string",
"minLength": 1,
"maxLength": 100,
"pattern": r"^[0-9+\-*/().\\s]+$"
}
},
"required": ["expression"]
},
execute_func=safe_calculator
)
# Production math agent
math_agent = create_a2a_agent(
name="MathTutor",
description="Production math tutor with safety features",
instruction=(
"You are a professional math tutor. Use the calculate tool for "
"mathematical computations. Always validate inputs and provide "
"clear explanations. Handle errors gracefully."
),
tools=[calc_tool]
)
return {"MathTutor": math_agent}
async def main():
"""Production server main function"""
# Environment configuration
host = os.getenv("A2A_HOST", "0.0.0.0")
port = int(os.getenv("A2A_PORT", "3000"))
cors_origins = os.getenv("A2A_CORS_ORIGINS", "").split(",")
# Create agents
agents = create_production_agents()
# Production server configuration
config = create_a2a_server_config(
agents=agents,
server_info={
"name": "Production A2A Server",
"description": "Production-ready A2A agent server",
"version": "1.0.0",
"contact": {"email": "support@example.com"},
"capabilities": {
"streaming": True,
"taskManagement": True,
"healthChecks": True
}
},
network_config={
"host": host,
"port": port,
"cors": {
"allow_origins": cors_origins if cors_origins != [''] else ["*"],
"allow_credentials": True,
"allow_methods": ["GET", "POST", "OPTIONS"],
"allow_headers": ["*"]
}
}
)
# Start server with graceful shutdown
logging.info(f"Starting A2A server on {host}:{port}")
server = await start_a2a_server(config)
try:
# Keep server running
while True:
await asyncio.sleep(1)
except KeyboardInterrupt:
logging.info("Shutting down A2A server...")
finally:
if hasattr(server, 'shutdown'):
await server.shutdown()
if __name__ == "__main__":
asyncio.run(main())
These examples provide comprehensive coverage of A2A protocol usage, from simple client-server interactions to complex production deployments. Use them as starting points for building your own distributed agent systems.