Examples and Tutorials¶
This guide provides comprehensive walkthroughs of JAF example applications, demonstrating real-world usage patterns and best practices for building AI agent systems.
Overview¶
JAF includes several example applications that showcase different aspects of the framework:
- Server Demo - Multi-agent HTTP server with tools and memory
- Handoff System - Language support with triage and specialist routing
- RAG Example - Retrieval-Augmented Generation with knowledge base
- Iterative Search Agent - Advanced callback system showcase with ReAct patterns
- Custom Tools - Advanced tool implementation patterns
- Memory Integration - Persistent conversation examples
Server Demo Walkthrough¶
The server demo (examples/server_demo.py
) demonstrates a complete production-ready JAF server with multiple specialized agents, custom tools, and memory persistence.
Architecture Overview¶
# Three specialized agents
MathTutor # Mathematical calculations and explanations
ChatBot # Friendly conversation and greetings
Assistant # General-purpose with all tools
# Two custom tools
Calculator # Safe mathematical expression evaluation
Greeting # Personalized greeting generation
# Memory support
InMemory # Development
Redis # Production caching
PostgreSQL # Production persistence
Key Components¶
1. Context Definition¶
The context provides user information and permissions to agents and tools, enabling security and personalization.
2. Tool Implementation¶
Calculator Tool with Security:
@function_tool
async def calculator(expression: str, context=None) -> str:
"""Safely evaluate mathematical expressions with input sanitization.
Args:
expression: Mathematical expression to evaluate (e.g., "2 + 3", "15 * 7")
"""
# Input sanitization - only allow safe characters
sanitized = ''.join(c for c in expression if c in '0123456789+-*/(). ')
if sanitized != expression:
return f"Error: Invalid characters in expression. Only numbers, +, -, *, /, and () are allowed."
try:
expression_for_eval = sanitized.replace(' ', '')
result = eval(expression_for_eval) # Safe due to sanitization
return f"{expression} = {result}"
except Exception as e:
return f"Error: Failed to evaluate expression: {str(e)}"
Greeting Tool with Validation:
@function_tool
async def greeting(name: str, context=None) -> str:
"""Generate a personalized greeting with input validation.
Args:
name: Name of the person to greet
"""
# Input validation
if not name or name.strip() == "":
return "Error: Name cannot be empty"
# Length validation
if len(name) > 100:
return f"Error: Name is too long (max 100 characters, got {len(name)})"
greeting = f"Hello, {name.strip()}! Nice to meet you. I'm a helpful AI assistant running on the JAF framework."
return greeting
3. Agent Specialization¶
Math Tutor Agent:
def create_math_agent() -> Agent[MyContext, str]:
def instructions(state: RunState[MyContext]) -> str:
return 'You are a helpful math tutor. Use the calculator tool to perform calculations and explain math concepts clearly.'
return Agent(
name='MathTutor',
instructions=instructions,
tools=[calculator] # Only calculator access
)
Chat Bot Agent:
def create_chat_agent() -> Agent[MyContext, str]:
def instructions(state: RunState[MyContext]) -> str:
return 'You are a friendly chatbot. Use the greeting tool when meeting new people, and engage in helpful conversation.'
return Agent(
name='ChatBot',
instructions=instructions,
tools=[greeting] # Only greeting access
)
General Assistant Agent:
def create_assistant_agent() -> Agent[MyContext, str]:
def instructions(state: RunState[MyContext]) -> str:
return 'You are a general-purpose assistant. You can help with math calculations and provide greetings.'
return Agent(
name='Assistant',
instructions=instructions,
tools=[calculator, greeting] # Access to all tools
)
4. Memory Integration¶
# Environment-based memory configuration
memory_type = os.getenv("JAF_MEMORY_TYPE", "memory").lower()
if memory_type == "redis":
# Redis configuration
redis_client = redis.Redis(
host=os.getenv("JAF_REDIS_HOST", "localhost"),
port=int(os.getenv("JAF_REDIS_PORT", "6379")),
password=os.getenv("JAF_REDIS_PASSWORD"),
db=int(os.getenv("JAF_REDIS_DB", "0"))
)
external_clients["redis"] = redis_client
elif memory_type == "postgres":
# PostgreSQL configuration
postgres_client = await asyncpg.connect(
host=os.getenv("JAF_POSTGRES_HOST", "localhost"),
port=int(os.getenv("JAF_POSTGRES_PORT", "5432")),
database=os.getenv("JAF_POSTGRES_DATABASE", "jaf_memory"),
user=os.getenv("JAF_POSTGRES_USERNAME", "postgres"),
password=os.getenv("JAF_POSTGRES_PASSWORD")
)
external_clients["postgres"] = postgres_client
# Create memory provider
memory_provider = await create_memory_provider_from_env(external_clients)
memory_config = MemoryConfig(
provider=memory_provider,
auto_store=True,
max_messages=1000
)
Running the Server Demo¶
1. Basic Setup¶
# Install dependencies
pip install jaf-py litellm redis asyncpg
# Set environment variables
export LITELLM_URL=http://localhost:4000
export LITELLM_API_KEY=your-api-key
export LITELLM_MODEL=gemini-2.5-pro
export PORT=3000
# Optional: Memory configuration
export JAF_MEMORY_TYPE=memory # or redis, postgres
2. Run the Server¶
Expected Output:
Starting JAF Development Server...
📡 LiteLLM URL: http://localhost:4000
🔑 API Key: Set
⚠️ Note: Chat endpoints will fail without a running LiteLLM server
Memory Type: memory
Memory provider created: InMemoryMemoryProvider
Creating server...
Try these example requests:
1. Health Check:
curl http://localhost:3000/health
2. List Agents:
curl http://localhost:3000/agents
3. Chat with Math Tutor:
curl -X POST http://localhost:3000/chat \
-H "Content-Type: application/json" \
-d '{"messages":[{"role":"user","content":"What is 15 * 7?"}],"agent_name":"MathTutor","context":{"userId":"demo","permissions":["user"]}}'
Starting server...
3. Test the Agents¶
Math Calculations:
curl -X POST http://localhost:3000/chat \
-H "Content-Type: application/json" \
-d '{
"messages": [{"role": "user", "content": "What is 15 * 7?"}],
"agent_name": "MathTutor",
"context": {"userId": "demo", "permissions": ["user"]}
}'
Friendly Greetings:
curl -X POST http://localhost:3000/agents/ChatBot/chat \
-H "Content-Type: application/json" \
-d '{
"messages": [{"role": "user", "content": "Hi, my name is Alice"}],
"context": {"userId": "demo", "permissions": ["user"]}
}'
Multi-Tool Assistant:
curl -X POST http://localhost:3000/chat \
-H "Content-Type: application/json" \
-d '{
"messages": [{"role": "user", "content": "Calculate 25 + 17 and then greet me as Bob"}],
"agent_name": "Assistant",
"context": {"userId": "demo", "permissions": ["user"]}
}'
4. Persistent Conversations¶
# Start a conversation
curl -X POST http://localhost:3000/chat \
-H "Content-Type: application/json" \
-d '{
"messages": [{"role": "user", "content": "Hello, I am starting a new conversation"}],
"agent_name": "ChatBot",
"conversation_id": "my-conversation",
"context": {"userId": "demo", "permissions": ["user"]}
}'
# Continue the conversation
curl -X POST http://localhost:3000/chat \
-H "Content-Type: application/json" \
-d '{
"messages": [{"role": "user", "content": "Do you remember me?"}],
"agent_name": "ChatBot",
"conversation_id": "my-conversation",
"context": {"userId": "demo", "permissions": ["user"]}
}'
# Get conversation history
curl http://localhost:3000/conversations/my-conversation
Handoff System - Language Support Example¶
The handoff example (examples/handoff_example.py
) demonstrates JAF's powerful agent handoff system through a language support scenario where a triage agent routes users to specialized language agents.
Architecture Overview¶
# Three specialized agents
TriageAgent # Routes to language specialists
FrenchAgent # French language support
GermanAgent # German language support
# Handoff capabilities
TriageAgent → FrenchAgent, GermanAgent
FrenchAgent → GermanAgent
GermanAgent → FrenchAgent
Key Components¶
1. Handoff Tool Import¶
from jaf.core.handoff import handoff_tool
# handoff_tool is a pre-built tool that enables agent handoffs
# Add it to any agent's tools list to enable handoff capability
2. Triage Agent with Handoffs¶
def create_triage_agent():
def instructions(state: RunState) -> str:
return """You are a language support triage agent.
Your job is to understand what language the customer needs help with and route them to the right specialist:
- If they mention "French", "français", or need French help → use handoff tool to transfer to "french_agent"
- If they mention "German", "deutsch", or need German help → use handoff tool to transfer to "german_agent"
- If they ask a simple question you can answer → respond directly
When using handoff tool:
- agent_name: "french_agent" or "german_agent"
- message: Brief summary of what the customer needs
Always be helpful and explain you're connecting them to the right language specialist."""
return Agent(
name="triage_agent",
instructions=instructions,
tools=[handoff_tool], # Add handoff capability
handoffs=["french_agent", "german_agent"] # Only these targets allowed
)
3. Specialist Agents with Tools and Handoffs¶
def create_french_agent():
def instructions(state: RunState) -> str:
return """You are a French language specialist agent.
You help customers with:
- French translations
- French language questions
- French cultural information
You have tools:
- translate: Translate text to French
Be friendly and helpful. Speak some French when appropriate!
If the customer needs help with other languages, use handoff tool to route them appropriately."""
return Agent(
name="french_agent",
instructions=instructions,
tools=[translate_tool, handoff_tool], # Own tools + handoff
handoffs=["german_agent"] # Can handoff to German specialist
)
def create_german_agent():
# Similar structure for German specialist
return Agent(
name="german_agent",
instructions=german_instructions,
tools=[translate_tool, handoff_tool],
handoffs=["french_agent"] # Can handoff to French specialist
)
4. Translation Tool¶
from pydantic import BaseModel
class TranslateArgs(BaseModel):
text: str
target_language: str
async def translate_text(args: TranslateArgs, context) -> str:
"""Mock translation tool for demonstration."""
translations = {
"french": {
"hello": "bonjour",
"goodbye": "au revoir",
"thank you": "merci",
},
"german": {
"hello": "hallo",
"goodbye": "auf wiedersehen",
"thank you": "danke",
}
}
lang_dict = translations.get(args.target_language.lower(), {})
translated = lang_dict.get(args.text.lower(), f"[Translation of '{args.text}']")
return f"Translated '{args.text}' to {args.target_language}: '{translated}'"
translate_tool = create_function_tool({
'name': 'translate',
'description': 'Translate text to specified language',
'execute': translate_text,
'parameters': TranslateArgs
})
Running the Example¶
Run Demo Scenarios¶
The example runs 5 demo scenarios: 1. "I need help translating 'hello' to French" 2. "Can you help me with German translations?" 3. "How do you say 'thank you' in French?" 4. "I want to learn some German phrases" 5. "What's the weather like today?" (stays with triage)
Expected Execution Flow¶
Scenario 1: French Translation Request
User: "I need help translating 'hello' to French"
1. TriageAgent receives request
2. TriageAgent identifies "French" keyword
3. TriageAgent uses handoff tool:
- agent_name: "french_agent"
- message: "Customer needs French translation help"
4. FrenchAgent takes over
5. FrenchAgent uses translate tool
6. FrenchAgent responds: "Bonjour! I can help with that. 'Hello' in French is 'bonjour'."
Scenario 2: Cross-Language Handoff
User: "I was learning French but now I need German"
1. FrenchAgent is active
2. FrenchAgent identifies German request
3. FrenchAgent uses handoff tool to "german_agent"
4. GermanAgent takes over
5. GermanAgent provides German language assistance
Complete Working Example¶
import asyncio
from jaf import Agent, RunState, RunConfig, Message, generate_run_id, generate_trace_id
from jaf.core.handoff import handoff_tool
from jaf.core.engine import run
from jaf.providers.model import make_litellm_provider
async def demo_handoff(user_message: str):
"""Demonstrate handoff with a single message."""
# Setup model provider
model_provider = make_litellm_provider(
base_url='http://localhost:4000',
api_key='your-key'
)
# Create agents
agent_registry = create_language_support_agents()
# Create initial state
initial_state = RunState(
run_id=generate_run_id(),
trace_id=generate_trace_id(),
messages=[Message(role='user', content=user_message)],
current_agent_name="triage_agent",
context={"user_id": "demo_user"},
turn_count=0
)
# Create run configuration
config = RunConfig(
agent_registry=agent_registry,
model_provider=model_provider,
max_turns=5,
on_event=lambda event: print(f"[{event.type}]")
)
# Run the conversation
result = await run(initial_state, config)
print(f"Status: {result.outcome.status}")
print(f"Final Agent: {result.final_state.current_agent_name}")
print(f"Output: {result.outcome.output}")
# Run demo
asyncio.run(demo_handoff("I need help with French translations"))
Key Patterns and Best Practices¶
1. Clear Handoff Instructions¶
Always provide clear instructions in the agent's system prompt about: - When to use handoff tool - What parameters to provide - What agents are available
instructions = """Route customers to specialists:
- French requests → handoff to "french_agent"
- German requests → handoff to "german_agent"
Use handoff tool with:
- agent_name: Target agent name
- message: Brief customer context"""
2. Handoff Validation¶
The handoffs
parameter enforces which agents can be handed off to:
Agent(
name="TriageAgent",
tools=[handoff_tool],
handoffs=["french_agent", "german_agent"] # Whitelist only
)
# If agent tries to handoff to "spanish_agent", it will fail
# since it's not in the handoffs list
3. Bidirectional Handoffs¶
Specialist agents can handoff between each other:
french_agent = Agent(
name="french_agent",
tools=[translate_tool, handoff_tool],
handoffs=["german_agent"] # Can route to German
)
german_agent = Agent(
name="german_agent",
tools=[translate_tool, handoff_tool],
handoffs=["french_agent"] # Can route to French
)
4. Tracing Handoffs¶
Handoffs generate trace events for monitoring:
trace_collector = ConsoleTraceCollector()
config = RunConfig(
agent_registry=agent_registry,
model_provider=model_provider,
on_event=trace_collector.collect # See all handoffs in logs
)
# Output will include:
# [handoff_initiated] from=triage_agent to=french_agent
# [handoff_completed] success=True
Common Use Cases¶
- Customer Support Routing: Triage → Technical/Billing/Sales
- Multi-Language Support: Router → Language Specialists
- Skill-Based Routing: Generalist → Domain Experts
- Escalation Patterns: L1 Support → L2/L3 Support
- Workflow Orchestration: Coordinator → Specialized Workers
File Location¶
Running in Production¶
For production deployments with handoffs:
from jaf.core.tracing import create_composite_trace_collector
# Enhanced tracing for handoff monitoring
trace_collector = create_composite_trace_collector()
config = RunConfig(
agent_registry={
'triage': triage_agent,
'tech_support': tech_support_agent,
'billing': billing_agent,
'sales': sales_agent
},
model_provider=model_provider,
max_turns=20, # Allow for multi-hop handoffs
on_event=trace_collector.collect
)
Monitoring Handoffs with Langfuse¶
When using Langfuse tracing, each agent is automatically tagged:
# In Langfuse dashboard, filter by:
Tags: agent_name = "triage_agent"
Tags: agent_name = "french_agent"
# View handoff patterns across your system
This example demonstrates how JAF's handoff system enables building sophisticated multi-agent systems with clear separation of concerns and automatic routing between specialized agents.
RAG Example Walkthrough¶
The RAG example (examples/server_example.py
) demonstrates Retrieval-Augmented Generation with a knowledge base and LiteLLM integration.
Architecture Overview¶
# RAG Components
Knowledge Base # Mock documents with metadata
Semantic Search # Keyword-based retrieval
LiteLLM Agent # Gemini-powered responses
RAG Tool # Integration layer
Key Components¶
1. Knowledge Base Structure¶
knowledge_base = [
{
"id": "doc1",
"title": "Python Programming Basics",
"content": "Python is a high-level, interpreted programming language...",
"metadata": {"category": "programming", "level": "beginner"}
},
{
"id": "doc2",
"title": "Machine Learning with Python",
"content": "Python is the leading language for machine learning...",
"metadata": {"category": "ml", "level": "intermediate"}
}
# ... more documents
]
2. RAG Tool Implementation¶
@function_tool
async def litellm_rag_search(query: str, max_results: int = 3, context=None) -> str:
"""Search the knowledge base and format retrieved information for LLM processing.
Args:
query: Search query for the knowledge base
max_results: Maximum number of documents to retrieve (default: 3)
"""
# Step 1: Retrieve relevant documents using semantic search
relevant_docs = _semantic_search(query, max_results)
if not relevant_docs:
return f"I couldn't find any relevant information in the knowledge base for your query: '{query}'"
# Step 2: Format the retrieved information
formatted_response = _format_retrieved_docs(relevant_docs, query)
# Include source information
sources = [f"[{doc['title']}] - {doc['metadata']['category']}" for doc in relevant_docs]
source_info = "\n\nSources: " + ", ".join(sources)
return formatted_response + source_info
3. Semantic Search Algorithm¶
def _semantic_search(query: str, max_results: int) -> List[Dict[str, Any]]:
"""Perform semantic search on the knowledge base using keyword matching and scoring."""
query_lower = query.lower()
scored_docs = []
for doc in knowledge_base: # Access global knowledge_base
score = 0
# Title and content matching
title_matches = sum(1 for word in query_lower.split() if word in doc["title"].lower())
content_matches = sum(1 for word in query_lower.split() if word in doc["content"].lower())
# Category-specific keywords
category_keywords = {
"programming": ["python", "code", "programming", "language", "syntax"],
"ml": ["machine learning", "ai", "model", "training", "neural"],
"web": ["web", "api", "fastapi", "server", "http"],
"ai": ["ai", "agent", "framework", "intelligent", "litellm", "gemini"]
}
category = doc["metadata"]["category"]
if category in category_keywords:
category_matches = sum(1 for keyword in category_keywords[category] if keyword in query_lower)
score += category_matches * 1.5
score += title_matches * 3 + content_matches
if score > 0:
scored_docs.append((score, doc))
# Sort by relevance score
scored_docs.sort(key=lambda x: x[0], reverse=True)
return [doc for score, doc in scored_docs[:max_results]]
def _format_retrieved_docs(docs: List[Dict[str, Any]], query: str) -> str:
"""Format retrieved documents for presentation."""
if not docs:
return "No relevant documents found."
formatted_sections = []
for i, doc in enumerate(docs, 1):
section = f"**{i}. {doc['title']}**\n{doc['content'][:300]}..."
if doc['metadata']:
section += f"\n*Category: {doc['metadata']['category']}*"
formatted_sections.append(section)
return "\n\n".join(formatted_sections)
4. RAG Agent Configuration¶
def create_litellm_rag_agent() -> Agent:
def rag_instructions(state: RunState) -> str:
return """You are a knowledgeable AI assistant with access to a specialized knowledge base through the LiteLLM proxy.
When users ask questions, you should:
1. Use the litellm_rag_search tool to search for relevant information in the knowledge base
2. Provide comprehensive answers based on the retrieved information
3. Always cite your sources when providing information from the knowledge base
4. Be specific and detailed in your responses
5. If the knowledge base doesn't contain relevant information, be honest about the limitations
You have access to information about programming, machine learning, web development, data science, AI frameworks, and LiteLLM proxy configuration."""
return Agent(
name="litellm_rag_assistant",
instructions=rag_instructions,
tools=[litellm_rag_search]
)
Running the RAG Example¶
1. Setup¶
# Install dependencies
pip install jaf-py litellm python-dotenv
# Configure environment
export LITELLM_URL=http://localhost:4000
export LITELLM_API_KEY=your-gemini-api-key
export LITELLM_MODEL=gemini-2.5-pro
2. Run the Example¶
3. Demo Modes¶
Automated Demo:
Choose demo mode:
1. Automated demo with sample questions
2. Interactive chat
Enter 1 or 2: 1
Demo Question 1: What is Python and why is it popular for programming?
------------------------------------------------------------
🤖 Assistant: Based on the knowledge base information, Python is a high-level, interpreted programming language that has gained popularity for several key reasons:
**What Python Is:**
Python is a high-level, interpreted programming language known for its simplicity and readability. It supports multiple programming paradigms including procedural, object-oriented, and functional programming.
**Why It's Popular:**
1. **Clean and Expressive Syntax** - Python's syntax is clean and expressive, making it accessible to both beginners and experienced developers
2. **Readability** - The language emphasizes code readability, which reduces development time and maintenance costs
3. **Versatility** - Python supports multiple programming paradigms, making it suitable for various types of projects
[Source 1] Python Programming Basics
Category: programming | Level: beginner
Interactive Mode:
Choose demo mode:
1. Automated demo with sample questions
2. Interactive chat
Enter 1 or 2: 2
🤖 Interactive JAF LiteLLM RAG Demo
Type your questions and get answers from the knowledge base!
Type 'quit' or 'exit' to stop.
👤 You: How do I use Python for machine learning?
Searching knowledge base and generating response...
🤖 Assistant: Python is the leading language for machine learning due to its rich ecosystem of specialized libraries. Here's how you can use Python for ML:
**Key Libraries:**
1. **Scikit-learn** - Provides simple and efficient tools for data mining and analysis
2. **TensorFlow and PyTorch** - Enable deep learning and neural network development
3. **NumPy and Pandas** - Handle numerical computations and data manipulation
These libraries make Python an excellent choice for machine learning projects, from basic data analysis to advanced deep learning applications.
[Source 1] Machine Learning with Python
Category: ml | Level: intermediate
Custom Tool Examples¶
Advanced Calculator Tool¶
import math
import re
import ast
import operator
# Safe mathematical functions registry
SAFE_MATH_FUNCTIONS = {
'sin': math.sin, 'cos': math.cos, 'tan': math.tan,
'sqrt': math.sqrt, 'log': math.log, 'exp': math.exp,
'abs': abs, 'round': round, 'max': max, 'min': min,
'pi': math.pi, 'e': math.e
}
@function_tool
async def advanced_calculator(expression: str, precision: int = 6, context=None) -> str:
"""Perform advanced mathematical calculations including trigonometry and scientific functions.
Args:
expression: Mathematical expression with functions like sin(x), sqrt(x), log(x)
precision: Number of decimal places for result formatting (default: 6)
"""
try:
# Input validation
if not expression or len(expression.strip()) == 0:
return "Error: Expression cannot be empty"
if len(expression) > 500:
return f"Error: Expression too long (max 500 characters, got {len(expression)})"
# Security validation - only allow safe mathematical characters and functions
allowed_pattern = r'^[0-9+\-*/().a-z_\s]+$'
if not re.match(allowed_pattern, expression.lower()):
return "Error: Expression contains invalid characters. Only numbers, operators, parentheses, and mathematical functions are allowed."
# Parse expression safely using AST
try:
tree = ast.parse(expression, mode='eval')
result = _safe_eval_advanced(tree.body, precision)
except SyntaxError:
return f"Error: Invalid mathematical syntax in expression: {expression}"
except ValueError as e:
return f"Error: {str(e)}"
# Format result with specified precision
if isinstance(result, float):
result = round(result, precision)
# Remove trailing zeros for cleaner display
if result == int(result):
result = int(result)
# Extract used functions for informational purposes
used_functions = _extract_functions_from_expression(expression)
function_info = f" (using: {', '.join(used_functions)})" if used_functions else ""
return f"Result: {expression} = {result}{function_info}"
except Exception as e:
return f"Error: Advanced calculation failed: {str(e)}"
def _safe_eval_advanced(node, precision: int):
"""Safely evaluate AST node with advanced mathematical functions."""
safe_operators = {
ast.Add: operator.add,
ast.Sub: operator.sub,
ast.Mult: operator.mul,
ast.Div: operator.truediv,
ast.Pow: operator.pow,
ast.USub: operator.neg,
ast.UAdd: operator.pos,
}
if isinstance(node, ast.Constant):
return node.value
elif isinstance(node, ast.Num): # Python < 3.8 compatibility
return node.n
elif isinstance(node, ast.Name):
# Handle mathematical constants
if node.id in SAFE_MATH_FUNCTIONS:
return SAFE_MATH_FUNCTIONS[node.id]
else:
raise ValueError(f"Undefined variable: {node.id}")
elif isinstance(node, ast.BinOp):
if type(node.op) not in safe_operators:
raise ValueError(f"Unsupported operation: {type(node.op).__name__}")
left = _safe_eval_advanced(node.left, precision)
right = _safe_eval_advanced(node.right, precision)
return safe_operators[type(node.op)](left, right)
elif isinstance(node, ast.UnaryOp):
if type(node.op) not in safe_operators:
raise ValueError(f"Unsupported unary operation: {type(node.op).__name__}")
operand = _safe_eval_advanced(node.operand, precision)
return safe_operators[type(node.op)](operand)
elif isinstance(node, ast.Call):
# Handle function calls
if isinstance(node.func, ast.Name):
func_name = node.func.id
if func_name in SAFE_MATH_FUNCTIONS:
func = SAFE_MATH_FUNCTIONS[func_name]
args = [_safe_eval_advanced(arg, precision) for arg in node.args]
try:
return func(*args)
except Exception as e:
raise ValueError(f"Error in function {func_name}: {str(e)}")
else:
raise ValueError(f"Unknown function: {func_name}")
else:
raise ValueError("Complex function calls not supported")
else:
raise ValueError(f"Unsupported AST node type: {type(node).__name__}")
def _extract_functions_from_expression(expression: str) -> list:
"""Extract mathematical function names from expression."""
functions_used = []
for func_name in SAFE_MATH_FUNCTIONS:
if isinstance(SAFE_MATH_FUNCTIONS[func_name], type(math.sin)): # Callable functions only
if func_name + '(' in expression:
functions_used.append(func_name)
return functions_used
Database Query Tool¶
import asyncpg
from typing import Dict, List, Any, Optional
# Global configuration for database security
ALLOWED_TABLES = {'users', 'products', 'orders', 'analytics'}
ALLOWED_COLUMNS = {
'users': ['id', 'name', 'email', 'created_at'],
'products': ['id', 'name', 'price', 'category'],
'orders': ['id', 'user_id', 'product_id', 'quantity', 'total'],
'analytics': ['id', 'metric_name', 'value', 'timestamp']
}
@function_tool
async def database_query(
table: str,
columns: str = "*",
where_clause: str = "",
limit: int = 100,
context=None
) -> str:
"""Execute safe database queries with prepared statements and access control.
Args:
table: Table name to query (must be in allowed list)
columns: Comma-separated column names or "*" for all (default: "*")
where_clause: SQL WHERE conditions using safe syntax (optional)
limit: Maximum number of rows to return (default: 100, max: 1000)
"""
try:
# Permission validation
if not context or not hasattr(context, 'permissions'):
return "Error: Context with permissions required"
if 'database_read' not in context.permissions:
return "Error: Database read permission required"
# Table validation
if table not in ALLOWED_TABLES:
available_tables = ', '.join(sorted(ALLOWED_TABLES))
return f"Error: Table '{table}' not accessible. Available tables: {available_tables}"
# Limit validation
if limit > 1000:
return "Error: Limit cannot exceed 1000 rows"
if limit < 1:
return "Error: Limit must be at least 1"
# Column validation
if columns != "*":
requested_columns = [col.strip() for col in columns.split(',')]
allowed_for_table = ALLOWED_COLUMNS.get(table, [])
invalid_columns = [col for col in requested_columns if col not in allowed_for_table]
if invalid_columns:
available_cols = ', '.join(sorted(allowed_for_table))
return f"Error: Invalid columns {invalid_columns} for table '{table}'. Available: {available_cols}"
columns_sql = ', '.join(requested_columns)
else:
columns_sql = ', '.join(ALLOWED_COLUMNS.get(table, ['*']))
# Build safe query with parameterized statements
query_parts = [f"SELECT {columns_sql} FROM {table}"]
params = []
if where_clause:
# Basic WHERE clause validation (prevent SQL injection)
if _validate_where_clause(where_clause):
query_parts.append(f"WHERE {where_clause}")
else:
return "Error: Invalid WHERE clause. Use simple conditions like 'column = value' or 'column > value'"
query_parts.append(f"LIMIT ${len(params) + 1}")
params.append(limit)
final_query = ' '.join(query_parts)
# Get database connection (in real implementation, this would come from context)
connection_pool = getattr(context, 'db_pool', None)
if not connection_pool:
return "Error: Database connection not available in context"
# Execute query safely
async with connection_pool.acquire() as conn:
rows = await conn.fetch(final_query, *params)
results = [dict(row) for row in rows]
# Format response
if not results:
return f"No records found in table '{table}'"
# Create formatted response
response_lines = [
f"Database Query Results:",
f"Table: {table}",
f"Columns: {columns}",
f"Records found: {len(results)}",
f"Limit applied: {limit}",
""
]
# Add sample of results (first 5 rows)
if results:
response_lines.append("Sample Results:")
for i, row in enumerate(results[:5], 1):
row_str = ', '.join([f"{k}: {v}" for k, v in row.items()])
response_lines.append(f" {i}. {row_str}")
if len(results) > 5:
response_lines.append(f" ... and {len(results) - 5} more records")
return '\n'.join(response_lines)
except asyncpg.PostgresError as e:
return f"Error: Database error: {str(e)}"
except Exception as e:
return f"Error: Query execution failed: {str(e)}"
def _validate_where_clause(where_clause: str) -> bool:
"""Validate WHERE clause for basic SQL injection protection."""
if not where_clause:
return True
# Convert to lowercase for analysis
clause_lower = where_clause.lower()
# Block dangerous SQL keywords
dangerous_keywords = [
'drop', 'delete', 'insert', 'update', 'create', 'alter',
'exec', 'execute', 'union', 'select', 'script', '--', ';'
]
for keyword in dangerous_keywords:
if keyword in clause_lower:
return False
# Only allow basic comparison operators and logical operators
allowed_operators = ['=', '>', '<', '>=', '<=', '!=', 'and', 'or', 'like', 'in', 'between']
# Simple validation - this is basic and should be enhanced for production
# In production, use proper SQL parsing or ORM query builders
return True
# Usage example for creating database-enabled agent
def create_database_agent(db_pool) -> Agent:
"""Create an agent with database query capabilities."""
def instructions(state):
return """You are a data analyst assistant with access to a company database.
You can query the following tables:
- users: Customer information (id, name, email, created_at)
- products: Product catalog (id, name, price, category)
- orders: Order history (id, user_id, product_id, quantity, total)
- analytics: Business metrics (id, metric_name, value, timestamp)
Always use the database_query tool to retrieve information. Be specific about which columns you need and apply appropriate filters and limits."""
return Agent(
name="DatabaseAnalyst",
instructions=instructions,
tools=[database_query]
)
HTTP API Tool¶
import httpx
import time
from urllib.parse import urlparse
from collections import defaultdict
from typing import Dict, List, Optional, Any
# Global configuration for API security
ALLOWED_DOMAINS = [
'api.github.com',
'jsonplaceholder.typicode.com',
'httpbin.org',
'api.openweathermap.org'
]
# Simple rate limiter implementation
class SimpleRateLimiter:
def __init__(self, max_requests: int = 10, time_window: int = 60):
self.max_requests = max_requests
self.time_window = time_window
self.requests = defaultdict(list)
def is_allowed(self, identifier: str = "default") -> bool:
now = time.time()
# Clean old requests
cutoff = now - self.time_window
self.requests[identifier] = [req_time for req_time in self.requests[identifier] if req_time > cutoff]
# Check if under limit
if len(self.requests[identifier]) >= self.max_requests:
return False
# Record this request
self.requests[identifier].append(now)
return True
# Global rate limiter instance
_rate_limiter = SimpleRateLimiter(max_requests=10, time_window=60)
@function_tool
async def http_api_request(
url: str,
method: str = "GET",
headers: Optional[str] = None,
data: Optional[str] = None,
timeout: int = 30,
context=None
) -> str:
"""Make HTTP API requests with security controls and rate limiting.
Args:
url: Target URL (must be from allowed domains)
method: HTTP method (GET, POST, PUT, DELETE)
headers: JSON string of headers (optional)
data: JSON string of request body data (optional)
timeout: Request timeout in seconds (default: 30, max: 60)
"""
try:
# Input validation
if not url or not url.startswith(('http://', 'https://')):
return "Error: Invalid URL. Must start with http:// or https://"
# Rate limiting
user_id = getattr(context, 'user_id', 'anonymous') if context else 'anonymous'
if not _rate_limiter.is_allowed(user_id):
return "Error: Rate limit exceeded. Please wait before making another request."
# Domain validation
parsed_url = urlparse(url)
if parsed_url.hostname not in ALLOWED_DOMAINS:
allowed_list = ', '.join(ALLOWED_DOMAINS)
return f"Error: Domain '{parsed_url.hostname}' not in allowed list. Allowed domains: {allowed_list}"
# Method validation
allowed_methods = ['GET', 'POST', 'PUT', 'DELETE', 'PATCH']
method = method.upper()
if method not in allowed_methods:
return f"Error: HTTP method '{method}' not allowed. Use: {', '.join(allowed_methods)}"
# Timeout validation
if timeout > 60:
timeout = 60
if timeout < 1:
timeout = 1
# Parse headers
parsed_headers = {}
if headers:
try:
import json
parsed_headers = json.loads(headers)
if not isinstance(parsed_headers, dict):
return "Error: Headers must be a JSON object"
except json.JSONDecodeError:
return "Error: Invalid JSON format for headers"
# Parse data
parsed_data = None
if data:
try:
import json
parsed_data = json.loads(data)
except json.JSONDecodeError:
return "Error: Invalid JSON format for request data"
# Add default headers
final_headers = {
'User-Agent': 'JAF-HTTP-Tool/1.0',
**parsed_headers
}
# Make HTTP request
async with httpx.AsyncClient(timeout=timeout) as client:
request_kwargs = {
'method': method,
'url': url,
'headers': final_headers
}
if parsed_data and method in ['POST', 'PUT', 'PATCH']:
request_kwargs['json'] = parsed_data
response = await client.request(**request_kwargs)
# Process response
response_info = [
f"HTTP {method} Request to {url}",
f"Status: {response.status_code} {response.reason_phrase}",
f"Response Time: {response.elapsed.total_seconds():.2f}s" if hasattr(response, 'elapsed') else "",
""
]
# Add response headers (filtered)
important_headers = ['content-type', 'content-length', 'server', 'date']
response_info.append("Response Headers:")
for header in important_headers:
if header in response.headers:
response_info.append(f" {header}: {response.headers[header]}")
response_info.append("")
# Process response body
content_type = response.headers.get('content-type', '').lower()
if 'application/json' in content_type:
try:
json_data = response.json()
# Limit JSON response size for readability
json_str = str(json_data)
if len(json_str) > 2000:
response_info.append("JSON Response (truncated):")
response_info.append(json_str[:2000] + "...")
else:
response_info.append("JSON Response:")
response_info.append(json_str)
except Exception:
response_info.append("Response Body (invalid JSON):")
response_info.append(response.text[:1000])
elif 'text/' in content_type:
response_info.append("Text Response:")
text_content = response.text
if len(text_content) > 1500:
response_info.append(text_content[:1500] + "...")
else:
response_info.append(text_content)
else:
response_info.append(f"Binary Response ({len(response.content)} bytes)")
response_info.append("Content type: " + content_type)
# Add status assessment
if 200 <= response.status_code < 300:
response_info.insert(1, "✅ Request successful")
elif 400 <= response.status_code < 500:
response_info.insert(1, "⚠️ Client error")
elif 500 <= response.status_code < 600:
response_info.insert(1, "❌ Server error")
return '\n'.join(response_info)
except httpx.TimeoutException:
return f"Error: Request to {url} timed out after {timeout} seconds"
except httpx.ConnectError:
return f"Error: Could not connect to {url}. Check if the server is running."
except httpx.HTTPError as e:
return f"Error: HTTP error occurred: {str(e)}"
except Exception as e:
return f"Error: Request failed: {str(e)}"
# Usage example for creating API-enabled agent
def create_api_agent() -> Agent:
"""Create an agent with HTTP API capabilities."""
def instructions(state):
return f"""You are an API integration assistant that can make HTTP requests to external services.
Available domains for API calls:
{chr(10).join([f'- {domain}' for domain in ALLOWED_DOMAINS])}
You can use GET requests to fetch data and POST/PUT requests to send data.
Always explain what API you're calling and what data you're requesting or sending.
Rate limit: 10 requests per minute per user.
Timeout: Maximum 60 seconds per request.
Use the http_api_request tool for all external API calls."""
return Agent(
name="APIAssistant",
instructions=instructions,
tools=[http_api_request]
)
Memory Integration Examples¶
Redis Memory Example¶
async def setup_redis_memory():
"""Configure Redis memory provider."""
import redis.asyncio as redis
# Create Redis client
redis_client = redis.Redis(
host="localhost",
port=6379,
password="your-password",
db=0,
decode_responses=False
)
# Test connection
await redis_client.ping()
# Create memory provider
from jaf.memory import create_redis_provider, RedisConfig
config = RedisConfig(
host="localhost",
port=6379,
password="your-password",
db=0,
key_prefix="jaf:conversations:",
ttl=86400 # 24 hours
)
provider = await create_redis_provider(config, redis_client)
return MemoryConfig(
provider=provider,
auto_store=True,
max_messages=1000
)
# Usage in server
memory_config = await setup_redis_memory()
run_config = RunConfig(
agent_registry=agents,
model_provider=model_provider,
memory=memory_config
)
PostgreSQL Memory Example¶
async def setup_postgres_memory():
"""Configure PostgreSQL memory provider."""
import asyncpg
# Create connection
conn = await asyncpg.connect(
host="localhost",
port=5432,
database="jaf_memory",
user="postgres",
password="your-password"
)
# Create memory provider
from jaf.memory import create_postgres_provider, PostgresConfig
config = PostgresConfig(
host="localhost",
port=5432,
database="jaf_memory",
username="postgres",
password="your-password",
table_name="conversations"
)
provider = await create_postgres_provider(config, conn)
return MemoryConfig(
provider=provider,
auto_store=True,
max_messages=1000
)
# Database schema setup
async def setup_postgres_schema(conn):
"""Set up PostgreSQL schema for JAF memory."""
await conn.execute('''
CREATE TABLE IF NOT EXISTS conversations (
id SERIAL PRIMARY KEY,
conversation_id VARCHAR(255) UNIQUE NOT NULL,
user_id VARCHAR(255),
messages JSONB NOT NULL,
metadata JSONB,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_conversations_user_id
ON conversations(user_id);
CREATE INDEX IF NOT EXISTS idx_conversations_created_at
ON conversations(created_at);
''')
Best Practices from Examples¶
1. Security Patterns¶
# Input sanitization
def sanitize_expression(expr: str) -> str:
"""Remove potentially dangerous characters."""
allowed = set('0123456789+-*/(). ')
return ''.join(c for c in expr if c in allowed)
# Permission checking
def check_permissions(context, required_permissions):
"""Verify user has required permissions."""
user_permissions = set(context.permissions)
required = set(required_permissions)
return required.issubset(user_permissions)
# Rate limiting
class RateLimiter:
def __init__(self, rate: int, window: int = 60):
self.rate = rate
self.window = window
self.requests = defaultdict(list)
def is_allowed(self, user_id: str) -> bool:
now = time.time()
user_requests = self.requests[user_id]
# Remove old requests
cutoff = now - self.window
self.requests[user_id] = [req for req in user_requests if req > cutoff]
# Check rate limit
if len(self.requests[user_id]) >= self.rate:
return False
self.requests[user_id].append(now)
return True
2. Error Handling Patterns¶
# Comprehensive error handling
async def safe_tool_execution(tool, args, context):
"""Execute tool with comprehensive error handling."""
try:
# Validate inputs
if not hasattr(args, 'validate'):
raise ValueError("Invalid arguments object")
# Check permissions
if not check_permissions(context, tool.required_permissions):
return ToolResponse.error(
ToolErrorCodes.PERMISSION_DENIED,
"Insufficient permissions"
)
# Execute with timeout
result = await asyncio.wait_for(
tool.execute(args, context),
timeout=30.0
)
return result
except asyncio.TimeoutError:
return ToolResponse.error(
ToolErrorCodes.TIMEOUT,
"Tool execution timed out"
)
except ValidationError as e:
return ToolResponse.validation_error(
str(e),
{'validation_errors': e.errors()}
)
except Exception as e:
logger.exception(f"Tool execution failed: {e}")
return ToolResponse.error(
ToolErrorCodes.EXECUTION_FAILED,
"Internal tool error"
)
3. Performance Patterns¶
# Connection pooling
async def create_optimized_client():
"""Create HTTP client with connection pooling."""
return httpx.AsyncClient(
limits=httpx.Limits(
max_connections=100,
max_keepalive_connections=20,
keepalive_expiry=30.0
),
timeout=httpx.Timeout(30.0)
)
# Caching
from functools import lru_cache
@lru_cache(maxsize=1000)
def expensive_computation(input_data: str) -> str:
"""Cache expensive computations."""
# Expensive operation here
return result
# Batch processing
async def process_batch(items: List[Any], batch_size: int = 10):
"""Process items in batches to avoid overwhelming resources."""
results = []
for i in range(0, len(items), batch_size):
batch = items[i:i + batch_size]
batch_results = await asyncio.gather(*[
process_item(item) for item in batch
])
results.extend(batch_results)
return results
Iterative Search Agent - Callback System Showcase¶
The iterative search agent (examples/iterative_search_agent.py
) demonstrates the full power of JAF's advanced callback system by implementing a sophisticated ReAct-style agent that can iteratively gather information, check for synthesis completion, and provide comprehensive answers.
Key Features Demonstrated¶
This example showcases how the callback system enables complex agent behaviors:
- 🔄 Iterative Information Gathering - Agent searches across multiple iterations
- ** Synthesis Checking** - Automatically determines when enough information is gathered
- ** Dynamic Query Refinement** - Refines search queries based on previous results
- 🚫 Loop Detection - Prevents repetitive searches
- ** Context Management** - Intelligently accumulates and filters information
- ** Performance Monitoring** - Tracks metrics and execution statistics
Architecture Overview¶
from adk.runners import RunnerConfig, execute_agent
# Comprehensive callback implementation
class IterativeSearchCallbacks:
async def on_start(self, context, message, session_state):
"""Initialize tracking for iterative search."""
self.original_query = message.content
print(f" Starting search for: '{self.original_query}'")
async def on_check_synthesis(self, session_state, context_data):
"""Determine if enough information has been gathered."""
if len(context_data) >= self.synthesis_threshold:
confidence = self._calculate_confidence(context_data)
if confidence >= 0.75:
return {
'complete': True,
'answer': self._generate_synthesis_prompt(context_data),
'confidence': confidence
}
return None
async def on_query_rewrite(self, original_query, context_data):
"""Refine queries based on accumulated context."""
gaps = self._identify_knowledge_gaps(context_data)
if gaps:
return f"{original_query} focusing on {', '.join(gaps)}"
return None
async def on_loop_detection(self, tool_history, current_tool):
"""Prevent repetitive searches."""
recent_queries = [item['query'] for item in tool_history[-3:]]
return self._detect_similarity(recent_queries) > 0.7
# Configure agent with callbacks
config = RunnerConfig(
agent=search_agent,
callbacks=IterativeSearchCallbacks(max_iterations=5, synthesis_threshold=4),
enable_context_accumulation=True,
enable_loop_detection=True
)
# Execute with full instrumentation
result = await execute_agent(config, session_state, message, context, model_provider)
Example Execution Flow¶
When you run the iterative search agent, you'll see output like this:
ITERATIVE SEARCH AGENT DEMONSTRATION
============================================================
Starting iterative search for: 'What are the applications of machine learning?'
🔄 ITERATION 1/4
Executing search: 'machine learning applications in different industries'
Adding 3 new context items...
Total context items: 3
🔄 ITERATION 2/4
Query refined: 'machine learning applications in finance and trading'
Executing search: 'machine learning applications in finance and trading'
Adding 2 new context items...
Total context items: 5
🧮 Evaluating synthesis readiness with 5 context items...
Coverage: 0.85
Quality: 0.90
Completeness: 0.50
Overall confidence: 0.75
Synthesis complete! Confidence: 0.85
ITERATIVE SEARCH COMPLETED
Total iterations: 2
Context items gathered: 5
Searches performed: 2
Final confidence: 0.85
Execution time: 1247ms
============================================================
Key Implementation Patterns¶
1. Context Accumulation¶
async def on_context_update(self, current_context, new_items):
"""Manage context with deduplication and relevance filtering."""
# Deduplicate based on content similarity
filtered_items = self._deduplicate_and_filter(new_items)
# Merge and sort by relevance
self.context_accumulator.extend(filtered_items)
self.context_accumulator.sort(key=lambda x: x.get('relevance', 0), reverse=True)
# Keep top items within limits
return self.context_accumulator[:20]
2. Intelligent Query Refinement¶
async def on_query_rewrite(self, original_query, context_data):
"""Analyze context gaps and refine search queries."""
topics_covered = self._analyze_topic_coverage(context_data)
if 'healthcare' in topics_covered and 'finance' not in topics_covered:
return f"{original_query} applications in finance and trading"
elif len(topics_covered) >= 2:
return f"{original_query} future trends and emerging applications"
return None
3. Synthesis Quality Assessment¶
async def on_check_synthesis(self, session_state, context_data):
"""Multi-factor synthesis readiness assessment."""
coverage_score = self._analyze_coverage(context_data)
quality_score = self._analyze_quality(context_data)
completeness_score = min(len(context_data) / 10.0, 1.0)
confidence = (coverage_score + quality_score + completeness_score) / 3.0
if confidence >= 0.75:
return {
'complete': True,
'answer': self._create_comprehensive_synthesis(context_data),
'confidence': confidence
}
return None
Running the Example¶
1. Basic Execution¶
2. Custom Configuration¶
from examples.iterative_search_agent import IterativeSearchCallbacks
# Configure for different behavior
callbacks = IterativeSearchCallbacks(
max_iterations=10, # More thorough search
synthesis_threshold=8 # Require more information
)
config = RunnerConfig(
agent=search_agent,
callbacks=callbacks,
enable_context_accumulation=True,
max_context_items=50 # Larger context window
)
Advanced Patterns Demonstrated¶
ReAct (Reasoning + Acting) Pattern¶
The example implements a full ReAct pattern where the agent:
- Reasons about what information it needs
- Acts by searching for that information
- Observes the results and their relevance
- Reasons about gaps and next steps
- Repeats until synthesis is complete
Dynamic Behavior Adaptation¶
class AdaptiveCallbacks(IterativeSearchCallbacks):
async def on_iteration_complete(self, iteration, has_tool_calls):
"""Adapt behavior based on progress."""
if not has_tool_calls:
# No tools called, likely finished
return {'should_stop': True}
if self._making_progress():
# Continue if making good progress
return {'should_continue': True}
else:
# Try different approach
return {'should_stop': True}
Performance Monitoring¶
async def on_complete(self, response):
"""Comprehensive execution analytics."""
print(f" Performance Metrics:")
print(f" Iterations: {self.iteration_count}")
print(f" Context Quality: {self.final_quality_score:.2f}")
print(f" Search Efficiency: {len(self.context_accumulator)/self.iteration_count:.1f} items/iteration")
print(f" Synthesis Confidence: {self.synthesis_confidence:.2f}")
This example demonstrates how the callback system transforms JAF from a simple agent executor into a sophisticated reasoning engine capable of complex, adaptive behaviors that would be impossible with traditional fixed execution patterns.
Tracing and Observability Examples¶
JAF provides comprehensive tracing capabilities for monitoring agent execution, debugging issues, and analyzing performance.
Basic Console Tracing¶
# examples/tracing_basic_example.py
import asyncio
from jaf import Agent, Message, RunConfig, RunState, run
from jaf.core.tracing import ConsoleTraceCollector
from jaf.core.types import ContentRole, generate_run_id, generate_trace_id
from jaf.providers.model import make_litellm_provider
async def main():
# Create console trace collector
trace_collector = ConsoleTraceCollector()
agent = Agent(
name="demo_agent",
instructions=lambda s: "You are a helpful assistant."
)
config = RunConfig(
agent_registry={"demo_agent": agent},
model_provider=make_litellm_provider("http://localhost:4000", "api-key"),
on_event=trace_collector.collect # Enable detailed tracing
)
initial_state = RunState(
run_id=generate_run_id(),
trace_id=generate_trace_id(),
messages=[Message(role=ContentRole.USER, content="Hello!")],
current_agent_name="demo_agent",
context={},
turn_count=0
)
result = await run(initial_state, config)
print(f"Result: {result.outcome}")
if __name__ == "__main__":
asyncio.run(main())
Output Example:
[2024-01-15T10:30:00.123Z] JAF:run_start Starting run run_abc123 (trace: trace_xyz789)
[2024-01-15T10:30:01.456Z] JAF:llm_call_start Calling gpt-4 for agent demo_agent
[2024-01-15T10:30:02.789Z] JAF:llm_call_end LLM responded with content
[2024-01-15T10:30:02.800Z] JAF:run_end Run completed successfully in 2.68s
OpenTelemetry Integration¶
# examples/otel_tracing_demo.py
import os
import asyncio
from jaf.core.tracing import create_composite_trace_collector, ConsoleTraceCollector
# Configure OpenTelemetry
os.environ["TRACE_COLLECTOR_URL"] = "http://localhost:4318/v1/traces"
async def main():
# Auto-configured tracing (includes OpenTelemetry + Console)
trace_collector = create_composite_trace_collector(ConsoleTraceCollector())
# Your agent configuration...
config = RunConfig(
agent_registry={"agent": agent},
model_provider=model_provider,
on_event=trace_collector.collect
)
result = await run(initial_state, config)
# Traces automatically sent to Jaeger at http://localhost:16686
Setup Jaeger:
# Start Jaeger for trace visualization
docker run -d \
--name jaeger \
-p 16686:16686 \
-p 4318:4318 \
jaegertracing/all-in-one:latest
# View traces at http://localhost:16686
Langfuse Integration¶
# examples/langfuse_tracing_demo.py
import os
import asyncio
from jaf.core.tracing import create_composite_trace_collector, ConsoleTraceCollector
# Configure Langfuse
os.environ["LANGFUSE_PUBLIC_KEY"] = "pk-lf-your-public-key"
os.environ["LANGFUSE_SECRET_KEY"] = "sk-lf-your-secret-key"
os.environ["LANGFUSE_HOST"] = "https://cloud.langfuse.com"
async def main():
# Auto-configured tracing (includes Langfuse + Console)
trace_collector = create_composite_trace_collector(ConsoleTraceCollector())
config = RunConfig(
agent_registry={"weather_agent": weather_agent},
model_provider=model_provider,
on_event=trace_collector.collect
)
# Traces automatically sent to Langfuse with LLM-specific analytics
result = await run(initial_state, config)
Custom Metrics Collection¶
# examples/custom_metrics_example.py
class MetricsCollector:
def __init__(self):
self.metrics = {
"total_runs": 0,
"successful_runs": 0,
"total_llm_calls": 0,
"total_tool_calls": 0
}
def collect(self, event):
if event.type == "run_start":
self.metrics["total_runs"] += 1
elif event.type == "run_end":
if event.data.get("outcome", {}).get("status") == "completed":
self.metrics["successful_runs"] += 1
elif event.type == "llm_call_start":
self.metrics["total_llm_calls"] += 1
elif event.type == "tool_call_start":
self.metrics["total_tool_calls"] += 1
def get_metrics(self):
return self.metrics.copy()
# Usage
metrics = MetricsCollector()
config = RunConfig(
agent_registry=agents,
model_provider=model_provider,
on_event=metrics.collect
)
# After runs
print("Metrics:", metrics.get_metrics())
Agent-as-Tool Examples¶
JAF's agent-as-tool functionality enables hierarchical agent architectures where specialized agents become tools for orchestrator agents.
Basic Translation Orchestrator¶
# examples/agent_as_tool_example.py
import asyncio
from dataclasses import dataclass
from jaf import Agent, ModelConfig, RunConfig, RunState, Message, run
from jaf.core.types import ContentRole, generate_run_id, generate_trace_id
@dataclass(frozen=True)
class TranslationContext:
user_id: str
target_languages: list[str]
# Create specialized translation agents
spanish_agent = Agent(
name="spanish_translator",
instructions=lambda state: "Translate to Spanish. Reply only with the translation.",
model_config=ModelConfig(name="gpt-4", temperature=0.3)
)
french_agent = Agent(
name="french_translator",
instructions=lambda state: "Translate to French. Reply only with the translation.",
model_config=ModelConfig(name="gpt-4", temperature=0.3)
)
# Convert agents to tools
spanish_tool = spanish_agent.as_tool(
tool_name="translate_to_spanish",
tool_description="Translate text to Spanish",
max_turns=3
)
french_tool = french_agent.as_tool(
tool_name="translate_to_french",
tool_description="Translate text to French",
max_turns=3
)
# Create orchestrator
orchestrator = Agent(
name="translation_orchestrator",
instructions=lambda state: (
"You coordinate translations using your tools. "
"Always use the appropriate translation tools."
),
tools=[spanish_tool, french_tool],
model_config=ModelConfig(name="gpt-4", temperature=0.1)
)
async def main():
config = RunConfig(
agent_registry={"translation_orchestrator": orchestrator},
model_provider=make_litellm_provider("http://localhost:4000", "api-key"),
max_turns=10
)
initial_state = RunState(
run_id=generate_run_id(),
trace_id=generate_trace_id(),
messages=[Message(
role=ContentRole.USER,
content="Translate 'Hello, how are you?' to Spanish and French"
)],
current_agent_name="translation_orchestrator",
context=TranslationContext(
user_id="user123",
target_languages=["spanish", "french"]
),
turn_count=0
)
result = await run(initial_state, config)
print(f"Translation result: {result.outcome.output}")
Conditional Tool Enabling¶
# Conditional enabling based on user permissions
def premium_user_only(context, agent):
return context.user_type == "premium"
def business_hours_only(context, agent):
from datetime import datetime
current_hour = datetime.now().hour
return 9 <= current_hour <= 17
# Create tools with conditions
premium_tool = expert_agent.as_tool(
tool_name="expert_analysis",
is_enabled=premium_user_only
)
support_tool = human_support_agent.as_tool(
tool_name="human_support",
is_enabled=business_hours_only
)
orchestrator = Agent(
name="customer_service",
instructions=lambda state: "Route customers to appropriate services",
tools=[premium_tool, support_tool]
)
Multi-Level Agent Hierarchies¶
# Level 3: Specialized processors
tokenizer_agent = Agent(name="tokenizer", instructions=tokenizer_instructions)
parser_agent = Agent(name="parser", instructions=parser_instructions)
validator_agent = Agent(name="validator", instructions=validator_instructions)
# Level 2: Processing coordinator
processor_agent = Agent(
name="processor",
instructions=processor_instructions,
tools=[
tokenizer_agent.as_tool(tool_name="tokenize_text"),
parser_agent.as_tool(tool_name="parse_structure"),
validator_agent.as_tool(tool_name="validate_output")
]
)
# Level 1: Main orchestrator
main_agent = Agent(
name="orchestrator",
instructions=orchestrator_instructions,
tools=[processor_agent.as_tool(tool_name="process_data")]
)
Session Management¶
# Ephemeral sessions (default) - independent execution
translator_tool = translator_agent.as_tool(
preserve_session=False # Fresh session each time
)
# Shared sessions - inherit conversation history
assistant_tool = personal_agent.as_tool(
preserve_session=True # Share parent's memory
)
# Custom output processing
def extract_json_response(run_result):
if run_result.outcome.status == 'completed':
output = run_result.outcome.output
# Extract JSON from output
if output.startswith('{'):
return output
return '{"error": "No valid JSON response"}'
data_tool = data_agent.as_tool(
custom_output_extractor=extract_json_response
)
Production Agent Registry¶
# examples/production_agent_registry.py
class AgentToolRegistry:
def __init__(self):
self.agents = {}
self.tool_configs = {}
def register_agent(self, agent, tool_config=None):
self.agents[agent.name] = agent
if tool_config:
self.tool_configs[agent.name] = tool_config
def create_orchestrator(self, name, instructions, enabled_tools):
tools = []
for tool_name in enabled_tools:
agent = self.agents[tool_name]
config = self.tool_configs.get(tool_name, {})
tools.append(agent.as_tool(**config))
return Agent(name=name, instructions=instructions, tools=tools)
# Usage
registry = AgentToolRegistry()
# Register specialized agents
registry.register_agent(
spanish_translator,
{"tool_name": "translate_spanish", "max_turns": 3}
)
registry.register_agent(
data_analyzer,
{"tool_name": "analyze_data", "timeout": 60.0}
)
# Create orchestrators dynamically
translation_agent = registry.create_orchestrator(
"translator",
translation_instructions,
["spanish_translator", "french_translator"]
)
Next Steps¶
- Agent-as-Tool Guide - Comprehensive hierarchical agent orchestration documentation
- Tracing Guide - Complete observability and monitoring documentation
- Learn about Deployment for production setup
- Review Troubleshooting for common issues
- Explore API Reference for complete documentation
- Check Tools Guide for advanced tool patterns
- Callback System - Deep dive into advanced agent instrumentation