为了便于说明问题,假设这里只有一个Kafka
集群,集群中只有一个Kafka
节点,也就是只有一台物理机。在这个Kafka broker
实例的 $KAFKA_HOME/config/server.properties
配置 log.dirs=/tmp/kafka-logs
,以此来设置 Kafka 消息文件存储目录。并通过命令:
$KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 4 --topic topic_test --replication-factor 1
创建一个 topic:topic_test
,partition
的数量配置为4。接下来可以在 /tmp/kafka-logs
目录中可以看到生成了 4 个partition
目录:
drwxr-xr-x 2 root root 4096 Apr 10 16:10 topic_test-0
drwxr-xr-x 2 root root 4096 Apr 10 16:10 topic_test-1
drwxr-xr-x 2 root root 4096 Apr 10 16:10 topic_test-2
drwxr-xr-x 2 root root 4096 Apr 10 16:10 topic_test-3
在
Kafka
文件存储中,同一个topic
下有多个不同的partition
,每个partiton
为一个目录,partition
的名称规则为topic名称+有序序号
,第一个序号从0开始计,最大的序号为partition
数量减1,partition
是实际物理上的概念,而topic
是逻辑上的概念,更多表象是一个消息的类别,当然,partition
还可以细分为segment
,一个partition
物理上由多个segment
组成;
- 引发第一个疑问,为什么不能以
partition
作为存储单位?
如果就以partition
为最小存储单位,可以想象,当Kafka producer
不断发送消息,必然会引起partition
文件的无限扩张,将对消息文件的维护以及已消费的消息的清理带来严重的影响,新数据是在文件尾部追加的,不论文件数据文件有多大,这个操作永远都是 O(1)
的查找,再者,查找某个offset
的Message
是顺序查找的。因此,如果数据文件很大的话,查找的效率就低. 因此,需以segment
为单位将partition
进一步细分。每个partition
相当于一个巨型文件被平均分配到多个大小相等的segment
数据文件中(每个segment
文件中消息数量不一定相等)这种特性也方便old segment
的删除,即方便已被消费的消息的清理,提高磁盘的利用率。每个partition
只需要支持顺序读写就行.segment
的文件生命周期由服务端配置参数log.segment.bytes
,log.roll.{ms,hours
} 等相关参数决定。
segment
文件由两部分组成,分别为 .index
文件和.log
文件,分别表示为segment
索引文件和数据文件。这两个文件的命令规则为:partition
全局的第一个segment
从0
开始,后续每个segment
文件名为上一个segment
文件最后一条消息的offset
值,数值大小为64
位,20
位数字字符长度,没有数字用0
填充,如下:
00000000000000000000.index
00000000000000000000.log
00000000000000000000.timeindex
00000000000000170410.index
00000000000000170410.log
00000000000000170410.timeindex
00000000000000239430.index
00000000000000239430.log
00000000000000239430.timeindex
以上面的segment
文件为例,展示出segment:00000000000000170410
的.index
文件和.log
文件的对应的关系,如下图:
如上图,.index
索引文件存储大量的元数据,.log
数据文件存储大量的消息,索引文件中的元数据指向对应数据文件中message
的物理偏移地址(也就是实际的偏移地址,因为会涉及到segment
文件清理)。其中以.index
索引文件中的元数据[3, 348]
为例,在.log
数据文件表示第3
个消息,即在全局partition
中offset
为170410+3=170413
,该消息的物理偏移地址为348
。
- 补充第二个疑问,如何保证消息消费的有序性呢?
举个例子,比如说生产者生产了25
个订单,订单假设分为创建-提交-付款-发货4
个步骤,那么消费者在消费的时候按照0到100
这个从小到大的顺序消费,那么kafka如何保证这种有序性呢?
难度就在于,生产者生产出0到100这100条数据之后,通过一定的分组策略存储到broker的partition
中的时候,
比如0到10这10条消息被存到了这个partition
中,10到20这10条消息被存到了那个partition
中,这样的话,消息在分组存到partition
中的时候就已经被分组策略搞得无序了。
那么能否做到消费者在消费消息的时候全局有序呢?遇到这个问题,我们可以回答,在大多数情况下是做不到全局有序的。但在某些情况下是可以做到的。比如我的partition
只有一个,这种情况下是可以全局有序的。
那么可能有人又有疑问了,只有一个partition
的话,哪里来的分布式呢?哪里来的负载均衡呢?所以说,全局有序是一个伪命题!让订单全局有序根本没有办法在kafka
内部做到。但是我们只能保证当前这个partition
内部消息消费的有序性。
结论:一个partition
中的数据是有序的吗?回答:间隔有序,不连续。
针对一个topic
里面的数据,只能做到partition
内部有序,不能做到全局有序。特别是加入消费者的场景后,如何保证消费者的消费的消息的全局有序性,
这是一个伪命题,只有在一种情况下才能保证消费的消息的全局有序性,那就是只有一个partition
!
当然不使用一个partition
我们也有办法解决,从代码层面解决,我们可以借助于Set或者Redis,将多次收到的相同订单号的消息储存起来,等满足4条后再一并处理。我自己在我的项目中就是这么做的。
第三个问题,如何从
partition
中通过offset
查找message
?
以上图为例,读取offset=170418
的消息,首先查找segment
文件,其中00000000000000000000.index
为最开始的文件,第二个文件为00000000000000170410.index
(起始偏移为170410+1=170411
),而第三个文件为00000000000000239430.index
(起始偏移为239430+1=239431
),所以这个offset=170418
就落到了第二个文件之中。其他后续文件可以依次类推,以其实偏移量命名并排列这些文件,然后根据二分查找法就可以快速定位到具体文件位置。其次根据00000000000000170410.index
文件中的[8,1325]定位到00000000000000170410.log
文件中的1325的位置进行读取。-
怎么知道何时读完本条消息,否则就读到下一条消息的内容了?
这个问题就得引出kafka的消息结构,如下图所示:
消息都具有固定的物理结构,包括:offset(8 Bytes)、消息体的大小(4 Bytes)、crc32(4 Bytes)、magic(1 Byte)、attributes(1 Byte)、key length(4 Bytes)、key(K Bytes)、payload(N Bytes)
等等字段,可以确定一条消息的大小,即读取到哪里截止。
总结,offset
的查找方机制是建立在offset
是有序的,索引文件被映射到内存中,所以查找的速度还是很快的。
另外,Kafka的Message存储采用了分区(partition
),Segment和index这几个手段来达到了高效性。
采用问题加回答的方式解释这个内容,感觉还可以,如果大家阅后有疑问,望在评论区补充,一起探讨,写的不到位之处,帮忙批评指正;