背景
在实际的业务场景中,我们常常需要周期性执行一些任务,比如巡查系统资源,处理过期数据等等。这些事情如果人工去执行的话,无疑是对人力资源的浪费。因此我们就开发出了定时任务。目前业界已有许多出色的定时任务框架,如quartz,elastic-job,包括SpringBoot也提供了定时任务,当然JDK本身也提供了定时任务功能。
那么我们在用这些框架的时候,有没有想过它们是怎么实现定时任务的呢?时间轮算法就是这样一种实现定时任务的方法。
一、概述
时间轮算法是通过一个时间轮去维护定时任务,按照一定的时间单位对时间轮进行划分刻度。然后根据任务的延时计算任务该落在时间轮的第几个刻度,如果任务时长超出了时间轮的刻度数量,则增加一个参数记录时间轮需要转动的圈数。
时间轮每转动一次就检查当前刻度下的任务圈数是否为0,如果为0说明时间到了就执行任务,否则就减少任务的圈数。这样看起来已经很好了,可以满足基本的定时任务需求了,但是我们还能不能继续优化一下呢?答案是可以的。想想我们家里的水表,它是不是有多个轮子在转动,时间轮是不是也可以改造成多级联动呢?建立3个时间轮,月轮、周轮、日轮,月轮存储每个月份需要执行定时任务,转动时将当月份的任务抛到周轮,周轮转动时将当天的任务抛到日轮中,日轮转动时直接执行当前刻度下的定时任务。
1.1 绝对时间和相对时间
定时任务一般有两种:
- 1、约定一段时间后执行。
- 2、约定某个时间点执行。
其实这两者是可以互相转换的,比如现在有一个定时任务是12点执行,当前时间是9点,那就可以认为这个任务是3小时后执行。同样,现在又有一个任务,是3小时后执行,那也可以认为这个任务12点执行。
假设我们现在有3个定时任务A、B、C,分别需要在3点、4点和9点执行,我们把它们都转换成绝对时间。
只需要把任务放到它需要被执行的时刻,然后等到时针转到相应的位置时,取出该时刻放置的任务,执行就可以了。这就是时间轮算法的核心思想。
1.2 重复执行
多数定时任务是需要重复执行,比如每天上午9点执行生成报表的任务。对于重复执行的任务,其实我们需要关心的只是下次执行时间,并不关心这个任务需要循环多少次,还是那每天上午9点的这个任务来说。
- 1、比如现在是下午4点钟,我把这个任务加入到时间轮,并设定当时针转到明天上午九点(该任务下次执行的时间)时执行。
- 2、时间来到了第二天上午九点,时间轮也转到了9点钟的位置,发现该位置有一个生成报表的任务,拿出来执行。
- 3、同时时间轮发现这是一个循环执行的任务,于是把该任务重新放回到9点钟的位置。
- 4、重复步骤2和步骤3。
如果哪一天这个任务不需要再执行了,那么直接通知时间轮,找到这个任务的位置删除掉就可以了。由上面的过程我们可以看到,时间轮至少需要提供4个功能:
- 1、加入任务
- 2、执行任务
- 3、删除任务
- 4、沿着时间刻度前进
1.3 时间轮的数据结构
时钟可以使用数组来表示,那么时钟的每一个刻度就是一个槽,槽用来存在该刻度需要被执行的定时任务。正常业务中,同一时刻中是会存在多个定时任务的,所以每个槽中放一个链表或者队列就可以了,执行的时候遍历一遍即可。
同一时刻存在多个任务时,只要把该刻度对应的链表全部遍历一遍,执行(扔到线程池中异步执行)其中的任务即可。
1.4 时间刻度不够用
增加时间轮的刻度
现在有我有2个定时任务,一个任务每周一上午9点执行,另一个任务每周三上午9点执行。最简单的办法就是增大时间轮的长度,可以从12个加到168 (一天24小时,一周就是168小时),那么下周一上午九点就是时间轮的第9个刻度,这周三上午九点就是时间轮的第57个刻度。
这样做的缺点:
- 1、时间刻度太多会导致时间轮走到的多数刻度没有任务执行,比如一个月就2个任务,我得移动720次,其中718次是无用功。
- 2、时间刻度太多会导致存储空间变大,利用率变低,比如一个月就2个任务,我得需要大小是720的数组,如果我的执行时间的粒度精确到秒,那就更恐怖了。
1.5 任务增加round属性
现在时间轮的刻度还沿用24,但是槽中的每个任务增加一个round属性,代表时钟转过第几圈之后再次转到这个槽的时候执行。
上图代表任务三在指针下一圈移动时执行,整体流程就是时间轮没移动一个刻度的时候都要遍历槽中所有任务,对每个任务的round属性减1,并取出round为0的任务调度,这样可以解决增加时间轮带来的空间浪费。但是这样带来的问题时,每次移动刻度的耗时会增加,当时间刻度很小(秒级甚至毫秒级),任务列表有很长,这种方案是不能接受的。
1.6 分层时间轮
分层时间轮是这样一种思想:
- 1、针对时间复杂度的问题:不做遍历计算round,凡是任务列表中的都应该是应该被执行的,直接全部取出来执行。
- 2、针对空间复杂度的问题:分层,每个时间粒度对应一个时间轮,多个时间轮之间进行级联协作。
假设现在有3个定时任务:
- 任务一每天上午9点执行
- 任务二每周2上午9点执行
- 任务三每月12号上午9点执行。
根据这三个任务的调度粒度,可以划分为3个时间轮,月轮、周轮和天轮,初始添加任务时,任务一被添加到天轮上,任务二被添加到周轮,任务三被添加到月轮上。三个时间轮按各自的刻度运转,当周轮移动到刻度2时,取出任务二丢到天轮上,当天轮移动到刻度9时执行。同样任务三在移动到刻度12时,取出任务三丢给月轮。以此类推。
1.7 round时间轮和分层时间轮的一点比较
相比于round时间轮思想,采用分层时间轮算法的优点在于:只需要多耗费极少的空间(从1个时间轮到3个时间轮),就能实现多线程在效率上的提高(一个时间轮是一个线程去行走,3个时间轮可以3个线程行走)。当然这是相对的,若提交的任务都是每隔几个小时重复执行,那显然小时时间轮比月、周、小时时间轮组合的耗费空间少,且执行时间还相同。
1.8 时间轮的应用
时间轮的思想应用范围非常广泛,各种操作系统的定时任务调度,Crontab、Dubbo、新版的XXL-JOB、还有基于java的通信框架Netty中也有时间轮的实现,几乎所有的时间任务调度系统采用的都是时间轮的思想。
至于采用round型的时间轮还是采用分层时间轮,看实际需要吧,时间复杂度和实现复杂度的取舍。
二、时间轮定时使用方式
用Netty的HashedWheelTimer来实现,给Pom加上下面的依赖
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.75.Final</version>
</dependency>
使用测试:
@RunWith(SpringRunner.class)
@SpringBootTest
public class Test {
@Test
public void test() throws InterruptedException {
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
HashedWheelTimer timer = new HashedWheelTimer(new NamedThreadFactory("timer-task"), 1, TimeUnit.MILLISECONDS,8);
TimerTask timerTask = new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
System.out.println("hello world " + LocalDateTime.now().format(formatter));
//执行完成之后再次加入调度
timer.newTimeout(this, 4, TimeUnit.SECONDS);
}
};
//将定时任务放入时间轮
timer.newTimeout(timerTask, 4, TimeUnit.SECONDS);
Thread.currentThread().join();
}
}
在这里使用的是 netty 使用时间轮算法实现的HashedWheelTimer来做的每隔 4s 的定时调度。
public HashedWheelTimer(
ThreadFactory threadFactory,
long tickDuration, TimeUnit unit, int ticksPerWheel) {
this(threadFactory, tickDuration, unit, ticksPerWheel, true);
}
使用方式比较简单,创建一个HashedWheelTimer时间轮定时器对象,threadFactory:创建线程的线程工厂
- tickDuration:一个间隔时间(步长)
- tickDuration:间隔时间的单位
- ticksPerWheel:时间轮的大小
输出如下:
hello world 2022-04-12 18:46:36
hello world 2022-04-12 18:46:40
hello world 2022-04-12 18:46:44
hello world 2022-04-12 18:46:48
hello world 2022-04-12 18:46:52
hello world 2022-04-12 18:46:56
hello world 2022-04-12 18:47:00
hello world 2022-04-12 18:47:04
hello world 2022-04-12 18:47:08
hello world 2022-04-12 18:47:12
hello world 2022-04-12 18:47:16
hello world 2022-04-12 18:47:20
三、时间轮定时内部原理
时间轮定时器原理基本都是如下图:
时间轮算法可以简单的看成一个循环数组+双向链表的数据结构实现的。
循环数组构成一个环形结构,指针每隔 tickDuration 时间走一步,每个数组上挂载一个双向链表结构的定时任务列表。
双向链表上的任务有个属性为 remainingRounds,即当前任务剩下的轮次是多少,每当指针走到该任务的位置时,remainingRounds 减 1,直到remainingRounds 为 0 时,定时任务触发。
通过时间轮算法的原理图我们可以知道,tickDuration 越小,定时任务越精确。
3.1 时间轮定时源码剖析
3.1.1 构造方法
首先从 HashedWheelTimer 的构造方法分析
public class HashedWheelTimer implements Timer {
public HashedWheelTimer(
ThreadFactory threadFactory,
long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
long maxPendingTimeouts) {
//线程工厂非null判断
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
//时间单位非null判断
if (unit == null) {
throw new NullPointerException("unit");
}
//时间间隔(步长)大于0判断
if (tickDuration <= 0) {
throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration);
}
//循环数组长度大于0判断
if (ticksPerWheel <= 0) {
throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);
}
// Normalize ticksPerWheel to power of two and initialize the wheel.
// 将ticksPerWheel修改为2的整数次幂 并且新建数组
wheel = createWheel(ticksPerWheel);
// 数组长度-1,其二进制均为1. 通过指针tick&mask 获取当前的数组下标,类似于hashmap的 hashcode&(len -1)
mask = wheel.length - 1;
// Convert tickDuration to nanos.
long duration = unit.toNanos(tickDuration);
// Prevent overflow.
if (duration >= Long.MAX_VALUE / wheel.length) {
throw new IllegalArgumentException(String.format(
"tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
tickDuration, Long.MAX_VALUE / wheel.length));
}
if (duration < MILLISECOND_NANOS) {
if (logger.isWarnEnabled()) {
logger.warn("Configured tickDuration %d smaller then %d, using 1ms.",
tickDuration, MILLISECOND_NANOS);
}
this.tickDuration = MILLISECOND_NANOS;
} else {
this.tickDuration = duration;
}
//创建工作线程,该线程会定期的移动指针,扫描链表任务,后面再分析
workerThread = threadFactory.newThread(worker);
leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null;
this.maxPendingTimeouts = maxPendingTimeouts;
//判断HashedWheelTimer实例是否创建太多,如果是就输出一个日志
if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
reportTooManyInstances();
}
}
}
构造方法比较简单明了,主要是做一些初始化工作,比如数组的长度控制为2的整数次幂,新建数组,新建工作线程等。
3.2 添加任务
继续往下看如何向时间轮定时器添加一个定时任务。
public class HashedWheelTimer implements Timer {
@Override
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
if (task == null) {
throw new NullPointerException("task");
}
if (unit == null) {
throw new NullPointerException("unit");
}
//一个计数器,表示当前在队列中等待的任务数量
long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
//默认maxPendingTimeouts为-1,如果该值>0.添加新任务时会进行判断,如果当前任务大于maxPendingTimeouts,则跑出拒绝异常
if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
pendingTimeouts.decrementAndGet();
throw new RejectedExecutionException("Number of pending timeouts ("
+ pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
+ "timeouts (" + maxPendingTimeouts + ")");
}
//检测工作线程扫描是否启动,如果未启动,启动下
start();
// Add the timeout to the timeout queue which will be processed on the next tick.
// During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.
//startTime为工作线程启动的时间,deadline为:System.nanoTime()+任务延迟时间-工作线程的启动时间
long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
// Guard against overflow.
//溢出判断,因为startTime是在start()方法中启动工作线程后赋值的,
//在delay大于0的情况下,deadline是不可能小于0,除非溢出了。如果溢出了为deadline赋值一个最大值
if (delay > 0 && deadline < 0) {
deadline = Long.MAX_VALUE;
}
//创建HashedWheelTimeout对象
HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
//将任务加入timeouts队列
timeouts.add(timeout);
return timeout;
}
}
该方法主要执行以下几个工作
- 1.参数非空校验
- 2.任务数量最大值检测
- 3.工作线程启动
- 4.获取任务的 deadline,将任务封装为 HashedWheelTimeout 对象
- 5.将 HashedWheelTimeout 对象放入任务队列 timeouts
3.3 工作线程启动
下面简单看下 start 方法
public class HashedWheelTimer implements Timer {
public void start() {
switch (WORKER_STATE_UPDATER.get(this)) {
case WORKER_STATE_INIT:
if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
//如果发现当前工作线程的状态为WORKER_STATE_INIT 初始化状态,则设置线程状态为 WORKER_STATE_STARTED并 启动工作线程
workerThread.start();
}
break;
case WORKER_STATE_STARTED:
break;
case WORKER_STATE_SHUTDOWN:
throw new IllegalStateException("cannot be started once stopped");
default:
throw new Error("Invalid WorkerState");
}
// Wait until the startTime is initialized by the worker.
//startTime 初始值为0,并且在工作线程启动后设置。startTimeInitialized是一个CountDownLatch锁,在工作线程启动后释放
while (startTime == 0) {
try {
startTimeInitialized.await();
} catch (InterruptedException ignore) {
// Ignore - it will be ready very soon.
}
}
}
}
该方法主要是启动工作线程并等待工作线程启动完成。
继续看工作线程的 run 方法做什么事情
3.4 工作线程run方法
public class HashedWheelTimer implements Timer {
private final class Worker implements Runnable {
private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();
private long tick;
@Override
public void run() {
// Initialize the startTime.
//线程启动后初始化startTime 时间为System.nanoTime()
startTime = System.nanoTime();
if (startTime == 0) {
// We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.
startTime = 1;
}
// Notify the other threads waiting for the initialization at start().
//释放start方法中的CountDownLatch锁
startTimeInitialized.countDown();
//在当前工作线程状态一直为 WORKER_STATE_STARTED 时循环执行
do {
//waitForNextTick 主要是指针跳动,内部使用Thread.sleep实现
final long deadline = waitForNextTick();
//小于0表示收到了关闭的信号
if (deadline > 0) {
//tick和mask进行按位与操作获取到当前数组下标位置
int idx = (int) (tick & mask);
//从时间轮中移除所有已经取消的定时任务
processCancelledTasks();
//获取到下标对应的链表头
HashedWheelBucket bucket =
wheel[idx];
//将队列中的定时任务放入到时间轮中
transferTimeoutsToBuckets();
//遍历链表任务,将达到执行时间的任务触发执行
bucket.expireTimeouts(deadline);
//指针+1
tick++;
}
} while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
// Fill the unprocessedTimeouts so we can return them from stop() method.
//工作线程停止后,将时间轮上的所有任务放入unprocessedTimeouts集合
for (HashedWheelBucket bucket: wheel) {
bucket.clearTimeouts(unprocessedTimeouts);
}
//将任务队列中的任务也放入unprocessedTimeouts集合
for (;;) {
HashedWheelTimeout timeout = timeouts.poll();
if (timeout == null) {
break;
}
if (!timeout.isCancelled()) {
unprocessedTimeouts.add(timeout);
}
}
//移除所有的未处理的定时任务
processCancelledTasks();
}
}
}
该部分代码主要分为以下几个部分
- 设置线程的启动时间 startTime
- 在工作线程启动的状态下
- 根据用户配置的 tickDuration 指针每次跳动一下
- 从时间轮中移除所有已经取消的定时任务
- 将队列中的定时任务放入到时间轮中
- 遍历链表任务,将达到执行时间的任务触发执行
- 工作线程停止后的清理工作
- 下面看一下指针跳动的代码
3.5 指针跳动
public class HashedWheelTimer implements Timer {
private final class Worker implements Runnable {
private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();
private long tick;
private long waitForNextTick() {
//获取下一个指针的deadline时间
long deadline = tickDuration * (tick + 1);
for (;;) {
//当前工作线程的活动时间
final long currentTime = System.nanoTime() - startTime;
//计算还需要多久达到deadline 。
//这里加上999999的原因是因为/只会取整数部分,并且是使用Thread.sleep时间的,参数为毫秒。
//为了保证任务不被提前执行,加上999999后就能够向上取整1ms。
long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
//sleepTimeMs 小于0表示达到了任务的触发时间
if (sleepTimeMs <= 0) {
if (currentTime == Long.MIN_VALUE) {
return -Long.MAX_VALUE;
} else {
return currentTime;
}
}
// Check if we run on windows, as if thats the case we will need
// to round the sleepTime as workaround for a bug that only affect
// the JVM if it runs on windows.
//
// See https://github.com/netty/netty/issues/356
if (PlatformDependent.isWindows()) {
sleepTimeMs = sleepTimeMs / 10 * 10;
}
try {
Thread.sleep(sleepTimeMs);
} catch (InterruptedException ignored) {
if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
return Long.MIN_VALUE;
}
}
}
}
}
}
通过源码分析我们可以看到时间轮算法实现的指针跳动是通过Thread.sleep 实现的,难以理解的就是 (deadline - currentTime + 999999) / 1000000;
3.6 将队列任务放入时间轮中
在工作线程的 run 方法中会调用 transferTimeoutsToBuckets方法,该方法会将用户提交到队列中的定时任务移动到时间轮中,下面具体分析下
public class HashedWheelTimer implements Timer {
private final class Worker implements Runnable {
private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();
private long tick;
private void transferTimeoutsToBuckets() {
// transfer only max. 100000 timeouts per tick to prevent a thread to stale the workerThread when it just
// adds new timeouts in a loop.
//每次最多只迁移 10W 个定时任务,主要是为了防止迁移时间过长,导致时间轮中的任务延迟执行
for (int i = 0; i < 100000; i++) {
HashedWheelTimeout timeout = timeouts.poll();
if (timeout == null) {
// all processed
break;
}
//如果任务已经被取消,就跳过
if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
// Was cancelled in the meantime.
continue;
}
//计算任务需要放入的数组位置
long calculated = timeout.deadline / tickDuration;
//由于时间轮中的数组是循环数组,计算还需要几个轮次
timeout.remainingRounds = (calculated - tick) / wheel.length;
//calculated 和tick 取最大,主要是为了保证过时的任务能够被调度。
//正常情况下calculated是大于tick的,
//如果某些任务执行时间过长,导致tick大于calculated,此时直接把过时的任务放到当前链表队列
final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past.
//按位与获取任务的执行位置
int stopIndex = (int) (ticks & mask);
HashedWheelBucket bucket = wheel[stopIndex];
//将任务放入当前数组上的链表
bucket.addTimeout(timeout);
}
}
}
}
transferTimeoutsToBuckets 方法很简单,我们主要要记住两点
- 1.每次最多会迁移10W 个队列中的任务到时间轮中,为了保证不影响工作线程的指针跳动
- 2.并且我们发现取消的任务会直接跳过,过时的任务会直接放到当前位置。
3.7 链表任务遍历
public class HashedWheelTimer implements Timer {
private static final class HashedWheelBucket {
// Used for the linked-list datastructure
private HashedWheelTimeout head;
private HashedWheelTimeout tail;
/**
* Expire all {@link HashedWheelTimeout}s for the given {@code deadline}.
*/
public void expireTimeouts(long deadline) {
HashedWheelTimeout timeout = head;
// process all timeouts
//遍历链表的所有任务
while (timeout != null) {
HashedWheelTimeout next = timeout.next;
//如果剩下的轮次<=0
if (timeout.remainingRounds <= 0) {
//从双向链表中移除该任务
next = remove(timeout);
//如果当前任务的deadline小于目前时间轮的deadline,表示任务已经可以被触发
if (timeout.deadline <= deadline) {
//任务执行
timeout.expire();
} else {
// The timeout was placed into a wrong slot. This should never happen.
throw new IllegalStateException(String.format(
"timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
}
} else if (timeout.isCancelled()) {
//任务取消也从链表中移除
next = remove(timeout);
} else {
// 任务的剩余轮次-1
timeout.remainingRounds --;
}
//链表遍历
timeout = next;
}
}
}
}
该方法主要是遍历链表上的定时任务
- 任务所剩轮次为 0 并且任务的 deadline 小于目前时间轮的 deadline,任务触发执行
- 任务被取消,从链表中移除
- 任务轮次大于 0 并且还未取消,轮次 -1
- 遍历下个定时任务
3.8 定时任务执行
public class HashedWheelTimer implements Timer {
private static final class HashedWheelTimeout implements Timeout {
public void expire() {
if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {
return;
}
try {
task.run(this);
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t);
}
}
}
}
}
定时任务执行代码,看着很简单,首先将任务的状态设置为ST_EXPIRED,然后直接调用 run方法执行任务,这里说明任务是在工作线程中执行的,也就是说如果任务执行时间过长,会影响其它定时任务的触发。
参考:
https://blog.csdn.net/qq_34772568/article/details/105534389
https://blog.csdn.net/su20145104009/article/details/115636136
https://blog.csdn.net/code_geek/article/details/113133327