时间轮
Kafka中存在大量的延迟操作,比如延迟生产,延迟拉取,延迟加入,延迟心跳等。kafka使用时间轮(TimingWheel)来实现管理延迟任务和超时完成延迟任务。
时间轮(TimingWheel)是存储定时任务的环形队列,底层采用数组实现,数组中的每个元素可以存放一个定时任务列表(TimerTaskList)。定时任务列表(TimerTaskList)是一个双向链表,链表中的每一项都是一个定时任务项(TimerTaskEntry),定时任务项中封装的就是真的定时任务(TimerTaskEntry(比如延迟操作))。
时间轮由多个时间格(槽)组成,每个时间格(槽)代表当前时间轮的基本时间跨度(tickMs)。时间轮的时间格(槽)个数是固定的,通过wheelSize设置,时间轮的总体时间跨度interval=tickMs*wheelSize。表盘指针currentTime表示时间轮当前所处的时间,currentTime将整个时间轮划分为到期部分和未到期部分,currentTime当前指向的时间格属于到期部分,刚好到期,需要处理此时间格所对应的TimerTaskList的所有任务。
时间轮的tickMs=1ms,wheelSize=20,interval=20ms,初始情况下表盘指针currentTime指向时间格0,此时有一个延迟时间为2ms的任务进来,会存放到时间格为2的定时任务列表(TimerTaskList)中。随着时间不断的推移,currentTime不断向前推进,过了2ms之后,到达时间格2时,就需要将时间格2所对应的TimeTaskList中的任务做相应的到期操作。此时若又有一个延迟时间为8ms的任务插入进来,则会存放到时间格为10的任务列表(TimerTaskList)中,从中可以看出定时任务存放到哪个时间格中,是从当前表盘指针指向的时间格开始算起。不是从初始时间格开始算起。那么同时有一个延迟时间为19ms的任务,那么这个时候,这个任务就会存放到时间格为1中。
总结一下:只要延迟时间没有超过当前时间轮的跨度,都会存储在当前时间轮中,选择时间格从当前表盘指针指向的时间格的tickMs开始计算。这样随着表盘指针(currentTime)的不断推进,当前时间轮处理的时间段也在不断的后移,时间范围在currentTime于currentTime+interval之间。
当延迟时间超过了时间轮的跨度,如果是直接扩充当前时间轮的时间格(wheelSize的大小),那么在kafka的这种级别的延迟任务,那么这个时间轮会很大,比如100万毫秒,这种情况下时间轮就会占用很大的空间,并且走一轮的话,花费的时间也比较长。所以当延时时间超过了当前时间轮的跨度(interval),kafka会重新建立一个时间轮:第一个时间轮会持有第二个时间轮的引用,第一个时间轮的interval为第二个时间轮的tickMs。所以第二个时间轮:tickMs=20ms,wheelSize=20,interval=400ms。
这是来了一个延迟时间是350ms的延迟任务,第一层时间轮是不能满足的,于是只能想更高一层的时间轮来存储,第二层时间轮的跨度interval大于延迟时间,第二层可以满足存储这个延迟任务,所以这个350ms会被存到到第二层时间轮的17时间格中。时间格的超时时间是tickMs。但是时间格定时任务列表(TimerTaskList)中的延迟任务是一个链表,链表中的延迟任务的超时时间不全是tickMs,比如第二层时间轮的第17个时间格中就会存储延迟时间是340ms到360ms的延迟任务。所以当TimerTaskList超时之后,如果链表里面还有不能执行的延迟任务,就有一个时间轮的降级操作,比如第二层时间轮的第17个时间格在340ms到期后,里面这个360ms到期的任务还有20ms才能执行到期操作,这时第一层的时间轮可以存放这个还有20ms到期的延迟任务,所以这个延迟任务就修改自己延迟时间,并被再次加入到第一层的时间格中(这时如果第一层不能装下,就会往上层上放)。再过20ms后,真正到期,最终执行相应的到期操作。
怎么判断任务过期?
我们把时间轮的时间格(槽)放入到DelayQueue中,因为每个时间格(槽)都有统一的一个过期时间,这个过期的槽会被DelayQueue的poll弹出,我们只需要将槽中的所有任务循环一次,重新加入到新的槽中,添加失败就立即执行。
参考: