Skip to content

復旧設計とフォールバック戦略

復旧設計とフォールバック戦略

Section titled “復旧設計とフォールバック戦略”

障害から自動/手動で安全に戻れる設計を詳しく解説します。

外部依存が落ちたらキャッシュ・スタブで縮退運転する。

# ✅ 良い例: キャッシュによるフォールバック
import redis
import httpx
redis_client = redis.Redis(host='localhost', port=6379, db=0)
async def get_product(product_id: int) -> Product:
# 1. キャッシュから取得を試みる
cached = redis_client.get(f"product:{product_id}")
if cached:
return Product.parse_raw(cached)
# 2. データベースから取得を試みる
try:
product = await product_repo.find_by_id(product_id)
if not product:
raise ProductNotFoundError(product_id)
# キャッシュに保存(TTL: 1時間)
redis_client.setex(
f"product:{product_id}",
3600,
product.json()
)
return product
except Exception as e:
# 3. データベースが落ちている場合、外部APIから取得を試みる
try:
async with httpx.AsyncClient(timeout=3.0) as client:
response = await client.get(
f"https://external-api.example.com/products/{product_id}"
)
product = Product.parse_raw(response.text)
# キャッシュに保存(次回はキャッシュから取得可能)
redis_client.setex(
f"product:{product_id}",
3600,
product.json()
)
return product
except Exception as api_error:
# 4. すべて失敗した場合、スタブデータを返す
logger.warning(
"All data sources failed, returning stub data",
product_id=product_id,
db_error=str(e),
api_error=str(api_error),
)
return create_stub_product(product_id)
def create_stub_product(product_id: int) -> Product:
# スタブデータ(最低限の情報のみ)
return Product(
id=product_id,
name="Product information temporarily unavailable",
price=0,
)

なぜ重要か:

  • 可用性の向上: 外部依存が落ちても、最低限のサービスを提供可能
  • ユーザー体験: 完全なエラーではなく、スタブデータを返すことでユーザー体験を維持

再起動時に中途状態をリカバリ可能にする(例:処理キューの再読込)。

# ✅ 良い例: 再起動時にOutboxを再処理
async def recover_pending_events():
# アプリケーション起動時に、PENDING状態のイベントを再処理
pending_events = await db.query(OutboxEvent).filter(
OutboxEvent.status == 'PENDING'
).all()
logger.info(
"Recovering pending outbox events",
count=len(pending_events)
)
for event in pending_events:
# リトライ回数が上限を超えていない場合のみ再処理
if event.retry_count < 3:
await process_outbox_event(event)
else:
logger.warning(
"Outbox event exceeded retry limit",
event_id=event.id,
retry_count=event.retry_count,
)
# 手動対応が必要な状態としてマーク
event.status = 'MANUAL_REVIEW_REQUIRED'
await db.commit()
# アプリケーション起動時に実行
@app.on_event("startup")
async def startup_event():
await recover_pending_events()

なぜ重要か:

  • データの整合性: 再起動時に中途状態のデータをリカバリ可能
  • 自動復旧: 手動介入なしで自動的に復旧可能

Exponential Backoff とJitterでセルフDDoSを防止する。

# ✅ 良い例: Exponential Backoff + Jitter
import asyncio
import random
async def retry_with_backoff(fn, max_retries=3):
for i in range(max_retries):
try:
return await fn()
except Exception as e:
if i == max_retries - 1:
raise e
# Exponential Backoff + Jitter
base_delay = 2 ** i * 1000 # 1s, 2s, 4s
jitter = random.randint(0, 1000) # 0-1s
delay = (base_delay + jitter) / 1000
await asyncio.sleep(delay)
async def charge_payment(order_id: int, amount: float):
async def _charge():
async with httpx.AsyncClient(timeout=3.0) as client:
response = await client.post(
'https://payment-api.example.com/charge',
json={'order_id': order_id, 'amount': amount},
)
if response.status_code != 200:
raise PaymentError("Payment failed")
return response.json()
return await retry_with_backoff(_charge)

バックオフの計算:

リトライ1: 1000ms + random(0-1000ms) = 1000-2000ms
リトライ2: 2000ms + random(0-2000ms) = 2000-4000ms
リトライ3: 4000ms + random(0-4000ms) = 4000-8000ms

なぜ重要か:

  • セルフDDoS防止: すべてのクライアントが同時にリトライしないよう、Jitterでランダム化
  • サーバー負荷の軽減: Exponential Backoffでサーバーへの負荷を段階的に軽減

フラグ切替・一時停止がコード修正なしで可能な設計。

# ✅ 良い例: 機能フラグによる制御
from sqlalchemy import Column, String, Boolean
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class FeatureFlag(Base):
__tablename__ = 'feature_flags'
key = Column(String, primary_key=True)
enabled = Column(Boolean, default=False)
description = Column(String)
async def is_feature_enabled(key: str) -> bool:
flag = await db.query(FeatureFlag).filter(
FeatureFlag.key == key
).first()
return flag.enabled if flag else False
@app.post("/orders")
async def create_order(order_data: OrderData):
order = await order_repo.save(order_data)
# 機能フラグで外部API呼び出しを制御
if await is_feature_enabled('payment-api.enabled'):
try:
await payment_service.charge_payment(order.id, order_data.amount)
except Exception as e:
# 外部APIが無効化されている場合、エラーをログに記録するが処理は続行
logger.warning(
"Payment API disabled, skipping payment",
order_id=order.id,
error=str(e),
)
else:
logger.info("Payment API disabled by feature flag")
return order
# 管理画面で機能フラグを切り替え可能
@app.post("/admin/feature-flags/{key}/enable")
async def enable_feature_flag(key: str):
flag = await db.query(FeatureFlag).filter(
FeatureFlag.key == key
).first()
if flag:
flag.enabled = True
await db.commit()
return {"message": "Feature flag enabled"}
return {"error": "Feature flag not found"}, 404
@app.post("/admin/feature-flags/{key}/disable")
async def disable_feature_flag(key: str):
flag = await db.query(FeatureFlag).filter(
FeatureFlag.key == key
).first()
if flag:
flag.enabled = False
await db.commit()
return {"message": "Feature flag disabled"}
return {"error": "Feature flag not found"}, 404

なぜ重要か:

  • 迅速な対応: コード修正なしで、機能を一時的に無効化可能
  • リスクの軽減: 問題が発生した場合、すぐに機能を無効化して影響を最小化

復旧設計とフォールバック戦略のポイント:

  • Graceful Degradation: 外部依存が落ちたらキャッシュ・スタブで縮退運転
  • 再起動安全性: 再起動時に中途状態をリカバリ可能にする
  • バックオフ戦略: Exponential Backoff + JitterでセルフDDoSを防止
  • 手動オペ対応: フラグ切替・一時停止がコード修正なしで可能な設計

これらの設計により、障害から自動/手動で安全に戻れます。