Ranvier Pattern Catalog
Version: 0.17 Updated: 2026-03-04
A reference of 12 core patterns for building Ranvier workflows. Each pattern includes a minimal code example and a link to the full example.
1. Basic Pipeline
Chain transitions linearly to build a processing pipeline.
let axon = Axon::<Input, Input, String>::new("pipeline")
.then(ValidateInput)
.then(ProcessData)
.then(FormatOutput);
let mut bus = Bus::new();
let result = axon.execute(input, &(), &mut bus).await;When to use: Sequential data transformation where each step depends on the previous.
Example: hello-world, typed-state-tree
2. Resource Injection
Inject shared dependencies (DB pools, config, external clients) via Resources.
use ranvier_core::transition::ResourceRequirement;
#[derive(Clone)]
struct DbPool { /* ... */ }
impl ResourceRequirement for DbPool {}
#[async_trait]
impl Transition<UserId, User> for FetchUser {
type Error = String;
type Resources = DbPool;
async fn run(&self, id: UserId, pool: &DbPool, _bus: &mut Bus) -> Outcome<User, String> {
let user = pool.query_user(&id).await;
Outcome::Next(user)
}
}
let pool = DbPool::new();
let result = axon.execute(input, &pool, &mut bus).await;When to use: When transitions share a common dependency like a database connection or API client.
Example: db-example, multitenancy-demo
3. Bus Capability Injection
Pass request-scoped context through the pipeline via the Bus.
// Before execution
let mut bus = Bus::new();
bus.insert(RequestId("req-abc".into()));
bus.insert(TenantId::new("tenant-1"));
// Inside transition
async fn run(&self, input: In, _res: &(), bus: &mut Bus) -> Outcome<Out, E> {
let tenant = bus.read::<TenantId>();
let req_id = bus.read::<RequestId>();
// ... use context
Outcome::Next(output)
}When to use: Per-request context (auth tokens, tenant IDs, correlation IDs, feature flags).
Example: bus-capability-demo, multitenancy-demo
4. Custom Error Types
Use thiserror + serde for structured, matchable domain errors.
#[derive(Debug, Clone, thiserror::Error, Serialize, Deserialize)]
enum OrderError {
#[error("item not found: {0}")]
ItemNotFound(String),
#[error("insufficient stock: need {need}, have {have}")]
InsufficientStock { need: u32, have: u32 },
#[error("payment declined: {0}")]
PaymentDeclined(String),
}
// Match on specific error variants
match result {
Outcome::Fault(OrderError::PaymentDeclined(reason)) => { /* retry or notify */ }
Outcome::Fault(OrderError::InsufficientStock { .. }) => { /* backorder */ }
_ => {}
}When to use: When you need domain-specific error handling beyond string messages.
Example: custom-error-types, order-processing-demo
5. Branching & Decision Trees
Use Outcome::Branch for conditional flow divergence.
async fn run(&self, order: Order, _res: &(), _bus: &mut Bus) -> Outcome<Order, E> {
if order.total > 10_000 {
return Outcome::Branch(
"high_value".into(),
Some(serde_json::to_value(&order).unwrap()),
);
}
Outcome::Next(order)
}When to use: Workflow decisions that route to different processing paths (approval flows, A/B testing, routing by type).
Example: order-processing-demo
6. Retry with Dead Letter Queue (DLQ)
Configure automatic retry with exponential backoff and DLQ fallback.
let axon = Axon::<Req, Req, String>::new("resilient")
.then(UnreliableStep)
.with_dlq_policy(DlqPolicy::RetryThenDlq {
max_attempts: 3,
backoff_ms: 100,
})
.with_dlq_sink(InMemoryDlqSink::new());
// After execution, check timeline for retry events
let timeline = bus.read::<Timeline>().unwrap();
for event in &timeline.events {
match event {
TimelineEvent::NodeRetry { .. } => { /* logged */ }
TimelineEvent::DlqExhausted { .. } => { /* alert */ }
_ => {}
}
}When to use: External service calls, network-dependent operations, anything that can fail transiently.
Example: retry-dlq-demo
7. State Persistence & Checkpoint
Persist workflow state for fault recovery and resumption.
use ranvier_runtime::{InMemoryPersistenceStore, PersistenceHandle, PersistenceTraceId, PersistenceAutoComplete};
let store = Arc::new(InMemoryPersistenceStore::new());
let handle = PersistenceHandle::from_arc(store.clone() as Arc<dyn PersistenceStore>);
// Execute with persistence — trace stays open on fault
let mut bus = ranvier_core::ranvier_bus!(
handle.clone(),
PersistenceTraceId::new("trace-001"),
PersistenceAutoComplete(false),
);
let result = axon.execute(input, &(), &mut bus).await;
// On fault: resume from last checkpoint
let trace = store.load("trace-001").await?.unwrap();
let cursor = store.resume("trace-001", trace.events.last().unwrap().step).await?;When to use: Long-running business workflows, payment processing, multi-step approval chains.
Example: state-persistence-demo
8. Compensation & Rollback
Register compensation hooks for automatic rollback on irrecoverable faults.
use ranvier_runtime::{CompensationHook, CompensationContext, CompensationHandle, CompensationRetryPolicy};
#[derive(Clone)]
struct RefundPayment;
#[async_trait]
impl CompensationHook for RefundPayment {
async fn compensate(&self, ctx: CompensationContext) -> anyhow::Result<()> {
println!("Refunding trace={} at step={}", ctx.trace_id, ctx.fault_step);
Ok(())
}
}
let mut bus = ranvier_core::ranvier_bus!(
handle,
PersistenceTraceId::new("order-123"),
CompensationHandle::from_hook(RefundPayment),
CompensationRetryPolicy { max_attempts: 2, backoff_ms: 100 },
);When to use: Saga patterns, distributed transactions, any workflow where partial completion requires cleanup.
Example: state-persistence-demo (phase 3)
9. Tenant Isolation
Use TenantId from ranvier_core::tenant for per-tenant data isolation.
use ranvier_core::tenant::TenantId;
// Inject tenant context via Bus
let mut bus = Bus::new();
bus.insert(TenantId::new("tenant-a"));
// Read in transition
async fn run(&self, input: In, store: &TenantStore, bus: &mut Bus) -> Outcome<Out, E> {
let tenant = bus.read::<TenantId>()
.ok_or_else(|| "missing tenant".to_string())?;
let data = store.list(tenant.as_str()).await;
Outcome::Next(data)
}When to use: SaaS multi-tenant applications with per-tenant data boundaries.
Example: multitenancy-demo
10. Unit Testing Transitions
Test transitions directly — they're just async functions.
#[tokio::test]
async fn test_validate_rejects_negative() {
let t = ValidateOrder;
let mut bus = Bus::new();
let input = OrderRequest { amount: -1, item: "Widget".into() };
let result = t.run(input, &(), &mut bus).await;
assert!(result.is_fault());
}
#[tokio::test]
async fn test_full_pipeline() {
let axon = Axon::<OrderRequest, OrderRequest, String>::new("test")
.then(ValidateOrder)
.then(ProcessOrder);
let mut bus = Bus::new();
let result = axon.execute(valid_input(), &(), &mut bus).await;
assert!(result.is_next());
}When to use: Every transition should have unit tests. Integration tests verify the full pipeline.
Example: testing-patterns (7 tests)
11. Application-Level Circuit Breaker
Implement circuit breaker logic at the application level using shared state.
use std::sync::atomic::{AtomicU32, Ordering};
#[derive(Clone)]
enum CircuitState { Closed, Open }
#[derive(Clone)]
struct CircuitBreakerGateway {
failure_count: Arc<AtomicU32>,
threshold: u32,
}
#[async_trait]
impl Transition<Request, Response> for CircuitBreakerGateway {
type Error = String;
type Resources = ();
async fn run(&self, req: Request, _: &(), _: &mut Bus) -> Outcome<Response, String> {
let failures = self.failure_count.load(Ordering::SeqCst);
if failures >= self.threshold {
return Outcome::Fault("circuit open".to_string());
}
match external_call(&req).await {
Ok(res) => {
self.failure_count.store(0, Ordering::SeqCst);
Outcome::Next(res)
}
Err(e) => {
self.failure_count.fetch_add(1, Ordering::SeqCst);
Outcome::Fault(e.to_string())
}
}
}
}When to use: Protecting against cascading failures from unreliable external services.
Example: retry-dlq-demo (demo 5)
12. Timeout Wrapping
Use tokio::time::timeout to wrap slow operations.
use tokio::time::{timeout, Duration};
#[async_trait]
impl Transition<Input, Output> for TimedStep {
type Error = String;
type Resources = ();
async fn run(&self, input: Input, _: &(), _: &mut Bus) -> Outcome<Output, String> {
match timeout(Duration::from_millis(500), slow_operation(&input)).await {
Ok(Ok(result)) => Outcome::Next(result),
Ok(Err(e)) => Outcome::Fault(e.to_string()),
Err(_) => Outcome::Fault("operation timed out".to_string()),
}
}
}When to use: Any operation with unpredictable latency (network calls, file I/O, database queries).
Example: retry-dlq-demo (demo 3-4)
Quick Reference
| # | Pattern | Key API | Difficulty |
|---|---|---|---|
| 1 | Basic Pipeline | Axon::new().then() |
Beginner |
| 2 | Resource Injection | ResourceRequirement, type Resources |
Beginner |
| 3 | Bus Capability | bus.insert(), bus.read() |
Beginner |
| 4 | Custom Errors | thiserror, Serialize + Deserialize |
Intermediate |
| 5 | Branching | Outcome::Branch |
Intermediate |
| 6 | Retry + DLQ | DlqPolicy::RetryThenDlq, DlqSink |
Intermediate |
| 7 | Persistence | PersistenceHandle, PersistenceTraceId |
Advanced |
| 8 | Compensation | CompensationHook, CompensationHandle |
Advanced |
| 9 | Tenant Isolation | TenantId, TenantStore |
Intermediate |
| 10 | Unit Testing | Transition::run(), Axon::execute() |
Beginner |
| 11 | Circuit Breaker | AtomicU32, application-level state |
Advanced |
| 12 | Timeout | tokio::time::timeout |
Intermediate |