Tags:定时作业调度
分布式定时任务调度
Quartz
TBSchedule
Elastic-job
基于给定时间点,给定时间间隔或者给定执行次数自动执行任务。
Timer
对于简单的有固定间隔(period)的任务,使用JAVA内置的Timer即可解决问题。
public static void main(String[] args){
Timer timer = new Timer();
timer.schedule(new TimerTask(){
@Override
public void run() {
System.out.println("do sth...");
}
}, 1000, 2000);
}
特点:in JDK
,简洁
,单线程
对于简单的定时任务,Timer是非常实用的类,做一些常规的简单任务,如在线程池中用Timer扫描出空闲线程。
ScheduledExecutor
多线程的固定间隔简单调度,JDK也提供了工具类
public static class ScheduledExecutorTest implements Runnable {
private String jobName = "";
public ScheduledExecutorTest(String jobName) {
super();
this.jobName = jobName;
}
@Override
public void run() {
System.out.println("execute " + jobName);
}
public static void main(String[] args) {
//执行线程池大小
ScheduledExecutorService service = Executors.newScheduledThreadPool(10);
long initialDelay1 = 1;
long period1 = 1;
// 从现在开始1秒钟之后,每隔1秒钟执行一次job1
service.scheduleAtFixedRate(
new ScheduledExecutorTest("job1"), initialDelay1,
period1, TimeUnit.SECONDS);
long initialDelay2 = 1;
long delay2 = 1;
// 从现在开始2秒钟之后,每隔2秒钟执行一次job2
service.scheduleWithFixedDelay(
new ScheduledExecutorTest("job2"), initialDelay2,
delay2, TimeUnit.SECONDS);
}
}
特点:in JDK
多线程
线程池
Unix Crontab
相比较Timer这种固定间隔调度,crontab的可以使用cron表达式表达更复杂调度策略:
每1分钟执行一次myCommand
* * * * * myCommand
实例2:每小时的第3和第15分钟执行
3,15 * * * * myCommand
实例3:在上午8点到11点的第3和第15分钟执行
3,15 8-11 * * * myCommand
实例4:每隔两天的上午8点到11点的第3和第15分钟执行
3,15 8-11 */2 * * myCommand
crontab往往和脚本搭配完成更复杂的任务,意味着当需要和主系统进行复杂交互时多有不便。
特点: Linux内置
Crontab表达式
Quartz##
Quartz是个开源JAVA库,可以简单看做以上三种的结合的扩展。
Scheduler:调度容器
Job:Job接口类
JobDetail :Job的描述类,job执行时的依据此对象的信息反射实例化出Job的具体执行对象。
Trigger:存放Job执行的时间策略
JobStore: 存储作业和调度期间的状态
Calendar:指定排除的时间点(如排除法定节假日)
Quartz的主要线程有两类,负责调度的线程和负责Misfire(指错过了执行时间的作业)的线程,其中负责调度的线程RegularSchedulerThread是基于线程池的,而Misfire只有一个线程。 两类线程都会访问抽象为JobStore的层来获取作业策略或写入调度状态。
JobStore也分持久化(JobStoreSupport)和非持久化(RAMJobStore)两种,使用场景大大不同,后面有叙述。
注意上图左边部分是调度器的守护线程QuartzScheduleThread的主要流程,也就是:QuartzScheduleThread会在RegularThread池有空闲时(否则block),从JobStore中取出N个(将来30秒内要触发的)Trigger,并交给RegularThread线程池来运行job。
Quartz的功能非常丰富,结构也比上述的复杂的多,本文只是简要介绍抽象层的概念,详解请参考更多资料。
对于单机调度Quartz基本能完全满足我们的需求,但多个机器怎么办呢?
Quartz集群##
为了分担单点压力,往往需要多个节点运行定时任务,他们之间有协作又不能冲突。
Quartz用了一个比较取巧的方式支持集群定时调度。
首先,JobStore要选用数据库持久化存储:JDBCJobStore,且自己管理事务:JobStoreTX。
依附于本身的trigger存取策略,Quartz利用数据库行级锁来实现多节点的通讯(间接通讯)。
0.调度器线程run()
1.获取待触发trigger
1.1数据库LOCKS表TRIGGER_ACCESS行加锁
1.2读取JobDetail信息
1.3读取trigger表中触发器信息并标记为"已获取"
1.4commit事务,释放锁
2.触发trigger
2.1数据库LOCKS表STATE_ACCESS行加锁
2.2确认trigger的状态
2.3读取trigger的JobDetail信息
2.4读取trigger的Calendar信息
2.3更新trigger信息
2.3commit事务,释放锁
3实例化并执行Job
3.1从线程池获取线程执行JobRunShell的run方法
读取之前获取锁,写入之后释放锁,这是Quartz集群解决集群同步的核心思想。
Quartz集群是用工具拼凑起来的一个方案,巧妙的运用了数据库锁解决同步问题,这在一些场景中是非常work的,但问题也依旧明显:
解决了节点同步问题,但没有解决分布式问题。
官方也做出说明,集群特性对于高cpu使用率的任务效果很好,但是对于大量的短任务,各个节点都会抢占数据库锁,这样就出现大量的线程等待资源.这种情况随着节点的增加会越来越严重.
有没有解决分布式问题的方案?
TBSchedule##
类比Quartz集群用数据库做存储,TBSchedule则使用更符合分布式场景的zookeeper来做任务状态。
zookeeper有永久节点存储作业的配置信息,使用临时节点存储调度时的状态,当其中一个调度端和zookeeper断开链接时,回话消失临时节点数据被抹除,所有在线调度端会感知到改变化并做出相应的动作。
来看几个重要概念:
- 任务项
即分片。分布式机制是通过分片实现:
如:TaskItem: 0,1,2,3
可以用数据的ID取模对应TaskItem,一个TaskItem就代表了一部分 数据。
如上线了机器[A,B,C], TBScher会做如下分配:
[A=1,0,B=2,C=3]
如上线了机器[A,B,C,D,E], TBScher会做如下分配:
[A=0,B=1,C=2,D=3],E空闲。
分片操作由是leader节点执行,leader是最早上线的节点(编号最小)。
节点感知
调度端会启动一个刷新zookeeper的timer,如果有变动则回触发leader的重新分配资源,
如:
新上线或下线了机器,会给各个调度端重新分配TaskItem。
暂停或重新启动某个策略,调度端会停止之前的负责这个策略的线程组。触发
TBScheduler依旧支持Crontab表达式,并进一步支持执行的时间段(超过时间段则暂停),
但其内里实现有异于Quartz:
对于一个策略,在首次启动时会计算出该策略的下次执行开始时间和执行结束,然后分别启动一个负责启动和暂停的Timer,Timer内的操作就是对调度器的暂停和恢复,以及下一批Timer的创建。
TBScher的流式Job###
相对于Quartz的job只有execute,Tbscher的Job主要多了selectTasks()方法。
/**
* 单个任务处理实现
*
* @author xuannan
*
*/
public class DemoTaskBean implements IScheduleTaskDealSingle<Long> {
public List<Long> selectTasks(String taskParameter,String ownSign, int taskItemNum,
List<TaskItemDefine> queryCondition, int fetchNum) throws Exception {
List<Long> result = new ArrayList<Long>();
String message = "获取数据...[ownSign=" + ownSign + ",taskParameter=\"" + taskParameter +"\"]:";
return result;
}
public boolean execute(Long task, String ownSign) throws Exception {
Thread.sleep(50);
log.info("处理任务["+ownSign+"]:" + task);
return true;
}
}
selectTasks返回的结果会被带入execute中执行,当execute时task为空时会再次selectTasks,
一次调度中,selectTasks可能会被调用多次,直到返回空,结束本次调度。
TBSchedule的出现最大的进步之处在于从关注作业到关注数据。在此概念上造就了高性能,也真正解决了集群分布式问题。
缺点:
- 对zookeeper的操作都是原生客户端的直接操作,维护起来易出错外,zookeeper的高可用也没有良好支持。zookeeper挂掉要重启所有调度端。
- 文档缺失,四年内没有任何更新(2016),缺少开源社区的维护。
Elastic-job##
原理基本和TBSchedule一致。
一些重要概念:
leader选举
调度端机器上线后会检查有没有leader,如果没有则提议自己做leader,两个同时上线引发冲突是由zookeeper的内部解决的,总之它可以保证只有一个主。
leader如果下线会触发重新选举,在选出下个leader前所有任务会被阻塞。分片
leader选举后,leader以『协调者』角色负责分片,同时依赖zookeeper的临时节点和监听器的主动检查和通知功能,对机器上、下线、任务配置更改、分片修改等事件做出响应。
任务的设计###
因为借助Quartz做实际调度工作,所以Elatic-job的任务都是Quartz的Job的实现,但做了更多的细分扩展:
简单任务:
AbstractSimpleElasticJob
类似Quartz的Job,在Elastic-job的意义则多了高可用。流式任务:
AbstractDataFlowElasticJob
类似TBSchedule的任务,又再次基础细分重视顺序的AbstractSequenceDataFlowElasticJob和重视性能的AbstractThroughputDataFlowElasticJob。用户扩展任务
elatic-job是向着插件化看齐的,希望用户以插件形式贡献代码,编写更多有用的任务。
一些亮点###
Sharding Offset
框架提供了记录当前处理位移的方式,这往往用于大批量的任务处理中机器挂掉,这时候别的机器接手了挂掉的机器的任务时,需要知道哪些任务处理过了哪些还没处理。在TBSchedule中需要自己在自己的系统中做持久化标记,而在Elatic-job中则可以使用Sharding Offset,这为failover提供了便利。Misfire开关
本次作业开启后上次作业因为某种原因还没有结束,框架把这次作业标记为Misfire,上次作业执行完后会弥补标记了Misfire的作业。
Quartz中原本也有Misfire,但在分布式环境中使用Misfire需要另外的支持,Elatic-job引入了它。
Elastic-job是2015年当当网发布的开源项目,它出现的意义是对TBSchedule在各方面的优化,这体现在它借鉴了TBSchedule的流式任务概念,但基本的调度功能还是交给这方面的资深专家:Quartz,而对zookeeper的操作使用crutor封装,以及文档比较全面,这一点对于维护者来说是心头好。
唯一的缺点是太新,缺少线上环境的考验。但当当的开发者在推广方面很给力,赞一个。
总结##
本文从浅至深的介绍了任务调度技术,但没有使用说明和结构详解,因为本文旨在对比的基础上做原理介绍,可以在技术选型上给出参考。
参考资料##
https://www.ibm.com/developerworks/cn/java/j-lo-taskschedule/
http://tech.meituan.com/mt-crm-quartz.html
http://www.cnblogs.com/davidwang456/p/4205237.html
http://code.taobao.org/p/tbschedule/wiki/index/
https://github.com/dangdangdotcom/elastic-job