Apache Kafka
Apache Kafka
Section titled “Apache Kafka”Apache Kafkaは、高スループットで分散型のメッセージストリーミングプラットフォームです。Spring Bootでは、Spring Kafkaを使用してKafkaと連携できます。
なぜKafkaが必要なのか
Section titled “なぜKafkaが必要なのか”RabbitMQとの比較
Section titled “RabbitMQとの比較”RabbitMQの特徴:
- メッセージキュー(1つのコンシューマーがメッセージを消費)
- 低レイテンシ
- 複雑なルーティング
Kafkaの特徴:
- メッセージストリーム(複数のコンシューマーが同じメッセージを読める)
- 高スループット
- イベントソーシングとストリーム処理
Kafkaを使うべき場合:
- 高スループット: 大量のメッセージを処理する必要がある
- イベントストリーミング: イベントの履歴を保持したい
- 複数のコンシューマー: 同じメッセージを複数のサービスが処理したい
- リアルタイムストリーム処理: ストリームデータのリアルタイム処理
Kafkaの基本概念
Section titled “Kafkaの基本概念”- Topic: メッセージのカテゴリ(キューに相当)
- Partition: Topicを分割した単位(並列処理のため)
- Producer: メッセージを送信するアプリケーション
- Consumer: メッセージを受信するアプリケーション
- Consumer Group: 複数のConsumerが協力してメッセージを処理
Spring Kafkaの設定
Section titled “Spring Kafkaの設定”依存関係の追加
Section titled “依存関係の追加”Maven (pom.xml):
<dependencies> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency></dependencies>Gradle (build.gradle):
dependencies { implementation 'org.springframework.kafka:spring-kafka'}application.yml:
spring: kafka: bootstrap-servers: localhost:9092 producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.springframework.kafka.support.serializer.JsonSerializer consumer: group-id: my-group key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer properties: spring.json.trusted.packages: "*"メッセージの送信(Producer)
Section titled “メッセージの送信(Producer)”@Configurationpublic class KafkaConfig {
@Bean public ProducerFactory<String, Object> producerFactory() { Map<String, Object> configProps = new HashMap<>(); configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps); }
@Bean public KafkaTemplate<String, Object> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); }}
@Servicepublic class OrderProducer {
private final KafkaTemplate<String, Object> kafkaTemplate; private static final String TOPIC = "orders";
public OrderProducer(KafkaTemplate<String, Object> kafkaTemplate) { this.kafkaTemplate = kafkaTemplate; }
public void sendOrderCreated(Order order) { kafkaTemplate.send(TOPIC, order.getId().toString(), order); }
// コールバック付き送信 public void sendOrderCreatedWithCallback(Order order) { ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(TOPIC, order.getId().toString(), order);
future.addCallback( result -> log.info("Message sent: {}", result.getRecordMetadata()), failure -> log.error("Failed to send message", failure) ); }
// パーティション指定 public void sendToPartition(Order order, int partition) { kafkaTemplate.send(TOPIC, partition, order.getId().toString(), order); }}メッセージの受信(Consumer)
Section titled “メッセージの受信(Consumer)”@Componentpublic class OrderConsumer {
@KafkaListener(topics = "orders", groupId = "order-processor") public void consumeOrder(Order order) { log.info("Received order: {}", order.getId()); // 注文を処理 processOrder(order); }
// 複数のトピックを監視 @KafkaListener(topics = {"orders", "payments"}, groupId = "multi-topic-consumer") public void consumeMultipleTopics(ConsumerRecord<String, Order> record) { String topic = record.topic(); Order order = record.value();
if ("orders".equals(topic)) { processOrder(order); } else if ("payments".equals(topic)) { processPayment(order); } }
// パーティション指定 @KafkaListener( topicPartitions = @TopicPartition( topic = "orders", partitions = {"0", "1"} ), groupId = "partitioned-consumer" ) public void consumeFromPartitions(Order order) { processOrder(order); }
// バッチ処理 @KafkaListener(topics = "orders", groupId = "batch-consumer", containerFactory = "batchKafkaListenerContainerFactory") public void consumeBatch(List<ConsumerRecord<String, Order>> records) { log.info("Received {} records", records.size()); for (ConsumerRecord<String, Order> record : records) { processOrder(record.value()); } }
private void processOrder(Order order) { // 注文処理の実装 }}バッチ処理の設定
Section titled “バッチ処理の設定”@Configurationpublic class KafkaConfig {
@Bean public ConsumerFactory<String, Object> consumerFactory() { Map<String, Object> configProps = new HashMap<>(); configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group"); configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); configProps.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
return new DefaultKafkaConsumerFactory<>(configProps); }
@Bean public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; }
// バッチ処理用のコンテナファクトリー @Bean public ConcurrentKafkaListenerContainerFactory<String, Object> batchKafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setBatchListener(true); // バッチ処理を有効化 return factory; }}エラーハンドリング
Section titled “エラーハンドリング”@Configurationpublic class KafkaConfig {
@Bean public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory());
// エラーハンドラーを設定 factory.setCommonErrorHandler(new DefaultErrorHandler( new FixedBackOff(1000L, 3L) // 1秒間隔で3回再試行 ));
return factory; }}
@Componentpublic class OrderConsumer {
@KafkaListener(topics = "orders", groupId = "order-processor") public void consumeOrder(Order order, Acknowledgment acknowledgment) { try { processOrder(order); acknowledgment.acknowledge(); // 手動コミット } catch (Exception e) { log.error("Failed to process order", e); // エラーハンドリング(デッドレターキューに送信など) } }}実践的な例: イベントソーシング
Section titled “実践的な例: イベントソーシング”// イベントクラスpublic class OrderEvent { private String eventId; private String orderId; private EventType eventType; private LocalDateTime timestamp; private Map<String, Object> data;
public enum EventType { ORDER_CREATED, ORDER_PAID, ORDER_SHIPPED, ORDER_CANCELLED }}
// イベントプロデューサー@Servicepublic class OrderEventProducer {
private final KafkaTemplate<String, OrderEvent> kafkaTemplate;
public void publishOrderCreated(Order order) { OrderEvent event = new OrderEvent(); event.setEventId(UUID.randomUUID().toString()); event.setOrderId(order.getId().toString()); event.setEventType(OrderEvent.EventType.ORDER_CREATED); event.setTimestamp(LocalDateTime.now()); event.setData(Map.of("items", order.getItems()));
kafkaTemplate.send("order-events", event); }}
// イベントコンシューマー(複数のサービスが同じイベントを処理)@Componentpublic class OrderEventConsumer {
// 在庫管理サービス @KafkaListener(topics = "order-events", groupId = "inventory-service") public void handleOrderEventForInventory(OrderEvent event) { if (event.getEventType() == OrderEvent.EventType.ORDER_CREATED) { updateInventory(event); } }
// メール送信サービス @KafkaListener(topics = "order-events", groupId = "email-service") public void handleOrderEventForEmail(OrderEvent event) { if (event.getEventType() == OrderEvent.EventType.ORDER_CREATED) { sendConfirmationEmail(event); } }
// 分析サービス @KafkaListener(topics = "order-events", groupId = "analytics-service") public void handleOrderEventForAnalytics(OrderEvent event) { recordAnalytics(event); }}パフォーマンス最適化
Section titled “パフォーマンス最適化”1. パーティション数の設定
Section titled “1. パーティション数の設定”// Topic作成時にパーティション数を指定@Beanpublic NewTopic ordersTopic() { return TopicBuilder.name("orders") .partitions(3) // 3つのパーティション .replicas(1) .build();}2. バッチサイズの調整
Section titled “2. バッチサイズの調整”spring: kafka: producer: batch-size: 16384 # 16KB linger-ms: 10 # 10ms待機3. 並列処理
Section titled “3. 並列処理”@Beanpublic ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(3); // 3つのスレッドで並列処理 return factory;}Apache Kafkaを使用したメッセージストリーミングのポイント:
- 高スループット: 大量のメッセージを処理
- イベントストリーミング: イベントの履歴を保持
- 複数のコンシューマー: 同じメッセージを複数のサービスが処理
- パーティション: 並列処理によるスケーラビリティ
- バッチ処理: 効率的なメッセージ処理
Kafkaは、高スループットが必要なアプリケーションやイベントソーシングパターンを実装する際に非常に有用です。適切に実装することで、アプリケーションのパフォーマンスとスケーラビリティを大幅に向上させることができます。