Skip to content

RabbitMQ完全ガイド

RabbitMQの実践的な使い方を、実務で使える実装例とベストプラクティスとともに詳しく解説します。

RabbitMQは、オープンソースのメッセージブローカーです。AMQP(Advanced Message Queuing Protocol)を実装しています。

RabbitMQの特徴
├─ オープンソース(無料)
├─ 複数のメッセージングパターンをサポート
├─ 高可用性(クラスタリング対応)
├─ 豊富な管理機能
└─ 多くの言語でサポート

RabbitMQを選ぶべき場合:

  • 複雑なルーティングが必要
  • 複数のメッセージングパターンが必要
  • 高可用性が必要
  • 豊富な管理機能が必要

RabbitMQを選ばないべき場合:

  • 非常に高いスループットが必要(Kafkaの方が適している場合がある)
  • シンプルなキューが必要(Redisの方が適している場合がある)

2. RabbitMQのインストールとセットアップ

Section titled “2. RabbitMQのインストールとセットアップ”
Terminal window
# Homebrewを使用
brew install rabbitmq
# サービスを開始
brew services start rabbitmq
# 管理プラグインの有効化
rabbitmq-plugins enable rabbitmq_management
# RabbitMQに接続(管理UI: http://localhost:15672)
Terminal window
# Ubuntu/Debian
sudo apt-get update
sudo apt-get install rabbitmq-server
# サービスを開始
sudo systemctl start rabbitmq-server
sudo systemctl enable rabbitmq-server
# 管理プラグインの有効化
sudo rabbitmq-plugins enable rabbitmq_management
# ユーザーの作成
sudo rabbitmqctl add_user admin admin_password
sudo rabbitmqctl set_user_tags admin administrator
sudo rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
Terminal window
# RabbitMQの起動
docker run -d --name rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=admin_password \
rabbitmq:3-management

Exchangeは、メッセージをルーティングするコンポーネントです。

Exchangeの種類
├─ Direct: ルーティングキーに基づいてルーティング
├─ Topic: パターンマッチングによるルーティング
├─ Fanout: すべてのキューにブロードキャスト
└─ Headers: ヘッダー属性に基づいてルーティング

Queueは、メッセージを保存するコンポーネントです。

Bindingは、ExchangeとQueueを接続するルールです。

1. Simple Queue(シンプルキュー)

Section titled “1. Simple Queue(シンプルキュー)”
# Pythonの例(pikaライブラリ)
import pika
# 接続の確立
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
channel = connection.channel()
# キュー宣言
channel.queue_declare(queue='hello')
# メッセージの送信
channel.basic_publish(
exchange='',
routing_key='hello',
body='Hello World!'
)
# メッセージの受信
def callback(ch, method, properties, body):
print(f"Received: {body}")
channel.basic_consume(
queue='hello',
on_message_callback=callback,
auto_ack=True
)
channel.start_consuming()
# プロデューサー
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
channel = connection.channel()
# キュー宣言(永続化)
channel.queue_declare(queue='task_queue', durable=True)
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(
exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode=2, # メッセージの永続化
)
)
# コンシューマー
def callback(ch, method, properties, body):
print(f"Received: {body}")
# 処理のシミュレーション
import time
time.sleep(body.count(b'.'))
print("Done")
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1) # 公平な配信
channel.basic_consume(
queue='task_queue',
on_message_callback=callback
)
channel.start_consuming()

3. Publish/Subscribe(パブリッシュ/サブスクライブ)

Section titled “3. Publish/Subscribe(パブリッシュ/サブスクライブ)”
# プロデューサー
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
channel = connection.channel()
# Exchange宣言(Fanout)
channel.exchange_declare(exchange='logs', exchange_type='fanout')
message = 'Log message'
channel.basic_publish(
exchange='logs',
routing_key='',
body=message
)
# コンシューマー
channel.exchange_declare(exchange='logs', exchange_type='fanout')
# 一時的なキューを作成
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
# Exchangeとキューをバインド
channel.queue_bind(exchange='logs', queue=queue_name)
def callback(ch, method, properties, body):
print(f"Received: {body}")
channel.basic_consume(
queue=queue_name,
on_message_callback=callback,
auto_ack=True
)
channel.start_consuming()
# プロデューサー(Direct Exchange)
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(
exchange='direct_logs',
routing_key=severity,
body=message
)
# コンシューマー
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
severities = sys.argv[1:]
if not severities:
sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
sys.exit(1)
for severity in severities:
channel.queue_bind(
exchange='direct_logs',
queue=queue_name,
routing_key=severity
)
def callback(ch, method, properties, body):
print(f" [{method.routing_key}] {body}")
channel.basic_consume(
queue=queue_name,
on_message_callback=callback,
auto_ack=True
)
channel.start_consuming()
# プロデューサー(Topic Exchange)
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(
exchange='topic_logs',
routing_key=routing_key,
body=message
)
# コンシューマー
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
binding_keys = sys.argv[1:]
if not binding_keys:
sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
sys.exit(1)
for binding_key in binding_keys:
channel.queue_bind(
exchange='topic_logs',
queue=queue_name,
routing_key=binding_key
)
def callback(ch, method, properties, body):
print(f" [{method.routing_key}] {body}")
channel.basic_consume(
queue=queue_name,
on_message_callback=callback,
auto_ack=True
)
channel.start_consuming()
# キューの永続化
channel.queue_declare(queue='task_queue', durable=True)
# メッセージの永続化
channel.basic_publish(
exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode=2, # メッセージの永続化
)
)

6. メッセージの確認応答(Acknowledgment)

Section titled “6. メッセージの確認応答(Acknowledgment)”
def callback(ch, method, properties, body):
print(f"Received: {body}")
# 処理
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(
queue='task_queue',
on_message_callback=callback,
auto_ack=False # 手動確認応答
)
def callback(ch, method, properties, body):
print(f"Received: {body}")
# 処理
ch.basic_ack(delivery_tag=method.delivery_tag, multiple=True)
# DLQの設定
channel.exchange_declare(exchange='dlx', exchange_type='direct')
channel.queue_declare(queue='dlq', durable=True)
channel.queue_bind(exchange='dlx', queue='dlq', routing_key='dlq')
# メインキューにDLQを設定
channel.queue_declare(
queue='main_queue',
durable=True,
arguments={
'x-dead-letter-exchange': 'dlx',
'x-dead-letter-routing-key': 'dlq',
'x-message-ttl': 60000 # 60秒でTTL
}
)
# 優先度キューの宣言
channel.queue_declare(
queue='priority_queue',
durable=True,
arguments={'x-max-priority': 10}
)
# 優先度付きメッセージの送信
channel.basic_publish(
exchange='',
routing_key='priority_queue',
body=message,
properties=pika.BasicProperties(
priority=5, # 優先度(0-10)
delivery_mode=2,
)
)
Terminal window
# ノード1の起動
rabbitmq-server -detached
# ノード2の起動
RABBITMQ_NODENAME=rabbit@node2 rabbitmq-server -detached
# クラスタの形成
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@node1
rabbitmqctl start_app
# クラスタの状態確認
rabbitmqctl cluster_status
Terminal window
# ポリシーの設定
rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'
# 特定のキューにミラーリングを設定
rabbitmqctl set_policy ha-two "^two\." '{"ha-mode":"exactly","ha-params":2}'
Terminal window
# 管理UIへのアクセス
# http://localhost:15672
# デフォルトユーザー: guest / guest
Terminal window
# キューの一覧
rabbitmqctl list_queues
# Exchangeの一覧
rabbitmqctl list_exchanges
# バインディングの一覧
rabbitmqctl list_bindings
# 接続の一覧
rabbitmqctl list_connections
# チャネルの一覧
rabbitmqctl list_channels

11. 実践的なベストプラクティス

Section titled “11. 実践的なベストプラクティス”
# 接続プールの使用
import pika
from pika.adapters.blocking_connection import BlockingConnection
class RabbitMQConnection:
def __init__(self, host='localhost', port=5672):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(host=host, port=port)
)
self.channel = self.connection.channel()
def close(self):
self.connection.close()
import pika
import logging
logging.basicConfig(level=logging.INFO)
def publish_with_retry(channel, exchange, routing_key, body, max_retries=3):
for attempt in range(max_retries):
try:
channel.basic_publish(
exchange=exchange,
routing_key=routing_key,
body=body
)
return True
except Exception as e:
logging.error(f"Publish failed (attempt {attempt + 1}): {e}")
if attempt == max_retries - 1:
raise
time.sleep(2 ** attempt) # 指数バックオフ
return False

メッセージのシリアライゼーション

Section titled “メッセージのシリアライゼーション”
import json
import pika
# JSONシリアライゼーション
def publish_json(channel, exchange, routing_key, data):
message = json.dumps(data)
channel.basic_publish(
exchange=exchange,
routing_key=routing_key,
body=message,
properties=pika.BasicProperties(
content_type='application/json',
delivery_mode=2,
)
)
def consume_json(ch, method, properties, body):
data = json.loads(body)
# 処理
ch.basic_ack(delivery_tag=method.delivery_tag)
# 解決: 永続化と確認応答の使用
channel.queue_declare(queue='task_queue', durable=True)
channel.basic_publish(
exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode=2, # 永続化
)
)
channel.basic_consume(
queue='task_queue',
on_message_callback=callback,
auto_ack=False # 手動確認応答
)
# 解決: 冪等性の実装
processed_messages = set()
def callback(ch, method, properties, body):
message_id = properties.message_id
if message_id in processed_messages:
ch.basic_ack(delivery_tag=method.delivery_tag)
return
# 処理
process_message(body)
processed_messages.add(message_id)
ch.basic_ack(delivery_tag=method.delivery_tag)
# 解決: バッチ処理とパイプライン
channel.basic_qos(prefetch_count=100) # バッチサイズの調整
# 複数のメッセージを一度に送信
for message in messages:
channel.basic_publish(
exchange='',
routing_key='task_queue',
body=message
)

RabbitMQ完全ガイドのポイント:

  • Exchange: Direct、Topic、Fanout、Headers
  • メッセージングパターン: Simple Queue、Work Queue、Pub/Sub、Routing、Topics
  • 永続化: キューとメッセージの永続化
  • 確認応答: 手動確認応答による信頼性の向上
  • Dead Letter Queue: 失敗メッセージの処理
  • 優先度キュー: メッセージの優先順位付け
  • クラスタリング: 高可用性の実現
  • 監視: 管理UIとコマンドライン管理

適切なRabbitMQの使用により、信頼性の高いメッセージングシステムを構築できます。