Event Streaming完全ガイド
Event Streaming完全ガイド
Section titled “Event Streaming完全ガイド”イベントストリーミングパターンを詳しく解説します。
なぜEvent Streamingが必要なのか
Section titled “なぜEvent Streamingが必要なのか”従来のメッセージキューとの違い
Section titled “従来のメッセージキューとの違い”メッセージキュー:
// メッセージキュー: 1つのメッセージを1つのコンシューマーが処理await queue.send('order.created', orderData);
// 1つのワーカーが処理worker.on('order.created', async (data) => { await processOrder(data);});イベントストリーミング:
// イベントストリーミング: 1つのイベントを複数のコンシューマーが処理可能await eventStream.publish('order.created', orderData);
// 複数のコンシューマーが処理consumer1.subscribe('order.created', async (event) => { await sendEmail(event.userId);});
consumer2.subscribe('order.created', async (event) => { await updateAnalytics(event.orderId);});
consumer3.subscribe('order.created', async (event) => { await notifyWarehouse(event.orderId);});メリット:
- 複数のコンシューマーが同じイベントを処理可能
- イベントの履歴を保持
- タイムトラベルが可能
- スケーラビリティが高い
Kafkaを使用した実装
Section titled “Kafkaを使用した実装”基本的な実装
Section titled “基本的な実装”import { Kafka } from 'kafkajs';
const kafka = new Kafka({ clientId: 'my-app', brokers: ['localhost:9092'],});
// Producerconst producer = kafka.producer();
await producer.connect();
await producer.send({ topic: 'order.created', messages: [ { key: orderId, value: JSON.stringify({ orderId, userId, items, amount, }), }, ],});
// Consumerconst consumer = kafka.consumer({ groupId: 'order-processors' });
await consumer.connect();await consumer.subscribe({ topic: 'order.created' });
await consumer.run({ eachMessage: async ({ topic, partition, message }) => { const event = JSON.parse(message.value.toString()); await processOrder(event); },});複数のConsumer Group
Section titled “複数のConsumer Group”// Consumer Group 1: メール送信const emailConsumer = kafka.consumer({ groupId: 'email-service' });await emailConsumer.subscribe({ topic: 'order.created' });await emailConsumer.run({ eachMessage: async ({ topic, partition, message }) => { const event = JSON.parse(message.value.toString()); await sendEmail(event.userId); },});
// Consumer Group 2: 分析const analyticsConsumer = kafka.consumer({ groupId: 'analytics-service' });await analyticsConsumer.subscribe({ topic: 'order.created' });await analyticsConsumer.run({ eachMessage: async ({ topic, partition, message }) => { const event = JSON.parse(message.value.toString()); await updateAnalytics(event.orderId); },});Event Bridgeを使用した実装
Section titled “Event Bridgeを使用した実装”import { EventBridgeClient, PutEventsCommand } from '@aws-sdk/client-eventbridge';
const eventBridge = new EventBridgeClient({ region: 'us-east-1' });
// イベントを発行await eventBridge.send(new PutEventsCommand({ Entries: [ { Source: 'order-service', DetailType: 'Order Created', Detail: JSON.stringify({ orderId, userId, items, amount, }), }, ],}));
// イベントルールで処理// - Lambda関数をトリガー// - SQSキューに送信// - SNSトピックに送信Event Streaming完全ガイドのポイント:
- なぜ必要か: 複数のコンシューマー、イベント履歴、タイムトラベル
- Kafka: 高スループット、耐久性、スケーラビリティ
- Event Bridge: AWS統合、サーバーレス、イベントルーティング
適切なEvent Streamingの実装により、スケーラブルで柔軟なイベント駆動システムを構築できます。