RabbitMQ完全ガイド
RabbitMQ完全ガイド
Section titled “RabbitMQ完全ガイド”RabbitMQの実践的な使い方を、実務で使える実装例とベストプラクティスとともに詳しく解説します。
1. RabbitMQとは
Section titled “1. RabbitMQとは”RabbitMQの特徴
Section titled “RabbitMQの特徴”RabbitMQは、オープンソースのメッセージブローカーです。AMQP(Advanced Message Queuing Protocol)を実装しています。
RabbitMQの特徴 ├─ オープンソース(無料) ├─ 複数のメッセージングパターンをサポート ├─ 高可用性(クラスタリング対応) ├─ 豊富な管理機能 └─ 多くの言語でサポートなぜRabbitMQを選ぶのか
Section titled “なぜRabbitMQを選ぶのか”RabbitMQを選ぶべき場合:
- 複雑なルーティングが必要
- 複数のメッセージングパターンが必要
- 高可用性が必要
- 豊富な管理機能が必要
RabbitMQを選ばないべき場合:
- 非常に高いスループットが必要(Kafkaの方が適している場合がある)
- シンプルなキューが必要(Redisの方が適している場合がある)
2. RabbitMQのインストールとセットアップ
Section titled “2. RabbitMQのインストールとセットアップ”macOSでのインストール
Section titled “macOSでのインストール”# Homebrewを使用brew install rabbitmq
# サービスを開始brew services start rabbitmq
# 管理プラグインの有効化rabbitmq-plugins enable rabbitmq_management
# RabbitMQに接続(管理UI: http://localhost:15672)Linuxでのインストール
Section titled “Linuxでのインストール”# Ubuntu/Debiansudo apt-get updatesudo apt-get install rabbitmq-server
# サービスを開始sudo systemctl start rabbitmq-serversudo systemctl enable rabbitmq-server
# 管理プラグインの有効化sudo rabbitmq-plugins enable rabbitmq_management
# ユーザーの作成sudo rabbitmqctl add_user admin admin_passwordsudo rabbitmqctl set_user_tags admin administratorsudo rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"Dockerでのインストール
Section titled “Dockerでのインストール”# RabbitMQの起動docker run -d --name rabbitmq \ -p 5672:5672 \ -p 15672:15672 \ -e RABBITMQ_DEFAULT_USER=admin \ -e RABBITMQ_DEFAULT_PASS=admin_password \ rabbitmq:3-management3. 基本的な概念
Section titled “3. 基本的な概念”Exchange(エクスチェンジ)
Section titled “Exchange(エクスチェンジ)”Exchangeは、メッセージをルーティングするコンポーネントです。
Exchangeの種類 ├─ Direct: ルーティングキーに基づいてルーティング ├─ Topic: パターンマッチングによるルーティング ├─ Fanout: すべてのキューにブロードキャスト └─ Headers: ヘッダー属性に基づいてルーティングQueue(キュー)
Section titled “Queue(キュー)”Queueは、メッセージを保存するコンポーネントです。
Binding(バインディング)
Section titled “Binding(バインディング)”Bindingは、ExchangeとQueueを接続するルールです。
4. メッセージングパターン
Section titled “4. メッセージングパターン”1. Simple Queue(シンプルキュー)
Section titled “1. Simple Queue(シンプルキュー)”# Pythonの例(pikaライブラリ)import pika
# 接続の確立connection = pika.BlockingConnection( pika.ConnectionParameters('localhost'))channel = connection.channel()
# キュー宣言channel.queue_declare(queue='hello')
# メッセージの送信channel.basic_publish( exchange='', routing_key='hello', body='Hello World!')
# メッセージの受信def callback(ch, method, properties, body): print(f"Received: {body}")
channel.basic_consume( queue='hello', on_message_callback=callback, auto_ack=True)
channel.start_consuming()2. Work Queue(ワークキュー)
Section titled “2. Work Queue(ワークキュー)”# プロデューサーimport pikaimport sys
connection = pika.BlockingConnection( pika.ConnectionParameters('localhost'))channel = connection.channel()
# キュー宣言(永続化)channel.queue_declare(queue='task_queue', durable=True)
message = ' '.join(sys.argv[1:]) or "Hello World!"channel.basic_publish( exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode=2, # メッセージの永続化 ))
# コンシューマーdef callback(ch, method, properties, body): print(f"Received: {body}") # 処理のシミュレーション import time time.sleep(body.count(b'.')) print("Done") ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1) # 公平な配信channel.basic_consume( queue='task_queue', on_message_callback=callback)
channel.start_consuming()3. Publish/Subscribe(パブリッシュ/サブスクライブ)
Section titled “3. Publish/Subscribe(パブリッシュ/サブスクライブ)”# プロデューサーimport pika
connection = pika.BlockingConnection( pika.ConnectionParameters('localhost'))channel = connection.channel()
# Exchange宣言(Fanout)channel.exchange_declare(exchange='logs', exchange_type='fanout')
message = 'Log message'channel.basic_publish( exchange='logs', routing_key='', body=message)
# コンシューマーchannel.exchange_declare(exchange='logs', exchange_type='fanout')
# 一時的なキューを作成result = channel.queue_declare(queue='', exclusive=True)queue_name = result.method.queue
# Exchangeとキューをバインドchannel.queue_bind(exchange='logs', queue=queue_name)
def callback(ch, method, properties, body): print(f"Received: {body}")
channel.basic_consume( queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()4. Routing(ルーティング)
Section titled “4. Routing(ルーティング)”# プロデューサー(Direct Exchange)import pikaimport sys
connection = pika.BlockingConnection( pika.ConnectionParameters('localhost'))channel = connection.channel()
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish( exchange='direct_logs', routing_key=severity, body=message)
# コンシューマーchannel.exchange_declare(exchange='direct_logs', exchange_type='direct')
result = channel.queue_declare(queue='', exclusive=True)queue_name = result.method.queue
severities = sys.argv[1:]if not severities: sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) sys.exit(1)
for severity in severities: channel.queue_bind( exchange='direct_logs', queue=queue_name, routing_key=severity )
def callback(ch, method, properties, body): print(f" [{method.routing_key}] {body}")
channel.basic_consume( queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()5. Topics(トピック)
Section titled “5. Topics(トピック)”# プロデューサー(Topic Exchange)import pikaimport sys
connection = pika.BlockingConnection( pika.ConnectionParameters('localhost'))channel = connection.channel()
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish( exchange='topic_logs', routing_key=routing_key, body=message)
# コンシューマーchannel.exchange_declare(exchange='topic_logs', exchange_type='topic')
result = channel.queue_declare(queue='', exclusive=True)queue_name = result.method.queue
binding_keys = sys.argv[1:]if not binding_keys: sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0]) sys.exit(1)
for binding_key in binding_keys: channel.queue_bind( exchange='topic_logs', queue=queue_name, routing_key=binding_key )
def callback(ch, method, properties, body): print(f" [{method.routing_key}] {body}")
channel.basic_consume( queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()5. メッセージの永続化
Section titled “5. メッセージの永続化”キューとメッセージの永続化
Section titled “キューとメッセージの永続化”# キューの永続化channel.queue_declare(queue='task_queue', durable=True)
# メッセージの永続化channel.basic_publish( exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode=2, # メッセージの永続化 ))6. メッセージの確認応答(Acknowledgment)
Section titled “6. メッセージの確認応答(Acknowledgment)”手動確認応答
Section titled “手動確認応答”def callback(ch, method, properties, body): print(f"Received: {body}") # 処理 ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume( queue='task_queue', on_message_callback=callback, auto_ack=False # 手動確認応答)複数確認応答
Section titled “複数確認応答”def callback(ch, method, properties, body): print(f"Received: {body}") # 処理 ch.basic_ack(delivery_tag=method.delivery_tag, multiple=True)7. Dead Letter Queue(DLQ)
Section titled “7. Dead Letter Queue(DLQ)”DLQの設定
Section titled “DLQの設定”# DLQの設定channel.exchange_declare(exchange='dlx', exchange_type='direct')channel.queue_declare(queue='dlq', durable=True)channel.queue_bind(exchange='dlx', queue='dlq', routing_key='dlq')
# メインキューにDLQを設定channel.queue_declare( queue='main_queue', durable=True, arguments={ 'x-dead-letter-exchange': 'dlx', 'x-dead-letter-routing-key': 'dlq', 'x-message-ttl': 60000 # 60秒でTTL })8. 優先度キュー
Section titled “8. 優先度キュー”優先度キューの設定
Section titled “優先度キューの設定”# 優先度キューの宣言channel.queue_declare( queue='priority_queue', durable=True, arguments={'x-max-priority': 10})
# 優先度付きメッセージの送信channel.basic_publish( exchange='', routing_key='priority_queue', body=message, properties=pika.BasicProperties( priority=5, # 優先度(0-10) delivery_mode=2, ))9. クラスタリング
Section titled “9. クラスタリング”クラスタの設定
Section titled “クラスタの設定”# ノード1の起動rabbitmq-server -detached
# ノード2の起動RABBITMQ_NODENAME=rabbit@node2 rabbitmq-server -detached
# クラスタの形成rabbitmqctl stop_apprabbitmqctl resetrabbitmqctl join_cluster rabbit@node1rabbitmqctl start_app
# クラスタの状態確認rabbitmqctl cluster_statusミラーリング
Section titled “ミラーリング”# ポリシーの設定rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'
# 特定のキューにミラーリングを設定rabbitmqctl set_policy ha-two "^two\." '{"ha-mode":"exactly","ha-params":2}'10. 監視と管理
Section titled “10. 監視と管理”# 管理UIへのアクセス# http://localhost:15672# デフォルトユーザー: guest / guestコマンドライン管理
Section titled “コマンドライン管理”# キューの一覧rabbitmqctl list_queues
# Exchangeの一覧rabbitmqctl list_exchanges
# バインディングの一覧rabbitmqctl list_bindings
# 接続の一覧rabbitmqctl list_connections
# チャネルの一覧rabbitmqctl list_channels11. 実践的なベストプラクティス
Section titled “11. 実践的なベストプラクティス”# 接続プールの使用import pikafrom pika.adapters.blocking_connection import BlockingConnection
class RabbitMQConnection: def __init__(self, host='localhost', port=5672): self.connection = pika.BlockingConnection( pika.ConnectionParameters(host=host, port=port) ) self.channel = self.connection.channel()
def close(self): self.connection.close()エラーハンドリング
Section titled “エラーハンドリング”import pikaimport logging
logging.basicConfig(level=logging.INFO)
def publish_with_retry(channel, exchange, routing_key, body, max_retries=3): for attempt in range(max_retries): try: channel.basic_publish( exchange=exchange, routing_key=routing_key, body=body ) return True except Exception as e: logging.error(f"Publish failed (attempt {attempt + 1}): {e}") if attempt == max_retries - 1: raise time.sleep(2 ** attempt) # 指数バックオフ return Falseメッセージのシリアライゼーション
Section titled “メッセージのシリアライゼーション”import jsonimport pika
# JSONシリアライゼーションdef publish_json(channel, exchange, routing_key, data): message = json.dumps(data) channel.basic_publish( exchange=exchange, routing_key=routing_key, body=message, properties=pika.BasicProperties( content_type='application/json', delivery_mode=2, ) )
def consume_json(ch, method, properties, body): data = json.loads(body) # 処理 ch.basic_ack(delivery_tag=method.delivery_tag)12. よくある問題と解決方法
Section titled “12. よくある問題と解決方法”問題1: メッセージの損失
Section titled “問題1: メッセージの損失”# 解決: 永続化と確認応答の使用channel.queue_declare(queue='task_queue', durable=True)
channel.basic_publish( exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode=2, # 永続化 ))
channel.basic_consume( queue='task_queue', on_message_callback=callback, auto_ack=False # 手動確認応答)問題2: メッセージの重複処理
Section titled “問題2: メッセージの重複処理”# 解決: 冪等性の実装processed_messages = set()
def callback(ch, method, properties, body): message_id = properties.message_id
if message_id in processed_messages: ch.basic_ack(delivery_tag=method.delivery_tag) return
# 処理 process_message(body) processed_messages.add(message_id)
ch.basic_ack(delivery_tag=method.delivery_tag)問題3: パフォーマンスの低下
Section titled “問題3: パフォーマンスの低下”# 解決: バッチ処理とパイプラインchannel.basic_qos(prefetch_count=100) # バッチサイズの調整
# 複数のメッセージを一度に送信for message in messages: channel.basic_publish( exchange='', routing_key='task_queue', body=message )RabbitMQ完全ガイドのポイント:
- Exchange: Direct、Topic、Fanout、Headers
- メッセージングパターン: Simple Queue、Work Queue、Pub/Sub、Routing、Topics
- 永続化: キューとメッセージの永続化
- 確認応答: 手動確認応答による信頼性の向上
- Dead Letter Queue: 失敗メッセージの処理
- 優先度キュー: メッセージの優先順位付け
- クラスタリング: 高可用性の実現
- 監視: 管理UIとコマンドライン管理
適切なRabbitMQの使用により、信頼性の高いメッセージングシステムを構築できます。