Bulkheadパターン
Bulkheadパターン
Section titled “Bulkheadパターン”バルクヘッドパターンを詳しく解説します。
なぜBulkheadが必要なのか
Section titled “なぜBulkheadが必要なのか”障害の伝播問題
Section titled “障害の伝播問題”問題のある実装:
// ❌ 悪い例: すべてのリクエストが同じリソースプールを使用class OrderService { private threadPool: ThreadPool; // すべてのリクエストが同じプールを使用
async processOrder(orderData: OrderData): Promise<Order> { // 決済処理が遅い場合、他のリクエストも影響を受ける await this.threadPool.execute(async () => { await paymentService.chargePayment(orderData.orderId, orderData.amount); await inventoryService.reserveInventory(orderData.orderId, orderData.items); await notificationService.sendConfirmation(orderData.userId); }); }}問題点:
- 障害の伝播: 1つのサービスが遅いと、他のサービスも影響を受ける
- リソースの枯渇: 1つのサービスがリソースを占有すると、他のサービスが使用できない
- 可用性の低下: 全体の可用性が低下する
影響:
- 障害の伝播
- リソースの枯渇
- 可用性の低下
Bulkheadによる解決
Section titled “Bulkheadによる解決”改善された実装:
// ✅ 良い例: Bulkheadパターンを使用class BulkheadPool { private pools: Map<string, ThreadPool> = new Map();
constructor() { // サービスごとに独立したスレッドプールを作成 this.pools.set('payment', new ThreadPool({ maxSize: 10 })); this.pools.set('inventory', new ThreadPool({ maxSize: 20 })); this.pools.set('notification', new ThreadPool({ maxSize: 30 })); }
async execute<T>(serviceName: string, fn: () => Promise<T>): Promise<T> { const pool = this.pools.get(serviceName); if (!pool) { throw new Error(`Pool not found for service: ${serviceName}`); }
return await pool.execute(fn); }}
class OrderService { private bulkheadPool: BulkheadPool;
constructor() { this.bulkheadPool = new BulkheadPool(); }
async processOrder(orderData: OrderData): Promise<Order> { // 各サービスが独立したプールを使用 await Promise.all([ this.bulkheadPool.execute('payment', async () => { await paymentService.chargePayment(orderData.orderId, orderData.amount); }), this.bulkheadPool.execute('inventory', async () => { await inventoryService.reserveInventory(orderData.orderId, orderData.items); }), this.bulkheadPool.execute('notification', async () => { await notificationService.sendConfirmation(orderData.userId); }), ]); }}メリット:
- 障害の分離: 1つのサービスの障害が他のサービスに影響しない
- リソースの分離: サービスごとに独立したリソースプール
- 可用性の向上: 全体の可用性が向上
Bulkheadの実装パターン
Section titled “Bulkheadの実装パターン”パターン1: スレッドプールの分離
Section titled “パターン1: スレッドプールの分離”実装例:
class ThreadPool { private maxSize: number; private queue: Array<() => Promise<any>> = []; private active: number = 0;
constructor(maxSize: number) { this.maxSize = maxSize; }
async execute<T>(fn: () => Promise<T>): Promise<T> { return new Promise((resolve, reject) => { this.queue.push(async () => { try { const result = await fn(); resolve(result); } catch (error) { reject(error); } finally { this.active--; this.processQueue(); } });
this.processQueue(); }); }
private processQueue(): void { if (this.active >= this.maxSize || this.queue.length === 0) { return; }
const task = this.queue.shift(); if (task) { this.active++; task(); } }}
class BulkheadPool { private pools: Map<string, ThreadPool> = new Map();
constructor(config: Map<string, number>) { for (const [serviceName, maxSize] of config) { this.pools.set(serviceName, new ThreadPool(maxSize)); } }
async execute<T>(serviceName: string, fn: () => Promise<T>): Promise<T> { const pool = this.pools.get(serviceName); if (!pool) { throw new Error(`Pool not found for service: ${serviceName}`); }
return await pool.execute(fn); }}
// 使用例const bulkheadPool = new BulkheadPool( new Map([ ['payment', 10], ['inventory', 20], ['notification', 30], ]));
await bulkheadPool.execute('payment', async () => { await paymentService.chargePayment(orderId, amount);});パターン2: 接続プールの分離
Section titled “パターン2: 接続プールの分離”実装例:
class ConnectionPool { private maxSize: number; private connections: Array<Connection> = []; private available: Array<Connection> = [];
constructor(maxSize: number) { this.maxSize = maxSize; }
async acquire(): Promise<Connection> { if (this.available.length > 0) { return this.available.pop()!; }
if (this.connections.length < this.maxSize) { const connection = await this.createConnection(); this.connections.push(connection); return connection; }
// 接続が利用可能になるまで待機 return await this.waitForConnection(); }
release(connection: Connection): void { this.available.push(connection); }
private async createConnection(): Promise<Connection> { // 接続を作成 return new Connection(); }
private async waitForConnection(): Promise<Connection> { // 接続が利用可能になるまで待機 return new Promise((resolve) => { const checkAvailable = () => { if (this.available.length > 0) { resolve(this.available.pop()!); } else { setTimeout(checkAvailable, 100); } }; checkAvailable(); }); }}
class BulkheadConnectionPool { private pools: Map<string, ConnectionPool> = new Map();
constructor(config: Map<string, number>) { for (const [serviceName, maxSize] of config) { this.pools.set(serviceName, new ConnectionPool(maxSize)); } }
async execute<T>(serviceName: string, fn: (connection: Connection) => Promise<T>): Promise<T> { const pool = this.pools.get(serviceName); if (!pool) { throw new Error(`Pool not found for service: ${serviceName}`); }
const connection = await pool.acquire(); try { return await fn(connection); } finally { pool.release(connection); } }}実践的な実装例
Section titled “実践的な実装例”Resilience4jを使用した実装
Section titled “Resilience4jを使用した実装”import { Bulkhead, BulkheadConfig } from 'resilience4j';
const bulkheadConfig: BulkheadConfig = { maxConcurrentCalls: 10, // 最大10並行呼び出し maxWaitDuration: 1000, // 最大1秒待機};
const bulkhead = Bulkhead.of('payment-service', bulkheadConfig);
class PaymentService { async chargePayment(orderId: string, amount: number): Promise<PaymentResult> { return await bulkhead.executeSupplier(async () => { const response = await fetch('https://payment-api.example.com/charge', { method: 'POST', body: JSON.stringify({ orderId, amount }), });
if (!response.ok) { throw new Error('Payment failed'); }
return await response.json(); }); }}Bulkheadの設定
Section titled “Bulkheadの設定”サービスごとの設定
Section titled “サービスごとの設定”// 決済サービス: 厳しい設定(重要なのでリソースを制限)const paymentBulkheadConfig: BulkheadConfig = { maxConcurrentCalls: 10, maxWaitDuration: 1000,};
// 通知サービス: 緩い設定(重要ではないので多くのリソースを使用可能)const notificationBulkheadConfig: BulkheadConfig = { maxConcurrentCalls: 50, maxWaitDuration: 5000,};Bulkheadパターンのポイント:
- なぜ必要か: 障害の伝播を防ぎ、リソースを分離
- スレッドプールの分離: サービスごとに独立したスレッドプール
- 接続プールの分離: サービスごとに独立した接続プール
- 実装: Resilience4jなどのライブラリを使用
- 設定: サービスごとに適切な設定値を選択
適切なBulkheadパターンの実装により、障害に強いシステムを構築できます。