SSE(Server-Sent Events)通信
SSE(Server-Sent Events)通信
Section titled “SSE(Server-Sent Events)通信”SSE(Server-Sent Events)は、サーバーからクライアントへの一方向のリアルタイム通信を実現するプロトコルです。Next.jsでは、Route HandlerでSSEストリームを実装できます。
なぜSSEが必要なのか
Section titled “なぜSSEが必要なのか”SSEとWebSocketの違い
Section titled “SSEとWebSocketの違い”SSEの特徴:
- 一方向通信: サーバーからクライアントへのみ
- HTTPベース: HTTPプロトコルを使用
- 自動再接続: ブラウザが自動的に再接続
- シンプル: 実装が簡単
- サーバーレス対応: Next.jsのRoute Handlerで実装可能
WebSocketの特徴:
- 双方向通信: クライアントとサーバーの双方向
- 独自プロトコル: WebSocketプロトコルを使用
- 手動再接続: 手動で再接続ロジックを実装
- 複雑: 実装が複雑
- カスタムサーバー必要: Next.jsではカスタムサーバーが必要
使い分け:
- SSE: サーバーからクライアントへのプッシュのみ必要な場合(通知、ログストリーム、リアルタイム更新など)
- WebSocket: 双方向通信が必要な場合(チャット、ゲームなど)
Next.jsでのSSE実装
Section titled “Next.jsでのSSE実装”1. Route HandlerでのSSE実装
Section titled “1. Route HandlerでのSSE実装”基本的なSSE Route Handler:
import { NextRequest } from 'next/server';
export async function GET(request: NextRequest) { // SSEストリームを作成 const stream = new ReadableStream({ async start(controller) { const encoder = new TextEncoder();
// 接続を維持するためのハートビート const heartbeatInterval = setInterval(() => { try { controller.enqueue(encoder.encode(': heartbeat\n\n')); } catch (error) { clearInterval(heartbeatInterval); } }, 30000); // 30秒ごと
// 通知を送信する関数 const sendNotification = (data: any) => { try { const message = `data: ${JSON.stringify(data)}\n\n`; controller.enqueue(encoder.encode(message)); } catch (error) { console.error('Failed to send notification:', error); } };
// クライアントが切断した場合の処理 request.signal.addEventListener('abort', () => { clearInterval(heartbeatInterval); controller.close(); });
// 初期メッセージを送信 sendNotification({ type: 'connected', message: 'SSE connection established', });
// 通知をリッスン(実際の実装では、イベントエミッターやメッセージキューを使用) // 例: eventEmitter.on('notification', sendNotification); }, });
return new Response(stream, { headers: { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache, no-transform', 'Connection': 'keep-alive', 'X-Accel-Buffering': 'no', // Nginxのバッファリングを無効化 }, });}2. イベントエミッターを使用した実装
Section titled “2. イベントエミッターを使用した実装”イベントエミッターの実装:
import { EventEmitter } from 'events';
class NotificationEmitter extends EventEmitter {}export const notificationEmitter = new NotificationEmitter();
// 通知を送信する関数export function sendNotification(notification: { id: string; type: string; message: string; userId?: string;}) { notificationEmitter.emit('notification', notification);}SSE Route Handlerの実装:
import { NextRequest } from 'next/server';import { notificationEmitter } from '@/lib/eventEmitter';
export async function GET(request: NextRequest) { // 認証トークンを取得 const authHeader = request.headers.get('authorization'); const token = authHeader?.replace('Bearer ', '');
if (!token || !isValidToken(token)) { return new Response('Unauthorized', { status: 401 }); }
const userId = getUserIdFromToken(token);
const stream = new ReadableStream({ async start(controller) { const encoder = new TextEncoder(); let isClosed = false;
// 通知を送信する関数 const sendNotification = (notification: any) => { // ユーザーIDが指定されている場合、該当ユーザーのみに送信 if (notification.userId && notification.userId !== userId) { return; }
if (!isClosed) { try { const message = `data: ${JSON.stringify(notification)}\n\n`; controller.enqueue(encoder.encode(message)); } catch (error) { console.error('Failed to send notification:', error); } } };
// イベントリスナーを登録 notificationEmitter.on('notification', sendNotification);
// 接続確立メッセージを送信 sendNotification({ type: 'connected', message: 'SSE connection established', timestamp: Date.now(), });
// クライアントが切断した場合の処理 request.signal.addEventListener('abort', () => { isClosed = true; notificationEmitter.off('notification', sendNotification); controller.close(); }); }, });
return new Response(stream, { headers: { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache, no-transform', 'Connection': 'keep-alive', 'X-Accel-Buffering': 'no', }, });}3. クライアント側の実装
Section titled “3. クライアント側の実装”Reactフックの実装:
import { useEffect, useState, useRef, useCallback } from 'react';
interface UseSSEOptions { url: string; token?: string; onMessage?: (data: any) => void; onError?: (error: Event) => void;}
export function useSSE<T = any>(options: UseSSEOptions) { const { url, token, onMessage, onError } = options; const [isConnected, setIsConnected] = useState(false); const [lastMessage, setLastMessage] = useState<T | null>(null); const eventSourceRef = useRef<EventSource | null>(null);
const connect = useCallback(() => { try { // トークンがある場合、URLに含める const authenticatedUrl = token ? `${url}?token=${encodeURIComponent(token)}` : url;
const eventSource = new EventSource(authenticatedUrl, { withCredentials: true, });
eventSourceRef.current = eventSource;
eventSource.onopen = () => { setIsConnected(true); };
eventSource.onmessage = (event) => { try { const data = JSON.parse(event.data) as T; setLastMessage(data); onMessage?.(data); } catch (error) { console.error('Failed to parse SSE message:', error); } };
eventSource.onerror = (error) => { console.error('SSE error:', error); setIsConnected(false); onError?.(error); }; } catch (error) { console.error('Failed to create EventSource:', error); } }, [url, token, onMessage, onError]);
const disconnect = useCallback(() => { if (eventSourceRef.current) { eventSourceRef.current.close(); eventSourceRef.current = null; setIsConnected(false); } }, []);
useEffect(() => { connect();
return () => { disconnect(); }; }, [connect, disconnect]);
return { isConnected, lastMessage, reconnect: connect, disconnect, };}SSEフックの使用例:
'use client';
import { useState, useEffect } from 'react';import { useSSE } from '@/hooks/useSSE';import { useAuth } from '@/hooks/useAuth';
interface Notification { id: string; type: 'info' | 'success' | 'warning' | 'error'; message: string; timestamp: number;}
export function RealtimeNotifications() { const { token } = useAuth(); const [notifications, setNotifications] = useState<Notification[]>([]);
const { isConnected, lastMessage } = useSSE<Notification>({ url: '/api/notifications/stream', token, onMessage: (data) => { // 接続確立メッセージは無視 if (data.type === 'connected') { return; }
setNotifications((prev) => [data, ...prev]);
// 5秒後に自動的に削除 setTimeout(() => { setNotifications((prev) => prev.filter((n) => n.id !== data.id) ); }, 5000); }, });
const handleDismiss = (id: string) => { setNotifications((prev) => prev.filter((n) => n.id !== id)); };
return ( <div className="notifications"> <div className="notifications-header"> <h3>通知</h3> <div className={`status ${isConnected ? 'connected' : 'disconnected'}`}> {isConnected ? '接続中' : '切断中'} </div> </div>
<div className="notifications-list"> {notifications.map((notification) => ( <div key={notification.id} className={`notification notification-${notification.type}`} > <div className="notification-message">{notification.message}</div> <div className="notification-time"> {new Date(notification.timestamp).toLocaleTimeString()} </div> <button onClick={() => handleDismiss(notification.id)}> 閉じる </button> </div> ))} </div> </div> );}4. カスタムイベントの実装
Section titled “4. カスタムイベントの実装”カスタムイベントのSSE実装:
import { NextRequest } from 'next/server';import { logEmitter } from '@/lib/logEmitter';
export async function GET(request: NextRequest) { const stream = new ReadableStream({ async start(controller) { const encoder = new TextEncoder(); let isClosed = false;
// ログを送信する関数 const sendLog = (log: any) => { if (!isClosed) { try { // カスタムイベント名を指定 const message = `event: ${log.level}\ndata: ${JSON.stringify(log)}\n\n`; controller.enqueue(encoder.encode(message)); } catch (error) { console.error('Failed to send log:', error); } } };
// イベントリスナーを登録 logEmitter.on('log', sendLog); logEmitter.on('error', sendLog); logEmitter.on('warn', sendLog); logEmitter.on('info', sendLog);
// 接続確立メッセージを送信 controller.enqueue( encoder.encode(`data: ${JSON.stringify({ type: 'connected' })}\n\n`) );
// クライアントが切断した場合の処理 request.signal.addEventListener('abort', () => { isClosed = true; logEmitter.off('log', sendLog); logEmitter.off('error', sendLog); logEmitter.off('warn', sendLog); logEmitter.off('info', sendLog); controller.close(); }); }, });
return new Response(stream, { headers: { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache, no-transform', 'Connection': 'keep-alive', 'X-Accel-Buffering': 'no', }, });}カスタムイベントのクライアント側実装:
import { useEffect, useState, useRef, useCallback } from 'react';
interface UseSSEWithEventsOptions { url: string; token?: string; events?: string[];}
export function useSSEWithEvents<T = any>( options: UseSSEWithEventsOptions) { const { url, token, events = [] } = options; const [isConnected, setIsConnected] = useState(false); const [messages, setMessages] = useState<Map<string, T>>(new Map()); const eventSourceRef = useRef<EventSource | null>(null);
const connect = useCallback(() => { try { const authenticatedUrl = token ? `${url}?token=${encodeURIComponent(token)}` : url;
const eventSource = new EventSource(authenticatedUrl); eventSourceRef.current = eventSource;
eventSource.onopen = () => { setIsConnected(true); };
// デフォルトのメッセージイベント eventSource.onmessage = (event) => { try { const data = JSON.parse(event.data) as T; setMessages((prev) => { const newMap = new Map(prev); newMap.set('message', data); return newMap; }); } catch (error) { console.error('Failed to parse SSE message:', error); } };
// カスタムイベント events.forEach((eventName) => { eventSource.addEventListener(eventName, (event: MessageEvent) => { try { const data = JSON.parse(event.data) as T; setMessages((prev) => { const newMap = new Map(prev); newMap.set(eventName, data); return newMap; }); } catch (error) { console.error(`Failed to parse SSE event ${eventName}:`, error); } }); });
eventSource.onerror = (error) => { console.error('SSE error:', error); setIsConnected(false); }; } catch (error) { console.error('Failed to create EventSource:', error); } }, [url, token, events]);
const disconnect = useCallback(() => { if (eventSourceRef.current) { eventSourceRef.current.close(); eventSourceRef.current = null; setIsConnected(false); } }, []);
useEffect(() => { connect();
return () => { disconnect(); }; }, [connect, disconnect]);
return { isConnected, messages, reconnect: connect, disconnect, };}ログストリームコンポーネント:
'use client';
import { useState, useEffect, useRef } from 'react';import { useSSEWithEvents } from '@/hooks/useSSEWithEvents';
interface LogEntry { id: string; level: 'info' | 'warn' | 'error'; message: string; timestamp: number;}
export function LogStream() { const [logs, setLogs] = useState<LogEntry[]>([]); const [autoScroll, setAutoScroll] = useState(true); const logsEndRef = useRef<HTMLDivElement>(null);
const { isConnected, messages } = useSSEWithEvents<LogEntry>({ url: '/api/logs/stream', events: ['info', 'warn', 'error'], });
// メッセージからログを取得 useEffect(() => { const newLogs: LogEntry[] = [];
messages.forEach((log, eventName) => { if (eventName === 'info' || eventName === 'warn' || eventName === 'error') { newLogs.push(log); } });
if (newLogs.length > 0) { setLogs((prev) => { const updated = [...prev, ...newLogs]; // 最大1000件まで保持 return updated.slice(-1000); }); } }, [messages]);
// 自動スクロール useEffect(() => { if (autoScroll && logsEndRef.current) { logsEndRef.current.scrollIntoView({ behavior: 'smooth' }); } }, [logs, autoScroll]);
const handleClear = () => { setLogs([]); };
return ( <div className="log-stream"> <div className="log-stream-header"> <h3>ログストリーム</h3> <div className="log-stream-controls"> <label> <input type="checkbox" checked={autoScroll} onChange={(e) => setAutoScroll(e.target.checked)} /> 自動スクロール </label> <button onClick={handleClear}>クリア</button> <div className={`status ${isConnected ? 'connected' : 'disconnected'}`}> {isConnected ? '接続中' : '切断中'} </div> </div> </div>
<div className="log-stream-content"> {logs.map((log) => ( <div key={log.id} className={`log-entry log-${log.level}`}> <span className="log-time"> {new Date(log.timestamp).toLocaleTimeString()} </span> <span className="log-level">{log.level.toUpperCase()}</span> <span className="log-message">{log.message}</span> </div> ))} <div ref={logsEndRef} /> </div> </div> );}SSEのベストプラクティス
Section titled “SSEのベストプラクティス”1. 接続管理
Section titled “1. 接続管理”接続管理の実装:
class SSEManager { private connections = new Map<string, ReadableStreamDefaultController>();
addConnection(id: string, controller: ReadableStreamDefaultController) { this.connections.set(id, controller); }
removeConnection(id: string) { this.connections.delete(id); }
sendToConnection(id: string, data: any) { const controller = this.connections.get(id); if (controller) { const encoder = new TextEncoder(); const message = `data: ${JSON.stringify(data)}\n\n`; try { controller.enqueue(encoder.encode(message)); } catch (error) { console.error('Failed to send to connection:', error); this.removeConnection(id); } } }
broadcast(data: any) { this.connections.forEach((controller, id) => { this.sendToConnection(id, data); }); }}
export const sseManager = new SSEManager();2. エラーハンドリング
Section titled “2. エラーハンドリング”エラーハンドリングの実装:
export async function GET(request: NextRequest) { try { const stream = new ReadableStream({ async start(controller) { const encoder = new TextEncoder(); let isClosed = false;
// エラーハンドリング const handleError = (error: Error) => { if (!isClosed) { try { const errorMessage = `event: error\ndata: ${JSON.stringify({ message: error.message, timestamp: Date.now(), })}\n\n`; controller.enqueue(encoder.encode(errorMessage)); } catch (e) { console.error('Failed to send error:', e); } } };
// 接続確立 try { controller.enqueue( encoder.encode(`data: ${JSON.stringify({ type: 'connected' })}\n\n`) ); } catch (error) { handleError(error as Error); return; }
// クライアントが切断した場合の処理 request.signal.addEventListener('abort', () => { isClosed = true; controller.close(); }); }, });
return new Response(stream, { headers: { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache, no-transform', 'Connection': 'keep-alive', 'X-Accel-Buffering': 'no', }, }); } catch (error) { console.error('SSE stream error:', error); return new Response('Internal Server Error', { status: 500 }); }}Next.jsでのSSE通信のポイント:
- Route Handler: Next.jsのRoute HandlerでSSEストリームを実装
- イベントエミッター: イベントエミッターで通知を管理
- 認証: トークンベースの認証をサポート
- カスタムイベント: 複数のイベントタイプを処理
- エラーハンドリング: 適切なエラーハンドリングと接続管理
適切なSSE実装により、Next.jsアプリケーションでサーバーからクライアントへのリアルタイムプッシュを実現できます。