FastAPI認証・認可完全ガイド
FastAPI認証・認可完全ガイド
Section titled “FastAPI認証・認可完全ガイド”FastAPIでの認証と認可の実装を、実務で使える実装例とともに詳しく解説します。
1. 認証・認可とは
Section titled “1. 認証・認可とは”認証と認可の違い
Section titled “認証と認可の違い”- 認証(Authentication): ユーザーが誰であるかを確認する
- 認可(Authorization): ユーザーが特定のリソースにアクセスできるかを確認する
認証・認可の流れ ├─ ログイン(認証) ├─ トークン発行 ├─ リクエストにトークンを含める └─ トークンの検証(認可)なぜ認証・認可が必要か
Section titled “なぜ認証・認可が必要か”問題のある構成(認証なし):
# 問題: 誰でもアクセス可能@app.get("/users/me")def get_current_user(): return {"user": "anonymous"}
# 問題点:# 1. 機密情報が漏洩する可能性# 2. 不正アクセスのリスク# 3. ユーザー識別ができない解決: 認証・認可による保護
# 解決: 認証が必要@app.get("/users/me")def get_current_user(current_user: str = Depends(get_current_user)): return {"user": current_user}2. OAuth2とJWT
Section titled “2. OAuth2とJWT”OAuth2の基本設定
Section titled “OAuth2の基本設定”from fastapi.security import OAuth2PasswordBearer
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")JWTトークンの生成
Section titled “JWTトークンの生成”from datetime import datetime, timedeltafrom jose import jwtfrom typing import Optional
SECRET_KEY = "your-secret-key"ALGORITHM = "HS256"ACCESS_TOKEN_EXPIRE_MINUTES = 30
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None): to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=15) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwtJWTトークンの検証
Section titled “JWTトークンの検証”from jose import JWTError, jwtfrom fastapi import Depends, HTTPException, statusfrom fastapi.security import OAuth2PasswordBearer
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
def get_current_user(token: str = Depends(oauth2_scheme)): credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) username: str = payload.get("sub") if username is None: raise credentials_exception except JWTError: raise credentials_exception return username3. パスワードのハッシュ化
Section titled “3. パスワードのハッシュ化”bcryptを使用したハッシュ化
Section titled “bcryptを使用したハッシュ化”from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def verify_password(plain_password: str, hashed_password: str) -> bool: return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str: return pwd_context.hash(password)4. ログインエンドポイント
Section titled “4. ログインエンドポイント”基本的なログイン
Section titled “基本的なログイン”from fastapi import APIRouter, Depends, HTTPException, statusfrom fastapi.security import OAuth2PasswordRequestFormfrom sqlalchemy.orm import Sessionfrom datetime import timedelta
from app.core import securityfrom app.core.config import settingsfrom app.database import get_dbfrom app import models, schemas
router = APIRouter()
@router.post("/token", response_model=schemas.Token)def login_for_access_token( form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)): user = authenticate_user(db, form_data.username, form_data.password) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES) access_token = security.create_access_token( data={"sub": user.username}, expires_delta=access_token_expires ) return {"access_token": access_token, "token_type": "bearer"}
def authenticate_user(db: Session, username: str, password: str): user = db.query(models.User).filter(models.User.username == username).first() if not user: return False if not security.verify_password(password, user.hashed_password): return False return user5. 認証が必要なエンドポイント
Section titled “5. 認証が必要なエンドポイント”基本的な実装
Section titled “基本的な実装”from fastapi import APIRouter, Depends, HTTPExceptionfrom sqlalchemy.orm import Session
from app.core.security import get_current_userfrom app.database import get_dbfrom app import models, schemas
router = APIRouter()
@router.get("/users/me", response_model=schemas.User)def read_users_me( current_user: models.User = Depends(get_current_user), db: Session = Depends(get_db)): return current_user
@router.get("/users/{user_id}", response_model=schemas.User)def read_user( user_id: int, current_user: models.User = Depends(get_current_user), db: Session = Depends(get_db)): user = db.query(models.User).filter(models.User.id == user_id).first() if user is None: raise HTTPException(status_code=404, detail="User not found") return user6. 権限ベースの認可
Section titled “6. 権限ベースの認可”ロールベースの認可
Section titled “ロールベースの認可”from fastapi import Depends, HTTPException, statusfrom app import models
def get_current_active_user( current_user: models.User = Depends(get_current_user)) -> models.User: if not current_user.is_active: raise HTTPException(status_code=400, detail="Inactive user") return current_user
def get_current_admin_user( current_user: models.User = Depends(get_current_active_user)) -> models.User: if current_user.role != "admin": raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions" ) return current_userエンドポイントでの使用
Section titled “エンドポイントでの使用”from fastapi import APIRouter, Dependsfrom app.core.security import get_current_admin_userfrom app import models
router = APIRouter()
@router.get("/admin/users/")def get_all_users( admin_user: models.User = Depends(get_current_admin_user)): # 管理者のみアクセス可能 return {"users": []}7. CORSの設定
Section titled “7. CORSの設定”CORSミドルウェア
Section titled “CORSミドルウェア”from fastapi.middleware.cors import CORSMiddleware
app = FastAPI()
app.add_middleware( CORSMiddleware, allow_origins=["https://your-frontend.com", "http://localhost:3000"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"],)8. セキュリティヘッダー
Section titled “8. セキュリティヘッダー”セキュリティヘッダーの追加
Section titled “セキュリティヘッダーの追加”from fastapi import Request
@app.middleware("http")async def add_security_headers(request: Request, call_next): response = await call_next(request) response.headers["X-Content-Type-Options"] = "nosniff" response.headers["X-Frame-Options"] = "DENY" response.headers["X-XSS-Protection"] = "1; mode=block" response.headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains" return response9. 実務でのベストプラクティス
Section titled “9. 実務でのベストプラクティス”パターン1: リフレッシュトークン
Section titled “パターン1: リフレッシュトークン”def create_refresh_token(data: dict): to_encode = data.copy() expire = datetime.utcnow() + timedelta(days=7) to_encode.update({"exp": expire, "type": "refresh"}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt
# app/api/v1/endpoints/auth.py@router.post("/token/refresh")def refresh_access_token( refresh_token: str, db: Session = Depends(get_db)): try: payload = jwt.decode(refresh_token, SECRET_KEY, algorithms=[ALGORITHM]) if payload.get("type") != "refresh": raise HTTPException(status_code=401, detail="Invalid token type") username: str = payload.get("sub") if username is None: raise HTTPException(status_code=401, detail="Invalid token") except JWTError: raise HTTPException(status_code=401, detail="Invalid token")
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES) access_token = security.create_access_token( data={"sub": username}, expires_delta=access_token_expires ) return {"access_token": access_token, "token_type": "bearer"}パターン2: APIキー認証
Section titled “パターン2: APIキー認証”from fastapi import Security, HTTPException, statusfrom fastapi.security import APIKeyHeader
api_key_header = APIKeyHeader(name="X-API-Key")
def get_api_key(api_key: str = Security(api_key_header)): if api_key != "your-api-key": raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Invalid API Key" ) return api_key
# app/api/v1/endpoints/items.py@router.get("/items/")def get_items(api_key: str = Depends(get_api_key)): return {"items": []}10. よくある問題と解決策
Section titled “10. よくある問題と解決策”問題1: トークンが無効
Section titled “問題1: トークンが無効”原因:
- トークンの有効期限が切れている
- トークンの署名が間違っている
解決策:
# トークンの有効期限を確認def get_current_user(token: str = Depends(oauth2_scheme)): try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) # 有効期限の確認 exp = payload.get("exp") if exp and datetime.utcnow().timestamp() > exp: raise HTTPException(status_code=401, detail="Token expired") username: str = payload.get("sub") if username is None: raise HTTPException(status_code=401, detail="Invalid token") return username except JWTError: raise HTTPException(status_code=401, detail="Invalid token")問題2: CORSエラー
Section titled “問題2: CORSエラー”原因:
- 許可されていないオリジンからのリクエスト
- CORS設定が正しくない
解決策:
# 開発環境と本番環境で設定を分けるfrom app.core.config import settings
app.add_middleware( CORSMiddleware, allow_origins=settings.CORS_ORIGINS, allow_credentials=True, allow_methods=["*"], allow_headers=["*"],)これで、FastAPIの認証・認可の基礎知識と実務での使い方を理解できるようになりました。