#Ranvier 패턴 카탈로그
버전: 0.33.0 최종 업데이트: 2026-03-15 적용 대상: ranvier-core, ranvier-runtime, ranvier-std 카테고리: Operations
Ranvier 워크플로우를 구축하기 위한 12가지 핵심 패턴 레퍼런스입니다. 각 패턴에는 최소 코드 예제와 전체 예제 링크가 포함되어 있습니다.
#1. 기본 파이프라인
Transition을 선형으로 연결하여 처리 파이프라인을 구축합니다.
let axon = Axon::<Input, Input, String>::new("pipeline")
.then(ValidateInput)
.then(ProcessData)
.then(FormatOutput);
let mut bus = Bus::new();
let result = axon.execute(input, &(), &mut bus).await;사용 시점: 각 단계가 이전 단계에 의존하는 순차적 데이터 변환.
예제: hello-world, typed-state-tree
#2. 리소스 주입
Resources를 통해 공유 종속성(DB 풀, 설정, 외부 클라이언트)을 주입합니다.
use ranvier_core::transition::ResourceRequirement;
#[derive(Clone)]
struct DbPool { /* ... */ }
impl ResourceRequirement for DbPool {}
#[async_trait]
impl Transition<UserId, User> for FetchUser {
type Error = String;
type Resources = DbPool;
async fn run(&self, id: UserId, pool: &DbPool, _bus: &mut Bus) -> Outcome<User, String> {
let user = pool.query_user(&id).await;
Outcome::Next(user)
}
}
let pool = DbPool::new();
let result = axon.execute(input, &pool, &mut bus).await;사용 시점: Transition들이 데이터베이스 연결이나 API 클라이언트와 같은 공통 종속성을 공유할 때.
예제: db-example, multitenancy-demo
#3. Bus 기능 주입
Bus를 통해 요청 범위의 컨텍스트를 파이프라인에 전달합니다.
// 실행 전
let mut bus = Bus::new();
bus.insert(RequestId("req-abc".into()));
bus.insert(TenantId::new("tenant-1"));
// Transition 내부
async fn run(&self, input: In, _res: &(), bus: &mut Bus) -> Outcome<Out, E> {
let tenant = bus.read::<TenantId>();
let req_id = bus.read::<RequestId>();
// ... 컨텍스트 사용
Outcome::Next(output)
}사용 시점: 요청별 컨텍스트 (인증 토큰, 테넌트 ID, 상관관계 ID, 기능 플래그).
예제: bus-capability-demo, multitenancy-demo
#4. 커스텀 에러 타입
구조화되고 매칭 가능한 도메인 에러를 위해 thiserror + serde를 사용합니다.
#[derive(Debug, Clone, thiserror::Error, Serialize, Deserialize)]
enum OrderError {
#[error("item not found: {0}")]
ItemNotFound(String),
#[error("insufficient stock: need {need}, have {have}")]
InsufficientStock { need: u32, have: u32 },
#[error("payment declined: {0}")]
PaymentDeclined(String),
}
// 특정 에러 변형에 대한 매칭
match result {
Outcome::Fault(OrderError::PaymentDeclined(reason)) => { /* 재시도 또는 알림 */ }
Outcome::Fault(OrderError::InsufficientStock { .. }) => { /* 백오더 */ }
_ => {}
}사용 시점: 문자열 메시지를 넘어선 도메인별 에러 처리가 필요할 때.
예제: custom-error-types, order-processing-demo
#5. 분기 및 의사결정 트리
조건부 흐름 분기를 위해 Outcome::Branch를 사용합니다.
async fn run(&self, order: Order, _res: &(), _bus: &mut Bus) -> Outcome<Order, E> {
if order.total > 10_000 {
return Outcome::Branch(
"high_value".into(),
Some(serde_json::to_value(&order).unwrap()),
);
}
Outcome::Next(order)
}사용 시점: 다른 처리 경로로 라우팅하는 워크플로우 결정 (승인 흐름, A/B 테스트, 유형별 라우팅).
예제: order-processing-demo
#6. 재시도와 Dead Letter Queue (DLQ)
지수 백오프와 DLQ 폴백을 포함한 자동 재시도를 구성합니다.
let axon = Axon::<Req, Req, String>::new("resilient")
.then(UnreliableStep)
.with_dlq_policy(DlqPolicy::RetryThenDlq {
max_attempts: 3,
backoff_ms: 100,
})
.with_dlq_sink(InMemoryDlqSink::new());
// 실행 후, 타임라인에서 재시도 이벤트 확인
let timeline = bus.read::<Timeline>().unwrap();
for event in &timeline.events {
match event {
TimelineEvent::NodeRetry { .. } => { /* 기록됨 */ }
TimelineEvent::DlqExhausted { .. } => { /* 알림 */ }
_ => {}
}
}사용 시점: 외부 서비스 호출, 네트워크 종속 작업, 일시적으로 실패할 수 있는 모든 것.
예제: retry-dlq-demo
#7. 상태 영속화 및 체크포인트
장애 복구 및 재개를 위해 워크플로우 상태를 영속화합니다.
use ranvier_runtime::{InMemoryPersistenceStore, PersistenceHandle, PersistenceTraceId, PersistenceAutoComplete};
let store = Arc::new(InMemoryPersistenceStore::new());
let handle = PersistenceHandle::from_arc(store.clone() as Arc<dyn PersistenceStore>);
// 영속화와 함께 실행 — 장애 시 트레이스가 열린 상태로 유지
let mut bus = ranvier_core::ranvier_bus!(
handle.clone(),
PersistenceTraceId::new("trace-001"),
PersistenceAutoComplete(false),
);
let result = axon.execute(input, &(), &mut bus).await;
// 장애 시: 마지막 체크포인트에서 재개
let trace = store.load("trace-001").await?.unwrap();
let cursor = store.resume("trace-001", trace.events.last().unwrap().step).await?;사용 시점: 장기 실행 비즈니스 워크플로우, 결제 처리, 다단계 승인 체인.
예제: state-persistence-demo
#8. 보상 및 롤백
복구 불가능한 장애에 대한 자동 롤백을 위해 보상 훅을 등록합니다.
use ranvier_runtime::{CompensationHook, CompensationContext, CompensationHandle, CompensationRetryPolicy};
#[derive(Clone)]
struct RefundPayment;
#[async_trait]
impl CompensationHook for RefundPayment {
async fn compensate(&self, ctx: CompensationContext) -> anyhow::Result<()> {
println!("Refunding trace={} at step={}", ctx.trace_id, ctx.fault_step);
Ok(())
}
}
let mut bus = ranvier_core::ranvier_bus!(
handle,
PersistenceTraceId::new("order-123"),
CompensationHandle::from_hook(RefundPayment),
CompensationRetryPolicy { max_attempts: 2, backoff_ms: 100 },
);사용 시점: Saga 패턴, 분산 트랜잭션, 부분 완료 시 정리가 필요한 모든 워크플로우.
예제: state-persistence-demo (3단계)
#9. 테넌트 격리
테넌트별 데이터 격리를 위해 ranvier_core::tenant의 TenantId를 사용합니다.
use ranvier_core::tenant::TenantId;
// Bus를 통해 테넌트 컨텍스트 주입
let mut bus = Bus::new();
bus.insert(TenantId::new("tenant-a"));
// Transition에서 읽기
async fn run(&self, input: In, store: &TenantStore, bus: &mut Bus) -> Outcome<Out, E> {
let tenant = bus.read::<TenantId>()
.ok_or_else(|| "missing tenant".to_string())?;
let data = store.list(tenant.as_str()).await;
Outcome::Next(data)
}사용 시점: 테넌트별 데이터 경계가 있는 SaaS 멀티테넌트 애플리케이션.
예제: multitenancy-demo
#10. Transition 단위 테스트
Transition을 직접 테스트하세요 — 단순한 비동기 함수입니다.
#[tokio::test]
async fn test_validate_rejects_negative() {
let t = ValidateOrder;
let mut bus = Bus::new();
let input = OrderRequest { amount: -1, item: "Widget".into() };
let result = t.run(input, &(), &mut bus).await;
assert!(result.is_fault());
}
#[tokio::test]
async fn test_full_pipeline() {
let axon = Axon::<OrderRequest, OrderRequest, String>::new("test")
.then(ValidateOrder)
.then(ProcessOrder);
let mut bus = Bus::new();
let result = axon.execute(valid_input(), &(), &mut bus).await;
assert!(result.is_next());
}사용 시점: 모든 Transition에 단위 테스트가 있어야 합니다. 통합 테스트는 전체 파이프라인을 검증합니다.
예제: testing-patterns (7개 테스트)
#11. 애플리케이션 수준 서킷 브레이커
공유 상태를 사용하여 애플리케이션 수준에서 서킷 브레이커 로직을 구현합니다.
use std::sync::atomic::{AtomicU32, Ordering};
#[derive(Clone)]
enum CircuitState { Closed, Open }
#[derive(Clone)]
struct CircuitBreakerGateway {
failure_count: Arc<AtomicU32>,
threshold: u32,
}
#[async_trait]
impl Transition<Request, Response> for CircuitBreakerGateway {
type Error = String;
type Resources = ();
async fn run(&self, req: Request, _: &(), _: &mut Bus) -> Outcome<Response, String> {
let failures = self.failure_count.load(Ordering::SeqCst);
if failures >= self.threshold {
return Outcome::Fault("circuit open".to_string());
}
match external_call(&req).await {
Ok(res) => {
self.failure_count.store(0, Ordering::SeqCst);
Outcome::Next(res)
}
Err(e) => {
self.failure_count.fetch_add(1, Ordering::SeqCst);
Outcome::Fault(e.to_string())
}
}
}
}사용 시점: 신뢰할 수 없는 외부 서비스로 인한 연쇄 장애를 방지할 때.
예제: retry-dlq-demo (데모 5)
#12. 타임아웃 래핑
느린 작업을 래핑하기 위해 tokio::time::timeout을 사용합니다.
use tokio::time::{timeout, Duration};
#[async_trait]
impl Transition<Input, Output> for TimedStep {
type Error = String;
type Resources = ();
async fn run(&self, input: Input, _: &(), _: &mut Bus) -> Outcome<Output, String> {
match timeout(Duration::from_millis(500), slow_operation(&input)).await {
Ok(Ok(result)) => Outcome::Next(result),
Ok(Err(e)) => Outcome::Fault(e.to_string()),
Err(_) => Outcome::Fault("operation timed out".to_string()),
}
}
}사용 시점: 예측할 수 없는 지연시간을 가진 모든 작업 (네트워크 호출, 파일 I/O, 데이터베이스 쿼리).
예제: retry-dlq-demo (데모 3-4)
#빠른 참조
| # | 패턴 | 주요 API | 난이도 |
|---|---|---|---|
| 1 | 기본 파이프라인 | Axon::new().then() |
초급 |
| 2 | 리소스 주입 | ResourceRequirement, type Resources |
초급 |
| 3 | Bus 기능 | bus.insert(), bus.read() |
초급 |
| 4 | 커스텀 에러 | thiserror, Serialize + Deserialize |
중급 |
| 5 | 분기 | Outcome::Branch |
중급 |
| 6 | 재시도 + DLQ | DlqPolicy::RetryThenDlq, DlqSink |
중급 |
| 7 | 영속화 | PersistenceHandle, PersistenceTraceId |
고급 |
| 8 | 보상 | CompensationHook, CompensationHandle |
고급 |
| 9 | 테넌트 격리 | TenantId, TenantStore |
중급 |
| 10 | 단위 테스트 | Transition::run(), Axon::execute() |
초급 |
| 11 | 서킷 브레이커 | AtomicU32, 애플리케이션 수준 상태 |
고급 |
| 12 | 타임아웃 | tokio::time::timeout |
중급 |