Skip to content

最終的整合性の設計

分散システムにおける最終的整合性の設計パターンを詳しく解説します。

なぜ最終的整合性が必要なのか

Section titled “なぜ最終的整合性が必要なのか”

強い整合性(Strong Consistency)の問題点

Section titled “強い整合性(Strong Consistency)の問題点”

問題のある実装:

// ❌ 悪い例: 強い整合性を要求
async function createOrderWithInventory(orderData: OrderData): Promise<Order> {
// すべてのサービスが同期して動作する必要がある
return await db.$transaction(async (tx) => {
// 1. 注文を作成
const order = await tx.order.create({ data: orderData });
// 2. 在庫サービスを呼び出し(同期)
const inventoryResult = await inventoryService.reserveInventory(order.id, orderData.items);
if (!inventoryResult.success) {
throw new Error('Inventory reservation failed');
}
// 3. 決済サービスを呼び出し(同期)
const paymentResult = await paymentService.chargePayment(order.id, orderData.amount);
if (!paymentResult.success) {
// 在庫を戻す必要がある
await inventoryService.releaseInventory(order.id, orderData.items);
throw new Error('Payment failed');
}
return order;
});
}

問題点:

  1. パフォーマンス: すべてのサービスが応答するまで待機する必要がある
  2. 可用性: 1つのサービスが失敗すると、全体が失敗する
  3. スケーラビリティ: サービス間の緊密な結合が必要
  4. 障害の伝播: 1つのサービスの障害が全体に影響する

影響:

  • パフォーマンスの低下
  • 可用性の低下
  • スケーラビリティの制限
  • 障害の伝播

改善された実装:

// ✅ 良い例: 最終的整合性を使用
async function createOrderWithInventory(orderData: OrderData): Promise<Order> {
// 1. 注文を作成(即座に完了)
const order = await db.order.create({
data: {
...orderData,
status: 'PENDING',
},
});
// 2. イベントを発行(非同期)
await eventBus.publish('order.created', {
orderId: order.id,
userId: orderData.userId,
items: orderData.items,
amount: orderData.amount,
});
// 3. 即座にレスポンスを返す
return order;
}
// イベントハンドラー: 非同期で処理
class OrderEventHandler {
async handleOrderCreated(event: OrderCreatedEvent): Promise<void> {
try {
// 1. 在庫を予約
await inventoryService.reserveInventory(event.orderId, event.items);
// 2. 次のイベントを発行
await eventBus.publish('inventory.reserved', {
orderId: event.orderId,
amount: event.amount,
});
} catch (error) {
// 在庫予約に失敗した場合、注文をキャンセル
await eventBus.publish('inventory.reservation.failed', {
orderId: event.orderId,
error: error.message,
});
}
}
async handleInventoryReserved(event: InventoryReservedEvent): Promise<void> {
try {
// 決済を処理
await paymentService.chargePayment(event.orderId, event.amount);
// 注文を確定
await db.order.update({
where: { id: event.orderId },
data: { status: 'CONFIRMED' },
});
} catch (error) {
// 決済に失敗した場合、在庫を戻す
await inventoryService.releaseInventory(event.orderId);
await eventBus.publish('payment.failed', {
orderId: event.orderId,
error: error.message,
});
}
}
}

メリット:

  • パフォーマンスの向上: 即座にレスポンスを返せる
  • 可用性の向上: 1つのサービスの障害が全体に影響しない
  • スケーラビリティ: サービス間の疎結合
  • 障害の分離: 各サービスが独立して動作

パターン1: イベント駆動アーキテクチャ

Section titled “パターン1: イベント駆動アーキテクチャ”

実装例:

// イベントバス
class EventBus {
private subscribers: Map<string, Array<(event: any) => Promise<void>>> = new Map();
subscribe(eventType: string, handler: (event: any) => Promise<void>): void {
if (!this.subscribers.has(eventType)) {
this.subscribers.set(eventType, []);
}
this.subscribers.get(eventType)!.push(handler);
}
async publish(eventType: string, eventData: any): Promise<void> {
const handlers = this.subscribers.get(eventType) || [];
// すべてのハンドラーを並列で実行
await Promise.all(handlers.map(handler => handler(eventData)));
}
}
// 使用例
const eventBus = new EventBus();
// イベントハンドラーを登録
eventBus.subscribe('order.created', async (event) => {
await inventoryService.reserveInventory(event.orderId, event.items);
});
eventBus.subscribe('inventory.reserved', async (event) => {
await paymentService.chargePayment(event.orderId, event.amount);
});
// イベントを発行
await eventBus.publish('order.created', {
orderId: 'order-1',
items: [{ productId: 'product-1', quantity: 2 }],
});

パターン2: 補償トランザクション(Saga)

Section titled “パターン2: 補償トランザクション(Saga)”

実装例:

class OrderSaga {
async execute(orderData: OrderData): Promise<Order> {
const compensations: Array<() => Promise<void>> = [];
try {
// ステップ1: 注文を作成
const order = await orderService.createOrder(orderData);
compensations.push(() => orderService.cancelOrder(order.id));
// ステップ2: 在庫を予約
await inventoryService.reserveInventory(order.id, orderData.items);
compensations.push(() => inventoryService.releaseInventory(order.id, orderData.items));
// ステップ3: 決済を処理
await paymentService.chargePayment(order.id, orderData.amount);
compensations.push(() => paymentService.refundPayment(order.id));
return order;
} catch (error) {
// 補償トランザクションを実行
for (const compensate of compensations.reverse()) {
try {
await compensate();
} catch (compError) {
console.error('Compensation failed:', compError);
}
}
throw error;
}
}
}

実装例:

class IdempotentService {
async processOrder(orderId: string, orderData: OrderData): Promise<void> {
// 冪等キーで重複実行を防止
const idempotencyKey = `order-${orderId}`;
// 既に処理済みか確認
const existing = await db.processedOrders.findUnique({
where: { idempotencyKey },
});
if (existing) {
// 既に処理済みの場合は、結果を返す
return existing.result;
}
try {
// 処理を実行
const result = await this.executeOrder(orderData);
// 結果を保存
await db.processedOrders.create({
data: {
idempotencyKey,
orderId,
result: JSON.stringify(result),
processedAt: new Date(),
},
});
return result;
} catch (error) {
// エラーも記録(リトライ時の重複実行を防止)
await db.processedOrders.create({
data: {
idempotencyKey,
orderId,
result: JSON.stringify({ error: error.message }),
processedAt: new Date(),
},
});
throw error;
}
}
}

条件:

  • 金融取引
  • 在庫管理(リアルタイム)
  • 認証・認可

実践例:

// 金融取引: 強い整合性が必要
async function transferMoney(fromAccountId: string, toAccountId: string, amount: number): Promise<void> {
return await db.$transaction(async (tx) => {
// 送金元の残高を確認
const fromAccount = await tx.account.findUnique({
where: { id: fromAccountId },
});
if (fromAccount.balance < amount) {
throw new Error('Insufficient balance');
}
// 送金元の残高を減らす
await tx.account.update({
where: { id: fromAccountId },
data: { balance: { decrement: amount } },
});
// 送金先の残高を増やす
await tx.account.update({
where: { id: toAccountId },
data: { balance: { increment: amount } },
});
});
}

条件:

  • ユーザープロフィールの更新
  • 商品情報の更新
  • ログ・分析データ

実践例:

// ユーザープロフィールの更新: 最終的整合性で十分
async function updateUserProfile(userId: string, profileData: ProfileData): Promise<void> {
// 即座に更新
await db.user.update({
where: { id: userId },
data: profileData,
});
// イベントを発行(非同期で他のサービスに反映)
await eventBus.publish('user.profile.updated', {
userId,
profileData,
});
}
// イベントハンドラー: 非同期で他のサービスに反映
class UserProfileEventHandler {
async handleProfileUpdated(event: UserProfileUpdatedEvent): Promise<void> {
// 検索サービスに反映
await searchService.updateUserProfile(event.userId, event.profileData);
// 分析サービスに反映
await analyticsService.trackProfileUpdate(event.userId, event.profileData);
}
}

実装例:

class ConsistencyChecker {
async checkOrderConsistency(orderId: string): Promise<ConsistencyResult> {
const order = await db.order.findUnique({ where: { id: orderId } });
const inventory = await inventoryService.getReservation(orderId);
const payment = await paymentService.getPayment(orderId);
const inconsistencies: string[] = [];
// チェック1: 注文が確定しているのに、在庫が予約されていない
if (order.status === 'CONFIRMED' && !inventory) {
inconsistencies.push('Order is confirmed but inventory is not reserved');
}
// チェック2: 注文が確定しているのに、決済が完了していない
if (order.status === 'CONFIRMED' && payment.status !== 'COMPLETED') {
inconsistencies.push('Order is confirmed but payment is not completed');
}
// チェック3: 決済が完了しているのに、注文が確定していない
if (payment.status === 'COMPLETED' && order.status !== 'CONFIRMED') {
inconsistencies.push('Payment is completed but order is not confirmed');
}
return {
isConsistent: inconsistencies.length === 0,
inconsistencies,
};
}
async repairInconsistencies(orderId: string): Promise<void> {
const result = await this.checkOrderConsistency(orderId);
if (!result.isConsistent) {
// 整合性を修復
for (const inconsistency of result.inconsistencies) {
await this.repairInconsistency(orderId, inconsistency);
}
}
}
private async repairInconsistency(orderId: string, inconsistency: string): Promise<void> {
// 整合性を修復するロジック
if (inconsistency.includes('inventory is not reserved')) {
const order = await db.order.findUnique({ where: { id: orderId } });
await inventoryService.reserveInventory(orderId, order.items);
}
if (inconsistency.includes('payment is not completed')) {
const order = await db.order.findUnique({ where: { id: orderId } });
await paymentService.chargePayment(orderId, order.amount);
}
}
}

最終的整合性の設計のポイント:

  • なぜ必要か: パフォーマンス、可用性、スケーラビリティの向上
  • イベント駆動アーキテクチャ: 非同期で処理、サービス間の疎結合
  • 補償トランザクション: Sagaパターンで整合性を保つ
  • 冪等性の保証: 重複実行を防止
  • 整合性レベルの選択: 強い整合性が必要な場合と最終的整合性で十分な場合
  • 整合性の確認と修復: 定期的な整合性チェックと修復

適切な最終的整合性の設計により、スケーラブルで可用性の高い分散システムを構築できます。