Workflow Orchestration¶
JAF's workflow orchestration system enables the creation of complex, multi-step automation processes that can coordinate multiple agents, tools, and conditional logic. This system provides sophisticated workflow capabilities for enterprise scenarios.
Overview¶
The workflow system provides:
- Step-based Execution: Define workflows as sequences of executable steps
- Conditional Logic: Branch workflow execution based on runtime conditions
- Parallel Processing: Execute multiple steps simultaneously for improved performance
- Error Handling: Robust error recovery and retry mechanisms
- State Management: Maintain workflow state and context across execution steps
- Agent Integration: Seamless integration with JAF agents and tools
Core Components¶
Workflow Definition¶
Workflows are created using the Workflow class and WorkflowBuilder:
from jaf.core.workflows import Workflow, WorkflowBuilder, WorkflowContext
from jaf.core.workflows import AgentStep, ToolStep, ConditionalStep, ParallelStep
from jaf import Agent, Tool
# Create a simple workflow
workflow = Workflow(
workflow_id="customer_onboarding",
name="Customer Onboarding Process",
description="Complete customer onboarding workflow"
)
# Add steps to workflow
agent_step = AgentStep("welcome_step", welcome_agent, "Welcome new customer")
workflow.add_step(agent_step)
# Or use the builder pattern
workflow = WorkflowBuilder("customer_onboarding", "Customer Onboarding") \
.add_agent_step("welcome", welcome_agent, "Welcome new customer") \
.add_tool_step("create_account", account_tool, {"type": "standard"}) \
.build()
Workflow Execution¶
Execute workflows with comprehensive monitoring and control:
from jaf.core.workflows import Workflow, WorkflowContext
# Create execution context
context = WorkflowContext(
workflow_id="onboarding_001",
user_context={"customer_id": "cust_12345"},
variables={
"customer_email": "john@example.com",
"verification_status": "pending"
},
metadata={
"started_by": "system",
"priority": "high"
}
)
# Execute workflow
result = await workflow.execute(context)
print(f"Workflow Status: {result.status}")
print(f"Execution Time: {result.total_execution_time_ms}ms")
print(f"Steps Completed: {len(result.steps)}")
print(f"Success Rate: {result.success_rate}%")
Step Types¶
AgentStep¶
Execute JAF agents within workflows:
from jaf.core.workflows import AgentStep
from jaf import Agent
# Create an agent
def instructions(state):
return "You are a helpful customer service agent."
customer_agent = Agent(
name="CustomerServiceAgent",
instructions=instructions,
tools=[]
)
# Create agent step
agent_step = AgentStep(
step_id="customer_service",
agent=customer_agent,
message="Handle customer inquiry about billing"
)
# Configure step options
agent_step.with_timeout(60).with_retry(3)
# Add execution conditions
agent_step.add_condition(lambda context: context.variables.get("priority") == "high")
ToolStep¶
Execute tools with parameter mapping:
from jaf.core.workflows import ToolStep
from jaf import Tool
# Create a tool
email_tool = Tool(
name="send_email",
description="Send email to customer",
# ... tool implementation
)
# Create tool step
tool_step = ToolStep(
step_id="send_welcome_email",
tool=email_tool,
args={
"to": "customer@example.com",
"subject": "Welcome to our service",
"template": "welcome_email"
}
)
# Configure step options
tool_step.with_timeout(30).with_retry(2)
ConditionalStep¶
Branch execution based on runtime conditions:
from jaf.core.workflows import ConditionalStep, AgentStep, ToolStep
# Create conditional step
conditional_step = ConditionalStep(
step_id="payment_check",
condition=lambda context: context.variables.get("payment_amount", 0) > 1000,
true_step=AgentStep("approval", high_value_agent, "Review high-value transaction"),
false_step=ToolStep("auto_approve", auto_approve_tool, {"auto_approve": True})
)
ParallelStep¶
Execute multiple steps simultaneously:
from jaf.core.workflows import ParallelStep, ToolStep, AgentStep
# Create steps for parallel execution
create_record_step = ToolStep("create_record", database_tool, {"table": "customers"})
send_email_step = ToolStep("send_email", email_tool, {"template": "welcome"})
assign_manager_step = AgentStep("assign_manager", manager_agent, "Assign account manager")
# Create parallel step
parallel_step = ParallelStep(
step_id="customer_setup",
steps=[create_record_step, send_email_step, assign_manager_step],
wait_for_all=True # Wait for all steps to complete
)
# Configure timeout
parallel_step.with_timeout(120)
LoopStep¶
Iterate over data or repeat until conditions are met:
from jaf.core.workflows import LoopStep, ToolStep
# Create step to execute in loop
process_step = ToolStep("process_item", processing_tool, {})
# Create loop step with condition
loop_step = LoopStep(
step_id="process_orders",
step=process_step,
condition=lambda context, iteration: iteration < len(context.variables.get("orders", [])),
max_iterations=10
)
# Configure timeout
loop_step.with_timeout(300)
Advanced Features¶
Error Handling and Recovery¶
Workflows support built-in error handling and retry mechanisms:
from jaf.core.workflows import Workflow, AgentStep, ToolStep
# Create workflow with error handling
workflow = Workflow("payment_processing", "Payment Processing")
# Add error handler for specific step
def payment_error_handler(step_result, context):
if "timeout" in step_result.error:
# Return a retry step
return ToolStep("retry_payment", payment_tool, {"retry": True})
else:
# Return a fallback step
return AgentStep("manual_review", review_agent, "Manual payment review required")
workflow.add_error_handler("process_payment", payment_error_handler)
# Configure step-level retry
payment_step = ToolStep("process_payment", payment_tool, {"amount": 100})
payment_step.with_retry(max_retries=3).with_timeout(30)
workflow.add_step(payment_step)
State Management¶
Workflow context maintains state across execution steps:
from jaf.core.workflows import WorkflowContext
# Create context with initial state
context = WorkflowContext(
workflow_id="customer_onboarding",
user_context={"customer_id": "12345"},
variables={
"customer_tier": "premium",
"verification_status": "pending"
}
)
# Context is automatically updated with step results
# Access updated context after execution
result = await workflow.execute(context)
final_context = result.context
print(f"Final variables: {final_context.variables}")
print(f"Step results: {[step.output for step in result.steps]}")
Streaming Execution¶
Monitor workflow execution in real-time:
from jaf.core.workflows import execute_workflow_stream
# Stream workflow execution
async for step_result in execute_workflow_stream(workflow, context):
print(f"Step {step_result.step_id}: {step_result.status}")
if step_result.is_success:
print(f" Output: {step_result.output}")
elif step_result.is_failure:
print(f" Error: {step_result.error}")
print(f" Execution time: {step_result.execution_time_ms}ms")
Best Practices¶
1. Design for Idempotency¶
Ensure workflow steps can be safely retried:
# Good: Idempotent step
idempotent_step = ToolStep("create_user_account") \
.with_params_function(
lambda state: {
"user_id": state["user_id"],
"email": state["email"],
"upsert": True # Create or update
}
)
# Good: Check before action
safe_step = ConditionalStep("create_if_not_exists") \
.condition(lambda state: not user_exists(state["user_id"])) \
.if_true(ToolStep("create_user_account"))
2. Handle Partial Failures¶
Design workflows to handle partial failures gracefully:
# Parallel step with failure handling
robust_parallel = ParallelStep("multi_system_update") \
.add_branch(ToolStep("update_crm").with_retry(max_attempts=3)) \
.add_branch(ToolStep("update_billing").with_retry(max_attempts=3)) \
.add_branch(ToolStep("update_analytics").with_retry(max_attempts=2)) \
.with_failure_policy("continue_on_partial_failure") \
.with_minimum_success_count(2) # Require at least 2 successes
3. Use Timeouts Appropriately¶
Set realistic timeouts for all steps:
# Different timeouts for different step types
workflow = WorkflowBuilder("data_processing") \
.add_step(
ToolStep("quick_validation")
.with_timeout(10) # Fast operation
) \
.add_step(
ToolStep("heavy_computation")
.with_timeout(300) # Allow 5 minutes
) \
.add_step(
AgentStep("human_review")
.with_timeout(3600) # Allow 1 hour
) \
.build()
4. Implement Proper Logging¶
Add comprehensive logging for debugging:
from jaf.core.workflows import WorkflowLogger
# Custom logger
class DetailedWorkflowLogger(WorkflowLogger):
def log_step_start(self, step_name, state):
logger.info(f"Starting step: {step_name}", extra={
"workflow_id": state.workflow_id,
"step_name": step_name,
"state_keys": list(state.data.keys())
})
def log_step_complete(self, step_name, result, duration_ms):
logger.info(f"Completed step: {step_name} in {duration_ms}ms", extra={
"step_name": step_name,
"duration_ms": duration_ms,
"success": result.success
})
# Use custom logger
workflow = WorkflowBuilder("logged_workflow") \
.with_logger(DetailedWorkflowLogger()) \
.build()
Example: Complete E-commerce Order Processing¶
Here's a comprehensive example showing a complete e-commerce order processing workflow:
import asyncio
from jaf.core.workflows import WorkflowBuilder, WorkflowEngine, WorkflowContext
from jaf.core.workflows import AgentStep, ToolStep, ConditionalStep, ParallelStep
async def create_order_processing_workflow():
"""Create a comprehensive order processing workflow."""
return WorkflowBuilder("ecommerce_order_processing") \
.description("Complete e-commerce order processing pipeline") \
.add_step(
# Step 1: Validate order
ToolStep("validate_order")
.with_params_function(
lambda state: {"order_id": state["order_id"]}
)
.with_timeout(30)
.with_retry(max_attempts=3)
) \
.add_step(
# Step 2: Check inventory
ConditionalStep("inventory_check")
.condition(lambda state: state.get("order_valid", False))
.if_true(
ToolStep("check_inventory")
.with_params_function(
lambda state: {"items": state["order_items"]}
)
)
.if_false(
AgentStep("order_validation_agent")
.with_input("Handle invalid order")
)
) \
.add_step(
# Step 3: Process payment
ConditionalStep("payment_processing")
.condition(lambda state: state.get("inventory_available", False))
.if_true(
ToolStep("process_payment")
.with_params_function(
lambda state: {
"amount": state["order_total"],
"payment_method": state["payment_method"]
}
)
.with_timeout(60)
.with_retry(max_attempts=2)
)
.if_false(
AgentStep("inventory_agent")
.with_input("Handle out of stock items")
)
) \
.add_step(
# Step 4: Parallel fulfillment
ConditionalStep("fulfillment_check")
.condition(lambda state: state.get("payment_successful", False))
.if_true(
ParallelStep("order_fulfillment")
.add_branch(
# Update inventory
ToolStep("update_inventory")
.with_params_function(
lambda state: {"items": state["order_items"]}
)
)
.add_branch(
# Generate shipping label
ToolStep("create_shipping_label")
.with_params_function(
lambda state: {
"address": state["shipping_address"],
"items": state["order_items"]
}
)
)
.add_branch(
# Send confirmation email
ToolStep("send_confirmation_email")
.with_params_function(
lambda state: {
"customer_email": state["customer_email"],
"order_id": state["order_id"]
}
)
)
.add_branch(
# Update CRM
ToolStep("update_crm")
.with_params_function(
lambda state: {
"customer_id": state["customer_id"],
"order_value": state["order_total"]
}
)
)
.with_timeout(120)
.with_failure_policy("continue_on_partial_failure")
.with_minimum_success_count(3)
)
) \
.add_step(
# Step 5: Final notification
AgentStep("fulfillment_agent")
.with_input_function(
lambda state: f"Complete order fulfillment for order {state['order_id']}"
)
.with_context_function(
lambda state: {
"order_status": state.get("fulfillment_status", "unknown"),
"customer_tier": state.get("customer_tier", "standard")
}
)
) \
.with_error_handler(
lambda error, step, state: {
"action": "retry" if "network" in str(error).lower() else "fail",
"escalate": True,
"notify_team": True
}
) \
.build()
async def main():
"""Demonstrate the complete order processing workflow."""
# Create workflow
workflow = await create_order_processing_workflow()
# Create engine
engine = WorkflowEngine()
# Create execution context
context = WorkflowContext(
workflow_id="order_12345",
initial_data={
"order_id": "ORD-12345",
"customer_id": "CUST-67890",
"customer_email": "customer@example.com",
"order_items": [
{"sku": "ITEM-001", "quantity": 2, "price": 29.99},
{"sku": "ITEM-002", "quantity": 1, "price": 49.99}
],
"order_total": 109.97,
"payment_method": "credit_card",
"shipping_address": {
"street": "123 Main St",
"city": "Anytown",
"state": "CA",
"zip": "12345"
},
"customer_tier": "premium"
},
metadata={
"started_by": "order_service",
"priority": "normal",
"source": "web"
}
)
# Execute workflow
print("🚀 Starting order processing workflow...")
result = await engine.execute_workflow(workflow, context)
# Display results
print(f"✅ Workflow completed with status: {result.status}")
print(f"⏱️ Total execution time: {result.execution_time_ms}ms")
print(f"📊 Steps completed: {result.steps_completed}/{result.total_steps}")
if result.status == "failed":
print(f"❌ Error: {result.error}")
else:
print(f"🎉 Order {context.initial_data['order_id']} processed successfully!")
if __name__ == "__main__":
asyncio.run(main())
The workflow orchestration system provides the foundation for building sophisticated, enterprise-grade automation that can handle complex business processes with reliability and observability.
Next Steps¶
- Learn about Analytics System for workflow monitoring
- Explore Performance Monitoring for optimization
- Check Streaming Responses for real-time updates
- Review Plugin System for extensibility