说明
去年一年在简书大约写了25篇,在公司内网写了5篇博客。今年定个小目标吧,在简书产出高质量的博客50篇,加油!
首先本片文章参考了[10w定时任务,如何高效触发超时](http://chuansong.me/n/1650380646616),感谢作者!
前言
在工作中,经常会碰到需要定时或者超时任务场景。例如在各种RPC框架或者IM、PUSH等框架中,通常需要在server和client端之间维持一条长连接。而这条长连接通常需要有心跳保持,client端(或server)通常需要给server端(或client)定时发送心跳消息,server端在一定时间内收不到来client的心跳消息时会close掉连接。
常见方案
对于上文中提到的心跳消息处理,通常server端在收到心跳消息时会更新对应channel的最近读写时间。而处理心跳超时通常会有两种做法:
- 使用一个Timer(或者是ScheduledThreadPoolExecutor),定时对所有的channels进行遍历,然后根据最近读写时间和超时时间计算是否超时
- 对每个channel使用一个Timer或者对每个channel开启一个定时任务,定时检查该channel是否超时
在Dubbo中采用的是客户端超时采用的是方案二,服务端超时采用的是方案一(严格意义上,这么区分不完全正确),具体的代码如下:
private void startHeatbeatTimer() {
stopHeartbeatTimer();
if ( heartbeat > 0 ) {
heatbeatTimer = scheduled.scheduleWithFixedDelay(
new HeartBeatTask( new HeartBeatTask.ChannelProvider() {
public Collection<Channel> getChannels() {
return Collections.<Channel>singletonList( HeaderExchangeClient.this );
}
}, heartbeat, heartbeatTimeout),
heartbeat, heartbeat, TimeUnit.MILLISECONDS );
}
}
对于每一个HeaderExchangeClient都会创建一个单独的HeartBeatTask任务,而HeartBeatTask处理超时的方式如下:
public void run() {
try {
long now = System.currentTimeMillis();
for ( Channel channel : channelProvider.getChannels() ) {
if (channel.isClosed()) {
continue;
}
try {
Long lastRead = ( Long ) channel.getAttribute(
HeaderExchangeHandler.KEY_READ_TIMESTAMP );
Long lastWrite = ( Long ) channel.getAttribute(
HeaderExchangeHandler.KEY_WRITE_TIMESTAMP );
if ( ( lastRead != null && now - lastRead > heartbeat )
|| ( lastWrite != null && now - lastWrite > heartbeat ) ) {
Request req = new Request();
req.setVersion( "2.0.0" );
req.setTwoWay( true );
req.setEvent( Request.HEARTBEAT_EVENT );
channel.send( req );
if ( logger.isDebugEnabled() ) {
logger.debug( "Send heartbeat to remote channel " + channel.getRemoteAddress()
+ ", cause: The channel has no data-transmission exceeds a heartbeat period: " + heartbeat + "ms" );
}
}
if ( lastRead != null && now - lastRead > heartbeatTimeout ) {
logger.warn( "Close channel " + channel
+ ", because heartbeat read idle time out: " + heartbeatTimeout + "ms" );
if (channel instanceof Client) {
try {
((Client)channel).reconnect();
}catch (Exception e) {
//do nothing
}
} else {
channel.close();
}
}
} catch ( Throwable t ) {
logger.warn( "Exception when heartbeat to remote channel " + channel.getRemoteAddress(), t );
}
}
} catch ( Throwable t ) {
logger.warn( "Unhandled exception when heartbeat, cause: " + t.getMessage(), t );
}
}
对于客户端来说channelProvider.getChannels()其实只有一个,就是一个HeaderExchangeClient;对于服务端来说,channelProvider.getChannels()是连接到server的所有channels。
以上两种方案各种利弊,方案一每次需要遍历效率不高,方案二资源可能有些浪费(通常以为这多个线程,如果是单线程其实就退化成了方案一)。
更好的做法
其实业界已经提出了一个更高效更优雅的做法,有论文,而Netty基于该论文实现了HashedWheelTimer并使用。那接下来就分析下HashedWheelTimer的使用以及怎么实现的呢。
简单来说呢。HashedWheelTimer维护了一个环形的队列。往环中添加超时任务的时候会根据超时时间计算该超时任务需要落在环中的那个节点中(还会记录需要经过的圈数)。每tick一下会移动到环中的下一个节点,取出节点中所有的超时任务遍历,如果超时任务剩余的圈数为1证明已经到了超时时间则执行超时,如果剩余圈数大于1在减1.然后继续tick。
需要说明的是,HashedWheelTimer并非精确定时,精度取决于tickDuration。
构造方法
先看一下HashedWheelTimer的构造方法
public HashedWheelTimer(
ThreadFactory threadFactory,
long tickDuration, TimeUnit unit, int ticksPerWheel) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
if (unit == null) {
throw new NullPointerException("unit");
}
if (tickDuration <= 0) {
throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration);
}
if (ticksPerWheel <= 0) {
throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);
}
// Normalize ticksPerWheel to power of two and initialize the wheel.
wheel = createWheel(ticksPerWheel);
mask = wheel.length - 1;
// Convert tickDuration to nanos.
this.tickDuration = unit.toNanos(tickDuration);
// Prevent overflow.
if (this.tickDuration >= Long.MAX_VALUE / wheel.length) {
throw new IllegalArgumentException(String.format(
"tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
tickDuration, Long.MAX_VALUE / wheel.length));
}
workerThread = threadFactory.newThread(worker);
leak = leakDetector.open(this);
}
我们需要传入threadFactory,这个threadFactory会用来创建worker线程。第二个参数tickDuration代表每个tick经过的时间。第三个参数unit表示tickDuration的时间单位。第四个参数ticksPerWheel代表环的大小。
其中需要注意的是方法createWheel(ticksPerWheel)。
private static HashedWheelBucket[] createWheel(int ticksPerWheel) {
if (ticksPerWheel <= 0) {
throw new IllegalArgumentException(
"ticksPerWheel must be greater than 0: " + ticksPerWheel);
}
if (ticksPerWheel > 1073741824) {
throw new IllegalArgumentException(
"ticksPerWheel may not be greater than 2^30: " + ticksPerWheel);
}
ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
for (int i = 0; i < wheel.length; i ++) {
wheel[i] = new HashedWheelBucket();
}
return wheel;
}
private static int normalizeTicksPerWheel(int ticksPerWheel) {
int normalizedTicksPerWheel = 1;
while (normalizedTicksPerWheel < ticksPerWheel) {
normalizedTicksPerWheel <<= 1;
}
return normalizedTicksPerWheel;
}
以上代码中normalizeTicksPerWheel得出环的大小,取了一个大于等于ticksPerWheel且是2的N次幂的整数。为啥要取成2的N次幂呢,主要是因为在大小而2的N次幂的环上求索引非常的方便,a & (b-1) = a % b,当b时2的N次幂时成立。
start方法
public void start() {
switch (WORKER_STATE_UPDATER.get(this)) {
case WORKER_STATE_INIT:
if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
workerThread.start();
}
break;
case WORKER_STATE_STARTED:
break;
case WORKER_STATE_SHUTDOWN:
throw new IllegalStateException("cannot be started once stopped");
default:
throw new Error("Invalid WorkerState");
}
// Wait until the startTime is initialized by the worker.
while (startTime == 0) {
try {
startTimeInitialized.await();
} catch (InterruptedException ignore) {
// Ignore - it will be ready very soon.
}
}
}
start方法也非常的讲究,可以认为WORKER_STATE_UPDATER是一个AtomicInteger变量,代表着当前HashedWheelTimer的状态,当状态为WORKER_STATE_INIT是会启动workerThread。在启动worker线程之后会一直等待startTime变成非0。这段代码还是很凸显功底的。稍后再分析workerThread的时候会解释下startTimeInitialized的作用。
newTimeout方法
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
if (task == null) {
throw new NullPointerException("task");
}
if (unit == null) {
throw new NullPointerException("unit");
}
start();
// Add the timeout to the timeout queue which will be processed on the next tick.
// During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.
long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
timeouts.add(timeout);
return timeout;
}
这是个非常重要的方法,我们调用此方法来增加一个定时任务。该方法有三个参数,第一个参数描述了定时任务,在任务超时的时候会执行其run(Timeout timeout)方法,第二个参数为超时时间,也就是距离当前时刻多久之后执行超时任务,第三个参数是超时时间的时间单位。整个方法比较简单,先计算deadline,也就是任务超时需要经过的纳秒级时间,然后构建一个相应的HashedWheelTimeout放入到timeouts队列中,需要注意的是此时并没有将HashedWheelTimeout放到环上,按照注释Add the timeout to the timeout queue which will be processed on the next ticktimeouts超时任务队列中超时任务将在下个tick被放入到正确的bucket中。
需要特别注意的是,newTimeout中调用了start()方法,最佳实践是不要直接调用start(),而是在有超时任务需要执行的时候通过newTimeout来触发start(),以避免worker线程无畏的空转。
HashedWheelBucket
HashedWheelBucket是一个内部类,代表的是环上的节点。在构造方法中会构造一个HashedWheelBucket数组。
private static final class HashedWheelBucket {
// Used for the linked-list datastructure
private HashedWheelTimeout head;
private HashedWheelTimeout tail;
}
HashedWheelBucket中维持了一个链表来存储超时任务。
Worker线程
public void run() {
// Initialize the startTime.
startTime = System.nanoTime();
if (startTime == 0) {
// We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.
startTime = 1;
}
// Notify the other threads waiting for the initialization at start().
startTimeInitialized.countDown();
do {
final long deadline = waitForNextTick();
if (deadline > 0) {
int idx = (int) (tick & mask);
processCancelledTasks();
HashedWheelBucket bucket =
wheel[idx];
transferTimeoutsToBuckets();
bucket.expireTimeouts(deadline);
tick++;
}
} while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
// Fill the unprocessedTimeouts so we can return them from stop() method.
for (HashedWheelBucket bucket: wheel) {
bucket.clearTimeouts(unprocessedTimeouts);
}
for (;;) {
HashedWheelTimeout timeout = timeouts.poll();
if (timeout == null) {
break;
}
if (!timeout.isCancelled()) {
unprocessedTimeouts.add(timeout);
}
}
processCancelledTasks();
}
整个WheelTimer中最重要的就是Woker线程了。前面提到start()方法中会启动worker线程,并且会等待startTime不为0,worker线程会把startTime设置为当前的纳秒时间,并且startTimeInitialized.countDown()唤醒阻塞在start()方法的线程。
在之后,只要WheelTimer还在WORKER_STATE_STARTED状态(目前改变状态会会在start和stop方法)。
waitForNextTick()
private long waitForNextTick() {
long deadline = tickDuration * (tick + 1);
for (;;) {
final long currentTime = System.nanoTime() - startTime;
long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
if (sleepTimeMs <= 0) {
if (currentTime == Long.MIN_VALUE) {
return -Long.MAX_VALUE;
} else {
return currentTime;
}
}
// Check if we run on windows, as if thats the case we will need
// to round the sleepTime as workaround for a bug that only affect
// the JVM if it runs on windows.
//
// See https://github.com/netty/netty/issues/356
if (PlatformDependent.isWindows()) {
sleepTimeMs = sleepTimeMs / 10 * 10;
}
try {
Thread.sleep(sleepTimeMs);
} catch (InterruptedException ignored) {
if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
return Long.MIN_VALUE;
}
}
}
}
waitForNextTick()比较简单,就是让woker线程休眠一个tick的时间,休眠完之后返回当前纳秒时间。
processCancelledTasks()
private void processCancelledTasks() {
for (;;) {
Runnable task = cancelledTimeouts.poll();
if (task == null) {
// all processed
break;
}
try {
task.run();
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("An exception was thrown while process a cancellation task", t);
}
}
}
}
WheelTime中维护了一个cancelledTimeouts队列,每次tick都会处理cancelledTimeouts队列中的所有超时任务,至于任务是在什么时候怎么被添加到cancelledTimeouts队列中的后面再说。
transferTimeoutsToBuckets()
private void transferTimeoutsToBuckets() {
// transfer only max. 100000 timeouts per tick to prevent a thread to stale the workerThread when it just
// adds new timeouts in a loop.
for (int i = 0; i < 100000; i++) {
HashedWheelTimeout timeout = timeouts.poll();
if (timeout == null) {
// all processed
break;
}
if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
// Was cancelled in the meantime.
continue;
}
long calculated = timeout.deadline / tickDuration;
timeout.remainingRounds = (calculated - tick) / wheel.length;
final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past.
int stopIndex = (int) (ticks & mask);
HashedWheelBucket bucket = wheel[stopIndex];
bucket.addTimeout(timeout);
}
}
前面提到,在newTimeout的时候,超时任务并不会立马添加到环中,而是先放到了timeout队列中。在每个tick来临的时候,worker会将timeout中的所有超时任务方法环中。而计算remainingRounds和stopIndex的方法还是很巧妙的
long calculated = timeout.deadline / tickDuration;
timeout.remainingRounds = (calculated - tick) / wheel.length;
final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past.
int stopIndex = (int) (ticks & mask);
然后将超时任务添加到对应的HashedWheelBucket中。
bucket.expireTimeouts(deadline);
public void expireTimeouts(long deadline) {
HashedWheelTimeout timeout = head;
// process all timeouts
while (timeout != null) {
boolean remove = false;
if (timeout.remainingRounds <= 0) {
if (timeout.deadline <= deadline) {
timeout.expire();
} else {
// The timeout was placed into a wrong slot. This should never happen.
throw new IllegalStateException(String.format(
"timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
}
remove = true;
} else if (timeout.isCancelled()) {
remove = true;
} else {
timeout.remainingRounds --;
}
// store reference to next as we may null out timeout.next in the remove block.
HashedWheelTimeout next = timeout.next;
if (remove) {
remove(timeout);
}
timeout = next;
}
}
处理环中对应bucket中所有的超时任务,如果remainingRounds小于等于0,证明超时时间到了,则执行timeout.expire();
,如果remainingRounds大于0,则减1,如果超时任务超时或者取消,移除超时任务。
HashedWheelTimeout#cancel
public boolean cancel() {
// only update the state it will be removed from HashedWheelBucket on next tick.
if (!compareAndSetState(ST_INIT, ST_CANCELLED)) {
return false;
}
// If a task should be canceled we create a new Runnable for this to another queue which will
// be processed on each tick. So this means that we will have a GC latency of max. 1 tick duration
// which is good enough. This way we can make again use of our MpscLinkedQueue and so minimize the
// locking / overhead as much as possible.
//
// It is important that we not just add the HashedWheelTimeout itself again as it extends
// MpscLinkedQueueNode and so may still be used as tombstone.
timer.cancelledTimeouts.add(new Runnable() {
@Override
public void run() {
HashedWheelBucket bucket = HashedWheelTimeout.this.bucket;
if (bucket != null) {
bucket.remove(HashedWheelTimeout.this);
}
}
});
return true;
}```
前面提到了cancelledTimeouts队列,在调用HashedWheelTimeout#cancel时会像cancelledTimeouts队列中增加任务,该任务就是将超时任务从对应的bucket中移除
### stop()
public Set<Timeout> stop() {
if (Thread.currentThread() == workerThread) {
throw new IllegalStateException(
HashedWheelTimer.class.getSimpleName() +
".stop() cannot be called from " +
TimerTask.class.getSimpleName());
}
if (!WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) {
// workerState can be 0 or 2 at this moment - let it always be 2.
WORKER_STATE_UPDATER.set(this, WORKER_STATE_SHUTDOWN);
if (leak != null) {
leak.close();
}
return Collections.emptySet();
}
boolean interrupted = false;
while (workerThread.isAlive()) {
workerThread.interrupt();
try {
workerThread.join(100);
} catch (InterruptedException ignored) {
interrupted = true;
}
}
if (interrupted) {
Thread.currentThread().interrupt();
}
if (leak != null) {
leak.close();
}
return worker.unprocessedTimeouts();
}
我一直认为写程序有两点非常考验功底,1是生命周期管理,2是异常情况处理
WheelTimer有start()方法也应该有stop()方法,该stop方法有比较多的技巧值得学习
if (!WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) {
// workerState can be 0 or 2 at this moment - let it always be 2.
WORKER_STATE_UPDATER.set(this, WORKER_STATE_SHUTDOWN);
if (leak != null) {
leak.close();
}
return Collections.emptySet();
}
这里相当于有多个线程同时调用stop()方法时,只有一个能成功把状态从WORKER_STATE_STARTED设置为WORKER_STATE_SHUTDOWN,如果设置不成功则强制设置为WORKER_STATE_SHUTDOWN(保证总有一个成功,其实应该没有必要),然后返回空列表(表示该线程不需要处理了,总会有另外一个成功的线程完成后面的事情)。
while (workerThread.isAlive()) {
workerThread.interrupt();
try {
workerThread.join(100);
} catch (InterruptedException ignored) {
interrupted = true;
}
}
如果workerThread.isAlive,如果worker线程仍活着,或尝试workerThread.interrupt()(要想停止一个线程可以使用xxxThread.interrupt(),然后让xxxThread响应xxxThread.isInterrupted(),虽然该wokerThread没有响应这个...)。在WheelTimer中,stop的时候想要workerThread优雅的处理完事情,并且返回未能处理完的任务后退出,所以使用` workerThread.join(100);`在线程中等待workerThread执行100ms。
// Fill the unprocessedTimeouts so we can return them from stop() method.
for (HashedWheelBucket bucket: wheel) {
bucket.clearTimeouts(unprocessedTimeouts);
}
for (;;) {
HashedWheelTimeout timeout = timeouts.poll();
if (timeout == null) {
break;
}
if (!timeout.isCancelled()) {
unprocessedTimeouts.add(timeout);
}
}
processCancelledTasks()
在worker线程中,最后会将bucket中所有没来得及处理的任务和timeout队列中没超时的任务放入到unprocessedTimeouts中,然后会处理掉已经取消的超时任务,然后就完成了它的使命等待被回收。
其中有响应InterruptedException的部分处理,关于InterruptedException的处理估计会要出一篇文章详细讲解。
## 总结
代码写得非常好,有很多值得学习的地方,HashedWheelTimer可以用起来了。