背景
开发Java应用经常会需要用到单机定时任务,这个时候一般我们会采用分布式任务调度中间件来解决问题。典型的分布式任务调度中间件,比如淘宝的tbschedule,当当的elastic-job,唯品会的saturn。京东没做开源,大概率是用了tbschedule。
但是,分布式任务调度中间件往往是基于一个zookeeper集群来做任务调度的。�如果zookeeper集群出了问题,任务调度就挂街了。
这个时候其实可以直接基于quartz来做调度,这也是本文要说的事情。
QuartzTask
首先定义一个任务接口,我们希望一个任务实现下面这些方法:
public interface QuartzTask {
/**
* 任务执行的代码
*/
void executeTask();
/**
* 指定执行任务的机器ip
*
* @return String
*/
String getTargetIP();
/**
* crond表达式
*
* @return String
*/
String getCrond();
/**
* 任务唯一名称
*
* @return String
*/
String getJobName();
}
EmptyTask
为了解释清楚如何使用quartz来做单机任务调度,我们新建一个空白的任务EmptyTask并实现QuartzTask定义的方法:
@Service("emptyTask")
public class EmptyTask implements QuartzTask {
private static final Logger logger = LoggerFactory.getLogger(EmptyTask.class);
public void executeTask() {
logger.warn("emptyTask running");
}
@Override
public String getTargetIP() {
return "127.0.0.1";
}
@Override
public String getCrond() {
return "0 0/1 * * * ?";
}
@Override
public String getJobName() {
return "emptyTask";
}
}
targetIP即可以写死成一个字符串,也可以从配置服务、第三方接口,数据库等数据源动态的获取。
crond表达式可以在 http://cron.qqe2.com/ 动态生成。
BeanPostProcessor
BeanPostProcessor是Spring中一个很实用的接口。BeanPostProcessor提供了两个方法:
public interface BeanPostProcessor {
Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException;
Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException;
}
postProcessBeforeInitialization:Bean 实例化之前进行的处理
postProcessAfterInitialization: Bean 实例化之后进行的处理
使用BeanPostProcessor也很简单,只要实现这个接口,并重载这两个方法就可以了。
SchedulerManager
利用BeanPostProcessor提供的特性,我们可以在所有bean初始化完成之后做一些事情。于是就有了SchedulerManager类:
public class SchedulerManager extends SchedulerFactoryBean implements BeanPostProcessor, InitializingBean {
private Scheduler scheduler;
public void afterPropertiesSet() throws Exception {
super.afterPropertiesSet();
this.scheduler = this.getScheduler();
}
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
return bean;
}
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
try {
if (QuartzTask.class.isInstance(bean)) {
QuartzTask quartzTask = QuartzTask.class.cast(bean);
CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(quartzTask.getCrond());
Trigger newTrigger = TriggerBuilder.newTrigger()
.withIdentity(quartzTask.getJobName(), "DEFAULT").withSchedule(scheduleBuilder).build();
//需要一个中间job类来处理,中间类会去实际执行任务
JobDetail jobDetail = JobBuilder.newJob(ScheduleTaskExecutor.class)
.withIdentity(quartzTask.getJobName(), "DEFAULT").build();
jobDetail.getJobDataMap().put("taskContext", quartzTask);
scheduler.scheduleJob(jobDetail, newTrigger);
}
} catch (Exception e) {
logger.error("scheduleManager init error.", e);
}
return bean;
}
}
ScheduleTaskExecutor
由于JobDetail自己管理了任务执行类的生命周期,所以只能使用一个中间任务类ScheduleTaskExecutor,中转到要实际执行的任务。
DisallowConcurrentExecution
注解能使得ScheduleTaskExecutor的同一个实例,不会并发执行execute方法。也就是说同一个JobDetail不会并发执行。但是如果是不同的JobDetail,是可以并发执行的。
@DisallowConcurrentExecution
public class ScheduleTaskExecutor implements Job {
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
QuartzTask quartzTask = (QuartzTask)context.getMergedJobDataMap().get("taskContext");
if (!HttpUtil.getServerIp().equals(quartzTask.getTargetIP())) {
return;
}
quartzTask.executeTask();
}
}
存在的问题
上面的实现方式存在一个待优化的问题:线上应用往往是多机器分布式的,虽然指定了某个IP执行任务,但是其实每台机器都开启了一个定时任务去执行中间任务类的的方法,只不过在IP判断的时候提前结束了任务。
更好的实现方式,是监控targetIP的变化,然后暂停or开启任务。