【Kafka 使用手册】

【Kafka 使用手册】

Kafka 使用手册Kafka 是一个分布式流处理平台,主要用于构建实时数据管道和流应用。核心概念包括生产者(Producer)、消费者(Consumer)、主题(Topic)、分区(Partition)和 Broker。

生产者:将数据发布到 Kafka 主题。消费者:订阅主题并处理消息。主题:消息的逻辑分类,分为多个分区。分区:主题的物理分片,每个分区是一个有序、不可变的消息序列。Broker:Kafka 服务器,负责存储和转发消息。关键参数设置及注意事项生产者参数bootstrap.servers:Kafka 集群地址列表,格式为 host:port。acks:确认机制,可选 0(无需确认)、1(Leader 确认)或 all(所有副本确认)。retries:发送失败后的重试次数。batch.size:批量发送的字节大小,提高吞吐量。linger.ms:发送延迟时间,与 batch.size 配合使用。注意事项:

高吞吐场景建议增大 batch.size 和 linger.ms,但会引入延迟。acks=all 保证数据不丢失,但降低吞吐量。消费者参数bootstrap.servers:同生产者。group.id:消费者组 ID,相同组内的消费者共享分区。auto.offset.reset:无偏移量时的策略,可选 earliest(从最早开始)或 latest(从最新开始)。enable.auto.commit:是否自动提交偏移量。max.poll.records:单次拉取的最大消息数。注意事项:

避免频繁提交偏移量,可能影响性能。确保 group.id 唯一性,避免消费混乱。Broker 参数log.dirs:日志存储目录。num.partitions:默认主题分区数。default.replication.factor:默认副本数。zookeeper.connect:ZooKeeper 连接地址。注意事项:

分区数和副本数影响集群的扩展性和容错性。确保 log.dirs 有足够磁盘空间。案例分析案例:实时日志收集系统场景:多个服务生成日志,通过 Kafka 统一收集并供下游分析系统消费。

实现步骤:

创建主题 logs,设置分区数为 3,副本数为 2。生产者将日志发送到 logs 主题。消费者组订阅 logs 主题并处理日志。代码实现生产者示例(Java)代码语言:javascript复制import org.apache.kafka.clients.producer.*;

Properties props = new Properties();

props.put("bootstrap.servers", "localhost:9092");

props.put("acks", "all");

props.put("retries", 3);

props.put("batch.size", 16384);

props.put("linger.ms", 1);

props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer producer = new KafkaProducer<>(props);

ProducerRecord record = new ProducerRecord<>("logs", "key", "message");

producer.send(record, (metadata, exception) -> {

if (exception != null) {

exception.printStackTrace();

} else {

System.out.println("Message sent to partition " + metadata.partition());

}

});

producer.close();消费者示例(Java)代码语言:javascript复制import org.apache.kafka.clients.consumer.*;

Properties props = new Properties();

props.put("bootstrap.servers", "localhost:9092");

props.put("group.id", "log-consumers");

props.put("enable.auto.commit", "true");

props.put("auto.offset.reset", "earliest");

props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer consumer = new KafkaConsumer<>(props);

consumer.subscribe(Collections.singletonList("logs"));

while (true) {

ConsumerRecords records = consumer.poll(Duration.ofMillis(100));

for (ConsumerRecord record : records) {

System.out.printf("Offset = %d, Key = %s, Value = %s%n",

record.offset(), record.key(), record.value());

}

}常见问题及解决方案消息丢失:确保生产者设置 acks=all,消费者手动提交偏移量。消息重复:消费者实现幂等处理逻辑。性能瓶颈:调整 batch.size 和 linger.ms,增加分区数。通过合理配置参数和遵循最佳实践,可以充分发挥 Kafka 的高吞吐、低延迟特性。

相关文章

武侠手游哪个最好玩2025 武侠手游排行榜top10
det365手机版

武侠手游哪个最好玩2025 武侠手游排行榜top10

⌛ 11-27 👁️‍🗨️ 4411
快手怎么发红包
det365手机版

快手怎么发红包

⌛ 10-21 👁️‍🗨️ 2172
贵州金沙旅游 – 探索黔西南的自然与文化瑰宝
365出款成功未到

贵州金沙旅游 – 探索黔西南的自然与文化瑰宝

⌛ 08-24 👁️‍🗨️ 1146