Skip to content

Kafka完全ガイド

Apache Kafkaの実践的な使い方を、実務で使える実装例とベストプラクティスとともに詳しく解説します。

Apache Kafkaは、分散ストリーミングプラットフォームです。高スループット、低レイテンシ、スケーラビリティに優れています。

Kafkaの特徴
├─ 高スループット(数百万メッセージ/秒)
├─ 低レイテンシ
├─ 水平スケーリング
├─ 永続化(ディスクに保存)
├─ 分散アーキテクチャ
└─ ストリーム処理対応

Kafkaを選ぶべき場合:

  • 非常に高いスループットが必要
  • 大量のデータストリームを処理
  • イベントソーシングが必要
  • リアルタイムデータ処理が必要

Kafkaを選ばないべき場合:

  • シンプルなキューが必要(RabbitMQの方が適している場合がある)
  • 低スループット(RabbitMQの方が適している場合がある)

2. Kafkaのインストールとセットアップ

Section titled “2. Kafkaのインストールとセットアップ”
Terminal window
# Homebrewを使用
brew install kafka
# Zookeeperの起動
brew services start zookeeper
# Kafkaの起動
brew services start kafka
Terminal window
# Kafkaのダウンロード
wget https://downloads.apache.org/kafka/2.13-3.6.0/kafka_2.13-3.6.0.tgz
tar -xzf kafka_2.13-3.6.0.tgz
cd kafka_2.13-3.6.0
# Zookeeperの起動
bin/zookeeper-server-start.sh config/zookeeper.properties
# Kafkaの起動(別のターミナル)
bin/kafka-server-start.sh config/server.properties
Terminal window
# docker-compose.yml
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
# 起動
docker-compose up -d

Topicは、メッセージのカテゴリです。

Partitionは、Topicを分割した単位です。並列処理を可能にします。

Producerは、メッセージを送信するコンポーネントです。

Consumerは、メッセージを受信するコンポーネントです。

Consumer Group(コンシューマーグループ)

Section titled “Consumer Group(コンシューマーグループ)”

Consumer Groupは、複数のConsumerをグループ化したものです。

Terminal window
# Topicの作成
kafka-topics.sh --create \
--bootstrap-server localhost:9092 \
--replication-factor 1 \
--partitions 3 \
--topic my-topic
# Topicの一覧
kafka-topics.sh --list --bootstrap-server localhost:9092
# Topicの詳細確認
kafka-topics.sh --describe \
--bootstrap-server localhost:9092 \
--topic my-topic
Terminal window
# プロデューサーの起動
kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic my-topic
# メッセージの入力(対話形式)
> Hello Kafka
> This is a test message
Terminal window
# コンシューマーの起動
kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic my-topic \
--from-beginning
# コンシューマーグループの指定
kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic my-topic \
--group my-group
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class ProducerExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 3);
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 100; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>(
"my-topic",
"key-" + i,
"value-" + i
);
producer.send(record, (metadata, exception) -> {
if (exception == null) {
System.out.println("Sent: " + metadata.topic() +
" partition: " + metadata.partition() +
" offset: " + metadata.offset());
} else {
exception.printStackTrace();
}
});
}
producer.close();
}
}
from kafka import KafkaProducer
import json
# プロデューサーの作成
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
key_serializer=lambda k: k.encode('utf-8') if k else None,
acks='all',
retries=3
)
# メッセージの送信
for i in range(100):
future = producer.send(
'my-topic',
key=f'key-{i}',
value={'message': f'value-{i}'}
)
# 送信結果の確認
record_metadata = future.get(timeout=10)
print(f"Sent: {record_metadata.topic} "
f"partition: {record_metadata.partition} "
f"offset: {record_metadata.offset}")
producer.close()
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class ConsumerExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my-topic"));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.println("Received: " + record.key() + " = " + record.value() +
" partition: " + record.partition() +
" offset: " + record.offset());
}
consumer.commitSync();
}
} finally {
consumer.close();
}
}
}
from kafka import KafkaConsumer
import json
# コンシューマーの作成
consumer = KafkaConsumer(
'my-topic',
bootstrap_servers=['localhost:9092'],
group_id='my-group',
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
key_deserializer=lambda k: k.decode('utf-8') if k else None,
auto_offset_reset='earliest',
enable_auto_commit=False
)
# メッセージの受信
try:
for message in consumer:
print(f"Received: {message.key} = {message.value} "
f"partition: {message.partition} "
f"offset: {message.offset}")
# 手動コミット
consumer.commit()
except KeyboardInterrupt:
pass
finally:
consumer.close()

7. パーティションとレプリケーション

Section titled “7. パーティションとレプリケーション”
Terminal window
# パーティション数の変更
kafka-topics.sh --alter \
--bootstrap-server localhost:9092 \
--topic my-topic \
--partitions 6
# パーティションの確認
kafka-topics.sh --describe \
--bootstrap-server localhost:9092 \
--topic my-topic
Terminal window
# レプリケーション係数の設定
kafka-topics.sh --create \
--bootstrap-server localhost:9092 \
--replication-factor 3 \
--partitions 3 \
--topic my-topic
Terminal window
# Consumer Groupの一覧
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
# Consumer Groupの詳細
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--group my-group --describe
# Consumer Groupのリセット
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--group my-group --reset-offsets --to-earliest \
--topic my-topic --execute

9. ストリーム処理(Kafka Streams)

Section titled “9. ストリーム処理(Kafka Streams)”
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import java.util.Properties;
public class StreamsExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-example");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source = builder.stream("input-topic");
source.mapValues(value -> value.toUpperCase())
.to("output-topic");
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
Terminal window
# server.propertiesの設定
listeners=SASL_PLAINTEXT://localhost:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
Terminal window
# server.propertiesの設定
listeners=SSL://localhost:9093
ssl.keystore.location=/path/to/keystore
ssl.keystore.password=keystore_password
ssl.truststore.location=/path/to/truststore
ssl.truststore.password=truststore_password
Terminal window
# JMXの有効化
export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=9999"
# メトリクスの確認
# jconsole localhost:9999
Terminal window
# ログディレクトリの確認
ls -la /tmp/kafka-logs/
# パーティションログの確認
ls -la /tmp/kafka-logs/my-topic-0/

12. 実践的なベストプラクティス

Section titled “12. 実践的なベストプラクティス”
Terminal window
# パーティション数の目安
# - コンシューマーの数と同じか、その倍数
# - 通常は3-10パーティション
# - 最大1000パーティション程度
// 同じキーを使用することで、同じパーティションに送信
ProducerRecord<String, String> record = new ProducerRecord<>(
"my-topic",
"user-123", // 同じキーを使用
"message"
);
// バッチサイズの設定
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
// 解決: 適切なACK設定
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 3);
// 解決: 冪等性プロデューサー
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// 解決: バッチ処理と圧縮
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");

Kafka完全ガイドのポイント:

  • TopicとPartition: 並列処理の実現
  • Producer: メッセージの送信
  • Consumer: メッセージの受信
  • Consumer Group: 負荷分散
  • ストリーム処理: リアルタイムデータ処理
  • セキュリティ: SASL、SSL/TLS
  • 監視: JMXメトリクス
  • ベストプラクティス: パーティション数、順序保証、バッチ処理

適切なKafkaの使用により、高スループットでスケーラブルなメッセージングシステムを構築できます。