Skip to content

Bulkheadパターン

バルクヘッドパターンを詳しく解説します。

問題のある実装:

// ❌ 悪い例: すべてのリクエストが同じリソースプールを使用
class OrderService {
private threadPool: ThreadPool; // すべてのリクエストが同じプールを使用
async processOrder(orderData: OrderData): Promise<Order> {
// 決済処理が遅い場合、他のリクエストも影響を受ける
await this.threadPool.execute(async () => {
await paymentService.chargePayment(orderData.orderId, orderData.amount);
await inventoryService.reserveInventory(orderData.orderId, orderData.items);
await notificationService.sendConfirmation(orderData.userId);
});
}
}

問題点:

  1. 障害の伝播: 1つのサービスが遅いと、他のサービスも影響を受ける
  2. リソースの枯渇: 1つのサービスがリソースを占有すると、他のサービスが使用できない
  3. 可用性の低下: 全体の可用性が低下する

影響:

  • 障害の伝播
  • リソースの枯渇
  • 可用性の低下

改善された実装:

// ✅ 良い例: Bulkheadパターンを使用
class BulkheadPool {
private pools: Map<string, ThreadPool> = new Map();
constructor() {
// サービスごとに独立したスレッドプールを作成
this.pools.set('payment', new ThreadPool({ maxSize: 10 }));
this.pools.set('inventory', new ThreadPool({ maxSize: 20 }));
this.pools.set('notification', new ThreadPool({ maxSize: 30 }));
}
async execute<T>(serviceName: string, fn: () => Promise<T>): Promise<T> {
const pool = this.pools.get(serviceName);
if (!pool) {
throw new Error(`Pool not found for service: ${serviceName}`);
}
return await pool.execute(fn);
}
}
class OrderService {
private bulkheadPool: BulkheadPool;
constructor() {
this.bulkheadPool = new BulkheadPool();
}
async processOrder(orderData: OrderData): Promise<Order> {
// 各サービスが独立したプールを使用
await Promise.all([
this.bulkheadPool.execute('payment', async () => {
await paymentService.chargePayment(orderData.orderId, orderData.amount);
}),
this.bulkheadPool.execute('inventory', async () => {
await inventoryService.reserveInventory(orderData.orderId, orderData.items);
}),
this.bulkheadPool.execute('notification', async () => {
await notificationService.sendConfirmation(orderData.userId);
}),
]);
}
}

メリット:

  • 障害の分離: 1つのサービスの障害が他のサービスに影響しない
  • リソースの分離: サービスごとに独立したリソースプール
  • 可用性の向上: 全体の可用性が向上

パターン1: スレッドプールの分離

Section titled “パターン1: スレッドプールの分離”

実装例:

class ThreadPool {
private maxSize: number;
private queue: Array<() => Promise<any>> = [];
private active: number = 0;
constructor(maxSize: number) {
this.maxSize = maxSize;
}
async execute<T>(fn: () => Promise<T>): Promise<T> {
return new Promise((resolve, reject) => {
this.queue.push(async () => {
try {
const result = await fn();
resolve(result);
} catch (error) {
reject(error);
} finally {
this.active--;
this.processQueue();
}
});
this.processQueue();
});
}
private processQueue(): void {
if (this.active >= this.maxSize || this.queue.length === 0) {
return;
}
const task = this.queue.shift();
if (task) {
this.active++;
task();
}
}
}
class BulkheadPool {
private pools: Map<string, ThreadPool> = new Map();
constructor(config: Map<string, number>) {
for (const [serviceName, maxSize] of config) {
this.pools.set(serviceName, new ThreadPool(maxSize));
}
}
async execute<T>(serviceName: string, fn: () => Promise<T>): Promise<T> {
const pool = this.pools.get(serviceName);
if (!pool) {
throw new Error(`Pool not found for service: ${serviceName}`);
}
return await pool.execute(fn);
}
}
// 使用例
const bulkheadPool = new BulkheadPool(
new Map([
['payment', 10],
['inventory', 20],
['notification', 30],
])
);
await bulkheadPool.execute('payment', async () => {
await paymentService.chargePayment(orderId, amount);
});

実装例:

class ConnectionPool {
private maxSize: number;
private connections: Array<Connection> = [];
private available: Array<Connection> = [];
constructor(maxSize: number) {
this.maxSize = maxSize;
}
async acquire(): Promise<Connection> {
if (this.available.length > 0) {
return this.available.pop()!;
}
if (this.connections.length < this.maxSize) {
const connection = await this.createConnection();
this.connections.push(connection);
return connection;
}
// 接続が利用可能になるまで待機
return await this.waitForConnection();
}
release(connection: Connection): void {
this.available.push(connection);
}
private async createConnection(): Promise<Connection> {
// 接続を作成
return new Connection();
}
private async waitForConnection(): Promise<Connection> {
// 接続が利用可能になるまで待機
return new Promise((resolve) => {
const checkAvailable = () => {
if (this.available.length > 0) {
resolve(this.available.pop()!);
} else {
setTimeout(checkAvailable, 100);
}
};
checkAvailable();
});
}
}
class BulkheadConnectionPool {
private pools: Map<string, ConnectionPool> = new Map();
constructor(config: Map<string, number>) {
for (const [serviceName, maxSize] of config) {
this.pools.set(serviceName, new ConnectionPool(maxSize));
}
}
async execute<T>(serviceName: string, fn: (connection: Connection) => Promise<T>): Promise<T> {
const pool = this.pools.get(serviceName);
if (!pool) {
throw new Error(`Pool not found for service: ${serviceName}`);
}
const connection = await pool.acquire();
try {
return await fn(connection);
} finally {
pool.release(connection);
}
}
}
import { Bulkhead, BulkheadConfig } from 'resilience4j';
const bulkheadConfig: BulkheadConfig = {
maxConcurrentCalls: 10, // 最大10並行呼び出し
maxWaitDuration: 1000, // 最大1秒待機
};
const bulkhead = Bulkhead.of('payment-service', bulkheadConfig);
class PaymentService {
async chargePayment(orderId: string, amount: number): Promise<PaymentResult> {
return await bulkhead.executeSupplier(async () => {
const response = await fetch('https://payment-api.example.com/charge', {
method: 'POST',
body: JSON.stringify({ orderId, amount }),
});
if (!response.ok) {
throw new Error('Payment failed');
}
return await response.json();
});
}
}
// 決済サービス: 厳しい設定(重要なのでリソースを制限)
const paymentBulkheadConfig: BulkheadConfig = {
maxConcurrentCalls: 10,
maxWaitDuration: 1000,
};
// 通知サービス: 緩い設定(重要ではないので多くのリソースを使用可能)
const notificationBulkheadConfig: BulkheadConfig = {
maxConcurrentCalls: 50,
maxWaitDuration: 5000,
};

Bulkheadパターンのポイント:

  • なぜ必要か: 障害の伝播を防ぎ、リソースを分離
  • スレッドプールの分離: サービスごとに独立したスレッドプール
  • 接続プールの分離: サービスごとに独立した接続プール
  • 実装: Resilience4jなどのライブラリを使用
  • 設定: サービスごとに適切な設定値を選択

適切なBulkheadパターンの実装により、障害に強いシステムを構築できます。