Skip to content

FastAPI認証・認可完全ガイド

FastAPIでの認証と認可の実装を、実務で使える実装例とともに詳しく解説します。

  • 認証(Authentication): ユーザーが誰であるかを確認する
  • 認可(Authorization): ユーザーが特定のリソースにアクセスできるかを確認する
認証・認可の流れ
├─ ログイン(認証)
├─ トークン発行
├─ リクエストにトークンを含める
└─ トークンの検証(認可)

問題のある構成(認証なし):

# 問題: 誰でもアクセス可能
@app.get("/users/me")
def get_current_user():
return {"user": "anonymous"}
# 問題点:
# 1. 機密情報が漏洩する可能性
# 2. 不正アクセスのリスク
# 3. ユーザー識別ができない

解決: 認証・認可による保護

# 解決: 認証が必要
@app.get("/users/me")
def get_current_user(current_user: str = Depends(get_current_user)):
return {"user": current_user}
app/core/security.py
from fastapi.security import OAuth2PasswordBearer
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
app/core/security.py
from datetime import datetime, timedelta
from jose import jwt
from typing import Optional
SECRET_KEY = "your-secret-key"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
app/core/security.py
from jose import JWTError, jwt
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
except JWTError:
raise credentials_exception
return username
app/core/security.py
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
return pwd_context.hash(password)
app/api/v1/endpoints/auth.py
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from datetime import timedelta
from app.core import security
from app.core.config import settings
from app.database import get_db
from app import models, schemas
router = APIRouter()
@router.post("/token", response_model=schemas.Token)
def login_for_access_token(
form_data: OAuth2PasswordRequestForm = Depends(),
db: Session = Depends(get_db)
):
user = authenticate_user(db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = security.create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
def authenticate_user(db: Session, username: str, password: str):
user = db.query(models.User).filter(models.User.username == username).first()
if not user:
return False
if not security.verify_password(password, user.hashed_password):
return False
return user

5. 認証が必要なエンドポイント

Section titled “5. 認証が必要なエンドポイント”
app/api/v1/endpoints/users.py
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from app.core.security import get_current_user
from app.database import get_db
from app import models, schemas
router = APIRouter()
@router.get("/users/me", response_model=schemas.User)
def read_users_me(
current_user: models.User = Depends(get_current_user),
db: Session = Depends(get_db)
):
return current_user
@router.get("/users/{user_id}", response_model=schemas.User)
def read_user(
user_id: int,
current_user: models.User = Depends(get_current_user),
db: Session = Depends(get_db)
):
user = db.query(models.User).filter(models.User.id == user_id).first()
if user is None:
raise HTTPException(status_code=404, detail="User not found")
return user
app/core/security.py
from fastapi import Depends, HTTPException, status
from app import models
def get_current_active_user(
current_user: models.User = Depends(get_current_user)
) -> models.User:
if not current_user.is_active:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
def get_current_admin_user(
current_user: models.User = Depends(get_current_active_user)
) -> models.User:
if current_user.role != "admin":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough permissions"
)
return current_user
app/api/v1/endpoints/admin.py
from fastapi import APIRouter, Depends
from app.core.security import get_current_admin_user
from app import models
router = APIRouter()
@router.get("/admin/users/")
def get_all_users(
admin_user: models.User = Depends(get_current_admin_user)
):
# 管理者のみアクセス可能
return {"users": []}
app/main.py
from fastapi.middleware.cors import CORSMiddleware
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["https://your-frontend.com", "http://localhost:3000"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app/main.py
from fastapi import Request
@app.middleware("http")
async def add_security_headers(request: Request, call_next):
response = await call_next(request)
response.headers["X-Content-Type-Options"] = "nosniff"
response.headers["X-Frame-Options"] = "DENY"
response.headers["X-XSS-Protection"] = "1; mode=block"
response.headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains"
return response

9. 実務でのベストプラクティス

Section titled “9. 実務でのベストプラクティス”

パターン1: リフレッシュトークン

Section titled “パターン1: リフレッシュトークン”
app/core/security.py
def create_refresh_token(data: dict):
to_encode = data.copy()
expire = datetime.utcnow() + timedelta(days=7)
to_encode.update({"exp": expire, "type": "refresh"})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
# app/api/v1/endpoints/auth.py
@router.post("/token/refresh")
def refresh_access_token(
refresh_token: str,
db: Session = Depends(get_db)
):
try:
payload = jwt.decode(refresh_token, SECRET_KEY, algorithms=[ALGORITHM])
if payload.get("type") != "refresh":
raise HTTPException(status_code=401, detail="Invalid token type")
username: str = payload.get("sub")
if username is None:
raise HTTPException(status_code=401, detail="Invalid token")
except JWTError:
raise HTTPException(status_code=401, detail="Invalid token")
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = security.create_access_token(
data={"sub": username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
app/core/security.py
from fastapi import Security, HTTPException, status
from fastapi.security import APIKeyHeader
api_key_header = APIKeyHeader(name="X-API-Key")
def get_api_key(api_key: str = Security(api_key_header)):
if api_key != "your-api-key":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Invalid API Key"
)
return api_key
# app/api/v1/endpoints/items.py
@router.get("/items/")
def get_items(api_key: str = Depends(get_api_key)):
return {"items": []}

原因:

  • トークンの有効期限が切れている
  • トークンの署名が間違っている

解決策:

# トークンの有効期限を確認
def get_current_user(token: str = Depends(oauth2_scheme)):
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
# 有効期限の確認
exp = payload.get("exp")
if exp and datetime.utcnow().timestamp() > exp:
raise HTTPException(status_code=401, detail="Token expired")
username: str = payload.get("sub")
if username is None:
raise HTTPException(status_code=401, detail="Invalid token")
return username
except JWTError:
raise HTTPException(status_code=401, detail="Invalid token")

原因:

  • 許可されていないオリジンからのリクエスト
  • CORS設定が正しくない

解決策:

# 開発環境と本番環境で設定を分ける
from app.core.config import settings
app.add_middleware(
CORSMiddleware,
allow_origins=settings.CORS_ORIGINS,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)

これで、FastAPIの認証・認可の基礎知識と実務での使い方を理解できるようになりました。