第一章 初始Kafka
基本概念
- Producer:生产者,负责创建消息,然后将消息投递到Kafka中;
- Consumer:消费者,负责从Kafka中接收消息,然后进行相关的业务逻辑处理;
- Broker:服务代理节点,负责消息的存储;
- Zookeeper Cluster:负责Kafka集群元数据的管理、控制器的选举等;
- Topic:主题,每条发布到Kafka集群的消息都有一个主题;
- Partition:分区,同一主题下的不同分区包含的消息是不同的, 分区在存储层面可以看作一个可追加的日志( Log)文件,消息在被追加到分区日志、文件的时 候都会分配一个特定的偏移量(offset)。 offset是消息在分区中的唯一标识, Kafka通过它来保 证消息在分区内的顺序性,不过 offset并不跨越分区,也就是说, Kafka保证的是分区有序而不是主题有序;
- Replica:副本,Kafka高可用机制之一,Leader副本负责读写,Follower副本负责同步Leader副本;
- Assigned Replicas(AR):所有副本集合;
- In-Sync Replicas(ISR):所有与 leader 副本保持 一定程度 同步 的副本(包括 leader 副本在内);
- Out-of-Sync Replicas(OSR):与 leader 副本同 步滞后过 多的副本(不包括 leader 副本);
- High Watermark(HW):高水位,它标识 了一个特定的消息偏移量(offset),消费者只能拉取到这个 offset之前的消息;
-
Log End Offset(LEO):它标识当前日志文件中下一条待写入消息的offset,LEO的大小相当于当前日志分区中最后一条消息的offset值加 1;
第二章 生产者
客户端参数
消息发送
- 发后即忘:只管往 Kafka 中发送消息而并不关心消息是否正确到达,性能最高,可靠性也最差;
- 同步发送:send()方法本身就是异步的,send()方法之后直接链式调用了get()方法来阻塞等待Kafka的响应,可靠性高,性能差;
- 异步发送:send()方法里指定一个Callback的回调函数,Kafka在返回响应时调用该函数来实现异步的发送确认,性能好,可靠性高。
客户端架构
- 主线程:在主线程中由 KafkaProducer创建消息,然后通过可能的拦截器、序列化器和分区器的作 用之后缓存到消息累加器( RecordAccumulator,也称为消息收 集器〉中;
- Sender线程:负责从 RecordAccumulator中获取消息并将其发送到 Kafka中。
- RecordAccumulator:主要用来缓存消息以便Sender线程可以批量发送,进而减少网络传输的资源消耗以提升性能 ,采用双端队列实现分区。
通过 java.io.ByteBuffer 实现消息内存的创建和释放,RecordAccumulator内部使用BufferPool,默认值为 16384B,即 16KB。
元数据是指 Kafka 集群的元数据,这些元数据具体记录了集群中有哪些主题,这些主题有 哪些分区,每个分区的 lead巳r副本分配在哪个节点上, follower副本分配在哪些节点上,哪些副 本在 AR、 ISR 等集合中,集群中有哪些节点,控制器节点又是哪一个等信息。
重要参数
- acks:用来指定分区中必须要有多少个副本收到这条消息,之后生产者才会认为这条消 息是成功写入的
- acks=1:默认值,生产者发送消息之后,只要分区的 leader副本成功写入消 息,那么它就会收到来自服务端的成功响应;
- acks=0:生产者发送消 息之后不需要等待任何服务端的响应;
- acks=-1或all:生产者在消息发送之后,需要等待ISR中的所有副本都成功写入消息之后才能够收到来自服务端的成功响应。
- max.request.size:用来限制生产者客户端能发送的消息的最大值,默认值为 1048576B,即1MB
- retries和retry.backoff.ms
- retries:重试次数
- retry.backoff.ms:两次重试之间的时间间隔,默认100
第三章 消费者
消费者和消费组
消费者( Consumer)负责订阅 Kafka 中的主题( Topic),并且从订阅的主题上拉取消息。 与其他一些消息中间件不同的是:在 Kafka 的消费理念中还有一层消费组( Consumer Group) 的概念,每个消费者都有 一个对应的消费组。当消息发布到主题后,只会被投递给订阅它的每 个消费组中的一个消费者 。
客户端开发
消费逻辑步骤:
- 配置消费者客户端参数及创建相应的消费者实例;
- 订阅主题;
- 拉取消息并消费;
- 提交消费位移;
- 关闭消费者实例。
订阅方式
- 集合订阅 subscribe(Collection):AUTO_TOPICS
- 正则表达式订阅 subscribe(Pattern):AUTO_PATTERN
- 指定分区的订阅 assign(Collection):USER_ASSIGNED
subscribe()方法订阅主题具有消费者自动再均衡的功能,assign()方法不具备。
消息消费
位移提交
在旧消费者客户端中,消费位移是存储在 ZooKeeper 中的。而在新消费者客户端中,消费位移存储在Kafka内部的主题_consumer_offsets中。这里把将消费位移存储起来(持久化)的 动作称为“提交”,消费者在消费完消息之后需要执行消费位移的提交。
不过需要非常明确的是,当前消费者需要提交的消费位移并不是 x,而是 x+l,对应于图中的position,它表示下一条需要拉取的消息的位置。
提交参数:
- enable.auto.commit:自动提交,默认true;
- auto.commit.interval.ms:自动提交周期,默认5。
指定消费位移
seek()方法为我们提供了从特定位置读取消息的能力,我们可以通过这个方法来向前跳过若 干消息,也可以通过这个方法来 向后回溯若干消息,这样为消息的消费提供了很大的灵活性。 seek()方法也为我们提供了将消费位移保存在外部存储介质中的能力 , 还可以配合再均衡监听器 来提供更加精准的消费能力。
再均衡
指分区的所属权从一个消费者转移到另一消费者的行为,它为消费组具备高可用
性和伸缩性提供保障,可安全地在消费组中删除或添加消费者。不过在再均衡发生期间,消费组会变得不可用。
- onPartitionsRevoked():在再均衡开始之前和消费者停止读取消息之后被调用。可以通过这个回调方法来处理消费位移的提交,以此来避免一些不必要的重复消费现象的发生。
- onPartitionsAssigned():在重新分配分区之后和消费者开始读取消费之前被调用。
多线程实现
- acquire():通过线程操作计数标记的方式来检测线程是否发生了并发操作,以此保证只有一个线程在操作;
- release():释放。
实现方式: - 线程封闭:一个消费线程可 以消 费一个或多个分区中的消息;
- 线程开放:多个消费线程同时消费同一个分区,通过assign()、 seek()等方法实现;
- 消费逻辑多线程:将处理消息模块改成多线程的实现方式。
第四章 主题与分区
主题管理
主题和分区都是提供给上层用户 的抽象,而在副本层面或更加确切地说是Log层面才有实际物理上的存在。同一个分区中的多个副本必须分布在不同的 broker 中。
分区管理
优先副本选举
优先副本是指在AR集合列表中的第一个副本。
分区重分配
- 首先创建需要一 个包含主题清单的 JSON 文件;
- 其次根据主题清单和broker节点清单生成一份重分配方案;
- 最后根据这份方案执行具体的重分配动作。
分区重分配本质在于数据复制,先增加新的副本,然后进行数据同步,最后删除旧的副本来达到最终的目的。
第五章 日志存储
Log和LogSegnient 也不是纯粹物理意义上的概念,Log在物理上只以文件夹的形式存储,而每个LogSegment 对应于磁盘上的一个日志文件和两个索引文件,以及可能的其他文件。
为了便于消息的检索,每个 LogSegment 中的日志文件(以“ .log”为文件后缀)都有对应的两个索引文件:
- 偏移量索引文件:以“ .index”为文件后缀
- 时间戳索引文件:以“ .timeindex” 为文件后缀
每个 LogSegment 都有一个基准偏移量 baseOffset,用来表示当前 LogSegment 中第一条消息的offset。偏移量是一个64位的长整型数,日志文件和两个索引文件都是根据基准偏移量( baseOffset)命名的,名称固定为20位数字,没有达到的位数则用0填充。比如第一个LogSegment的基准偏移量为 0,对应的日志文件为 00000000000000000000.log。
日志格式
日志索引
- 偏移量索引文件:用来建立消息偏移量( offset)到物理地址之间的映射关系,方便快速定位消息所在的物理文件位置;
- 时间戳索引文件:根据指定的时间戳( timestamp)来查找对应的偏移量信息。
偏移量索引
- relativeOffset:相对偏移量,表示消息相对于baseOffset 的偏移量,占用 4 个字节,当前索引文件的文件名即为 baseOffset 的值。
- position:物理地址,也就是消息在日志分段文件中对应的物理位置,占用 4个字节。
消息的偏移量( offset)占用 8 个字节,也可以称为绝对偏移量。
时间戳索引
- timestamp: 当前日志分段最大的时间戳;
- relativeOffset:时间戳所对应的消息的相对偏移量。
日志清理
- 日志删除( Log Retention):按照一定的保留策略直接删除不符合条件的日志分段;
- 日志压缩( Log Compaction):针对每个消息的key进行整合,对于有相同key的不同value值,只保留最后一个版本。
日志删除
- 基于时间的保留策略:日志删除任务会检查当前日志文件中是否有保留时间超过设定的阈值(retentionMs)来寻找可删除的日志分段文件集合(deletableSegments);
- 基于日志大小的保留策略:日志删除任务会检查当前日志的大小是否超过设定的阈值( retentionSize)来寻找可删除的日志分段的文件集合( deletableSegments);
- 基于日志起始偏移量的保留策略:判断某日志分段的下一个日志分段的起始偏移量baseOffset是否小于等于logStartOffset,若是,则可以删除此日志分段。
日志压缩
Log Compaction执行前后,日志分段中的每条消息的偏移量和写入时的偏移量保持一致。Log Compaction会生成新的日志分段文件,日志分段中每条消息的物理位置会重新按照新文件 来组织。Log Compaction 执行过后的偏移量不再是连续的,不过这并不影响日志的查询。
磁盘存储
页缓存
操作系统实现的一种主要的磁盘缓存,以此用来减少对磁盘 I/O 的操作。
I/O流程
I/O调度策略:
- NOOP(No Operation):所有I/O请求大致按照先来后到的顺序进行操作(相
邻 νo 请求的合并); - CFQ(Completely Fair Queuing):按照I/O请求的地址进行排序,
而不是按照先来后到的顺序进行响应,默认调度策略; - DEADLINE:除了CFQ本身具有的I/O排序队列,DEADLINE额外分别为读I/O和写I/O提供了FIFO队列,FIFO (Read) > FIFO (Write) > CFQ;
- ANTICIPATORY:为了满足随机I/O和顺序I/O混合的场景,在DEADLINE的基础上,为每个读I/O都设置了6ms的等待时间窗口,如果在 6ms 内操作系统收到了相邻位置的读I/O请求,就可以立即满足。
零拷贝
将数据直接从磁盘文件复制到网卡设备中,而不需要经过应用程序,减少了内核和用户模式之间的上下文切换。对Linux操作系统而言,零拷贝技术依赖于底层的 sendfile()方法实现。对应于Java语言,FileChannal.transferTo()方法的底层实现就是sendfile()方法。
零拷贝技术通过DMA(Direct Memory Access)技术将文件内容复制到内核模式下的Read Buffer中。不过没有数据被复制到Socket Buffer,相反只有包含数据的位置和长度的信息的文件描述符被加到Socket Buffer中。DMA引擎直接将数据从内核模式中传递到网卡设备(协议引擎)。这里数据只经历了2次复制就从磁盘中传送出去了,并且上下文切换也变成了2次。零拷贝是针对内核模式而言的,数据在内核模式下实现了零拷贝。
第六章 深入服务端
时间轮
Kafka中的时间轮( TimingWheel)是一个存储定时任务的环形队列,底层采用数组实现,数组中的每个元素可以存放一个定时任务列表( TimerTaskList)。TimerTaskList是一个环形的双向链表,链表中的每一项表示的都是定时任务项( TimerTaskEntry),其中封装了真正的定时任务 (TimerTask) 。
用TimjngWheel做最擅长的任务添加和删除操作,而用DelayQueue做最擅长的 时间推进工作。
控制器
控制器选举
- /controller(临时节点):version在目前版本中固定为1, brokerId表示成为控制器的broker的id编号, timestamp 表示竞选成为控制器时的时间戳。
-
/controller_epoch(永久节点):节点中存放的是一个整型的controller_epoch 值,用于记录控制器发生变更的次数。
第七章 深入客户端
分区分配策略
- RangeAssignor:按照消费者总数和分区总数进行整除运算来获得一个跨
度,然后将分区按照跨度进行平均分配,以保证分区尽可能均匀地分配给所有的消费者。 - RoundRobinAssignor:将消费组内所有消费者及消费者订阅的所有主题的分区按照字典序排序,然后通过轮询方式逐个将分区依次分配给每个消费者。
- StickyAssignor:分区的分配要尽可能均匀;分区的分配尽可能与上次分配的保持相同。
- 自定义分区策略:实 现org.apache.kafka.clients.consumer.intemals. PartitionAssignor接口
事务
消息中间件的消息传输保障有 3个层级:
- at most once:至多一次。消息可能会丢失,但绝对不会重复传输。
- at least once:最少一次。消息绝不会丢失,但可能会重复传输。
- exactly once:恰好一次。每条消息肯定会被传输一次且仅传输一次。
幂等
对接口的多次调用所产生的结果和调用一次是一致的。Kafka使用幂等解决生产者在进行重试的时候有可能会重复写入消息问题。
- enable.idempotence:设置为 true 即可,默认false
具体实现: - producer_id:PID,生产者ID
- sequence number:序列号
broker会在内存中为每一对<PID,分区>维护一个序列号,每一条消息都会生成一个累计序列号,比较序列号大小判断是否重复写入。
事务
事务可以保证对多个分区写入操作的原子性。Kafka 中的事务可以使应用程序将消费消息、生产消息 、 提交消费位移当作原子操作来处理,同时成功或失败,即使该生产或消费会跨多个分区 。
- transactional.id:事务id
- enable.idempotence:幂等性设置为true
第八章 可靠性探究
副本剖析
失效副本
- follower副本进程卡住,在一段时间内根本没有向leader副本发起同步请求,比如频繁的Full GC。
- follower副本进程同步过慢,在一段时间内都无法追赶上leader副本,比如I/O开销过大。
ISR伸缩
- isr-expiration任务:周期性地检测每个分区是否需要缩减其ISR集合
- isr-change-propagation任务:周期性地检测ISR集合变更缓存(isrChangeSet),并向zookeeper创建永久节点。
ISR扩充:追赶上leader副本的判定准则是此副本的LEO是否不小于leader副本的 HW。
LEO和HW
Leader Epoch
解决基 于HW的同步机制可能会出现的数据丢失或leader副本和follower副本数据不一致的问题。
在需要截断数据的时候使用leader epoch作为参考依据而不是原本的HW。leader epoch代表leader的纪元信息( epoch),初始值为0。每当leader变更一次, leader epoch的值就会加1,相当于为leader增设了一个版本号。与此同时,每个副本中还会增设一个矢量<LeaderEpoch=> StartOffset>,其中StartOffset表示当前LeaderEpoch下写入的第一条消息的偏移量。