Skip to content

RabbitMQ

RabbitMQは、メッセージブローカーとして機能するオープンソースのメッセージングミドルウェアです。Spring Bootでは、Spring AMQPを使用してRabbitMQと連携できます。

なぜメッセージキューが必要なのか

Section titled “なぜメッセージキューが必要なのか”

問題のある同期処理の例:

@Service
public class OrderService {
public void processOrder(Order order) {
// 1. 注文を保存
orderRepository.save(order);
// 2. 在庫を更新(時間がかかる)
inventoryService.updateStock(order.getItems()); // 5秒かかる
// 3. メールを送信(時間がかかる)
emailService.sendConfirmation(order); // 3秒かかる
// 4. 決済処理(時間がかかる)
paymentService.processPayment(order); // 10秒かかる
// 問題点:
// - 合計18秒かかる(ユーザーは待たされる)
// - 1つでも失敗すると全体が失敗
// - スケーラビリティが低い
}
}

メッセージキューの解決:

@Service
public class OrderService {
public void processOrder(Order order) {
// 1. 注文を保存
orderRepository.save(order);
// 2. 非同期処理をキューに送信
rabbitTemplate.convertAndSend("order.exchange", "order.created", order);
// メリット:
// - 即座にレスポンスを返せる
// - 各処理が独立して実行される
// - スケーラブル(ワーカーを増やせる)
}
}

メリット:

  1. 非同期処理: 時間のかかる処理を非同期で実行
  2. スケーラビリティ: ワーカーを増やして処理能力を向上
  3. 信頼性: メッセージの永続化と再試行
  4. 疎結合: サービス間の依存関係を緩和
  • Producer: メッセージを送信するアプリケーション
  • Consumer: メッセージを受信して処理するアプリケーション
  • Queue: メッセージが保存される場所
  • Exchange: メッセージをルーティングする仕組み
  • Binding: ExchangeとQueueを結びつける

Maven (pom.xml):

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>

Gradle (build.gradle):

dependencies {
implementation 'org.springframework.boot:spring-boot-starter-amqp'
}

application.yml:

spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
@Configuration
public class RabbitMQConfig {
@Bean
public Queue orderQueue() {
return QueueBuilder.durable("order.queue").build();
}
@Bean
public DirectExchange orderExchange() {
return new DirectExchange("order.exchange");
}
@Bean
public Binding orderBinding() {
return BindingBuilder
.bind(orderQueue())
.to(orderExchange())
.with("order.created");
}
}
@Service
public class OrderProducer {
private final RabbitTemplate rabbitTemplate;
public OrderProducer(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public void sendOrderCreated(Order order) {
rabbitTemplate.convertAndSend(
"order.exchange",
"order.created",
order
);
}
// 遅延メッセージの送信
public void sendDelayedMessage(Order order, long delayMillis) {
rabbitTemplate.convertAndSend(
"order.exchange",
"order.created",
order,
message -> {
message.getMessageProperties().setDelay((int) delayMillis);
return message;
}
);
}
}
@Component
public class OrderConsumer {
private final InventoryService inventoryService;
private final EmailService emailService;
public OrderConsumer(InventoryService inventoryService,
EmailService emailService) {
this.inventoryService = inventoryService;
this.emailService = emailService;
}
@RabbitListener(queues = "order.queue")
public void handleOrderCreated(Order order) {
try {
// 在庫を更新
inventoryService.updateStock(order.getItems());
// メールを送信
emailService.sendConfirmation(order);
} catch (Exception e) {
// エラーハンドリング
log.error("Failed to process order: {}", order.getId(), e);
throw new AmqpRejectAndDontRequeueException("Processing failed");
}
}
// 複数のキューを監視
@RabbitListener(queues = {"order.queue", "payment.queue"})
public void handleMultipleQueues(Message message) {
String queueName = message.getMessageProperties().getConsumerQueue();
// キューに応じた処理
}
}
@Configuration
public class RabbitMQConfig {
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory =
new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
// 再試行設定
RetryTemplate retryTemplate = new RetryTemplate();
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(1000);
backOffPolicy.setMultiplier(2.0);
backOffPolicy.setMaxInterval(10000);
retryTemplate.setBackOffPolicy(backOffPolicy);
FixedBackOffPolicy backOffPolicy2 = new FixedBackOffPolicy();
backOffPolicy2.setBackOffPeriod(2000);
retryTemplate.setBackOffPolicy(backOffPolicy2);
factory.setRetryTemplate(retryTemplate);
return factory;
}
// デッドレターキュー(処理に失敗したメッセージを保存)
@Bean
public Queue deadLetterQueue() {
return QueueBuilder.durable("order.dlq").build();
}
@Bean
public DirectExchange deadLetterExchange() {
return new DirectExchange("order.dlx");
}
@Bean
public Binding deadLetterBinding() {
return BindingBuilder
.bind(deadLetterQueue())
.to(deadLetterExchange())
.with("order.failed");
}
}
// 注文サービス
@Service
public class OrderService {
private final OrderRepository orderRepository;
private final OrderProducer orderProducer;
public OrderService(OrderRepository orderRepository,
OrderProducer orderProducer) {
this.orderRepository = orderRepository;
this.orderProducer = orderProducer;
}
public Order createOrder(OrderRequest request) {
// 1. 注文を作成
Order order = new Order();
order.setItems(request.getItems());
order.setStatus(OrderStatus.PENDING);
order = orderRepository.save(order);
// 2. 非同期処理をキューに送信
orderProducer.sendOrderCreated(order);
return order;
}
}
// 在庫更新コンシューマー
@Component
public class InventoryConsumer {
@RabbitListener(queues = "inventory.queue")
public void updateInventory(Order order) {
// 在庫を更新
inventoryService.updateStock(order.getItems());
}
}
// メール送信コンシューマー
@Component
public class EmailConsumer {
@RabbitListener(queues = "email.queue")
public void sendEmail(Order order) {
// 確認メールを送信
emailService.sendConfirmation(order);
}
}

RabbitMQを使用したメッセージキューシステムのポイント:

  • 非同期処理: 時間のかかる処理を非同期で実行
  • スケーラビリティ: ワーカーを増やして処理能力を向上
  • 信頼性: メッセージの永続化と再試行
  • エラーハンドリング: デッドレターキューと再試行戦略
  • 疎結合: サービス間の依存関係を緩和

RabbitMQは、非同期処理が必要なアプリケーションで非常に有用です。適切に実装することで、アプリケーションのパフォーマンスとスケーラビリティを大幅に向上させることができます。