Skip to content

Sagaパターン完全ガイド

分散システムにおけるトランザクション管理のためのSagaパターンを詳しく解説します。

🎯 なぜSagaパターンが必要なのか

Section titled “🎯 なぜSagaパターンが必要なのか”

❌ 分散トランザクション(2PC)の問題点

Section titled “❌ 分散トランザクション(2PC)の問題点”

❌ 問題のある実装:

// ❌ 悪い例: 2フェーズコミット(2PC)を使用
async function createOrderWithPayment(orderData: OrderData) {
const coordinator = new TransactionCoordinator();
try {
// フェーズ1: 準備
await coordinator.prepare('order-service', () => createOrder(orderData));
await coordinator.prepare('payment-service', () => chargePayment(orderData));
await coordinator.prepare('inventory-service', () => reduceInventory(orderData));
// フェーズ2: コミット
await coordinator.commit('order-service');
await coordinator.commit('payment-service');
await coordinator.commit('inventory-service');
} catch (error) {
// ロールバック
await coordinator.rollback('order-service');
await coordinator.rollback('payment-service');
await coordinator.rollback('inventory-service');
throw error;
}
}

問題点:

  1. パフォーマンス: すべてのサービスが準備完了するまで待機する必要がある
  2. 可用性: 1つのサービスが失敗すると、全体がロールバックされる
  3. ロック時間: 長時間ロックが保持される
  4. スケーラビリティ: マイクロサービス間の緊密な結合が必要

影響:

  • パフォーマンスの低下
  • 可用性の低下
  • スケーラビリティの制限
  • 障害の伝播

改善された実装:

// ✅ 良い例: Sagaパターンを使用
async function createOrderWithPayment(orderData: OrderData) {
const saga = new OrderSaga();
try {
// ステップ1: 注文を作成
const order = await saga.step1_createOrder(orderData);
// ステップ2: 決済を処理
await saga.step2_processPayment(order.id, orderData.amount);
// ステップ3: 在庫を減らす
await saga.step3_reduceInventory(order.id, orderData.items);
return order;
} catch (error) {
// 補償トランザクションを実行
await saga.compensate();
throw error;
}
}

メリット:

  • パフォーマンスの向上: 各ステップを独立して実行
  • 可用性の向上: 1つのサービスが失敗しても、他のサービスに影響しない
  • スケーラビリティ: サービス間の疎結合
  • 障害の分離: 補償トランザクションで整合性を保つ

1. Orchestration(オーケストレーション)

Section titled “1. Orchestration(オーケストレーション)”

定義: 中央のオーケストレーターが各サービスの呼び出しを制御するパターンです。

実装例:

class OrderSagaOrchestrator {
private compensations: Array<() => Promise<void>> = [];
async execute(orderData: OrderData): Promise<Order> {
try {
// ステップ1: 注文を作成
const order = await this.orderService.createOrder(orderData);
this.compensations.push(() => this.orderService.cancelOrder(order.id));
// ステップ2: 決済を処理
await this.paymentService.chargePayment(order.id, orderData.amount);
this.compensations.push(() => this.paymentService.refundPayment(order.id));
// ステップ3: 在庫を減らす
await this.inventoryService.reduceInventory(order.id, orderData.items);
this.compensations.push(() => this.inventoryService.restoreInventory(order.id, orderData.items));
return order;
} catch (error) {
// 補償トランザクションを逆順で実行
for (const compensate of this.compensations.reverse()) {
try {
await compensate();
} catch (compError) {
// 補償トランザクションの失敗をログに記録
console.error('Compensation failed:', compError);
}
}
throw error;
}
}
}

メリット:

  • 制御が明確: 中央で制御されるため、フローが明確
  • エラーハンドリングが容易: 中央でエラーを処理できる
  • テストが容易: 中央のオーケストレーターをテストすればよい

デメリット:

  • 中央のオーケストレーターがボトルネックになる可能性
  • オーケストレーターの可用性が重要

2. Choreography(コレオグラフィー)

Section titled “2. Choreography(コレオグラフィー)”

定義: 各サービスがイベントを発行し、他のサービスがそれに反応するパターンです。

実装例:

// Order Service
class OrderService {
async createOrder(orderData: OrderData): Promise<Order> {
const order = await this.orderRepository.save({
...orderData,
status: 'PENDING',
});
// イベントを発行
await this.eventBus.publish('order.created', {
orderId: order.id,
userId: order.userId,
amount: order.amount,
items: order.items,
});
return order;
}
async handlePaymentCompleted(event: PaymentCompletedEvent): Promise<void> {
await this.orderRepository.update(event.orderId, {
status: 'PAID',
});
// 次のイベントを発行
await this.eventBus.publish('order.paid', {
orderId: event.orderId,
items: event.items,
});
}
async handleInventoryReserved(event: InventoryReservedEvent): Promise<void> {
await this.orderRepository.update(event.orderId, {
status: 'CONFIRMED',
});
}
async handlePaymentFailed(event: PaymentFailedEvent): Promise<void> {
await this.orderRepository.update(event.orderId, {
status: 'CANCELLED',
});
}
}
// Payment Service
class PaymentService {
async handleOrderCreated(event: OrderCreatedEvent): Promise<void> {
try {
await this.chargePayment(event.orderId, event.amount);
// 成功イベントを発行
await this.eventBus.publish('payment.completed', {
orderId: event.orderId,
amount: event.amount,
items: event.items,
});
} catch (error) {
// 失敗イベントを発行
await this.eventBus.publish('payment.failed', {
orderId: event.orderId,
error: error.message,
});
}
}
async handleInventoryReservationFailed(event: InventoryReservationFailedEvent): Promise<void> {
// 補償トランザクション: 返金
await this.refundPayment(event.orderId);
}
}
// Inventory Service
class InventoryService {
async handleOrderPaid(event: OrderPaidEvent): Promise<void> {
try {
await this.reserveInventory(event.orderId, event.items);
// 成功イベントを発行
await this.eventBus.publish('inventory.reserved', {
orderId: event.orderId,
});
} catch (error) {
// 失敗イベントを発行
await this.eventBus.publish('inventory.reservation.failed', {
orderId: event.orderId,
error: error.message,
});
}
}
}

メリット:

  • 疎結合: サービス間の直接的な依存がない
  • スケーラビリティ: 各サービスが独立してスケールできる
  • 柔軟性: 新しいサービスを追加しやすい

デメリット:

  • フローが複雑: イベントの流れを追跡するのが困難
  • デバッグが困難: 分散したログを追跡する必要がある
  • エラーハンドリングが複雑: 補償トランザクションの管理が困難

Orchestration vs Choreography の選択基準

Section titled “Orchestration vs Choreography の選択基準”

条件:

  • 複雑なビジネスロジックがある
  • エラーハンドリングが重要
  • フローを明確に制御したい
  • チームが小規模

実践例:

// 複雑なビジネスロジックがある場合
class ComplexOrderSagaOrchestrator {
async execute(orderData: OrderData): Promise<Order> {
// 複雑な条件分岐
if (orderData.isPremium) {
await this.processPremiumOrder(orderData);
} else {
await this.processStandardOrder(orderData);
}
// 複雑なエラーハンドリング
try {
// ...
} catch (error) {
// 詳細なエラーハンドリング
if (error.type === 'PAYMENT_FAILED') {
await this.handlePaymentFailure(orderData);
} else if (error.type === 'INVENTORY_SHORTAGE') {
await this.handleInventoryShortage(orderData);
}
}
}
}

条件:

  • シンプルなフロー
  • サービス間の疎結合が重要
  • スケーラビリティが重要
  • チームが大規模

実践例:

// シンプルなフロー
// Order Service
await eventBus.publish('order.created', orderData);
// Payment Service
eventBus.subscribe('order.created', async (event) => {
await chargePayment(event);
await eventBus.publish('payment.completed', event);
});
// Inventory Service
eventBus.subscribe('payment.completed', async (event) => {
await reserveInventory(event);
await eventBus.publish('inventory.reserved', event);
});

補償トランザクションの重要性

Section titled “補償トランザクションの重要性”

問題:

// ❌ 悪い例: 補償トランザクションがない
async function createOrder(orderData: OrderData) {
const order = await orderService.createOrder(orderData);
await paymentService.chargePayment(order.id, orderData.amount);
// 在庫が不足している場合、注文と決済は完了しているが、在庫は減っていない
// → データの不整合
await inventoryService.reduceInventory(order.id, orderData.items);
}

解決策:

// ✅ 良い例: 補償トランザクションを実装
class OrderSaga {
private compensations: Array<{ action: string; compensate: () => Promise<void> }> = [];
async execute(orderData: OrderData): Promise<Order> {
try {
// ステップ1: 注文を作成
const order = await this.orderService.createOrder(orderData);
this.compensations.push({
action: 'createOrder',
compensate: () => this.orderService.cancelOrder(order.id),
});
// ステップ2: 決済を処理
await this.paymentService.chargePayment(order.id, orderData.amount);
this.compensations.push({
action: 'chargePayment',
compensate: () => this.paymentService.refundPayment(order.id),
});
// ステップ3: 在庫を減らす
await this.inventoryService.reduceInventory(order.id, orderData.items);
this.compensations.push({
action: 'reduceInventory',
compensate: () => this.inventoryService.restoreInventory(order.id, orderData.items),
});
return order;
} catch (error) {
// 補償トランザクションを逆順で実行
await this.compensate();
throw error;
}
}
private async compensate(): Promise<void> {
for (const comp of this.compensations.reverse()) {
try {
await comp.compensate();
} catch (error) {
// 補償トランザクションの失敗をログに記録
// 場合によっては、手動での対応が必要
console.error(`Compensation failed for ${comp.action}:`, error);
await this.alertManualIntervention(comp.action, error);
}
}
}
}
// Sagaの状態管理
interface SagaState {
sagaId: string;
currentStep: number;
status: 'PENDING' | 'IN_PROGRESS' | 'COMPLETED' | 'FAILED' | 'COMPENSATING';
compensations: Array<{
step: number;
action: string;
compensate: () => Promise<void>;
}>;
data: any;
}
class SagaManager {
private sagas: Map<string, SagaState> = new Map();
async executeSaga<T>(
sagaId: string,
steps: Array<{ execute: () => Promise<any>; compensate: () => Promise<void> }>
): Promise<T> {
const saga: SagaState = {
sagaId,
currentStep: 0,
status: 'IN_PROGRESS',
compensations: [],
data: {},
};
this.sagas.set(sagaId, saga);
try {
for (let i = 0; i < steps.length; i++) {
saga.currentStep = i;
const result = await steps[i].execute();
saga.data[`step${i}`] = result;
saga.compensations.push({
step: i,
action: `step${i}`,
compensate: steps[i].compensate,
});
}
saga.status = 'COMPLETED';
return saga.data as T;
} catch (error) {
saga.status = 'FAILED';
await this.compensate(saga);
throw error;
}
}
private async compensate(saga: SagaState): Promise<void> {
saga.status = 'COMPENSATING';
for (const comp of saga.compensations.reverse()) {
try {
await comp.compensate();
} catch (error) {
console.error(`Compensation failed for step ${comp.step}:`, error);
// 補償トランザクションの失敗を記録
await this.recordCompensationFailure(saga.sagaId, comp.step, error);
}
}
}
private async recordCompensationFailure(
sagaId: string,
step: number,
error: Error
): Promise<void> {
// 補償トランザクションの失敗をデータベースに記録
// 手動での対応が必要な場合がある
await db.sagaFailures.create({
data: {
sagaId,
step,
error: error.message,
timestamp: new Date(),
},
});
}
}
// 使用例
const sagaManager = new SagaManager();
const order = await sagaManager.executeSaga<Order>(
`order-${orderId}`,
[
{
execute: () => orderService.createOrder(orderData),
compensate: () => orderService.cancelOrder(orderId),
},
{
execute: () => paymentService.chargePayment(orderId, amount),
compensate: () => paymentService.refundPayment(orderId),
},
{
execute: () => inventoryService.reduceInventory(orderId, items),
compensate: () => inventoryService.restoreInventory(orderId, items),
},
]
);

Sagaパターン完全ガイドのポイント:

  • なぜ必要か: 分散トランザクション(2PC)の問題点を解決
  • Orchestration: 中央のオーケストレーターが制御、複雑なロジックに適している
  • Choreography: イベント駆動、サービス間の疎結合に適している
  • 補償トランザクション: データの整合性を保つために重要
  • 実践的な実装: Sagaの状態管理、エラーハンドリング、補償トランザクションの実装

適切なSagaパターンの実装により、分散システムにおけるトランザクション管理が可能になります。