导语:Kafka 是一个分布式的基于发布/订阅模式的消息引擎系统。其高可靠、高性能、高吞吐的优势使其广泛应用于各个业务场景。所以这里我们有必要对它的特性做一个全面分析。
Kafka 基础
架构图
搞定术语
生产者:Producer。向主题发布新消息的应用程序。 消费者:Consumer。从主题订阅新消息的应用程序。
消息:Record。Kafka是消息引擎嘛,这里的消息就是指Kafka处理的主要对象。
主题:Topic。主题是承载消息的逻辑容器,在实际使用中多用来区分具体的业务。
分区:Partition。一个有序不变的消息序列。每个主题下可以有多个分区。 消息位移:Offset。表示分区中每条消息的位置信息,是一个单调递增且不变的值。
副本:Replica。Kafka中同一条消息能够被拷贝到多个地方以提供数据冗余\容灾,这些地方就是所谓的副本。副本还分为领导者副本和追随者副本,各自有不同的角色划分。副本是在分区层级下的,即每个分区可配置多个副本实现高可用。
消费者位移:Consumer Offset。表征消费者消费进度,每个消费者都有自己的消费者位移。
消费者组:Consumer Group。多个消费者实例共同组成的一个组,同时消费多个分区以实现高吞吐。
重平衡:Rebalance。消费者组内某个消费者实例挂掉后,其他消费者实例自动重新分配订阅主题分区的过程。Rebalance是Kafka消费者端实现高可用的重要手段。
什么情况下发生重平衡?
- consumer 发生变化
- partition 发生变化
-
topic 数量发生变化
核心参数
• Broker参数
• 存储类
• log.dirs=/home/kafka1,/home/kafka2,/home/kafka3
• ZooKeeper相关
• zookeeper.connect=zk1:2181,zk2:2181,zk3:2181/kafka1
• 连接类
Ssl sasl(PLAINTEX \sasl-PLAINTEX、scram)
• listeners=CONTROLLER: //localhost:9092
• listener.security.protocol.map=CONTROLLER:PLAINTEX
• Topic 管理
• auto.create.topics.enable: 自动创建topic
• unclean.leader.election.enable: false, ar=isr + 非isr
• auto.leader.rebalance.enable:
• 数据留存
• log.retention.{hours|minutes|ms} :数据寿命 hours=168h
• log.rentention.bytes: -1 表示没限制
• message.max.bytes: broker接受的最大数据大小
生产消费
- requet.require.ack=0 -1 1 用来保证是否让所有的isr同步数据
kafka特性深入剖析
#日志存储
Kafka 中的消息是以主题为基本单位进行归类的,各个主题在逻辑上相互独立。每个主题又可以分为一个或多个分区,分区的数量可以在主题创建的时候指定,也可以在之后修改。每条消息在发送的时候会根据分区规则被追加到指定的分区中,分区中的每条消息都会被分配一个唯一的序列号,也就是通常所说的偏移量(offset)。
不考虑多副本的情况,一个分区对应一个日志(Log)。为了防止 Log 过大,Kafka 又引入了日志分段(LogSegment)的概念,将 Log 切分为多个 LogSegment,相当于一个巨型文件被平均分配为多个相对较小的文件,这样也便于消息的维护和清理。
向主题topic-log中发送一定量的消息,某一时刻topic-log-0目录中的布局:
#分区副本剖析
Kafka 通过多副本机制实现故障自动转移,在 Kafka 集群中某个 broker 节点失效的情况下仍然保证服务可用。
我们该如何确保副本中所有的数据都是一致的呢?特别是对Kafka而言,当生产者发送消息到某个主题后,消息是如何同步到对应的所有副本中的呢?针对这个问题,最常见的解决方案就是采用基于领导者(Leader-based)的副本机制。
第一,副本分成两类:领导者(Leader Replica)和追随者(Follower Replica)。
第二,Follower副本是不对外提供服务的。这就是说,任何一个追随者副本都不能响应消费者和生产者的读写请求。所有的请求都必须由领导者副本来处理,或者说,所有的读写请求都必须发往领导者副本所在的Broker,由该Broker负责处理。
第三,当领导者副本挂掉了,或者说领导者副本所在的Broker宕机时,Kafka依托于ZooKeeper提供的监控功能能够实时感知到,并立即开启新一轮的领导者选举,从追随者副本中选一个作为新的领导者。老Leader副本重启回来后,只能作为追随者副本加入到集群中。
ISR AR
- 分区中的所有副本统称为 AR,而 ISR 是指与 leader 副本保持同步状态的副本集合,当然 leader 副本本身也是这个集合中的一员。
失效副本
正常情况下,分区的所有副本都处于 ISR 集合中,但是难免会有异常情况发生,从而某些副本被剥离出 ISR 集合中。在 ISR 集合之外,也就是处于同步失效或功能失效(比如副本处于非存活状态)的副本统称为失效副本,失效副本对应的分区也就称为同步失效分区,即 under-replicated 分区。
bin/kafka-topics.sh --zookeeper localhost: 2181/kafka --describe --topic topic-partitions --under-replicated-partitions
[root@node1 kafka_2.11-2.0.0]# bin/kafka-topics.sh --zookeeper localhost:2181/ kafka --describe --topic topic-partitions ]
Topic: topic-partitions Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,0
Topic: topic-partitions Partition: 1 Leader: 0 Replicas: 2,0,1 Isr: 0,1
Topic: topic-partitions Partition: 2 Leader: 0 Replicas: 0,1,2 Isr: 0,1
分析:
LEO与HW
LEO 标识每个分区中最后一条消息的下一个位置,分区的每个副本都有自己的 LEO,ISR 中最小的 LEO 即为 HW,俗称高水位,消费者只能拉取到 HW 之前的消息。
分析在拉去数据过程中各个副本 LEO 和 HW 的变化情况:
#可靠性ack分析
仅依靠副本数来支撑可靠性是远远不够的,大多数人还会想到生产者客户端参数 request.required.acks。
- 对于 acks = 1 的配置,生产者将消息发送到 leader 副本,leader 副本在成功写入本地日志之后会告知生产者已经成功提交,如下图所示。如果此时 ISR 集合的 follower 副本还没来得及拉取到 leader 中新写入的消息,leader 就宕机了,那么此次发送的消息就会丢失。
- 对于 ack = -1(all) 的配置,生产者将消息发送到 leader 副本,leader 副本在成功写入本地日志之后还要等待 ISR 中的 follower 副本全部同步完成才能够告知生产者已经成功提交,即使此时 leader 副本宕机,消息也不会丢失。
- 对于 ack = 0 的配置这意味着producer无需等待来自broker的确认而继续发送下一批消息。这种情况下数据传输效率最高,但是数据可靠性确是最低的。
#消息交付可靠性保障
至少一次(at least once):消息不会丢失,但有可能被重复发送。
最多一次(at most once):消息可能会丢失,但绝不会被重复发送。
精确一次(exactly once):消息不会丢失,也不会被重复发送。
at least once
如果producer收到来自Kafka broker的确认(ack)或者acks = all,
则表示该消息已经写入到Kafka。
但如果producer ack超时或收到错误,则可能会重试发送消息,客户端会认为该消息未写入Kafka。
At most once
如果在ack超时或返回错误时producer不重试,
则该消息可能最终不会写入Kafka,因此不会传递给consumer。
requet.request.ack=0
exactly once
produce1. kafka 0.11.0.0版本引入了idempotent producer机制,在这个机制中同一消息可能被producer发送多次,但是在broker端只会写入一次.
2. 幂等producer 能保证单分区上无重复消息;
props.put(“enable.idempotence”, ture)
Producer 需要做两件事:
1)初始化时像向 Broker 申请一个 ProducerID
2)为每条消息绑定一个 SequenceNumber
3) broker 保存SequenceNumber。
自动ack=-1 && 单分区去重
#磁盘顺序读写
kafak采用的是磁盘顺序读写方式,极大提升了读写性能。
#高性能页缓存
一般磁盘 I/O 的场景有以下几种:
用户调用标准 C 库进行 I/O 操作,数据流为:应用程序 buffer→C 库标准 IObuffer→文件系统页缓存→通过具体文件系统到磁盘。
用户调用文件 I/O,数据流为:应用程序 buffer→文件系统页缓存→通过具体文件系统到磁盘。
用户打开文件时使用 O_DIRECT,绕过页缓存直接读写磁盘。
脏页
#零拷贝
所谓的零拷贝是指将数据直接从磁盘文件复制到网卡设备中,而不需要经由应用程序之手。零拷贝大大提高了应用程序的性能,减少了内核和用户模式之间的上下文切换。
调用 read() 时,文件 A 中的内容被复制到了内核模式下的 Read Buffer 中。
CPU 控制将内核模式数据复制到用户模式下。
调用 write() 时,将用户模式下的内容复制到内核模式下的 Socket Buffer 中。
将内核模式下的 Socket Buffer 的数据复制到网卡设备中传送。
#零拷贝
常用命令
# zk
cd /usr/local/Cellar/zookeeper/3.4.10/bin
- 安装、解压
* 下载后解压,Zookeeper 的配置文件在 conf 目录下,有 zoo_sample.cfg 和 log4j.properties,将zoo_sample.cfg 重命名成zoo.cfg,因为 Zookeeper 在启动时会找这个文件作为默认配置文件。
- 启动
`./zkServer start`
- 客户端
`./zkCli`
#kafka
cd /Users/vking/tools/kafka_2.11-1.0.1/bin
./kafka-server-start.sh ../config/server.properties
log /tmp/kafka-logs
#创建topic
./kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic test00
# 查看topic 列表
./kafka-topics.sh --zookeeper 127.0.0.1:2181 --list
__consumer_offsets 这个是kafka内置的topic,保存consumer消费的offset的
# 生产
./kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test_1
#消费
./kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test_1 --group group_1 --from-beginning
#查看kafka配置
./kafka-configs.sh --zookeeper 127.0.0.1:2181 --entity-type brokers --entity-default --describe
./kafka-configs.sh --zookeeper 127.0.0.1:2181 --entity-type brokers --entity-name 0 --describe
总结
到这里Kafka 的核心特性基本剖析完了,当然kafka也还有其他优秀的特性 这里限于篇幅没法一一剖析,有疑问的同学我们可以评论区交流。
看完文章后,同学们可以试着去回答一个面试题:kafka的高可靠、高性能分别是怎么保证的?