Error Handling¶
Production Resilience
JAF's error handling framework provides comprehensive resilience patterns including circuit breakers, exponential backoff retries, and graceful degradation to ensure production systems remain stable under failure conditions.
Overview¶
The JAF error handling system implements enterprise-grade resilience patterns:
- 🔄 Circuit Breaker Pattern: Prevents cascade failures
- ** Retry Logic**: Exponential backoff with jitter
- ** Graceful Degradation**: Fallback mechanisms
- ** Error Monitoring**: Comprehensive error tracking
- ** Context-Aware Recovery**: Smart error classification
Error Hierarchy¶
ADK Error Types¶
from adk.errors import (
AdkError, # Base error class
AdkLLMError, # LLM service errors
AdkSessionError, # Session management errors
AdkSecurityError, # Security-related errors
AdkConfigError, # Configuration errors
AdkTimeoutError, # Timeout errors
AdkRateLimitError # Rate limiting errors
)
# Error hierarchy visualization
AdkError
├── AdkLLMError
│ ├── LLMTimeoutError
│ ├── LLMRateLimitError
│ └── LLMAuthenticationError
├── AdkSessionError
│ ├── SessionNotFoundError
│ └── SessionExpiredError
├── AdkSecurityError
│ ├── AuthenticationError
│ └── AuthorizationError
└── AdkConfigError
├── InvalidConfigError
└── MissingConfigError
Error Context and Metadata¶
from adk.errors import create_adk_error
# Rich error context
error = create_adk_error(
error_type=AdkLLMError,
message="LLM service timeout",
context={
"service": "openai",
"model": "gpt-4",
"request_id": "req_123",
"timeout_seconds": 30,
"retry_count": 2
},
recoverable=True,
retry_after_seconds=60
)
print(f"Error: {error.message}")
print(f"Context: {error.context}")
print(f"Recoverable: {error.recoverable}")
print(f"Retry after: {error.retry_after_seconds}s")
🔄 Circuit Breaker Pattern¶
Basic Circuit Breaker¶
from adk.errors import create_circuit_breaker, CircuitBreakerError
# Create circuit breaker for LLM service
llm_circuit_breaker = create_circuit_breaker(
name="llm-service",
failure_threshold=3, # Open after 3 failures
recovery_timeout=60, # Stay open for 60 seconds
expected_exception=AdkLLMError
)
@llm_circuit_breaker
async def call_llm_service(prompt: str) -> str:
"""LLM service call protected by circuit breaker."""
# This function is automatically protected
response = await llm_service.complete(prompt)
return response.content
# Usage
try:
result = await call_llm_service("Hello, world!")
print(f"Success: {result}")
except CircuitBreakerError:
print("Circuit breaker is open - service unavailable")
except AdkLLMError as e:
print(f"LLM error: {e}")
Circuit Breaker States¶
stateDiagram-v2
[*] --> Closed
Closed --> Open: failure_threshold reached
Open --> HalfOpen: recovery_timeout expired
HalfOpen --> Closed: success
HalfOpen --> Open: failure
note right of Closed: Normal operation
note right of Open: Rejecting requests
note right of HalfOpen: Testing service
Advanced Circuit Breaker Configuration¶
from adk.errors import CircuitBreakerConfig, create_circuit_breaker
# Advanced configuration
config = CircuitBreakerConfig(
failure_threshold=5, # More tolerant
recovery_timeout=120, # Longer recovery time
success_threshold=2, # Require 2 successes to close
timeout=30, # Call timeout
expected_exception=(AdkLLMError, AdkTimeoutError),
fallback_function=llm_fallback_response
)
circuit_breaker = create_circuit_breaker("advanced-llm", config)
async def llm_fallback_response(prompt: str) -> str:
"""Fallback response when service is unavailable."""
return "I'm currently experiencing technical difficulties. Please try again later."
Retry Logic¶
Exponential Backoff Retry¶
from adk.errors import create_retry_handler, RetryConfig
# Create retry handler with exponential backoff
retry_handler = create_retry_handler(
max_attempts=3,
base_delay=1.0, # Start with 1 second
exponential_base=2.0, # Double each time
max_delay=30.0, # Cap at 30 seconds
jitter=True # Add randomness
)
@retry_handler
async def unreliable_operation():
"""Operation that might fail and should be retried."""
if random.random() < 0.7: # 70% failure rate
raise AdkLLMError("Temporary service error")
return "Success!"
# Usage
try:
result = await unreliable_operation()
print(f"Operation succeeded: {result}")
except AdkLLMError as e:
print(f"Operation failed after all retries: {e}")
Conditional Retry Logic¶
from adk.errors import RetryConfig, should_retry
def custom_retry_condition(exception: Exception, attempt: int) -> bool:
"""Custom logic for when to retry."""
# Don't retry authentication errors
if isinstance(exception, AdkSecurityError):
return False
# Retry rate limit errors with longer delay
if isinstance(exception, AdkRateLimitError):
return attempt <= 5
# Retry other errors up to 3 times
return attempt <= 3
retry_config = RetryConfig(
max_attempts=5,
retry_condition=custom_retry_condition,
delay_calculator=lambda attempt: min(2 ** attempt, 60) # Exponential with cap
)
retry_handler = create_retry_handler(retry_config)
Retry with Context¶
@retry_handler
async def context_aware_operation(context: dict):
"""Operation with retry context tracking."""
try:
return await external_service_call(context)
except Exception as e:
# Add context to error for debugging
enriched_error = create_adk_error(
error_type=type(e),
message=str(e),
context={
**context,
"operation": "external_service_call",
"timestamp": datetime.now().isoformat()
}
)
raise enriched_error
Graceful Degradation¶
Fallback Mechanisms¶
from adk.errors import with_fallback
@with_fallback(fallback_function=lambda: "Fallback response")
async def primary_operation():
"""Primary operation with fallback."""
# Try primary service
return await primary_service.call()
# If primary_operation fails, fallback_function is called automatically
result = await primary_operation() # Returns either primary result or fallback
Multi-Level Fallbacks¶
from adk.errors import FallbackChain
# Create fallback chain
fallback_chain = FallbackChain([
("primary", primary_llm_service),
("secondary", secondary_llm_service),
("cache", cached_response_service),
("static", lambda prompt: "I'm currently unavailable. Please try again later.")
])
async def resilient_llm_call(prompt: str) -> str:
"""LLM call with multiple fallbacks."""
return await fallback_chain.execute(prompt)
Service Health Checking¶
from adk.errors import HealthChecker, ServiceStatus
class LLMHealthChecker(HealthChecker):
"""Health checker for LLM service."""
async def check_health(self) -> ServiceStatus:
try:
# Quick health check call
response = await self.service.health_check()
return ServiceStatus.HEALTHY if response.ok else ServiceStatus.DEGRADED
except Exception:
return ServiceStatus.UNHEALTHY
health_checker = LLMHealthChecker(llm_service)
# Use health status to determine behavior
if await health_checker.is_healthy():
result = await normal_operation()
else:
result = await fallback_operation()
Error Monitoring and Observability¶
Error Metrics Collection¶
from adk.errors import ErrorMetrics, create_error_handler
# Initialize error metrics
error_metrics = ErrorMetrics()
error_handler = create_error_handler(
metrics_collector=error_metrics,
enable_tracing=True,
log_level="INFO"
)
@error_handler
async def monitored_operation():
"""Operation with comprehensive error monitoring."""
try:
return await risky_operation()
except AdkLLMError:
# Automatically tracked in metrics
raise
# View error statistics
stats = error_metrics.get_stats()
print(f"Total errors: {stats['total_errors']}")
print(f"Error rate: {stats['error_rate']:.2%}")
print(f"Most common error: {stats['most_common_error']}")
Structured Error Logging¶
from adk.errors import ErrorLogger
import structlog
# Configure structured logging
error_logger = ErrorLogger(
logger=structlog.get_logger(),
include_stack_trace=True,
include_context=True,
sensitive_fields=["api_key", "password", "token"]
)
async def logged_operation():
"""Operation with structured error logging."""
try:
return await operation_that_might_fail()
except Exception as e:
await error_logger.log_error(
error=e,
context={
"user_id": "user-123",
"operation": "llm_call",
"request_id": "req-456"
},
severity="high"
)
raise
Error Alerting¶
from adk.errors import ErrorAlerter, AlertConfig
# Configure error alerting
alert_config = AlertConfig(
error_threshold=10, # Alert after 10 errors
time_window_minutes=5, # Within 5 minutes
cooldown_minutes=15, # Wait 15 minutes between alerts
alert_channels=["email", "slack"]
)
error_alerter = ErrorAlerter(alert_config)
@error_alerter.monitor
async def critical_operation():
"""Critical operation with automatic alerting."""
return await important_service_call()
Error Recovery Strategies¶
Automatic Recovery¶
from adk.errors import AutoRecoveryHandler
recovery_handler = AutoRecoveryHandler({
AdkLLMError: "retry_with_backoff",
AdkRateLimitError: "wait_and_retry",
AdkTimeoutError: "increase_timeout_and_retry",
AdkSessionError: "refresh_session_and_retry"
})
@recovery_handler
async def self_healing_operation():
"""Operation that attempts automatic recovery."""
return await operation_with_auto_recovery()
Manual Recovery Triggers¶
from adk.errors import RecoveryManager
recovery_manager = RecoveryManager()
# Register recovery procedures
@recovery_manager.register_recovery(AdkLLMError)
async def recover_from_llm_error(error: AdkLLMError, context: dict):
"""Manual recovery from LLM errors."""
if "rate_limit" in error.message.lower():
await asyncio.sleep(error.retry_after_seconds)
return await retry_operation(context)
elif "timeout" in error.message.lower():
return await retry_with_longer_timeout(context)
else:
return await use_fallback_service(context)
# Trigger recovery
try:
result = await risky_operation()
except AdkLLMError as e:
result = await recovery_manager.recover(e, context)
Testing Error Handling¶
Error Injection for Testing¶
from adk.errors.testing import ErrorInjector, ErrorScenario
# Create error scenarios for testing
error_injector = ErrorInjector([
ErrorScenario(
name="llm_timeout",
error_type=AdkTimeoutError,
probability=0.1, # 10% chance
condition=lambda context: context.get("model") == "gpt-4"
),
ErrorScenario(
name="rate_limit",
error_type=AdkRateLimitError,
probability=0.05, # 5% chance
retry_after=60
)
])
# Inject errors in test environment
@error_injector.inject_errors
async def test_operation():
"""Operation with error injection for testing."""
return await llm_service.call()
# Run tests
async def test_error_handling():
for _ in range(100):
try:
await test_operation()
except AdkError as e:
print(f"Handled error: {type(e).__name__}")
Chaos Engineering¶
from adk.errors.chaos import ChaosMonkey
# Configure chaos testing
chaos_monkey = ChaosMonkey({
"network_delay": {"probability": 0.1, "delay_ms": 1000},
"service_unavailable": {"probability": 0.05, "duration_seconds": 30},
"partial_failure": {"probability": 0.15, "success_rate": 0.7}
})
@chaos_monkey.apply_chaos
async def chaos_tested_operation():
"""Operation tested with chaos engineering."""
return await production_operation()
Best Practices¶
1. Error Classification¶
def classify_error(error: Exception) -> str:
"""Classify errors for appropriate handling."""
if isinstance(error, AdkSecurityError):
return "security" # Don't retry, alert immediately
elif isinstance(error, AdkRateLimitError):
return "rate_limit" # Retry with delay
elif isinstance(error, AdkTimeoutError):
return "timeout" # Retry with increased timeout
elif isinstance(error, AdkConfigError):
return "config" # Fix configuration, don't retry
else:
return "unknown" # Default handling
2. Error Budgets¶
from adk.errors import ErrorBudget
# Define error budget for service
error_budget = ErrorBudget(
budget_percentage=0.1, # 0.1% error rate allowed
time_window_hours=24, # Over 24 hours
action_on_exceeded="alert" # Alert when exceeded
)
@error_budget.track
async def budget_tracked_operation():
"""Operation tracked against error budget."""
return await service_call()
3. Graceful Shutdown¶
from adk.errors import GracefulShutdownHandler
shutdown_handler = GracefulShutdownHandler(
max_shutdown_time=30, # 30 seconds to graceful shutdown
save_state=True, # Save state before shutdown
notify_clients=True # Notify clients of shutdown
)
@shutdown_handler.on_shutdown
async def cleanup_resources():
"""Cleanup resources during graceful shutdown."""
await close_database_connections()
await save_pending_operations()
await notify_monitoring_systems()
🔗 Integration with JAF Core¶
Session Error Handling¶
from adk.types import ImmutableAdkSession
from adk.errors import SessionErrorHandler
session_error_handler = SessionErrorHandler()
@session_error_handler
async def safe_session_operation(session: ImmutableAdkSession):
"""Session operation with error handling."""
try:
return await process_session(session)
except AdkSessionError:
# Automatically handled by decorator
raise
Tool Error Handling¶
from adk.errors import ToolErrorHandler
tool_error_handler = ToolErrorHandler(
timeout_seconds=30,
retry_attempts=2,
fallback_response="Tool temporarily unavailable"
)
@tool_error_handler
async def safe_tool_execution(tool_call):
"""Tool execution with comprehensive error handling."""
return await execute_tool(tool_call)
🔗 Related Documentation¶
- ADK Overview - Complete ADK framework introduction
- Security Framework - Security and error prevention
- Session Management - Error-safe session handling
- Validation Suite - Testing error handling code
Production Resilience
JAF's error handling framework provides enterprise-grade resilience with circuit breakers, intelligent retries, and comprehensive monitoring. The system gracefully handles failures and maintains service availability under adverse conditions.