RabbitMQ
RabbitMQ
Section titled “RabbitMQ”RabbitMQは、メッセージブローカーとして機能するオープンソースのメッセージングミドルウェアです。Spring Bootでは、Spring AMQPを使用してRabbitMQと連携できます。
なぜメッセージキューが必要なのか
Section titled “なぜメッセージキューが必要なのか”同期処理の課題
Section titled “同期処理の課題”問題のある同期処理の例:
@Servicepublic class OrderService {
public void processOrder(Order order) { // 1. 注文を保存 orderRepository.save(order);
// 2. 在庫を更新(時間がかかる) inventoryService.updateStock(order.getItems()); // 5秒かかる
// 3. メールを送信(時間がかかる) emailService.sendConfirmation(order); // 3秒かかる
// 4. 決済処理(時間がかかる) paymentService.processPayment(order); // 10秒かかる
// 問題点: // - 合計18秒かかる(ユーザーは待たされる) // - 1つでも失敗すると全体が失敗 // - スケーラビリティが低い }}メッセージキューの解決:
@Servicepublic class OrderService {
public void processOrder(Order order) { // 1. 注文を保存 orderRepository.save(order);
// 2. 非同期処理をキューに送信 rabbitTemplate.convertAndSend("order.exchange", "order.created", order);
// メリット: // - 即座にレスポンスを返せる // - 各処理が独立して実行される // - スケーラブル(ワーカーを増やせる) }}メリット:
- 非同期処理: 時間のかかる処理を非同期で実行
- スケーラビリティ: ワーカーを増やして処理能力を向上
- 信頼性: メッセージの永続化と再試行
- 疎結合: サービス間の依存関係を緩和
RabbitMQの基本概念
Section titled “RabbitMQの基本概念”- Producer: メッセージを送信するアプリケーション
- Consumer: メッセージを受信して処理するアプリケーション
- Queue: メッセージが保存される場所
- Exchange: メッセージをルーティングする仕組み
- Binding: ExchangeとQueueを結びつける
Spring AMQPの設定
Section titled “Spring AMQPの設定”依存関係の追加
Section titled “依存関係の追加”Maven (pom.xml):
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency></dependencies>Gradle (build.gradle):
dependencies { implementation 'org.springframework.boot:spring-boot-starter-amqp'}application.yml:
spring: rabbitmq: host: localhost port: 5672 username: guest password: guest virtual-host: /メッセージの送信(Producer)
Section titled “メッセージの送信(Producer)”@Configurationpublic class RabbitMQConfig {
@Bean public Queue orderQueue() { return QueueBuilder.durable("order.queue").build(); }
@Bean public DirectExchange orderExchange() { return new DirectExchange("order.exchange"); }
@Bean public Binding orderBinding() { return BindingBuilder .bind(orderQueue()) .to(orderExchange()) .with("order.created"); }}
@Servicepublic class OrderProducer {
private final RabbitTemplate rabbitTemplate;
public OrderProducer(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; }
public void sendOrderCreated(Order order) { rabbitTemplate.convertAndSend( "order.exchange", "order.created", order ); }
// 遅延メッセージの送信 public void sendDelayedMessage(Order order, long delayMillis) { rabbitTemplate.convertAndSend( "order.exchange", "order.created", order, message -> { message.getMessageProperties().setDelay((int) delayMillis); return message; } ); }}メッセージの受信(Consumer)
Section titled “メッセージの受信(Consumer)”@Componentpublic class OrderConsumer {
private final InventoryService inventoryService; private final EmailService emailService;
public OrderConsumer(InventoryService inventoryService, EmailService emailService) { this.inventoryService = inventoryService; this.emailService = emailService; }
@RabbitListener(queues = "order.queue") public void handleOrderCreated(Order order) { try { // 在庫を更新 inventoryService.updateStock(order.getItems());
// メールを送信 emailService.sendConfirmation(order);
} catch (Exception e) { // エラーハンドリング log.error("Failed to process order: {}", order.getId(), e); throw new AmqpRejectAndDontRequeueException("Processing failed"); } }
// 複数のキューを監視 @RabbitListener(queues = {"order.queue", "payment.queue"}) public void handleMultipleQueues(Message message) { String queueName = message.getMessageProperties().getConsumerQueue(); // キューに応じた処理 }}エラーハンドリングと再試行
Section titled “エラーハンドリングと再試行”@Configurationpublic class RabbitMQConfig {
@Bean public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory( ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory);
// 再試行設定 RetryTemplate retryTemplate = new RetryTemplate(); ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy(); backOffPolicy.setInitialInterval(1000); backOffPolicy.setMultiplier(2.0); backOffPolicy.setMaxInterval(10000); retryTemplate.setBackOffPolicy(backOffPolicy);
FixedBackOffPolicy backOffPolicy2 = new FixedBackOffPolicy(); backOffPolicy2.setBackOffPeriod(2000); retryTemplate.setBackOffPolicy(backOffPolicy2);
factory.setRetryTemplate(retryTemplate);
return factory; }
// デッドレターキュー(処理に失敗したメッセージを保存) @Bean public Queue deadLetterQueue() { return QueueBuilder.durable("order.dlq").build(); }
@Bean public DirectExchange deadLetterExchange() { return new DirectExchange("order.dlx"); }
@Bean public Binding deadLetterBinding() { return BindingBuilder .bind(deadLetterQueue()) .to(deadLetterExchange()) .with("order.failed"); }}実践的な例: 注文処理システム
Section titled “実践的な例: 注文処理システム”// 注文サービス@Servicepublic class OrderService {
private final OrderRepository orderRepository; private final OrderProducer orderProducer;
public OrderService(OrderRepository orderRepository, OrderProducer orderProducer) { this.orderRepository = orderRepository; this.orderProducer = orderProducer; }
public Order createOrder(OrderRequest request) { // 1. 注文を作成 Order order = new Order(); order.setItems(request.getItems()); order.setStatus(OrderStatus.PENDING); order = orderRepository.save(order);
// 2. 非同期処理をキューに送信 orderProducer.sendOrderCreated(order);
return order; }}
// 在庫更新コンシューマー@Componentpublic class InventoryConsumer {
@RabbitListener(queues = "inventory.queue") public void updateInventory(Order order) { // 在庫を更新 inventoryService.updateStock(order.getItems()); }}
// メール送信コンシューマー@Componentpublic class EmailConsumer {
@RabbitListener(queues = "email.queue") public void sendEmail(Order order) { // 確認メールを送信 emailService.sendConfirmation(order); }}RabbitMQを使用したメッセージキューシステムのポイント:
- 非同期処理: 時間のかかる処理を非同期で実行
- スケーラビリティ: ワーカーを増やして処理能力を向上
- 信頼性: メッセージの永続化と再試行
- エラーハンドリング: デッドレターキューと再試行戦略
- 疎結合: サービス間の依存関係を緩和
RabbitMQは、非同期処理が必要なアプリケーションで非常に有用です。適切に実装することで、アプリケーションのパフォーマンスとスケーラビリティを大幅に向上させることができます。