在半个月之前,有幸看了xxl-job源码,原本打算写一篇源码分析文章。结果由于琐碎的事情干扰了,搁浅了。本篇文章先预热一下,讲下xxl-job中关于quartz知识。(本文内容参考自xxl-job官网)
xxl-job设计思想和调度模块剖析
xxl-job将调度行为抽象形成"调度中心"公共平台,而平台自身并不承担业务逻辑,"调度中心"负责发起调度请求。将任务抽象成分散的JobHandler,交由"执行器"统一管理,"执行器"负责接收调度请求并执行对应的JobHandler中业务逻辑。因此,"调度"和"任务"两部分可以相互解耦,提高系统整体稳定性和扩展性。
quartz的不足
Quartz作为开源作业调度中的佼佼者,是作业调度的首选。集群环境中Quartz采用API的方式对任务进行管理,但是会存在以下问题:
- 调用API的方式操作任务,不人性化。
- 需要持久化业务QuartzJobBean到底层数据表中,系统侵入性相当严重。
- 调度逻辑和QuartzJobBean耦合在同一个项目中,这将导致一个问题,在调度任务数量逐渐增多,同时调度任务逻辑逐渐加重的情况下,此时调度系统的性能将大大受限于业务。
- Quartz底层以"抢占式"获取DB锁并由抢占成功节点负责运行任务,会导致节点负载悬殊非常大,而xxl-job通过执行器实现"协同分配式"运行任务,充分发挥集群优势,负载各节点均衡。
RemoteHttpJobBean
常规Quartz的开发,任务逻辑一般维护在QuartzJobBean中,耦合很严重。xxl-job中的调度模块和任务模块完全解耦,调度模块中的所有调度任务都使用的是同一个QuartzJobBean(也就是RemoteHttpJobBean)。不同的调度任务将各自参数维护在各自扩展表数据中,当触发RemoteHttpJobBean执行时,将会解析不同的任务参数发起远程调用,调用各自的远程执行器服务。这种调用模型类似RPC调用,RemoteHttpJobBean提供调用代理的功能,而执行器提供远程服务的功能。
调度中心HA(集群)
基于Quartz的集群方案,数据库选用Mysql;集群分布式并发环境中使用QUARTZ定时任务调度,会在各个节点会上报任务,存到数据库中,执行时会从数据库中取出触发器来执行,如果触发器的名称和执行时间相同,则只有一个节点去执行此任务。
# 基于Quartz的集群方案,数据库选用Mysql;
# 集群分布式并发环境中使用QUARTZ定时任务调度,会在各个节点会上报任务,存到数据库中。
# 执行时会从数据库中取出触发器来执行,如果触发器的名称和执行时间相同,则只有一个节点去执行此任务。
# for cluster
org.quartz.jobStore.tablePrefix: XXL_JOB_QRTZ_
org.quartz.jobStore.class: org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.isClustered: true
org.quartz.jobStore.clusterCheckinInterval: 5000
调度线程池
调度采用线程池方式实现,避免单线程因阻塞而引起任务调度延迟
# 调度采用线程池方式实现,避免单线程因阻塞而引起任务调度延迟。
org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount: 50
org.quartz.threadPool.threadPriority: 5
org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread: true
@DisallowConcurrentExecution
xxl-job调度模块的"调度中心"默认不使用该注解,因为RemoteHttpJobBean为公共QuartzJobBean,这样在多线程调度的情况下,调度模块被阻塞的几率很低,大大提高了调度系统的承载量。xxl-job的每个调度任务虽然在调度模块时并行调度执行的,但是任务调度传递到任务模块的执行器确实是串行执行的,同时支持任务终止。
话外音:Quartz定时任务默认都是并发执行的,不会等待上一次任务执行完毕,只要间隔时间到就会执行, 如果定时任执行太长,会长时间占用资源,导致其它任务堵塞。
misfire策略
misfire用于Trigger触发时,线程池中没有可用的线程或者调度器被关闭了,此时这个Trigger就变成了misfire。当下次调度器启动或者有线程可用时,会检查处于misfire状态的Trigger。而misfire的状态值决定了调度器如何处理这个Trigger。
quartz有个全局的参数misfireThreadshold可以设置允许的超时时间,单位为毫秒。如果超过了就不执行,未超过就执行。
org.quartz.jobStore.misfireThreshold: 60000
造成misfire的可能原因:服务重启;调度线程被QuartzJobBean阻塞,线程被耗尽;某个任务启用了@DisallowConcurrentExecution,上次调度持续阻塞,下次调度被错过。
Misfire规则,假设任务是从上午9点到下午17点时间范围执行:
withMisfireHandlingInstructionDoNothing:不触发立即执行,等待下次调度
withMisfireHandlingInstructionIgnoreMisfires:以错过的第一个频率时间立刻开始执行,重做错过的所有频率周期后,当下一次触发频率发生时间大于当前时间后,再按照正常的Cron频率依次执行。如果9点misfire了,在10:15系统恢复之后。9点,10点的misfire会马上执行。
withMisfireHandlingInstructionFireAndProceed:以当前时间为触发频率立刻触发一次执行,然后按照Cron频率依次执行。假设9点,10点的任务都misfire了,系统在10:15恢复后,只会执行一次misfire,下次正点执行。
xxl-job默认misfire规则为:withMisfireHandlingInstructionDoNothing
// 3、corn trigger
CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(cronExpression).withMisfireHandlingInstructionDoNothing(); // withMisfireHandlingInstructionDoNothing 忽略掉调度终止过程中忽略的调度
CronTrigger cronTrigger = TriggerBuilder.newTrigger().withIdentity(triggerKey).withSchedule(cronScheduleBuilder).build();
xxl-job和quartz数据库表讲解
XXL-JOB调度模块基于Quartz集群实现,其"调度数据库"是在Quartz的11张集群mysql表基础上扩展而成。
xxl-job的5张扩展表
XXL_JOB_QRTZ_TRIGGER_GROUP
:执行器信息表,维护任务执行器信息
CREATE TABLE `XXL_JOB_QRTZ_TRIGGER_GROUP` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`app_name` varchar(64) NOT NULL COMMENT '执行器AppName',
`title` varchar(12) NOT NULL COMMENT '执行器名称',
`order` tinyint(4) NOT NULL DEFAULT '0' COMMENT '排序',
`address_type` tinyint(4) NOT NULL DEFAULT '0' COMMENT '执行器地址类型:0=自动注册、1=手动录入',
`address_list` varchar(512) DEFAULT NULL COMMENT '执行器地址列表,多地址逗号分隔',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
XXL_JOB_QRTZ_TRIGGER_INFO
:调度扩展信息表, 用于保存XXL-JOB调度任务的扩展信息,如任务分组、任务名、机器地址、执行器、执行入参和报警邮件等等
CREATE TABLE `XXL_JOB_QRTZ_TRIGGER_INFO` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`job_group` int(11) NOT NULL COMMENT '执行器主键ID',
`job_cron` varchar(128) NOT NULL COMMENT '任务执行CRON',
`job_desc` varchar(255) NOT NULL,
`add_time` datetime DEFAULT NULL,
`update_time` datetime DEFAULT NULL,
`author` varchar(64) DEFAULT NULL COMMENT '作者',
`alarm_email` varchar(255) DEFAULT NULL COMMENT '报警邮件',
`executor_route_strategy` varchar(50) DEFAULT NULL COMMENT '执行器路由策略',
`executor_handler` varchar(255) DEFAULT NULL COMMENT '执行器任务handler',
`executor_param` varchar(512) DEFAULT NULL COMMENT '执行器任务参数',
`executor_block_strategy` varchar(50) DEFAULT NULL COMMENT '阻塞处理策略',
`executor_timeout` int(11) NOT NULL DEFAULT '0' COMMENT '任务执行超时时间,单位秒',
`executor_fail_retry_count` int(11) NOT NULL DEFAULT '0' COMMENT '失败重试次数',
`glue_type` varchar(50) NOT NULL COMMENT 'GLUE类型',
`glue_source` mediumtext COMMENT 'GLUE源代码',
`glue_remark` varchar(128) DEFAULT NULL COMMENT 'GLUE备注',
`glue_updatetime` datetime DEFAULT NULL COMMENT 'GLUE更新时间',
`child_jobid` varchar(255) DEFAULT NULL COMMENT '子任务ID,多个逗号分隔',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
XXL_JOB_QRTZ_TRIGGER_LOG
:调度日志表,用于保存XXL-JOB任务调度的历史信息,如调度结果、执行结果、调度入参、调度机器和执行器等等;
CREATE TABLE `XXL_JOB_QRTZ_TRIGGER_LOG` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`job_group` int(11) NOT NULL COMMENT '执行器主键ID',
`job_id` int(11) NOT NULL COMMENT '任务,主键ID',
`executor_address` varchar(255) DEFAULT NULL COMMENT '执行器地址,本次执行的地址',
`executor_handler` varchar(255) DEFAULT NULL COMMENT '执行器任务handler',
`executor_param` varchar(512) DEFAULT NULL COMMENT '执行器任务参数',
`executor_sharding_param` varchar(20) DEFAULT NULL COMMENT '执行器任务分片参数,格式如 1/2',
`executor_fail_retry_count` int(11) NOT NULL DEFAULT '0' COMMENT '失败重试次数',
`trigger_time` datetime DEFAULT NULL COMMENT '调度-时间',
`trigger_code` int(11) NOT NULL COMMENT '调度-结果',
`trigger_msg` text COMMENT '调度-日志',
`handle_time` datetime DEFAULT NULL COMMENT '执行-时间',
`handle_code` int(11) NOT NULL COMMENT '执行-状态',
`handle_msg` text COMMENT '执行-日志',
PRIMARY KEY (`id`),
KEY `I_trigger_time` (`trigger_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
XXL_JOB_QRTZ_TRIGGER_LOGGLUE
:任务GLUE日志,用于保存GLUE更新历史,用于支持GLUE的版本回溯功能;
CREATE TABLE `XXL_JOB_QRTZ_TRIGGER_LOGGLUE` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`job_id` int(11) NOT NULL COMMENT '任务,主键ID',
`glue_type` varchar(50) DEFAULT NULL COMMENT 'GLUE类型',
`glue_source` mediumtext COMMENT 'GLUE源代码',
`glue_remark` varchar(128) NOT NULL COMMENT 'GLUE备注',
`add_time` timestamp NULL DEFAULT NULL,
`update_time` timestamp NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
XXL_JOB_QRTZ_TRIGGER_REGISTRY
:执行器注册表,维护在线的执行器和调度中心机器地址信息;
CREATE TABLE XXL_JOB_QRTZ_TRIGGER_REGISTRY (
`id` int(11) NOT NULL AUTO_INCREMENT,
`registry_group` varchar(255) NOT NULL,
`registry_key` varchar(255) NOT NULL,
`registry_value` varchar(255) NOT NULL,
`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
quartz的11张系统表(只会介绍常见的,后续用到了其他的再补充)
XXL_JOB_QRTZ_JOB_DETAILS
:存储的是job的详细信息,包括:[DESCRIPTION]
描述,[IS_DURABLE]
是否持久化,[JOB_DATA]
持久化对象等基本信息。
CREATE TABLE XXL_JOB_QRTZ_JOB_DETAILS
(
SCHED_NAME VARCHAR(120) NOT NULL,
JOB_NAME VARCHAR(200) NOT NULL,
JOB_GROUP VARCHAR(200) NOT NULL,
DESCRIPTION VARCHAR(250) NULL,
JOB_CLASS_NAME VARCHAR(250) NOT NULL,
IS_DURABLE VARCHAR(1) NOT NULL,
IS_NONCONCURRENT VARCHAR(1) NOT NULL,
IS_UPDATE_DATA VARCHAR(1) NOT NULL,
REQUESTS_RECOVERY VARCHAR(1) NOT NULL,
JOB_DATA BLOB NULL,
PRIMARY KEY (SCHED_NAME,JOB_NAME,JOB_GROUP)
);
XXL_JOB_QRTZ_CRON_TRIGGERS
:存储CronTrigger
相关信息,这也是我们使用最多的触发器。
CREATE TABLE XXL_JOB_QRTZ_CRON_TRIGGERS
(
SCHED_NAME VARCHAR(120) NOT NULL,
TRIGGER_NAME VARCHAR(200) NOT NULL,
TRIGGER_GROUP VARCHAR(200) NOT NULL,
CRON_EXPRESSION VARCHAR(200) NOT NULL,
TIME_ZONE_ID VARCHAR(80),
PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
REFERENCES XXL_JOB_QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
);
XXL_JOB_QRTZ_PAUSED_TRIGGER_GRPS
:存放暂停掉的触发器
CREATE TABLE XXL_JOB_QRTZ_PAUSED_TRIGGER_GRPS
(
SCHED_NAME VARCHAR(120) NOT NULL,
TRIGGER_GROUP VARCHAR(200) NOT NULL,
PRIMARY KEY (SCHED_NAME,TRIGGER_GROUP)
);
XXL_JOB_QRTZ_FIRED_TRIGGERS
:存储已经触发的trigger相关信息,trigger随着时间的推移状态发生变化,直到最后trigger执行完成,从表中被删除。相同的trigger和task,每触发一次都会创建一个实例;从刚被创建的ACQUIRED状态,到EXECUTING状态,最后执行完从数据库中删除;
CREATE TABLE XXL_JOB_QRTZ_FIRED_TRIGGERS
(
SCHED_NAME VARCHAR(120) NOT NULL,
ENTRY_ID VARCHAR(95) NOT NULL,
TRIGGER_NAME VARCHAR(200) NOT NULL,
TRIGGER_GROUP VARCHAR(200) NOT NULL,
INSTANCE_NAME VARCHAR(200) NOT NULL,
FIRED_TIME BIGINT(13) NOT NULL,
SCHED_TIME BIGINT(13) NOT NULL,
PRIORITY INTEGER NOT NULL,
STATE VARCHAR(16) NOT NULL,
JOB_NAME VARCHAR(200) NULL,
JOB_GROUP VARCHAR(200) NULL,
IS_NONCONCURRENT VARCHAR(1) NULL,
REQUESTS_RECOVERY VARCHAR(1) NULL,
PRIMARY KEY (SCHED_NAME,ENTRY_ID)
);
XXL_JOB_QRTZ_SIMPLE_TRIGGERS
:存储SimpleTrigger信息,timesTriggered默认值为0,当timesTriggered > repeatCount停止trigger,当执行完毕之后此记录会被删除
CREATE TABLE XXL_JOB_QRTZ_SIMPLE_TRIGGERS
(
SCHED_NAME VARCHAR(120) NOT NULL,
TRIGGER_NAME VARCHAR(200) NOT NULL,
TRIGGER_GROUP VARCHAR(200) NOT NULL,
REPEAT_COUNT BIGINT(7) NOT NULL,
REPEAT_INTERVAL BIGINT(12) NOT NULL,
TIMES_TRIGGERED BIGINT(10) NOT NULL,
PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
REFERENCES XXL_JOB_QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
);
XXL_JOB_QRTZ_TRIGGERS
: 和XXL_JOB_QRTZ_FIRED_TRIGGERS
存放的不一样,不管trigger
触发了多少次都只有一条记录,TRIGGER_STATE
用来标识当前trigger
的状态;假如cronTrigger每小时执行一次,执行完之后一直是WAITING状态;假如cronTrigger每6秒执行一次状态是ACQUIRED状态;假如simpleTrigger设置的执行次数为5,那么重复执行5次后状态为COMPLETE,并且会被删除。
CREATE TABLE XXL_JOB_QRTZ_TRIGGERS
(
SCHED_NAME VARCHAR(120) NOT NULL,
TRIGGER_NAME VARCHAR(200) NOT NULL,
TRIGGER_GROUP VARCHAR(200) NOT NULL,
JOB_NAME VARCHAR(200) NOT NULL,
JOB_GROUP VARCHAR(200) NOT NULL,
DESCRIPTION VARCHAR(250) NULL,
NEXT_FIRE_TIME BIGINT(13) NULL,
PREV_FIRE_TIME BIGINT(13) NULL,
PRIORITY INTEGER NULL,
TRIGGER_STATE VARCHAR(16) NOT NULL,
TRIGGER_TYPE VARCHAR(8) NOT NULL,
START_TIME BIGINT(13) NOT NULL,
END_TIME BIGINT(13) NULL,
CALENDAR_NAME VARCHAR(200) NULL,
MISFIRE_INSTR SMALLINT(2) NULL,
JOB_DATA BLOB NULL,
PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
FOREIGN KEY (SCHED_NAME,JOB_NAME,JOB_GROUP)
REFERENCES XXL_JOB_QRTZ_JOB_DETAILS(SCHED_NAME,JOB_NAME,JOB_GROUP)
);
XXL_JOB_QRTZ_SCHEDULER_STATE
:存储所有节点的scheduler
,会定期检查scheduler
是否失效。
CREATE TABLE XXL_JOB_QRTZ_SCHEDULER_STATE
(
SCHED_NAME VARCHAR(120) NOT NULL,
INSTANCE_NAME VARCHAR(200) NOT NULL,
LAST_CHECKIN_TIME BIGINT(13) NOT NULL,
CHECKIN_INTERVAL BIGINT(13) NOT NULL,
PRIMARY KEY (SCHED_NAME,INSTANCE_NAME)
);
XXL_JOB_QRTZ_LOCKS
:Quartz提供的锁表,为多个节点调度提供分布式锁,实现分布式调度,默认有2个锁,STATE_ACCESS
主要用在scheduler定期检查是否失效的时候,保证只有一个节点去处理已经失效的scheduler
。TRIGGER_ACCESS
主要用在TRIGGER
被调度的时候,保证只有一个节点去执行调度。
CREATE TABLE XXL_JOB_QRTZ_LOCKS
(
SCHED_NAME VARCHAR(120) NOT NULL,
LOCK_NAME VARCHAR(40) NOT NULL,
PRIMARY KEY (SCHED_NAME,LOCK_NAME)
);
关于单机版的quartz与Spring XML集成
1.创建Job类,无须继承父类,直接配置MethodInvokingJobDetailFactoryBean
即可。但需要指定一下两个属性:
- targetObject:指定包含任务执行体的Bean实例。
- targetMethod:指定将指定Bean实例的该方法包装成任务的执行体
<!-- 配置Job类 -->
<bean id="myJob" class="com.cmazxiaoma.quartz.xml.MyJob"></bean>
<!-- 配置JobDetail -->
<bean id="myJobDetail" class="org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean">
<!-- 执行目标job -->
<property name="targetObject" ref="myJob"></property>
<!-- 要执行的方法 -->
<property name="targetMethod" value="execute"></property>
</bean>
<!-- 配置tirgger触发器 -->
<bean id="cronTriggerFactoryBean" class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
<!-- jobDetail -->
<property name="jobDetail" ref="myJobDetail"></property>
<!-- cron表达式,执行时间 每5秒执行一次 -->
<property name="cronExpression" value="0/5 * * * * ?"></property>
</bean>
<!-- 配置调度工厂 -->
<bean id="springJobSchedulerFactoryBean" class="org.springframework.scheduling.quartz.SchedulerFactoryBean">
<property name="triggers">
<list>
<ref bean="cronTriggerFactoryBean"></ref>
</list>
</property>
</bean>
根据jdi.getJobDataMap().put("methodInvoker", this);
来看,如果Quartz是集群的话,这里会抛出Couldn't store job: Unable to serialize JobDataMap for insertion into database because the value of property 'methodInvoker' is not serializable: org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean
因为methodInvoker
不能进行序列化,没有实现serializable
接口。集群环境下,还是推荐用继承QuartzJobBean创建Job。MethodInvokingJobDetailFactoryBean中的targetObject和targetMethod参数无法持久化到底层数据库表。
最终还是根据concurrent策略选择MethodInvokingJob或者StatefulMethodInvokingJob去反射调用targetObject中的targetMethod。
public static class MethodInvokingJob extends QuartzJobBean {
protected static final Log logger = LogFactory.getLog(MethodInvokingJob.class);
private MethodInvoker methodInvoker;
/**
* Set the MethodInvoker to use.
*/
public void setMethodInvoker(MethodInvoker methodInvoker) {
this.methodInvoker = methodInvoker;
}
/**
* Invoke the method via the MethodInvoker.
*/
@Override
protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
try {
context.setResult(this.methodInvoker.invoke());
}
catch (InvocationTargetException ex) {
if (ex.getTargetException() instanceof JobExecutionException) {
// -> JobExecutionException, to be logged at info level by Quartz
throw (JobExecutionException) ex.getTargetException();
}
else {
// -> "unhandled exception", to be logged at error level by Quartz
throw new JobMethodInvocationFailedException(this.methodInvoker, ex.getTargetException());
}
}
catch (Exception ex) {
// -> "unhandled exception", to be logged at error level by Quartz
throw new JobMethodInvocationFailedException(this.methodInvoker, ex);
}
}
}
/**
* Extension of the MethodInvokingJob, implementing the StatefulJob interface.
* Quartz checks whether or not jobs are stateful and if so,
* won't let jobs interfere with each other.
*/
@PersistJobDataAfterExecution
@DisallowConcurrentExecution
public static class StatefulMethodInvokingJob extends MethodInvokingJob {
// No implementation, just an addition of the tag interface StatefulJob
// in order to allow stateful method invoking jobs.
}
2.创建Job
类,myJob
类继承QuartzJobBean
,实现 executeInternal(JobExecutionContext jobexecutioncontext)
方法。然后再通过配置JobDetailFactoryBean
创建jobDetail
。
<!-- 配置JobDetail -->
<bean id="myJobDetail" class="org.springframework.scheduling.quartz.JobDetailFactoryBean">
<property name="jobClass" value="com.cmazxiaoma.quartz.xml.MyJob"></property>
<property name="durability" value="true"></property>
</bean>
<!-- 配置tirgger触发器 -->
<bean id="cronTriggerFactoryBean" class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
<!-- jobDetail -->
<property name="jobDetail" ref="myJobDetail"></property>
<!-- cron表达式,执行时间 每5秒执行一次 -->
<property name="cronExpression" value="0/5 * * * * ?"></property>
</bean>
<!-- 配置调度工厂 如果将lazy-init='false'那么容器启动就会执行调度程序 -->
<bean id="springJobSchedulerFactoryBean" class="org.springframework.scheduling.quartz.SchedulerFactoryBean">
<property name="triggers">
<list>
<ref bean="cronTriggerFactoryBean"></ref>
</list>
</property>
</bean>
requestsRecovery属性设置为 true时,当Quartz服务被中止后,再次启动或集群中其他机器接手任务时会尝试恢复执行之前未完成的所有任务。
集群Quartz与SpringBoot集成
@Configuration
public class XxlJobDynamicSchedulerConfig {
@Bean
public SchedulerFactoryBean getSchedulerFactoryBean(DataSource dataSource){
SchedulerFactoryBean schedulerFactory = new SchedulerFactoryBean();
schedulerFactory.setDataSource(dataSource);
// 自动启动
schedulerFactory.setAutoStartup(true);
// 延时启动,应用启动成功后在启动
schedulerFactory.setStartupDelay(20);
// 覆盖DB中JOB:true、以数据库中已经存在的为准:false
schedulerFactory.setOverwriteExistingJobs(true);
schedulerFactory.setApplicationContextSchedulerContextKey("applicationContext");
schedulerFactory.setConfigLocation(new ClassPathResource("quartz.properties"));
return schedulerFactory;
}
@Bean(initMethod = "start", destroyMethod = "destroy")
public XxlJobDynamicScheduler getXxlJobDynamicScheduler(SchedulerFactoryBean schedulerFactory){
Scheduler scheduler = schedulerFactory.getScheduler();
XxlJobDynamicScheduler xxlJobDynamicScheduler = new XxlJobDynamicScheduler();
xxlJobDynamicScheduler.setScheduler(scheduler);
return xxlJobDynamicScheduler;
}
}
quartz.properties
# Default Properties file for use by StdSchedulerFactory
# to create a Quartz Scheduler Instance, if a different
# properties file is not explicitly specified.
#
org.quartz.scheduler.instanceName: DefaultQuartzScheduler
org.quartz.scheduler.instanceId: AUTO
org.quartz.scheduler.rmi.export: false
org.quartz.scheduler.rmi.proxy: false
org.quartz.scheduler.wrapJobExecutionInUserTransaction: false
# 调度采用线程池方式实现,避免单线程因阻塞而引起任务调度延迟。
org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount: 50
org.quartz.threadPool.threadPriority: 5
org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread: true
# misfile:错过了触发时间,来处理规则。
# 可能原因:
# 1.服务重启 2.调度线程被QuartzJobBean阻塞 3.线程被耗尽
# 4.某个任务启动了@DisallowConcurrentExecution,上次调度持续阻塞,下次调度被错过。
# 假设任务是从上午9点到下午17点
# Misfire规则:
# withMisfireHandlingInstructionDoNothing:不触发立即执行,等待下次调度
#——以错过的第一个频率时间立刻开始执行
#——重做错过的所有频率周期后
#——当下一次触发频率发生时间大于当前时间后,再按照正常的Cron频率依次执行
# 如果9点misfire了,在10:15系统恢复之后。9点,10点的misfire会马上执行
# withMisfireHandlingInstructionIgnoreMisfires:以错过的第一个频率时间立刻开始执行;
#——以当前时间为触发频率立刻触发一次执行
# ——然后按照Cron频率依次执行
# withMisfireHandlingInstructionFireAndProceed:以当前时间为触发频率立刻触发一次执行;
# 假设9点,10点的任务都misfire了,系统在10:15恢复后,只会执行一次misfire,
# 下次正点执行。
# XXL-JOB默认misfire规则为:withMisfireHandlingInstructionDoNothing
### 单位毫秒
org.quartz.jobStore.misfireThreshold: 60000
org.quartz.jobStore.maxMisfiresToHandleAtATime: 1
#org.quartz.jobStore.class: org.quartz.simpl.RAMJobStore
# 基于Quartz的集群方案,数据库选用Mysql;
# 集群分布式并发环境中使用QUARTZ定时任务调度,会在各个节点会上报任务,存到数据库中。
# 执行时会从数据库中取出触发器来执行,如果触发器的名称和执行时间相同,则只有一个节点去执行此任务。
# for cluster
org.quartz.jobStore.tablePrefix: XXL_JOB_QRTZ_
org.quartz.jobStore.class: org.quartz.impl.jdbcjobstore.JobStoreTX
# 默认是内存
#org.quartz.jobStore.class = org.quartz.simpl.RAMJobStore
org.quartz.jobStore.isClustered: true
org.quartz.jobStore.clusterCheckinInterval: 5000
在QuartzJobBean无法注入Service
以下测试都基于MySQL的Quartz集群环境下
前面说到基于MethodInvokingJobDetailFactoryBean创建Job,无法将targetObject和targetMethod参数持久化到数据库表,因此我们要想办法将这2个参数存储到JobDataMap中。
我们利用MethodInvokingJobDetailFactoryBean动态抽象构造Job的思想,将其改造一番。
public class MyMethodInvokingJobBean extends QuartzJobBean implements ApplicationContextAware {
private String targetObject;
private String targetMethod;
private ApplicationContext applicationContext;
public void setTargetObject(String targetObject) {
this.targetObject = targetObject;
}
public void setTargetMethod(String targetMethod) {
this.targetMethod = targetMethod;
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
@Override
protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
Object object = applicationContext.getBean(this.targetObject);
System.out.println("targetObject:" + targetObject);
System.out.println("targetMethod:" + targetMethod);
try {
Method method = object.getClass().getMethod(this.targetMethod, new Class[] {});
method.invoke(object, new Object[]{});
} catch (NoSuchMethodException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
}
}
}
@Configuration
public class MyQuartzConfiguration {
@Autowired
private MyQuartzTask myQuartzTask;
@Bean(name = "myQuartzJobBeanJobDetail")
public JobDetailFactoryBean myQuartzJobBeanJobDetail() {
JobDetailFactoryBean jobDetailFactoryBean = new JobDetailFactoryBean();
jobDetailFactoryBean.setJobClass(MyQuartzJobBean.class);
jobDetailFactoryBean.setDurability(true);
jobDetailFactoryBean.setRequestsRecovery(true);
return jobDetailFactoryBean;
}
@Bean(name = "myQuartzTaskJobDetail")
public JobDetailFactoryBean myQuartzTaskJobDetail() {
JobDetailFactoryBean jobDetailFactoryBean = new JobDetailFactoryBean();
jobDetailFactoryBean.setRequestsRecovery(true);
jobDetailFactoryBean.setDurability(true);
jobDetailFactoryBean.getJobDataMap().put("targetObject", "MyQuartzTask");
jobDetailFactoryBean.getJobDataMap().put("targetMethod", "execute");
jobDetailFactoryBean.setJobClass(MyMethodInvokingJobBean.class);
return jobDetailFactoryBean;
}
}
public class MyQuartzJobBean extends QuartzJobBean {
@Autowired
private MyQuartzService myQuartzService;
@Override
protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
System.out.println("MyQuartzJobBean executeInternal");
myQuartzService.print();
}
}
@Component(value = "MyQuartzTask")
public class MyQuartzTask {
@Autowired
private MyQuartzService myQuartzService;
public void execute() {
System.out.println("MyQuartzTask");
myQuartzService.print();
}
}
@Service
public class MyQuartzService {
public void print() {
System.out.println("MyQuartzService");
}
}
我们运行测试用例中的testMethodInvokingJobBean()
方法, 发现运行没问题,已经成功的注入了MyQuartzService
。大致的思想是:抽象出一个MyMethodInvokingJobBean
,注入targetObject和targetMethod参数,利用反射去执行目标类的目标方法,达到动态执行任务的目的,大大降低代码的耦合度。
public class QuartzTest extends AbstractSpringMvcTest {
@Autowired
private Scheduler scheduler;
@Resource(name = "myQuartzJobBeanJobDetail")
private JobDetail myQuartzJobBeanJobDetail;
@Resource(name = "myQuartzTaskJobDetail")
private JobDetail myQuartzTaskJobDetail;
@Test
public void testMethodInvokingJobBean() throws SchedulerException, InterruptedException {
TriggerKey triggerKey = TriggerKey.triggerKey("simpleTrigger", "simpleTriggerGroup");
SimpleScheduleBuilder simpleScheduleBuilder =
SimpleScheduleBuilder.simpleSchedule()
.withIntervalInSeconds(1)
.withRepeatCount(1);
SimpleTrigger simpleTrigger = (SimpleTrigger) TriggerBuilder.newTrigger()
.withIdentity(triggerKey)
.startNow()
.withSchedule(simpleScheduleBuilder)
.build();
scheduler.scheduleJob(myQuartzTaskJobDetail, simpleTrigger);
TimeUnit.MINUTES.sleep(10);
}
@Test
public void testJobDetailFactoryBean() throws InterruptedException, SchedulerException {
TriggerKey triggerKey = TriggerKey.triggerKey("simpleTrigger1", "simpleTriggerGroup");
SimpleScheduleBuilder simpleScheduleBuilder =
SimpleScheduleBuilder.simpleSchedule()
.withIntervalInSeconds(1)
.withRepeatCount(1);
SimpleTrigger simpleTrigger = (SimpleTrigger) TriggerBuilder.newTrigger()
.withIdentity(triggerKey)
.withSchedule(simpleScheduleBuilder)
.build();
scheduler.scheduleJob(myQuartzJobBeanJobDetail, simpleTrigger);
TimeUnit.MINUTES.sleep(10);
}
}
当我们运行testJobDetailFactoryBean()
时,此时我们的JobClass是MyQuartzJobBean
。当我们的job类中方法要被执行的时候,Quartz会根据JobClass重新实例化一个对象,这里对象中的属性都会为空,所以会抛出NPE异常。
看到这里有没有觉得这个问题好熟悉,在上一篇文章中为什么我的HibernateDaoSupport没有注入SessionFactory我们有提到过。当Quartz重新实例化对象后,肯定是没有调用populate()方法。我们此时应该找出当前对象中的所有属性,然后一一注入。(这里会涉及到AutowiredAnnotationBeanPostProcessor中的postProcessMergedBeanDefinition(RootBeanDefinition beanDefinition, Class<?> beanType, String beanName)
和postProcessPropertyValues( PropertyValues pvs, PropertyDescriptor[] pds, Object bean, String beanName) throws BeanCreationException
方法获取对象中需要被注入的属性和在Spring容器中获取相应的属性值)
我们在SpringBeanJobFactory
中的createJobInstance()中可以看到,就是从这里开始实例化Job类,并且把JobDataMap中的键值对填充到实例化后的Job对象中。
我们要做的是重写SpringBeanJobFactory
中的createJobInstance()
方法,为实例化后的Job对象注入依赖对象。
@Component
public class AutowireSpringBeanJobFactory extends SpringBeanJobFactory implements ApplicationContextAware {
private transient AutowireCapableBeanFactory autowireCapableBeanFactory;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.autowireCapableBeanFactory = applicationContext.getAutowireCapableBeanFactory();
}
@Override
protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
Object job = super.createJobInstance(bundle);
autowireCapableBeanFactory.autowireBean(job);
return job;
}
}
AutowireCapableBeanFactory中的autowireBean()方法中是调用populateBean()
完成属性的注入。
重新配置SchedulerFactoryBean
最后重新运行testJobDetailFactoryBean()
,发现在MyQuartzJobBean
中成功注入了MyQuartzService
。
尾言
饭要慢慢吃,路要慢慢走!