Skip to content

Apache Kafka

Apache Kafkaは、高スループットで分散型のメッセージストリーミングプラットフォームです。Spring Bootでは、Spring Kafkaを使用してKafkaと連携できます。

RabbitMQの特徴:

  • メッセージキュー(1つのコンシューマーがメッセージを消費)
  • 低レイテンシ
  • 複雑なルーティング

Kafkaの特徴:

  • メッセージストリーム(複数のコンシューマーが同じメッセージを読める)
  • 高スループット
  • イベントソーシングとストリーム処理

Kafkaを使うべき場合:

  1. 高スループット: 大量のメッセージを処理する必要がある
  2. イベントストリーミング: イベントの履歴を保持したい
  3. 複数のコンシューマー: 同じメッセージを複数のサービスが処理したい
  4. リアルタイムストリーム処理: ストリームデータのリアルタイム処理
  • Topic: メッセージのカテゴリ(キューに相当)
  • Partition: Topicを分割した単位(並列処理のため)
  • Producer: メッセージを送信するアプリケーション
  • Consumer: メッセージを受信するアプリケーション
  • Consumer Group: 複数のConsumerが協力してメッセージを処理

Maven (pom.xml):

<dependencies>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies>

Gradle (build.gradle):

dependencies {
implementation 'org.springframework.kafka:spring-kafka'
}

application.yml:

spring:
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
consumer:
group-id: my-group
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring.json.trusted.packages: "*"
@Configuration
public class KafkaConfig {
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
@Service
public class OrderProducer {
private final KafkaTemplate<String, Object> kafkaTemplate;
private static final String TOPIC = "orders";
public OrderProducer(KafkaTemplate<String, Object> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendOrderCreated(Order order) {
kafkaTemplate.send(TOPIC, order.getId().toString(), order);
}
// コールバック付き送信
public void sendOrderCreatedWithCallback(Order order) {
ListenableFuture<SendResult<String, Object>> future =
kafkaTemplate.send(TOPIC, order.getId().toString(), order);
future.addCallback(
result -> log.info("Message sent: {}", result.getRecordMetadata()),
failure -> log.error("Failed to send message", failure)
);
}
// パーティション指定
public void sendToPartition(Order order, int partition) {
kafkaTemplate.send(TOPIC, partition, order.getId().toString(), order);
}
}
@Component
public class OrderConsumer {
@KafkaListener(topics = "orders", groupId = "order-processor")
public void consumeOrder(Order order) {
log.info("Received order: {}", order.getId());
// 注文を処理
processOrder(order);
}
// 複数のトピックを監視
@KafkaListener(topics = {"orders", "payments"}, groupId = "multi-topic-consumer")
public void consumeMultipleTopics(ConsumerRecord<String, Order> record) {
String topic = record.topic();
Order order = record.value();
if ("orders".equals(topic)) {
processOrder(order);
} else if ("payments".equals(topic)) {
processPayment(order);
}
}
// パーティション指定
@KafkaListener(
topicPartitions = @TopicPartition(
topic = "orders",
partitions = {"0", "1"}
),
groupId = "partitioned-consumer"
)
public void consumeFromPartitions(Order order) {
processOrder(order);
}
// バッチ処理
@KafkaListener(topics = "orders", groupId = "batch-consumer", containerFactory = "batchKafkaListenerContainerFactory")
public void consumeBatch(List<ConsumerRecord<String, Order>> records) {
log.info("Received {} records", records.size());
for (ConsumerRecord<String, Order> record : records) {
processOrder(record.value());
}
}
private void processOrder(Order order) {
// 注文処理の実装
}
}
@Configuration
public class KafkaConfig {
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
configProps.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
return new DefaultKafkaConsumerFactory<>(configProps);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
// バッチ処理用のコンテナファクトリー
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> batchKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true); // バッチ処理を有効化
return factory;
}
}
@Configuration
public class KafkaConfig {
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// エラーハンドラーを設定
factory.setCommonErrorHandler(new DefaultErrorHandler(
new FixedBackOff(1000L, 3L) // 1秒間隔で3回再試行
));
return factory;
}
}
@Component
public class OrderConsumer {
@KafkaListener(topics = "orders", groupId = "order-processor")
public void consumeOrder(Order order, Acknowledgment acknowledgment) {
try {
processOrder(order);
acknowledgment.acknowledge(); // 手動コミット
} catch (Exception e) {
log.error("Failed to process order", e);
// エラーハンドリング(デッドレターキューに送信など)
}
}
}

実践的な例: イベントソーシング

Section titled “実践的な例: イベントソーシング”
// イベントクラス
public class OrderEvent {
private String eventId;
private String orderId;
private EventType eventType;
private LocalDateTime timestamp;
private Map<String, Object> data;
public enum EventType {
ORDER_CREATED, ORDER_PAID, ORDER_SHIPPED, ORDER_CANCELLED
}
}
// イベントプロデューサー
@Service
public class OrderEventProducer {
private final KafkaTemplate<String, OrderEvent> kafkaTemplate;
public void publishOrderCreated(Order order) {
OrderEvent event = new OrderEvent();
event.setEventId(UUID.randomUUID().toString());
event.setOrderId(order.getId().toString());
event.setEventType(OrderEvent.EventType.ORDER_CREATED);
event.setTimestamp(LocalDateTime.now());
event.setData(Map.of("items", order.getItems()));
kafkaTemplate.send("order-events", event);
}
}
// イベントコンシューマー(複数のサービスが同じイベントを処理)
@Component
public class OrderEventConsumer {
// 在庫管理サービス
@KafkaListener(topics = "order-events", groupId = "inventory-service")
public void handleOrderEventForInventory(OrderEvent event) {
if (event.getEventType() == OrderEvent.EventType.ORDER_CREATED) {
updateInventory(event);
}
}
// メール送信サービス
@KafkaListener(topics = "order-events", groupId = "email-service")
public void handleOrderEventForEmail(OrderEvent event) {
if (event.getEventType() == OrderEvent.EventType.ORDER_CREATED) {
sendConfirmationEmail(event);
}
}
// 分析サービス
@KafkaListener(topics = "order-events", groupId = "analytics-service")
public void handleOrderEventForAnalytics(OrderEvent event) {
recordAnalytics(event);
}
}
// Topic作成時にパーティション数を指定
@Bean
public NewTopic ordersTopic() {
return TopicBuilder.name("orders")
.partitions(3) // 3つのパーティション
.replicas(1)
.build();
}
spring:
kafka:
producer:
batch-size: 16384 # 16KB
linger-ms: 10 # 10ms待機
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3); // 3つのスレッドで並列処理
return factory;
}

Apache Kafkaを使用したメッセージストリーミングのポイント:

  • 高スループット: 大量のメッセージを処理
  • イベントストリーミング: イベントの履歴を保持
  • 複数のコンシューマー: 同じメッセージを複数のサービスが処理
  • パーティション: 並列処理によるスケーラビリティ
  • バッチ処理: 効率的なメッセージ処理

Kafkaは、高スループットが必要なアプリケーションやイベントソーシングパターンを実装する際に非常に有用です。適切に実装することで、アプリケーションのパフォーマンスとスケーラビリティを大幅に向上させることができます。