Skip to content

Event Streaming完全ガイド

イベントストリーミングパターンを詳しく解説します。

従来のメッセージキューとの違い

Section titled “従来のメッセージキューとの違い”

メッセージキュー:

// メッセージキュー: 1つのメッセージを1つのコンシューマーが処理
await queue.send('order.created', orderData);
// 1つのワーカーが処理
worker.on('order.created', async (data) => {
await processOrder(data);
});

イベントストリーミング:

// イベントストリーミング: 1つのイベントを複数のコンシューマーが処理可能
await eventStream.publish('order.created', orderData);
// 複数のコンシューマーが処理
consumer1.subscribe('order.created', async (event) => {
await sendEmail(event.userId);
});
consumer2.subscribe('order.created', async (event) => {
await updateAnalytics(event.orderId);
});
consumer3.subscribe('order.created', async (event) => {
await notifyWarehouse(event.orderId);
});

メリット:

  • 複数のコンシューマーが同じイベントを処理可能
  • イベントの履歴を保持
  • タイムトラベルが可能
  • スケーラビリティが高い
import { Kafka } from 'kafkajs';
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092'],
});
// Producer
const producer = kafka.producer();
await producer.connect();
await producer.send({
topic: 'order.created',
messages: [
{
key: orderId,
value: JSON.stringify({
orderId,
userId,
items,
amount,
}),
},
],
});
// Consumer
const consumer = kafka.consumer({ groupId: 'order-processors' });
await consumer.connect();
await consumer.subscribe({ topic: 'order.created' });
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const event = JSON.parse(message.value.toString());
await processOrder(event);
},
});
// Consumer Group 1: メール送信
const emailConsumer = kafka.consumer({ groupId: 'email-service' });
await emailConsumer.subscribe({ topic: 'order.created' });
await emailConsumer.run({
eachMessage: async ({ topic, partition, message }) => {
const event = JSON.parse(message.value.toString());
await sendEmail(event.userId);
},
});
// Consumer Group 2: 分析
const analyticsConsumer = kafka.consumer({ groupId: 'analytics-service' });
await analyticsConsumer.subscribe({ topic: 'order.created' });
await analyticsConsumer.run({
eachMessage: async ({ topic, partition, message }) => {
const event = JSON.parse(message.value.toString());
await updateAnalytics(event.orderId);
},
});
import { EventBridgeClient, PutEventsCommand } from '@aws-sdk/client-eventbridge';
const eventBridge = new EventBridgeClient({ region: 'us-east-1' });
// イベントを発行
await eventBridge.send(new PutEventsCommand({
Entries: [
{
Source: 'order-service',
DetailType: 'Order Created',
Detail: JSON.stringify({
orderId,
userId,
items,
amount,
}),
},
],
}));
// イベントルールで処理
// - Lambda関数をトリガー
// - SQSキューに送信
// - SNSトピックに送信

Event Streaming完全ガイドのポイント:

  • なぜ必要か: 複数のコンシューマー、イベント履歴、タイムトラベル
  • Kafka: 高スループット、耐久性、スケーラビリティ
  • Event Bridge: AWS統合、サーバーレス、イベントルーティング

適切なEvent Streamingの実装により、スケーラブルで柔軟なイベント駆動システムを構築できます。