Skip to content

OTPとGenServer完全ガイド

OTP(Open Telecom Platform)とGenServerを使用した並行処理と状態管理を、実務で使える実装例とともに詳しく解説します。

OTPは、Erlang/Elixirで並行処理とフォールトトレランスを実現するためのフレームワークです。

OTPの主要コンポーネント
├─ GenServer(状態管理)
├─ Supervisor(監視と再起動)
├─ GenStage(バックプレッシャー)
└─ Registry(プロセス登録)

問題のある構成(OTPなし):

# 問題: プロセスの管理が困難
pid = spawn(fn ->
# 状態の管理が困難
# エラーハンドリングが困難
# 再起動の仕組みがない
end)

解決: OTPによる堅牢なプロセス管理

# 解決: GenServerによる状態管理
defmodule Counter do
use GenServer
def start_link(initial_value) do
GenServer.start_link(__MODULE__, initial_value, name: :counter)
end
def increment do
GenServer.call(:counter, :increment)
end
def handle_call(:increment, _from, state) do
{:reply, state + 1, state + 1}
end
end

GenServerは、クライアント・サーバーモデルを実装するためのビヘイビアです。状態を安全に管理し、他のプロセスからのメッセージを処理します。

defmodule Counter do
use GenServer
# クライアントAPI
def start_link(initial_value \\ 0) do
GenServer.start_link(__MODULE__, initial_value, name: __MODULE__)
end
def get do
GenServer.call(__MODULE__, :get)
end
def increment do
GenServer.call(__MODULE__, :increment)
end
def decrement do
GenServer.call(__MODULE__, :decrement)
end
# サーバーコールバック
def init(initial_value) do
{:ok, initial_value}
end
def handle_call(:get, _from, state) do
{:reply, state, state}
end
def handle_call(:increment, _from, state) do
new_state = state + 1
{:reply, new_state, new_state}
end
def handle_call(:decrement, _from, state) do
new_state = state - 1
{:reply, new_state, new_state}
end
end
# GenServerの起動
{:ok, _pid} = Counter.start_link(0)
# 状態の取得
Counter.get() # => 0
# インクリメント
Counter.increment() # => 1
Counter.increment() # => 2
# デクリメント
Counter.decrement() # => 1
defmodule Cache do
use GenServer
def start_link do
GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
end
def put(key, value) do
GenServer.cast(__MODULE__, {:put, key, value})
end
def get(key) do
GenServer.call(__MODULE__, {:get, key})
end
# サーバーコールバック
def init(_) do
{:ok, %{}}
end
def handle_cast({:put, key, value}, state) do
new_state = Map.put(state, key, value)
{:noreply, new_state}
end
def handle_call({:get, key}, _from, state) do
value = Map.get(state, key)
{:reply, value, state}
end
end

4. Supervisor(スーパーバイザー)

Section titled “4. Supervisor(スーパーバイザー)”

Supervisorは、子プロセスを監視し、エラー発生時に自動的に再起動する仕組みです。

defmodule MyApp.Supervisor do
use Supervisor
def start_link(init_arg) do
Supervisor.start_link(__MODULE__, init_arg, name: __MODULE__)
end
@impl true
def init(_init_arg) do
children = [
{Counter, 0},
{Cache, []}
]
Supervisor.init(children, strategy: :one_for_one)
end
end
# :one_for_one - クラッシュした子プロセスのみ再起動
Supervisor.init(children, strategy: :one_for_one)
# :one_for_all - 1つの子プロセスがクラッシュすると、すべて再起動
Supervisor.init(children, strategy: :one_for_all)
# :rest_for_one - クラッシュした子プロセスと、その後に起動した子プロセスを再起動
Supervisor.init(children, strategy: :rest_for_one)

5. 実務でのベストプラクティス

Section titled “5. 実務でのベストプラクティス”

パターン1: キャッシュサーバー

Section titled “パターン1: キャッシュサーバー”
defmodule CacheServer do
use GenServer
def start_link(_opts) do
GenServer.start_link(__MODULE__, :ok, name: __MODULE__)
end
def get(key) do
GenServer.call(__MODULE__, {:get, key})
end
def put(key, value, ttl \\ 3600) do
GenServer.call(__MODULE__, {:put, key, value, ttl})
end
def init(:ok) do
:ets.new(:cache_table, [:set, :public, :named_table])
{:ok, %{}}
end
def handle_call({:get, key}, _from, state) do
case :ets.lookup(:cache_table, key) do
[{^key, value, expires_at}] ->
if System.system_time(:second) < expires_at do
{:reply, {:ok, value}, state}
else
:ets.delete(:cache_table, key)
{:reply, {:error, :not_found}, state}
end
[] ->
{:reply, {:error, :not_found}, state}
end
end
def handle_call({:put, key, value, ttl}, _from, state) do
expires_at = System.system_time(:second) + ttl
:ets.insert(:cache_table, {key, value, expires_at})
{:reply, :ok, state}
end
end
defmodule ConnectionPool do
use GenServer
def start_link(size) do
GenServer.start_link(__MODULE__, size, name: __MODULE__)
end
def checkout do
GenServer.call(__MODULE__, :checkout)
end
def checkin(conn) do
GenServer.cast(__MODULE__, {:checkin, conn})
end
def init(size) do
connections = 1..size
|> Enum.map(fn _ -> create_connection() end)
{:ok, %{available: connections, checked_out: []}}
end
def handle_call(:checkout, _from, %{available: [conn | rest]} = state) do
new_state = %{
available: rest,
checked_out: [conn | state.checked_out]
}
{:reply, {:ok, conn}, new_state}
end
def handle_call(:checkout, _from, %{available: []} = state) do
{:reply, {:error, :no_connections_available}, state}
end
def handle_cast({:checkin, conn}, state) do
new_state = %{
available: [conn | state.available],
checked_out: List.delete(state.checked_out, conn)
}
{:noreply, new_state}
end
defp create_connection do
# 接続の作成ロジック
%{id: :rand.uniform(1000)}
end
end

原因:

  • メッセージの処理が長時間かかっている
  • デッドロックが発生している

解決策:

# タイムアウトの設定
GenServer.call(pid, :message, 5000) # 5秒のタイムアウト
# 非同期処理の使用
Task.async(fn -> heavy_operation() end)

問題2: プロセスがクラッシュする

Section titled “問題2: プロセスがクラッシュする”

原因:

  • エラーハンドリングが不十分
  • 状態が不正

解決策:

# try-catchの使用
def handle_call(:operation, _from, state) do
try do
result = risky_operation(state)
{:reply, {:ok, result}, state}
rescue
e ->
{:reply, {:error, Exception.message(e)}, state}
end
end

これで、OTPとGenServerの基礎知識と実務での使い方を理解できるようになりました。