Skip to content

Event Sourcing完全ガイド

イベントソーシングパターンを詳しく解説します。

問題のある実装:

// ❌ 悪い例: 現在の状態のみを保存
class OrderService {
async createOrder(orderData: OrderData): Promise<Order> {
// 現在の状態のみを保存
return await db.order.create({
data: {
id: generateId(),
userId: orderData.userId,
items: orderData.items,
status: 'PENDING',
totalAmount: calculateTotal(orderData.items),
createdAt: new Date(),
},
});
}
async updateOrderStatus(orderId: string, status: OrderStatus): Promise<Order> {
// 以前の状態が失われる
return await db.order.update({
where: { id: orderId },
data: { status },
});
}
async cancelOrder(orderId: string): Promise<void> {
// キャンセルされた理由が記録されない
await db.order.update({
where: { id: orderId },
data: { status: 'CANCELLED' },
});
}
}

問題点:

  1. 履歴の欠如: 過去の状態が失われる
  2. 監査の困難: 誰がいつ何を変更したかが分からない
  3. デバッグの困難: 問題が発生した原因を追跡できない
  4. タイムトラベル: 過去の状態を再現できない

影響:

  • 監査の困難
  • デバッグの困難
  • データの整合性の確認が困難
  • ビジネスロジックの理解が困難

改善された実装:

// ✅ 良い例: Event Sourcingを使用
class OrderService {
async createOrder(orderData: OrderData): Promise<string> {
const orderId = generateId();
// イベントを保存
const event: OrderCreatedEvent = {
eventId: generateId(),
aggregateId: orderId,
eventType: 'OrderCreated',
eventData: {
userId: orderData.userId,
items: orderData.items,
totalAmount: calculateTotal(orderData.items),
},
timestamp: new Date(),
version: 1,
};
await this.eventStore.append(orderId, event);
return orderId;
}
async updateOrderStatus(orderId: string, status: OrderStatus): Promise<void> {
// イベントを保存
const event: OrderStatusUpdatedEvent = {
eventId: generateId(),
aggregateId: orderId,
eventType: 'OrderStatusUpdated',
eventData: {
status,
previousStatus: await this.getCurrentStatus(orderId),
},
timestamp: new Date(),
version: await this.getNextVersion(orderId),
};
await this.eventStore.append(orderId, event);
}
async cancelOrder(orderId: string, reason: string): Promise<void> {
// キャンセルの理由も記録
const event: OrderCancelledEvent = {
eventId: generateId(),
aggregateId: orderId,
eventType: 'OrderCancelled',
eventData: {
reason,
cancelledBy: await this.getCurrentUser(),
},
timestamp: new Date(),
version: await this.getNextVersion(orderId),
};
await this.eventStore.append(orderId, event);
}
// イベントから現在の状態を再構築
async getOrder(orderId: string): Promise<Order> {
const events = await this.eventStore.getEvents(orderId);
return this.replayEvents(events);
}
// 過去の状態を再現
async getOrderAtTime(orderId: string, timestamp: Date): Promise<Order> {
const events = await this.eventStore.getEventsUntil(orderId, timestamp);
return this.replayEvents(events);
}
private replayEvents(events: Event[]): Order {
let order = new Order();
for (const event of events) {
switch (event.eventType) {
case 'OrderCreated':
order = this.applyOrderCreated(order, event);
break;
case 'OrderStatusUpdated':
order = this.applyOrderStatusUpdated(order, event);
break;
case 'OrderCancelled':
order = this.applyOrderCancelled(order, event);
break;
}
}
return order;
}
}

メリット:

  • 完全な履歴: すべての変更が記録される
  • 監査の容易さ: 誰がいつ何を変更したかが明確
  • デバッグの容易さ: イベントの流れを追跡できる
  • タイムトラベル: 過去の状態を再現できる

定義: イベントを永続化するストレージです。

実装例:

interface Event {
eventId: string;
aggregateId: string;
eventType: string;
eventData: any;
timestamp: Date;
version: number;
}
class EventStore {
private db: PrismaClient;
async append(aggregateId: string, event: Event): Promise<void> {
// 楽観的ロック: バージョンチェック
const lastEvent = await this.db.event.findFirst({
where: { aggregateId },
orderBy: { version: 'desc' },
});
if (lastEvent && lastEvent.version >= event.version) {
throw new Error('Concurrent modification detected');
}
await this.db.event.create({
data: {
eventId: event.eventId,
aggregateId: event.aggregateId,
eventType: event.eventType,
eventData: JSON.stringify(event.eventData),
timestamp: event.timestamp,
version: event.version,
},
});
}
async getEvents(aggregateId: string): Promise<Event[]> {
const events = await this.db.event.findMany({
where: { aggregateId },
orderBy: { version: 'asc' },
});
return events.map(e => ({
eventId: e.eventId,
aggregateId: e.aggregateId,
eventType: e.eventType,
eventData: JSON.parse(e.eventData),
timestamp: e.timestamp,
version: e.version,
}));
}
async getEventsUntil(aggregateId: string, timestamp: Date): Promise<Event[]> {
const events = await this.db.event.findMany({
where: {
aggregateId,
timestamp: { lte: timestamp },
},
orderBy: { version: 'asc' },
});
return events.map(e => ({
eventId: e.eventId,
aggregateId: e.aggregateId,
eventType: e.eventType,
eventData: JSON.parse(e.eventData),
timestamp: e.timestamp,
version: e.version,
}));
}
}

定義: ビジネスロジックの単位です。イベントを発行し、イベントから状態を再構築します。

実装例:

class Order {
private id: string;
private userId: string;
private items: OrderItem[];
private status: OrderStatus;
private totalAmount: number;
private createdAt: Date;
// イベントから状態を再構築
static fromEvents(events: Event[]): Order {
const order = new Order();
for (const event of events) {
order.applyEvent(event);
}
return order;
}
private applyEvent(event: Event): void {
switch (event.eventType) {
case 'OrderCreated':
this.applyOrderCreated(event);
break;
case 'OrderStatusUpdated':
this.applyOrderStatusUpdated(event);
break;
case 'OrderCancelled':
this.applyOrderCancelled(event);
break;
}
}
private applyOrderCreated(event: OrderCreatedEvent): void {
this.id = event.aggregateId;
this.userId = event.eventData.userId;
this.items = event.eventData.items;
this.status = 'PENDING';
this.totalAmount = event.eventData.totalAmount;
this.createdAt = event.timestamp;
}
private applyOrderStatusUpdated(event: OrderStatusUpdatedEvent): void {
this.status = event.eventData.status;
}
private applyOrderCancelled(event: OrderCancelledEvent): void {
this.status = 'CANCELLED';
}
// コマンドを処理してイベントを発行
cancel(reason: string, userId: string): OrderCancelledEvent {
if (this.status === 'CANCELLED') {
throw new Error('Order is already cancelled');
}
return {
eventId: generateId(),
aggregateId: this.id,
eventType: 'OrderCancelled',
eventData: {
reason,
cancelledBy: userId,
},
timestamp: new Date(),
version: this.getNextVersion(),
};
}
private getNextVersion(): number {
// 現在のバージョンを取得(実際の実装ではイベントストアから取得)
return 1;
}
}

なぜスナップショットが必要か

Section titled “なぜスナップショットが必要か”

問題:

// イベントが多すぎる場合、再構築に時間がかかる
async function getOrder(orderId: string): Promise<Order> {
// 1000個のイベントをすべて読み込んで再構築
const events = await eventStore.getEvents(orderId); // 1000個のイベント
return Order.fromEvents(events); // 時間がかかる
}

解決策:

class SnapshotStore {
private db: PrismaClient;
async saveSnapshot(aggregateId: string, aggregate: Order, version: number): Promise<void> {
await this.db.snapshot.create({
data: {
aggregateId,
aggregateData: JSON.stringify(aggregate),
version,
timestamp: new Date(),
},
});
}
async getLatestSnapshot(aggregateId: string): Promise<{ snapshot: Order; version: number } | null> {
const snapshot = await this.db.snapshot.findFirst({
where: { aggregateId },
orderBy: { version: 'desc' },
});
if (!snapshot) {
return null;
}
return {
snapshot: JSON.parse(snapshot.aggregateData),
version: snapshot.version,
};
}
}
class OrderService {
private eventStore: EventStore;
private snapshotStore: SnapshotStore;
async getOrder(orderId: string): Promise<Order> {
// 最新のスナップショットを取得
const snapshot = await this.snapshotStore.getLatestSnapshot(orderId);
if (snapshot) {
// スナップショット以降のイベントのみを読み込む
const events = await this.eventStore.getEventsAfter(orderId, snapshot.version);
return Order.fromEventsWithSnapshot(snapshot.snapshot, events);
} else {
// スナップショットがない場合は、すべてのイベントを読み込む
const events = await this.eventStore.getEvents(orderId);
return Order.fromEvents(events);
}
}
async saveOrder(order: Order): Promise<void> {
const events = order.getUncommittedEvents();
for (const event of events) {
await this.eventStore.append(order.id, event);
}
// 定期的にスナップショットを保存(例: 100イベントごと)
if (events.length % 100 === 0) {
await this.snapshotStore.saveSnapshot(order.id, order, order.version);
}
}
}

実装例:

// イベントから読み取り専用のビューを作成
class OrderViewProjection {
private db: PrismaClient;
async handleOrderCreated(event: OrderCreatedEvent): Promise<void> {
await this.db.orderView.create({
data: {
id: event.aggregateId,
userId: event.eventData.userId,
items: JSON.stringify(event.eventData.items),
totalAmount: event.eventData.totalAmount,
status: 'PENDING',
createdAt: event.timestamp,
},
});
}
async handleOrderStatusUpdated(event: OrderStatusUpdatedEvent): Promise<void> {
await this.db.orderView.update({
where: { id: event.aggregateId },
data: { status: event.eventData.status },
});
}
async handleOrderCancelled(event: OrderCancelledEvent): Promise<void> {
await this.db.orderView.update({
where: { id: event.aggregateId },
data: {
status: 'CANCELLED',
cancelledReason: event.eventData.reason,
cancelledAt: event.timestamp,
},
});
}
}
// イベントストリームを処理
class EventProcessor {
private eventStore: EventStore;
private projections: Projection[] = [];
async processEvents(): Promise<void> {
const events = await this.eventStore.getUnprocessedEvents();
for (const event of events) {
for (const projection of this.projections) {
await projection.handle(event);
}
await this.eventStore.markAsProcessed(event.eventId);
}
}
}

条件:

  • 完全な監査ログが必要
  • 過去の状態を再現する必要がある
  • 複雑なビジネスロジックがある
  • タイムトラベルが必要

実践例:

// 金融システム: すべての取引を記録
class TransactionService {
async transferMoney(fromAccountId: string, toAccountId: string, amount: number): Promise<void> {
const event: MoneyTransferredEvent = {
eventId: generateId(),
aggregateId: fromAccountId,
eventType: 'MoneyTransferred',
eventData: {
fromAccountId,
toAccountId,
amount,
timestamp: new Date(),
},
timestamp: new Date(),
version: await this.getNextVersion(fromAccountId),
};
await this.eventStore.append(fromAccountId, event);
}
// 過去の残高を再現
async getBalanceAtTime(accountId: string, timestamp: Date): Promise<number> {
const events = await this.eventStore.getEventsUntil(accountId, timestamp);
return this.replayEvents(events);
}
}

Event Sourcingを適用しないべき場合

Section titled “Event Sourcingを適用しないべき場合”

条件:

  • シンプルなCRUD操作のみ
  • 監査ログが不要
  • 過去の状態を再現する必要がない
  • パフォーマンスが最重要

実践例:

// シンプルなCRUD操作
class SimpleService {
async createItem(data: ItemData): Promise<Item> {
return await db.item.create({ data });
}
async updateItem(id: string, data: Partial<Item>): Promise<Item> {
return await db.item.update({ where: { id }, data });
}
}

Event Sourcing完全ガイドのポイント:

  • なぜ必要か: 完全な履歴、監査、デバッグ、タイムトラベル
  • Event Store: イベントを永続化するストレージ
  • Aggregate: イベントを発行し、イベントから状態を再構築
  • Snapshot: パフォーマンス向上のためのスナップショット
  • Projection: 読み取り専用ビューの作成
  • 適用基準: 監査ログが必要、過去の状態を再現する必要がある場合

適切なEvent Sourcingの実装により、完全な履歴と監査機能を持つシステムを構築できます。