冪等性と整合性
冪等性と整合性
Section titled “冪等性と整合性”分散システムでは「1回しか実行されない」は幻想。「何度実行されても最終状態が正しい」ことを保証する設計を詳しく解説します。
冪等性の重要性
Section titled “冪等性の重要性”分散システムでは、ネットワークエラー、タイムアウト、再起動などにより、同じ処理が複数回実行される可能性があります。
問題のあるコード
Section titled “問題のあるコード”# ❌ 問題のあるコード: 非冪等な処理async def create_order(order_data: OrderData) -> Order: # 問題: 再実行時に注文が二重作成される order = Order(**order_data.dict()) return await order_repo.save(order)なぜ問題か:
- 再実行時の二重作成: ネットワークエラーでクライアントが再送すると、注文が2つ作成される
- データの不整合: 同じ注文が複数存在し、在庫や決済に影響する
冪等性の担保
Section titled “冪等性の担保”Idempotency Keyの使用
Section titled “Idempotency Keyの使用”# ✅ 良い例: Idempotency Keyによる冪等性の担保async def create_order( order_data: OrderData, idempotency_key: str) -> Order: # Idempotency Keyで既存の注文を確認 existing_order = await order_repo.find_by_idempotency_key(idempotency_key) if existing_order: # 既に存在する場合は、既存の注文を返す return existing_order
# 新規作成 order = Order( **order_data.dict(), idempotency_key=idempotency_key ) return await order_repo.save(order)
# リポジトリの実装async def find_by_idempotency_key(self, key: str) -> Order | None: return await self.db.query(Order).filter( Order.idempotency_key == key ).first()
# マイグレーションでIdempotency Keyを追加# CREATE UNIQUE INDEX idx_orders_idempotency_key ON orders(idempotency_key);なぜ重要か:
- 重複防止: 同じIdempotency Keyで再実行しても、同じ結果が返される
- データの整合性: 注文の重複作成を防止
トランザクション境界
Section titled “トランザクション境界”DB取引中に外部APIを呼ばない(失敗時復旧不可)。
問題のあるコード
Section titled “問題のあるコード”# ❌ 問題のあるコード: トランザクション内で外部APIを呼ぶasync def create_order(order_data: OrderData) -> Order: async with db.begin() as tx: # 1. 注文を作成(DBトランザクション内) order = Order(**order_data.dict()) tx.add(order) await tx.commit()
# 2. トランザクション内で外部APIを呼ぶ(問題) async with httpx.AsyncClient() as client: response = await client.post( 'https://payment-api.example.com/charge', json={'order_id': order.id, 'amount': order_data.amount}, )
if response.status_code != 200: raise PaymentError("Payment failed")
# 3. 決済結果を保存 order.payment_status = "COMPLETED" await tx.commit()
return orderなぜ問題か:
- ロールバック不可: 外部APIが成功した後にトランザクションが失敗した場合、外部APIのロールバックが困難
- データの不整合: 外部APIは成功しているが、DBには注文が存在しない状態になる可能性
Outboxパターンの実装
Section titled “Outboxパターンの実装”# ✅ 良い例: Outboxパターンによる解決async def create_order( order_data: OrderData, idempotency_key: str) -> Order: async with db.begin() as tx: # 1. Idempotency Keyで既存の注文を確認 existing_order = await tx.query(Order).filter( Order.idempotency_key == idempotency_key ).first()
if existing_order: await tx.commit() return existing_order
# 2. トランザクション内で注文を作成 order = Order( **order_data.dict(), idempotency_key=idempotency_key ) tx.add(order) await tx.flush()
# 3. Outboxテーブルに外部API呼び出しを記録(トランザクション内) payload = json.dumps({ 'order_id': order.id, 'amount': order_data.amount })
outbox_event = OutboxEvent( event_type='PAYMENT_CHARGE', aggregate_id=str(order.id), payload=payload, idempotency_key=idempotency_key, status='PENDING' ) tx.add(outbox_event)
# 4. トランザクションをコミット(外部APIは呼ばない) await tx.commit() return order
# 別プロセスでOutboxを処理async def process_outbox(): pending_events = await db.query(OutboxEvent).filter( OutboxEvent.status == 'PENDING' ).limit(10).all()
for event in pending_events: try: # 外部APIを呼ぶ(トランザクション外) payload = json.loads(event.payload)
async with httpx.AsyncClient(timeout=3.0) as client: response = await client.post( 'https://payment-api.example.com/charge', headers={'Idempotency-Key': event.idempotency_key}, json=payload, )
if response.status_code == 200: event.status = 'COMPLETED' await db.commit() else: event.status = 'FAILED' event.retry_count += 1 await db.commit() except Exception as e: event.status = 'FAILED' event.retry_count += 1 await db.commit()
# 定期的にOutboxを処理async def start_outbox_processor(): while True: await process_outbox() await asyncio.sleep(5)なぜ重要か:
- トランザクションの短縮: DBのロック時間が短縮される
- 外部障害の分離: 外部APIの障害がトランザクションに影響しない
- 再実行の容易さ: Outboxテーブルから再実行可能
- 冪等性の保証: Idempotency Keyにより重複実行を防止
再送安全なフロー
Section titled “再送安全なフロー”「結果整合性で良いデータ」と「厳密整合性が必要なデータ」を明示的に分類する。
整合性レベルの分類
Section titled “整合性レベルの分類”# 厳密整合性が必要なデータ(ACIDトランザクション)class Order(Base): __tablename__ = 'orders' id = Column(Integer, primary_key=True) amount = Column(Numeric) status = Column(String) # CREATED, PAID, CANCELLED
# 結果整合性で良いデータ(イベント駆動)class OrderAnalytics(Base): __tablename__ = 'order_analytics' id = Column(Integer, primary_key=True) order_id = Column(Integer) total_amount = Column(Numeric) # 集計値(最終的に整合性が取れれば良い) last_updated = Column(DateTime)使い分け:
- 厳密整合性: 注文、決済、在庫など、ビジネス的に重要なデータ
- 結果整合性: 分析データ、ログ、通知など、最終的に整合性が取れれば良いデータ
冪等性と整合性のポイント:
- 冪等性の担保: Idempotency Keyで再実行を安全化
- トランザクション境界: DB取引中に外部APIを呼ばない(Outboxパターンを使用)
- 再送安全なフロー: 厳密整合性と結果整合性を明示的に分類
これらの原則により、「何度実行されても最終状態が正しい」堅牢なシステムを構築できます。