Kafka完全ガイド
Kafka完全ガイド
Section titled “Kafka完全ガイド”Apache Kafkaの実践的な使い方を、実務で使える実装例とベストプラクティスとともに詳しく解説します。
1. Kafkaとは
Section titled “1. Kafkaとは”Kafkaの特徴
Section titled “Kafkaの特徴”Apache Kafkaは、分散ストリーミングプラットフォームです。高スループット、低レイテンシ、スケーラビリティに優れています。
Kafkaの特徴 ├─ 高スループット(数百万メッセージ/秒) ├─ 低レイテンシ ├─ 水平スケーリング ├─ 永続化(ディスクに保存) ├─ 分散アーキテクチャ └─ ストリーム処理対応なぜKafkaを選ぶのか
Section titled “なぜKafkaを選ぶのか”Kafkaを選ぶべき場合:
- 非常に高いスループットが必要
- 大量のデータストリームを処理
- イベントソーシングが必要
- リアルタイムデータ処理が必要
Kafkaを選ばないべき場合:
- シンプルなキューが必要(RabbitMQの方が適している場合がある)
- 低スループット(RabbitMQの方が適している場合がある)
2. Kafkaのインストールとセットアップ
Section titled “2. Kafkaのインストールとセットアップ”macOSでのインストール
Section titled “macOSでのインストール”# Homebrewを使用brew install kafka
# Zookeeperの起動brew services start zookeeper
# Kafkaの起動brew services start kafkaLinuxでのインストール
Section titled “Linuxでのインストール”# Kafkaのダウンロードwget https://downloads.apache.org/kafka/2.13-3.6.0/kafka_2.13-3.6.0.tgztar -xzf kafka_2.13-3.6.0.tgzcd kafka_2.13-3.6.0
# Zookeeperの起動bin/zookeeper-server-start.sh config/zookeeper.properties
# Kafkaの起動(別のターミナル)bin/kafka-server-start.sh config/server.propertiesDockerでのインストール
Section titled “Dockerでのインストール”# docker-compose.ymlversion: '3'services: zookeeper: image: confluentinc/cp-zookeeper:latest environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000
kafka: image: confluentinc/cp-kafka:latest depends_on: - zookeeper ports: - "9092:9092" environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
# 起動docker-compose up -d3. 基本的な概念
Section titled “3. 基本的な概念”Topic(トピック)
Section titled “Topic(トピック)”Topicは、メッセージのカテゴリです。
Partition(パーティション)
Section titled “Partition(パーティション)”Partitionは、Topicを分割した単位です。並列処理を可能にします。
Producer(プロデューサー)
Section titled “Producer(プロデューサー)”Producerは、メッセージを送信するコンポーネントです。
Consumer(コンシューマー)
Section titled “Consumer(コンシューマー)”Consumerは、メッセージを受信するコンポーネントです。
Consumer Group(コンシューマーグループ)
Section titled “Consumer Group(コンシューマーグループ)”Consumer Groupは、複数のConsumerをグループ化したものです。
4. 基本的な操作
Section titled “4. 基本的な操作”Topicの作成
Section titled “Topicの作成”# Topicの作成kafka-topics.sh --create \ --bootstrap-server localhost:9092 \ --replication-factor 1 \ --partitions 3 \ --topic my-topic
# Topicの一覧kafka-topics.sh --list --bootstrap-server localhost:9092
# Topicの詳細確認kafka-topics.sh --describe \ --bootstrap-server localhost:9092 \ --topic my-topicメッセージの送信
Section titled “メッセージの送信”# プロデューサーの起動kafka-console-producer.sh \ --bootstrap-server localhost:9092 \ --topic my-topic
# メッセージの入力(対話形式)> Hello Kafka> This is a test messageメッセージの受信
Section titled “メッセージの受信”# コンシューマーの起動kafka-console-consumer.sh \ --bootstrap-server localhost:9092 \ --topic my-topic \ --from-beginning
# コンシューマーグループの指定kafka-console-consumer.sh \ --bootstrap-server localhost:9092 \ --topic my-topic \ --group my-group5. Producerの実装
Section titled “5. Producerの実装”Javaの例
Section titled “Javaの例”import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerConfig;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class ProducerExample { public static void main(String[] args) { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.ACKS_CONFIG, "all"); props.put(ProducerConfig.RETRIES_CONFIG, 3);
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 100; i++) { ProducerRecord<String, String> record = new ProducerRecord<>( "my-topic", "key-" + i, "value-" + i );
producer.send(record, (metadata, exception) -> { if (exception == null) { System.out.println("Sent: " + metadata.topic() + " partition: " + metadata.partition() + " offset: " + metadata.offset()); } else { exception.printStackTrace(); } }); }
producer.close(); }}Pythonの例
Section titled “Pythonの例”from kafka import KafkaProducerimport json
# プロデューサーの作成producer = KafkaProducer( bootstrap_servers=['localhost:9092'], value_serializer=lambda v: json.dumps(v).encode('utf-8'), key_serializer=lambda k: k.encode('utf-8') if k else None, acks='all', retries=3)
# メッセージの送信for i in range(100): future = producer.send( 'my-topic', key=f'key-{i}', value={'message': f'value-{i}'} )
# 送信結果の確認 record_metadata = future.get(timeout=10) print(f"Sent: {record_metadata.topic} " f"partition: {record_metadata.partition} " f"offset: {record_metadata.offset}")
producer.close()6. Consumerの実装
Section titled “6. Consumerの実装”Javaの例
Section titled “Javaの例”import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;import java.util.Collections;import java.util.Properties;
public class ConsumerExample { public static void main(String[] args) { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("my-topic"));
try { while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) { System.out.println("Received: " + record.key() + " = " + record.value() + " partition: " + record.partition() + " offset: " + record.offset()); }
consumer.commitSync(); } } finally { consumer.close(); } }}Pythonの例
Section titled “Pythonの例”from kafka import KafkaConsumerimport json
# コンシューマーの作成consumer = KafkaConsumer( 'my-topic', bootstrap_servers=['localhost:9092'], group_id='my-group', value_deserializer=lambda m: json.loads(m.decode('utf-8')), key_deserializer=lambda k: k.decode('utf-8') if k else None, auto_offset_reset='earliest', enable_auto_commit=False)
# メッセージの受信try: for message in consumer: print(f"Received: {message.key} = {message.value} " f"partition: {message.partition} " f"offset: {message.offset}")
# 手動コミット consumer.commit()except KeyboardInterrupt: passfinally: consumer.close()7. パーティションとレプリケーション
Section titled “7. パーティションとレプリケーション”パーティションの設定
Section titled “パーティションの設定”# パーティション数の変更kafka-topics.sh --alter \ --bootstrap-server localhost:9092 \ --topic my-topic \ --partitions 6
# パーティションの確認kafka-topics.sh --describe \ --bootstrap-server localhost:9092 \ --topic my-topicレプリケーション
Section titled “レプリケーション”# レプリケーション係数の設定kafka-topics.sh --create \ --bootstrap-server localhost:9092 \ --replication-factor 3 \ --partitions 3 \ --topic my-topic8. Consumer Group
Section titled “8. Consumer Group”Consumer Groupの管理
Section titled “Consumer Groupの管理”# Consumer Groupの一覧kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
# Consumer Groupの詳細kafka-consumer-groups.sh --bootstrap-server localhost:9092 \ --group my-group --describe
# Consumer Groupのリセットkafka-consumer-groups.sh --bootstrap-server localhost:9092 \ --group my-group --reset-offsets --to-earliest \ --topic my-topic --execute9. ストリーム処理(Kafka Streams)
Section titled “9. ストリーム処理(Kafka Streams)”Javaの例
Section titled “Javaの例”import org.apache.kafka.streams.KafkaStreams;import org.apache.kafka.streams.StreamsBuilder;import org.apache.kafka.streams.StreamsConfig;import org.apache.kafka.streams.kstream.KStream;
import java.util.Properties;
public class StreamsExample { public static void main(String[] args) { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-example"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source = builder.stream("input-topic"); source.mapValues(value -> value.toUpperCase()) .to("output-topic");
KafkaStreams streams = new KafkaStreams(builder.build(), props); streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); }}10. セキュリティ
Section titled “10. セキュリティ”SASL認証
Section titled “SASL認証”# server.propertiesの設定listeners=SASL_PLAINTEXT://localhost:9092security.inter.broker.protocol=SASL_PLAINTEXTsasl.mechanism.inter.broker.protocol=PLAINsasl.enabled.mechanisms=PLAINSSL/TLS
Section titled “SSL/TLS”# server.propertiesの設定listeners=SSL://localhost:9093ssl.keystore.location=/path/to/keystoressl.keystore.password=keystore_passwordssl.truststore.location=/path/to/truststoressl.truststore.password=truststore_password11. 監視と管理
Section titled “11. 監視と管理”JMXメトリクス
Section titled “JMXメトリクス”# JMXの有効化export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=9999"
# メトリクスの確認# jconsole localhost:9999# ログディレクトリの確認ls -la /tmp/kafka-logs/
# パーティションログの確認ls -la /tmp/kafka-logs/my-topic-0/12. 実践的なベストプラクティス
Section titled “12. 実践的なベストプラクティス”パーティション数の決定
Section titled “パーティション数の決定”# パーティション数の目安# - コンシューマーの数と同じか、その倍数# - 通常は3-10パーティション# - 最大1000パーティション程度メッセージの順序保証
Section titled “メッセージの順序保証”// 同じキーを使用することで、同じパーティションに送信ProducerRecord<String, String> record = new ProducerRecord<>( "my-topic", "user-123", // 同じキーを使用 "message");// バッチサイズの設定props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);props.put(ProducerConfig.LINGER_MS_CONFIG, 10);13. よくある問題と解決方法
Section titled “13. よくある問題と解決方法”問題1: メッセージの損失
Section titled “問題1: メッセージの損失”// 解決: 適切なACK設定props.put(ProducerConfig.ACKS_CONFIG, "all");props.put(ProducerConfig.RETRIES_CONFIG, 3);問題2: メッセージの重複処理
Section titled “問題2: メッセージの重複処理”// 解決: 冪等性プロデューサーprops.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);問題3: パフォーマンスの低下
Section titled “問題3: パフォーマンスの低下”// 解決: バッチ処理と圧縮props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");Kafka完全ガイドのポイント:
- TopicとPartition: 並列処理の実現
- Producer: メッセージの送信
- Consumer: メッセージの受信
- Consumer Group: 負荷分散
- ストリーム処理: リアルタイムデータ処理
- セキュリティ: SASL、SSL/TLS
- 監視: JMXメトリクス
- ベストプラクティス: パーティション数、順序保証、バッチ処理
適切なKafkaの使用により、高スループットでスケーラブルなメッセージングシステムを構築できます。