Kafka 使用手册Kafka 是一个分布式流处理平台,主要用于构建实时数据管道和流应用。核心概念包括生产者(Producer)、消费者(Consumer)、主题(Topic)、分区(Partition)和 Broker。
生产者:将数据发布到 Kafka 主题。消费者:订阅主题并处理消息。主题:消息的逻辑分类,分为多个分区。分区:主题的物理分片,每个分区是一个有序、不可变的消息序列。Broker:Kafka 服务器,负责存储和转发消息。关键参数设置及注意事项生产者参数bootstrap.servers:Kafka 集群地址列表,格式为 host:port。acks:确认机制,可选 0(无需确认)、1(Leader 确认)或 all(所有副本确认)。retries:发送失败后的重试次数。batch.size:批量发送的字节大小,提高吞吐量。linger.ms:发送延迟时间,与 batch.size 配合使用。注意事项:
高吞吐场景建议增大 batch.size 和 linger.ms,但会引入延迟。acks=all 保证数据不丢失,但降低吞吐量。消费者参数bootstrap.servers:同生产者。group.id:消费者组 ID,相同组内的消费者共享分区。auto.offset.reset:无偏移量时的策略,可选 earliest(从最早开始)或 latest(从最新开始)。enable.auto.commit:是否自动提交偏移量。max.poll.records:单次拉取的最大消息数。注意事项:
避免频繁提交偏移量,可能影响性能。确保 group.id 唯一性,避免消费混乱。Broker 参数log.dirs:日志存储目录。num.partitions:默认主题分区数。default.replication.factor:默认副本数。zookeeper.connect:ZooKeeper 连接地址。注意事项:
分区数和副本数影响集群的扩展性和容错性。确保 log.dirs 有足够磁盘空间。案例分析案例:实时日志收集系统场景:多个服务生成日志,通过 Kafka 统一收集并供下游分析系统消费。
实现步骤:
创建主题 logs,设置分区数为 3,副本数为 2。生产者将日志发送到 logs 主题。消费者组订阅 logs 主题并处理日志。代码实现生产者示例(Java)代码语言:javascript复制import org.apache.kafka.clients.producer.*;
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 3);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer
ProducerRecord
producer.send(record, (metadata, exception) -> {
if (exception != null) {
exception.printStackTrace();
} else {
System.out.println("Message sent to partition " + metadata.partition());
}
});
producer.close();消费者示例(Java)代码语言:javascript复制import org.apache.kafka.clients.consumer.*;
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "log-consumers");
props.put("enable.auto.commit", "true");
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer
consumer.subscribe(Collections.singletonList("logs"));
while (true) {
ConsumerRecords
for (ConsumerRecord
System.out.printf("Offset = %d, Key = %s, Value = %s%n",
record.offset(), record.key(), record.value());
}
}常见问题及解决方案消息丢失:确保生产者设置 acks=all,消费者手动提交偏移量。消息重复:消费者实现幂等处理逻辑。性能瓶颈:调整 batch.size 和 linger.ms,增加分区数。通过合理配置参数和遵循最佳实践,可以充分发挥 Kafka 的高吞吐、低延迟特性。