Pattern Catalog
12 patterns for every workflow.
A quick-reference catalog of battle-tested Ranvier patterns. Each card shows a minimal code example and links to the full example in the repository.
1. Basic Pipeline
BeginnerChain transitions linearly to build a processing pipeline.
let axon = Axon::simple::<String>("pipeline")
.then(ValidateInput)
.then(ProcessData)
.then(FormatOutput);
let result = axon.execute(input, &(), &mut bus).await;Examples: hello-world, typed-state-tree
2. Resource Injection
BeginnerInject shared dependencies (DB pools, config) via Resources.
impl ResourceRequirement for DbPool {}
impl Transition<UserId, User> for FetchUser {
type Resources = DbPool;
async fn run(&self, id: UserId, pool: &DbPool, _bus: &mut Bus)
-> Outcome<User, String> {
Outcome::Next(pool.query_user(&id).await)
}
}Examples: db-example, multitenancy-demo
3. Bus Capability
BeginnerPass request-scoped context through the pipeline via the type-indexed Bus.
bus.insert(RequestId("req-abc".into()));
bus.insert(TenantId::new("tenant-1"));
// Inside transition
let tenant = bus.read::<TenantId>();
let req_id = bus.read::<RequestId>();Examples: bus-capability-demo, multitenancy-demo
4. Custom Errors
IntermediateUse thiserror + serde for structured, matchable domain errors.
#[derive(Error, Serialize, Deserialize)]
enum OrderError {
#[error("item not found: {0}")]
ItemNotFound(String),
#[error("payment declined: {0}")]
PaymentDeclined(String),
}Examples: custom-error-types, order-processing-demo
5. Branching
IntermediateUse Outcome::Branch for conditional flow divergence.
if order.total > 10_000 {
return Outcome::Branch(
"high_value".into(),
Some(serde_json::to_value(&order).unwrap()),
);
}
Outcome::Next(order)Example: order-processing-demo
6. Retry + DLQ
IntermediateAutomatic retry with exponential backoff and dead-letter queue fallback.
axon.with_dlq_policy(DlqPolicy::RetryThenDlq {
max_attempts: 3,
backoff_ms: 100,
})
.with_dlq_sink(InMemoryDlqSink::new());Example: retry-dlq-demo
7. Persistence
AdvancedPersist workflow state for fault recovery and checkpoint/resume.
let mut bus = ranvier_bus!(
handle.clone(),
PersistenceTraceId::new("trace-001"),
PersistenceAutoComplete(false),
);
let result = axon.execute(input, &(), &mut bus).await;Example: state-persistence-demo
8. Compensation
AdvancedRegister compensation hooks for automatic rollback on faults (saga pattern).
impl CompensationHook for RefundPayment {
async fn compensate(&self, ctx: CompensationContext)
-> anyhow::Result<()> {
println!("Refunding trace={}", ctx.trace_id);
Ok(())
}
}Example: state-persistence-demo (phase 3)
9. Tenant Isolation
IntermediatePer-tenant data boundaries via TenantId from the multitenancy extension.
bus.insert(TenantId::new("tenant-a"));
// Inside transition
let tenant = bus.read::<TenantId>()
.ok_or_else(|| "missing tenant".to_string())?;
let data = store.list(tenant.as_str()).await;Example: multitenancy-demo
10. Unit Testing
BeginnerTransitions are async functions — test them directly or test the full pipeline.
#[tokio::test]
async fn test_pipeline() {
let result = axon.execute(input, &(), &mut bus).await;
assert!(result.is_next());
}Example: testing-patterns (7 tests)
11. Circuit Breaker
AdvancedApplication-level circuit breaker using shared atomic state.
let failures = self.failure_count.load(Ordering::SeqCst);
if failures >= self.threshold {
return Outcome::Fault("circuit open".to_string());
}Example: retry-dlq-demo (demo 5)
12. Timeout
IntermediateWrap slow operations with tokio::time::timeout.
match timeout(Duration::from_millis(500), slow_op(&input)).await {
Ok(Ok(result)) => Outcome::Next(result),
Ok(Err(e)) => Outcome::Fault(e.to_string()),
Err(_) => Outcome::Fault("timed out".to_string()),
}Example: retry-dlq-demo (demo 3-4)