Ranvier Pattern Catalog

Version: 0.17 Updated: 2026-03-04

A reference of 12 core patterns for building Ranvier workflows. Each pattern includes a minimal code example and a link to the full example.


1. Basic Pipeline

Chain transitions linearly to build a processing pipeline.

let axon = Axon::<Input, Input, String>::new("pipeline")
    .then(ValidateInput)
    .then(ProcessData)
    .then(FormatOutput);

let mut bus = Bus::new();
let result = axon.execute(input, &(), &mut bus).await;

When to use: Sequential data transformation where each step depends on the previous.

Example: hello-world, typed-state-tree


2. Resource Injection

Inject shared dependencies (DB pools, config, external clients) via Resources.

use ranvier_core::transition::ResourceRequirement;

#[derive(Clone)]
struct DbPool { /* ... */ }
impl ResourceRequirement for DbPool {}

#[async_trait]
impl Transition<UserId, User> for FetchUser {
    type Error = String;
    type Resources = DbPool;

    async fn run(&self, id: UserId, pool: &DbPool, _bus: &mut Bus) -> Outcome<User, String> {
        let user = pool.query_user(&id).await;
        Outcome::Next(user)
    }
}

let pool = DbPool::new();
let result = axon.execute(input, &pool, &mut bus).await;

When to use: When transitions share a common dependency like a database connection or API client.

Example: db-example, multitenancy-demo


3. Bus Capability Injection

Pass request-scoped context through the pipeline via the Bus.

// Before execution
let mut bus = Bus::new();
bus.insert(RequestId("req-abc".into()));
bus.insert(TenantId::new("tenant-1"));

// Inside transition
async fn run(&self, input: In, _res: &(), bus: &mut Bus) -> Outcome<Out, E> {
    let tenant = bus.read::<TenantId>();
    let req_id = bus.read::<RequestId>();
    // ... use context
    Outcome::Next(output)
}

When to use: Per-request context (auth tokens, tenant IDs, correlation IDs, feature flags).

Example: bus-capability-demo, multitenancy-demo


4. Custom Error Types

Use thiserror + serde for structured, matchable domain errors.

#[derive(Debug, Clone, thiserror::Error, Serialize, Deserialize)]
enum OrderError {
    #[error("item not found: {0}")]
    ItemNotFound(String),
    #[error("insufficient stock: need {need}, have {have}")]
    InsufficientStock { need: u32, have: u32 },
    #[error("payment declined: {0}")]
    PaymentDeclined(String),
}

// Match on specific error variants
match result {
    Outcome::Fault(OrderError::PaymentDeclined(reason)) => { /* retry or notify */ }
    Outcome::Fault(OrderError::InsufficientStock { .. }) => { /* backorder */ }
    _ => {}
}

When to use: When you need domain-specific error handling beyond string messages.

Example: custom-error-types, order-processing-demo


5. Branching & Decision Trees

Use Outcome::Branch for conditional flow divergence.

async fn run(&self, order: Order, _res: &(), _bus: &mut Bus) -> Outcome<Order, E> {
    if order.total > 10_000 {
        return Outcome::Branch(
            "high_value".into(),
            Some(serde_json::to_value(&order).unwrap()),
        );
    }
    Outcome::Next(order)
}

When to use: Workflow decisions that route to different processing paths (approval flows, A/B testing, routing by type).

Example: order-processing-demo


6. Retry with Dead Letter Queue (DLQ)

Configure automatic retry with exponential backoff and DLQ fallback.

let axon = Axon::<Req, Req, String>::new("resilient")
    .then(UnreliableStep)
    .with_dlq_policy(DlqPolicy::RetryThenDlq {
        max_attempts: 3,
        backoff_ms: 100,
    })
    .with_dlq_sink(InMemoryDlqSink::new());

// After execution, check timeline for retry events
let timeline = bus.read::<Timeline>().unwrap();
for event in &timeline.events {
    match event {
        TimelineEvent::NodeRetry { .. } => { /* logged */ }
        TimelineEvent::DlqExhausted { .. } => { /* alert */ }
        _ => {}
    }
}

When to use: External service calls, network-dependent operations, anything that can fail transiently.

Example: retry-dlq-demo


7. State Persistence & Checkpoint

Persist workflow state for fault recovery and resumption.

use ranvier_runtime::{InMemoryPersistenceStore, PersistenceHandle, PersistenceTraceId, PersistenceAutoComplete};

let store = Arc::new(InMemoryPersistenceStore::new());
let handle = PersistenceHandle::from_arc(store.clone() as Arc<dyn PersistenceStore>);

// Execute with persistence — trace stays open on fault
let mut bus = ranvier_core::ranvier_bus!(
    handle.clone(),
    PersistenceTraceId::new("trace-001"),
    PersistenceAutoComplete(false),
);
let result = axon.execute(input, &(), &mut bus).await;

// On fault: resume from last checkpoint
let trace = store.load("trace-001").await?.unwrap();
let cursor = store.resume("trace-001", trace.events.last().unwrap().step).await?;

When to use: Long-running business workflows, payment processing, multi-step approval chains.

Example: state-persistence-demo


8. Compensation & Rollback

Register compensation hooks for automatic rollback on irrecoverable faults.

use ranvier_runtime::{CompensationHook, CompensationContext, CompensationHandle, CompensationRetryPolicy};

#[derive(Clone)]
struct RefundPayment;

#[async_trait]
impl CompensationHook for RefundPayment {
    async fn compensate(&self, ctx: CompensationContext) -> anyhow::Result<()> {
        println!("Refunding trace={} at step={}", ctx.trace_id, ctx.fault_step);
        Ok(())
    }
}

let mut bus = ranvier_core::ranvier_bus!(
    handle,
    PersistenceTraceId::new("order-123"),
    CompensationHandle::from_hook(RefundPayment),
    CompensationRetryPolicy { max_attempts: 2, backoff_ms: 100 },
);

When to use: Saga patterns, distributed transactions, any workflow where partial completion requires cleanup.

Example: state-persistence-demo (phase 3)


9. Tenant Isolation

Use TenantId from ranvier_core::tenant for per-tenant data isolation.

use ranvier_core::tenant::TenantId;

// Inject tenant context via Bus
let mut bus = Bus::new();
bus.insert(TenantId::new("tenant-a"));

// Read in transition
async fn run(&self, input: In, store: &TenantStore, bus: &mut Bus) -> Outcome<Out, E> {
    let tenant = bus.read::<TenantId>()
        .ok_or_else(|| "missing tenant".to_string())?;
    let data = store.list(tenant.as_str()).await;
    Outcome::Next(data)
}

When to use: SaaS multi-tenant applications with per-tenant data boundaries.

Example: multitenancy-demo


10. Unit Testing Transitions

Test transitions directly — they're just async functions.

#[tokio::test]
async fn test_validate_rejects_negative() {
    let t = ValidateOrder;
    let mut bus = Bus::new();

    let input = OrderRequest { amount: -1, item: "Widget".into() };
    let result = t.run(input, &(), &mut bus).await;

    assert!(result.is_fault());
}

#[tokio::test]
async fn test_full_pipeline() {
    let axon = Axon::<OrderRequest, OrderRequest, String>::new("test")
        .then(ValidateOrder)
        .then(ProcessOrder);

    let mut bus = Bus::new();
    let result = axon.execute(valid_input(), &(), &mut bus).await;
    assert!(result.is_next());
}

When to use: Every transition should have unit tests. Integration tests verify the full pipeline.

Example: testing-patterns (7 tests)


11. Application-Level Circuit Breaker

Implement circuit breaker logic at the application level using shared state.

use std::sync::atomic::{AtomicU32, Ordering};

#[derive(Clone)]
enum CircuitState { Closed, Open }

#[derive(Clone)]
struct CircuitBreakerGateway {
    failure_count: Arc<AtomicU32>,
    threshold: u32,
}

#[async_trait]
impl Transition<Request, Response> for CircuitBreakerGateway {
    type Error = String;
    type Resources = ();

    async fn run(&self, req: Request, _: &(), _: &mut Bus) -> Outcome<Response, String> {
        let failures = self.failure_count.load(Ordering::SeqCst);
        if failures >= self.threshold {
            return Outcome::Fault("circuit open".to_string());
        }
        match external_call(&req).await {
            Ok(res) => {
                self.failure_count.store(0, Ordering::SeqCst);
                Outcome::Next(res)
            }
            Err(e) => {
                self.failure_count.fetch_add(1, Ordering::SeqCst);
                Outcome::Fault(e.to_string())
            }
        }
    }
}

When to use: Protecting against cascading failures from unreliable external services.

Example: retry-dlq-demo (demo 5)


12. Timeout Wrapping

Use tokio::time::timeout to wrap slow operations.

use tokio::time::{timeout, Duration};

#[async_trait]
impl Transition<Input, Output> for TimedStep {
    type Error = String;
    type Resources = ();

    async fn run(&self, input: Input, _: &(), _: &mut Bus) -> Outcome<Output, String> {
        match timeout(Duration::from_millis(500), slow_operation(&input)).await {
            Ok(Ok(result)) => Outcome::Next(result),
            Ok(Err(e)) => Outcome::Fault(e.to_string()),
            Err(_) => Outcome::Fault("operation timed out".to_string()),
        }
    }
}

When to use: Any operation with unpredictable latency (network calls, file I/O, database queries).

Example: retry-dlq-demo (demo 3-4)


Quick Reference

# Pattern Key API Difficulty
1 Basic Pipeline Axon::new().then() Beginner
2 Resource Injection ResourceRequirement, type Resources Beginner
3 Bus Capability bus.insert(), bus.read() Beginner
4 Custom Errors thiserror, Serialize + Deserialize Intermediate
5 Branching Outcome::Branch Intermediate
6 Retry + DLQ DlqPolicy::RetryThenDlq, DlqSink Intermediate
7 Persistence PersistenceHandle, PersistenceTraceId Advanced
8 Compensation CompensationHook, CompensationHandle Advanced
9 Tenant Isolation TenantId, TenantStore Intermediate
10 Unit Testing Transition::run(), Axon::execute() Beginner
11 Circuit Breaker AtomicU32, application-level state Advanced
12 Timeout tokio::time::timeout Intermediate