Skip to content

Event Busパターン

イベントバスパターンを詳しく解説します。

直接的なサービス間通信の問題

Section titled “直接的なサービス間通信の問題”

問題のある実装:

// ❌ 悪い例: サービス間の直接的な依存
class OrderService {
async createOrder(orderData: OrderData): Promise<Order> {
const order = await this.orderRepository.save(orderData);
// 直接他のサービスを呼び出し
await this.paymentService.chargePayment(order.id, orderData.amount);
await this.inventoryService.reserveInventory(order.id, orderData.items);
await this.notificationService.sendConfirmation(orderData.userId);
return order;
}
}

問題点:

  • サービス間の緊密な結合
  • 障害の伝播
  • スケーラビリティの制限

改善された実装:

// ✅ 良い例: Event Busを使用
class EventBus {
private subscribers: Map<string, Array<(event: any) => Promise<void>>> = new Map();
subscribe(eventType: string, handler: (event: any) => Promise<void>): void {
if (!this.subscribers.has(eventType)) {
this.subscribers.set(eventType, []);
}
this.subscribers.get(eventType)!.push(handler);
}
async publish(eventType: string, eventData: any): Promise<void> {
const handlers = this.subscribers.get(eventType) || [];
// すべてのハンドラーを並列で実行
await Promise.all(handlers.map(handler => handler(eventData)));
}
}
class OrderService {
private eventBus: EventBus;
async createOrder(orderData: OrderData): Promise<Order> {
const order = await this.orderRepository.save(orderData);
// イベントを発行(サービス間の直接的な依存がない)
await this.eventBus.publish('order.created', {
orderId: order.id,
userId: orderData.userId,
items: orderData.items,
amount: orderData.amount,
});
return order;
}
}
// Payment Service
class PaymentService {
constructor(eventBus: EventBus) {
eventBus.subscribe('order.created', async (event) => {
await this.chargePayment(event.orderId, event.amount);
await eventBus.publish('payment.completed', {
orderId: event.orderId,
});
});
}
}
// Inventory Service
class InventoryService {
constructor(eventBus: EventBus) {
eventBus.subscribe('payment.completed', async (event) => {
await this.reserveInventory(event.orderId, event.items);
});
}
}

メリット:

  • サービス間の疎結合
  • 障害の分離
  • スケーラビリティ
class InMemoryEventBus implements EventBus {
private subscribers: Map<string, Array<(event: any) => Promise<void>>> = new Map();
subscribe(eventType: string, handler: (event: any) => Promise<void>): void {
if (!this.subscribers.has(eventType)) {
this.subscribers.set(eventType, []);
}
this.subscribers.get(eventType)!.push(handler);
}
async publish(eventType: string, eventData: any): Promise<void> {
const handlers = this.subscribers.get(eventType) || [];
await Promise.all(handlers.map(handler => handler(eventData)));
}
}
// Redis Pub/Subを使用
import { Redis } from 'ioredis';
class DistributedEventBus implements EventBus {
private redis: Redis;
private subscribers: Map<string, Array<(event: any) => Promise<void>>> = new Map();
constructor(redis: Redis) {
this.redis = redis;
}
subscribe(eventType: string, handler: (event: any) => Promise<void>): void {
if (!this.subscribers.has(eventType)) {
this.subscribers.set(eventType, []);
this.redis.subscribe(eventType);
}
this.subscribers.get(eventType)!.push(handler);
// Redisからのメッセージを受信
this.redis.on('message', async (channel, message) => {
if (channel === eventType) {
const event = JSON.parse(message);
await handler(event);
}
});
}
async publish(eventType: string, eventData: any): Promise<void> {
// Redisに発行
await this.redis.publish(eventType, JSON.stringify(eventData));
}
}

Event Busパターンのポイント:

  • なぜ必要か: サービス間の疎結合、障害の分離
  • インメモリEvent Bus: シンプル、単一プロセス
  • 分散Event Bus: 複数プロセス、Redis Pub/Sub

適切なEvent Busパターンの実装により、疎結合でスケーラブルなシステムを構築できます。