Skip to content

冪等性と整合性

分散システムでは「1回しか実行されない」は幻想。「何度実行されても最終状態が正しい」ことを保証する設計を詳しく解説します。

分散システムでは、ネットワークエラー、タイムアウト、再起動などにより、同じ処理が複数回実行される可能性があります。

# ❌ 問題のあるコード: 非冪等な処理
async def create_order(order_data: OrderData) -> Order:
# 問題: 再実行時に注文が二重作成される
order = Order(**order_data.dict())
return await order_repo.save(order)

なぜ問題か:

  • 再実行時の二重作成: ネットワークエラーでクライアントが再送すると、注文が2つ作成される
  • データの不整合: 同じ注文が複数存在し、在庫や決済に影響する
# ✅ 良い例: Idempotency Keyによる冪等性の担保
async def create_order(
order_data: OrderData,
idempotency_key: str
) -> Order:
# Idempotency Keyで既存の注文を確認
existing_order = await order_repo.find_by_idempotency_key(idempotency_key)
if existing_order:
# 既に存在する場合は、既存の注文を返す
return existing_order
# 新規作成
order = Order(
**order_data.dict(),
idempotency_key=idempotency_key
)
return await order_repo.save(order)
# リポジトリの実装
async def find_by_idempotency_key(self, key: str) -> Order | None:
return await self.db.query(Order).filter(
Order.idempotency_key == key
).first()
# マイグレーションでIdempotency Keyを追加
# CREATE UNIQUE INDEX idx_orders_idempotency_key ON orders(idempotency_key);

なぜ重要か:

  • 重複防止: 同じIdempotency Keyで再実行しても、同じ結果が返される
  • データの整合性: 注文の重複作成を防止

DB取引中に外部APIを呼ばない(失敗時復旧不可)。

# ❌ 問題のあるコード: トランザクション内で外部APIを呼ぶ
async def create_order(order_data: OrderData) -> Order:
async with db.begin() as tx:
# 1. 注文を作成(DBトランザクション内)
order = Order(**order_data.dict())
tx.add(order)
await tx.commit()
# 2. トランザクション内で外部APIを呼ぶ(問題)
async with httpx.AsyncClient() as client:
response = await client.post(
'https://payment-api.example.com/charge',
json={'order_id': order.id, 'amount': order_data.amount},
)
if response.status_code != 200:
raise PaymentError("Payment failed")
# 3. 決済結果を保存
order.payment_status = "COMPLETED"
await tx.commit()
return order

なぜ問題か:

  • ロールバック不可: 外部APIが成功した後にトランザクションが失敗した場合、外部APIのロールバックが困難
  • データの不整合: 外部APIは成功しているが、DBには注文が存在しない状態になる可能性
# ✅ 良い例: Outboxパターンによる解決
async def create_order(
order_data: OrderData,
idempotency_key: str
) -> Order:
async with db.begin() as tx:
# 1. Idempotency Keyで既存の注文を確認
existing_order = await tx.query(Order).filter(
Order.idempotency_key == idempotency_key
).first()
if existing_order:
await tx.commit()
return existing_order
# 2. トランザクション内で注文を作成
order = Order(
**order_data.dict(),
idempotency_key=idempotency_key
)
tx.add(order)
await tx.flush()
# 3. Outboxテーブルに外部API呼び出しを記録(トランザクション内)
payload = json.dumps({
'order_id': order.id,
'amount': order_data.amount
})
outbox_event = OutboxEvent(
event_type='PAYMENT_CHARGE',
aggregate_id=str(order.id),
payload=payload,
idempotency_key=idempotency_key,
status='PENDING'
)
tx.add(outbox_event)
# 4. トランザクションをコミット(外部APIは呼ばない)
await tx.commit()
return order
# 別プロセスでOutboxを処理
async def process_outbox():
pending_events = await db.query(OutboxEvent).filter(
OutboxEvent.status == 'PENDING'
).limit(10).all()
for event in pending_events:
try:
# 外部APIを呼ぶ(トランザクション外)
payload = json.loads(event.payload)
async with httpx.AsyncClient(timeout=3.0) as client:
response = await client.post(
'https://payment-api.example.com/charge',
headers={'Idempotency-Key': event.idempotency_key},
json=payload,
)
if response.status_code == 200:
event.status = 'COMPLETED'
await db.commit()
else:
event.status = 'FAILED'
event.retry_count += 1
await db.commit()
except Exception as e:
event.status = 'FAILED'
event.retry_count += 1
await db.commit()
# 定期的にOutboxを処理
async def start_outbox_processor():
while True:
await process_outbox()
await asyncio.sleep(5)

なぜ重要か:

  • トランザクションの短縮: DBのロック時間が短縮される
  • 外部障害の分離: 外部APIの障害がトランザクションに影響しない
  • 再実行の容易さ: Outboxテーブルから再実行可能
  • 冪等性の保証: Idempotency Keyにより重複実行を防止

「結果整合性で良いデータ」と「厳密整合性が必要なデータ」を明示的に分類する。

# 厳密整合性が必要なデータ(ACIDトランザクション)
class Order(Base):
__tablename__ = 'orders'
id = Column(Integer, primary_key=True)
amount = Column(Numeric)
status = Column(String) # CREATED, PAID, CANCELLED
# 結果整合性で良いデータ(イベント駆動)
class OrderAnalytics(Base):
__tablename__ = 'order_analytics'
id = Column(Integer, primary_key=True)
order_id = Column(Integer)
total_amount = Column(Numeric) # 集計値(最終的に整合性が取れれば良い)
last_updated = Column(DateTime)

使い分け:

  • 厳密整合性: 注文、決済、在庫など、ビジネス的に重要なデータ
  • 結果整合性: 分析データ、ログ、通知など、最終的に整合性が取れれば良いデータ

冪等性と整合性のポイント:

  • 冪等性の担保: Idempotency Keyで再実行を安全化
  • トランザクション境界: DB取引中に外部APIを呼ばない(Outboxパターンを使用)
  • 再送安全なフロー: 厳密整合性と結果整合性を明示的に分類

これらの原則により、「何度実行されても最終状態が正しい」堅牢なシステムを構築できます。