Skip to content

観測可能性の設計

障害は「再現」ではなく「観測」で解決する。観測可能性を高める設計を詳しく解説します。

各処理にトレースID(request_id)を通し、入出力境界ごとにJSON構造化ログを吐く。

# ✅ 良い例: 構造化ログとトレースID
import uuid
import structlog
from fastapi import Request
logger = structlog.get_logger()
@app.middleware("http")
async def add_trace_id(request: Request, call_next):
# トレースIDを生成(またはリクエストヘッダーから取得)
trace_id = request.headers.get("X-Trace-Id") or str(uuid.uuid4())
request.state.trace_id = trace_id
# すべてのログにトレースIDを自動的に含める
structlog.contextvars.clear_contextvars()
structlog.contextvars.bind_contextvars(trace_id=trace_id, user_id=get_user_id(request))
response = await call_next(request)
response.headers["X-Trace-Id"] = trace_id
return response
@app.post("/orders")
async def create_order(order_data: OrderData, request: Request):
trace_id = request.state.trace_id
# 入出力境界でログを出力
logger.info("Order creation started",
order_data=order_data.dict(),
timestamp=datetime.now().isoformat(),
)
order = await create_order(order_data, trace_id)
logger.info("Order creation completed",
order_id=order.id,
status=order.status,
duration=(datetime.now() - start_time).total_seconds(),
)
return order

ログの出力例:

{
"timestamp": "2024-01-01T10:00:00.000Z",
"level": "info",
"trace_id": "abc123",
"user_id": "user456",
"order_data": {
"amount": 10000,
"items": [...]
},
"message": "Order creation started"
}

なぜ重要か:

  • トレーサビリティ: トレースIDでリクエスト全体を追跡可能
  • 構造化ログ: JSON形式で検索・分析が容易
  • コンテキスト: ユーザーIDなどのコンテキストを自動的に含める

成功率・レイテンシ・リトライ回数・プール使用率を定常監視する。

# ✅ 良い例: Prometheusを使用したメトリクス収集
from prometheus_client import Counter, Histogram, Gauge, generate_latest
# カウンター: 注文作成回数
order_creation_counter = Counter(
'orders_created_total',
'Total number of orders created',
['status']
)
# ヒストグラム: 注文作成のレイテンシ
order_creation_duration = Histogram(
'orders_creation_duration_seconds',
'Order creation duration in seconds',
buckets=[0.1, 0.5, 1, 2, 5]
)
# ゲージ: 接続プールの使用率
connection_pool_gauge = Gauge(
'db_connection_pool_active',
'Active database connections'
)
@app.post("/orders")
async def create_order(order_data: OrderData, request: Request):
with order_creation_duration.time():
try:
order = await create_order(order_data, request.state.trace_id)
# 成功時のメトリクス
order_creation_counter.labels(status='success').inc()
return order
except Exception as e:
# 失敗時のメトリクス
order_creation_counter.labels(status='failure').inc()
raise
# メトリクスエンドポイント
@app.get("/metrics")
async def metrics():
return Response(generate_latest(), media_type="text/plain")

監視すべきメトリクス:

  • 成功率: orders_created_total{status="success"} / orders_created_total{status="failure"}
  • レイテンシ: orders_creation_duration_seconds (p50, p95, p99)
  • リトライ回数: orders_retry_total
  • プール使用率: db_connection_pool_active / db_connection_pool_max

外部API・DBアクセス単位にSpanを埋め、分散トレース可能にする。

# ✅ 良い例: OpenTelemetryを使用した分散トレース
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import ConsoleSpanExporter, BatchSpanProcessor
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer(__name__)
@app.post("/orders")
async def create_order(order_data: OrderData, request: Request):
# 親Spanを作成
with tracer.start_as_current_span("create-order") as span:
span.set_attribute("order.amount", str(order_data.amount))
span.set_attribute("trace.id", request.state.trace_id)
# DB操作のSpan
with tracer.start_as_current_span("db.save-order") as db_span:
order = await order_repo.save(order_data)
db_span.set_attribute("order.id", order.id)
return order

トレースの出力例:

Trace: abc123
├─ Span: create-order (duration: 150ms)
│ ├─ Span: db.save-order (duration: 50ms)
│ └─ Span: payment-api.charge (duration: 100ms)

なぜ重要か:

  • 分散トレース: 複数サービス間のリクエストフローを追跡可能
  • ボトルネック特定: どの処理が遅いかを特定可能
  • エラー追跡: エラーが発生した箇所を特定可能

観測可能性の設計のポイント:

  • ログ: トレースIDを通し、構造化ログを出力
  • メトリクス: 成功率・レイテンシ・リトライ回数・プール使用率を監視
  • トレース: 外部API・DBアクセス単位にSpanを埋め、分散トレース可能にする

これらの設計により、障害を「再現」ではなく「観測」で解決できます。