Event Sourcing完全ガイド
Event Sourcing完全ガイド
Section titled “Event Sourcing完全ガイド”イベントソーシングパターンを詳しく解説します。
なぜEvent Sourcingが必要なのか
Section titled “なぜEvent Sourcingが必要なのか”従来の状態管理の問題点
Section titled “従来の状態管理の問題点”問題のある実装:
// ❌ 悪い例: 現在の状態のみを保存class OrderService { async createOrder(orderData: OrderData): Promise<Order> { // 現在の状態のみを保存 return await db.order.create({ data: { id: generateId(), userId: orderData.userId, items: orderData.items, status: 'PENDING', totalAmount: calculateTotal(orderData.items), createdAt: new Date(), }, }); }
async updateOrderStatus(orderId: string, status: OrderStatus): Promise<Order> { // 以前の状態が失われる return await db.order.update({ where: { id: orderId }, data: { status }, }); }
async cancelOrder(orderId: string): Promise<void> { // キャンセルされた理由が記録されない await db.order.update({ where: { id: orderId }, data: { status: 'CANCELLED' }, }); }}問題点:
- 履歴の欠如: 過去の状態が失われる
- 監査の困難: 誰がいつ何を変更したかが分からない
- デバッグの困難: 問題が発生した原因を追跡できない
- タイムトラベル: 過去の状態を再現できない
影響:
- 監査の困難
- デバッグの困難
- データの整合性の確認が困難
- ビジネスロジックの理解が困難
Event Sourcingによる解決
Section titled “Event Sourcingによる解決”改善された実装:
// ✅ 良い例: Event Sourcingを使用class OrderService { async createOrder(orderData: OrderData): Promise<string> { const orderId = generateId();
// イベントを保存 const event: OrderCreatedEvent = { eventId: generateId(), aggregateId: orderId, eventType: 'OrderCreated', eventData: { userId: orderData.userId, items: orderData.items, totalAmount: calculateTotal(orderData.items), }, timestamp: new Date(), version: 1, };
await this.eventStore.append(orderId, event);
return orderId; }
async updateOrderStatus(orderId: string, status: OrderStatus): Promise<void> { // イベントを保存 const event: OrderStatusUpdatedEvent = { eventId: generateId(), aggregateId: orderId, eventType: 'OrderStatusUpdated', eventData: { status, previousStatus: await this.getCurrentStatus(orderId), }, timestamp: new Date(), version: await this.getNextVersion(orderId), };
await this.eventStore.append(orderId, event); }
async cancelOrder(orderId: string, reason: string): Promise<void> { // キャンセルの理由も記録 const event: OrderCancelledEvent = { eventId: generateId(), aggregateId: orderId, eventType: 'OrderCancelled', eventData: { reason, cancelledBy: await this.getCurrentUser(), }, timestamp: new Date(), version: await this.getNextVersion(orderId), };
await this.eventStore.append(orderId, event); }
// イベントから現在の状態を再構築 async getOrder(orderId: string): Promise<Order> { const events = await this.eventStore.getEvents(orderId); return this.replayEvents(events); }
// 過去の状態を再現 async getOrderAtTime(orderId: string, timestamp: Date): Promise<Order> { const events = await this.eventStore.getEventsUntil(orderId, timestamp); return this.replayEvents(events); }
private replayEvents(events: Event[]): Order { let order = new Order();
for (const event of events) { switch (event.eventType) { case 'OrderCreated': order = this.applyOrderCreated(order, event); break; case 'OrderStatusUpdated': order = this.applyOrderStatusUpdated(order, event); break; case 'OrderCancelled': order = this.applyOrderCancelled(order, event); break; } }
return order; }}メリット:
- 完全な履歴: すべての変更が記録される
- 監査の容易さ: 誰がいつ何を変更したかが明確
- デバッグの容易さ: イベントの流れを追跡できる
- タイムトラベル: 過去の状態を再現できる
Event Sourcingの基本概念
Section titled “Event Sourcingの基本概念”Event Store(イベントストア)
Section titled “Event Store(イベントストア)”定義: イベントを永続化するストレージです。
実装例:
interface Event { eventId: string; aggregateId: string; eventType: string; eventData: any; timestamp: Date; version: number;}
class EventStore { private db: PrismaClient;
async append(aggregateId: string, event: Event): Promise<void> { // 楽観的ロック: バージョンチェック const lastEvent = await this.db.event.findFirst({ where: { aggregateId }, orderBy: { version: 'desc' }, });
if (lastEvent && lastEvent.version >= event.version) { throw new Error('Concurrent modification detected'); }
await this.db.event.create({ data: { eventId: event.eventId, aggregateId: event.aggregateId, eventType: event.eventType, eventData: JSON.stringify(event.eventData), timestamp: event.timestamp, version: event.version, }, }); }
async getEvents(aggregateId: string): Promise<Event[]> { const events = await this.db.event.findMany({ where: { aggregateId }, orderBy: { version: 'asc' }, });
return events.map(e => ({ eventId: e.eventId, aggregateId: e.aggregateId, eventType: e.eventType, eventData: JSON.parse(e.eventData), timestamp: e.timestamp, version: e.version, })); }
async getEventsUntil(aggregateId: string, timestamp: Date): Promise<Event[]> { const events = await this.db.event.findMany({ where: { aggregateId, timestamp: { lte: timestamp }, }, orderBy: { version: 'asc' }, });
return events.map(e => ({ eventId: e.eventId, aggregateId: e.aggregateId, eventType: e.eventType, eventData: JSON.parse(e.eventData), timestamp: e.timestamp, version: e.version, })); }}Aggregate(集約)
Section titled “Aggregate(集約)”定義: ビジネスロジックの単位です。イベントを発行し、イベントから状態を再構築します。
実装例:
class Order { private id: string; private userId: string; private items: OrderItem[]; private status: OrderStatus; private totalAmount: number; private createdAt: Date;
// イベントから状態を再構築 static fromEvents(events: Event[]): Order { const order = new Order();
for (const event of events) { order.applyEvent(event); }
return order; }
private applyEvent(event: Event): void { switch (event.eventType) { case 'OrderCreated': this.applyOrderCreated(event); break; case 'OrderStatusUpdated': this.applyOrderStatusUpdated(event); break; case 'OrderCancelled': this.applyOrderCancelled(event); break; } }
private applyOrderCreated(event: OrderCreatedEvent): void { this.id = event.aggregateId; this.userId = event.eventData.userId; this.items = event.eventData.items; this.status = 'PENDING'; this.totalAmount = event.eventData.totalAmount; this.createdAt = event.timestamp; }
private applyOrderStatusUpdated(event: OrderStatusUpdatedEvent): void { this.status = event.eventData.status; }
private applyOrderCancelled(event: OrderCancelledEvent): void { this.status = 'CANCELLED'; }
// コマンドを処理してイベントを発行 cancel(reason: string, userId: string): OrderCancelledEvent { if (this.status === 'CANCELLED') { throw new Error('Order is already cancelled'); }
return { eventId: generateId(), aggregateId: this.id, eventType: 'OrderCancelled', eventData: { reason, cancelledBy: userId, }, timestamp: new Date(), version: this.getNextVersion(), }; }
private getNextVersion(): number { // 現在のバージョンを取得(実際の実装ではイベントストアから取得) return 1; }}Snapshot(スナップショット)
Section titled “Snapshot(スナップショット)”なぜスナップショットが必要か
Section titled “なぜスナップショットが必要か”問題:
// イベントが多すぎる場合、再構築に時間がかかるasync function getOrder(orderId: string): Promise<Order> { // 1000個のイベントをすべて読み込んで再構築 const events = await eventStore.getEvents(orderId); // 1000個のイベント return Order.fromEvents(events); // 時間がかかる}解決策:
class SnapshotStore { private db: PrismaClient;
async saveSnapshot(aggregateId: string, aggregate: Order, version: number): Promise<void> { await this.db.snapshot.create({ data: { aggregateId, aggregateData: JSON.stringify(aggregate), version, timestamp: new Date(), }, }); }
async getLatestSnapshot(aggregateId: string): Promise<{ snapshot: Order; version: number } | null> { const snapshot = await this.db.snapshot.findFirst({ where: { aggregateId }, orderBy: { version: 'desc' }, });
if (!snapshot) { return null; }
return { snapshot: JSON.parse(snapshot.aggregateData), version: snapshot.version, }; }}
class OrderService { private eventStore: EventStore; private snapshotStore: SnapshotStore;
async getOrder(orderId: string): Promise<Order> { // 最新のスナップショットを取得 const snapshot = await this.snapshotStore.getLatestSnapshot(orderId);
if (snapshot) { // スナップショット以降のイベントのみを読み込む const events = await this.eventStore.getEventsAfter(orderId, snapshot.version); return Order.fromEventsWithSnapshot(snapshot.snapshot, events); } else { // スナップショットがない場合は、すべてのイベントを読み込む const events = await this.eventStore.getEvents(orderId); return Order.fromEvents(events); } }
async saveOrder(order: Order): Promise<void> { const events = order.getUncommittedEvents();
for (const event of events) { await this.eventStore.append(order.id, event); }
// 定期的にスナップショットを保存(例: 100イベントごと) if (events.length % 100 === 0) { await this.snapshotStore.saveSnapshot(order.id, order, order.version); } }}Projection(プロジェクション)
Section titled “Projection(プロジェクション)”読み取り専用ビューの作成
Section titled “読み取り専用ビューの作成”実装例:
// イベントから読み取り専用のビューを作成class OrderViewProjection { private db: PrismaClient;
async handleOrderCreated(event: OrderCreatedEvent): Promise<void> { await this.db.orderView.create({ data: { id: event.aggregateId, userId: event.eventData.userId, items: JSON.stringify(event.eventData.items), totalAmount: event.eventData.totalAmount, status: 'PENDING', createdAt: event.timestamp, }, }); }
async handleOrderStatusUpdated(event: OrderStatusUpdatedEvent): Promise<void> { await this.db.orderView.update({ where: { id: event.aggregateId }, data: { status: event.eventData.status }, }); }
async handleOrderCancelled(event: OrderCancelledEvent): Promise<void> { await this.db.orderView.update({ where: { id: event.aggregateId }, data: { status: 'CANCELLED', cancelledReason: event.eventData.reason, cancelledAt: event.timestamp, }, }); }}
// イベントストリームを処理class EventProcessor { private eventStore: EventStore; private projections: Projection[] = [];
async processEvents(): Promise<void> { const events = await this.eventStore.getUnprocessedEvents();
for (const event of events) { for (const projection of this.projections) { await projection.handle(event); }
await this.eventStore.markAsProcessed(event.eventId); } }}Event Sourcingの適用基準
Section titled “Event Sourcingの適用基準”Event Sourcingを適用すべき場合
Section titled “Event Sourcingを適用すべき場合”条件:
- 完全な監査ログが必要
- 過去の状態を再現する必要がある
- 複雑なビジネスロジックがある
- タイムトラベルが必要
実践例:
// 金融システム: すべての取引を記録class TransactionService { async transferMoney(fromAccountId: string, toAccountId: string, amount: number): Promise<void> { const event: MoneyTransferredEvent = { eventId: generateId(), aggregateId: fromAccountId, eventType: 'MoneyTransferred', eventData: { fromAccountId, toAccountId, amount, timestamp: new Date(), }, timestamp: new Date(), version: await this.getNextVersion(fromAccountId), };
await this.eventStore.append(fromAccountId, event); }
// 過去の残高を再現 async getBalanceAtTime(accountId: string, timestamp: Date): Promise<number> { const events = await this.eventStore.getEventsUntil(accountId, timestamp); return this.replayEvents(events); }}Event Sourcingを適用しないべき場合
Section titled “Event Sourcingを適用しないべき場合”条件:
- シンプルなCRUD操作のみ
- 監査ログが不要
- 過去の状態を再現する必要がない
- パフォーマンスが最重要
実践例:
// シンプルなCRUD操作class SimpleService { async createItem(data: ItemData): Promise<Item> { return await db.item.create({ data }); }
async updateItem(id: string, data: Partial<Item>): Promise<Item> { return await db.item.update({ where: { id }, data }); }}Event Sourcing完全ガイドのポイント:
- なぜ必要か: 完全な履歴、監査、デバッグ、タイムトラベル
- Event Store: イベントを永続化するストレージ
- Aggregate: イベントを発行し、イベントから状態を再構築
- Snapshot: パフォーマンス向上のためのスナップショット
- Projection: 読み取り専用ビューの作成
- 適用基準: 監査ログが必要、過去の状態を再現する必要がある場合
適切なEvent Sourcingの実装により、完全な履歴と監査機能を持つシステムを構築できます。