- 鉴于对Spring实现的@Scheduled的调度和SchedulerFactoryBean的研究发现,基于Spring的调度封装虽满足了大多需求,但其为了简化使用方式,过度封装使得Job并不容易控制和运维,导致开发对Job的控制和运维成本上升;下面是本人基于Quartz和Spring及Annotation开发的单机版调度配置DEMO,满足单机调度的大部分需求和管理、运维操作并解放对配置文件的繁琐操作;
下面对Spring的@Scheduled注解和SchedulerFactoryBean配置及自定义@SchedulerJob做对比;
源码地址
功能点描述
功能点 | Spring @Scheduled | 自定义@SchedulerJob |
---|---|---|
可控制 | 否 | 是 |
可运维 | 否 | 是 |
可页面化 | 否 | 是 |
可统一跟踪业务状态 | 否 | 是 |
可统一跟踪调度状态 | 否 | 是 |
支持cron表达式 | 是 | 是 |
支持类似ScheduledExecutorService的定时调度 | 是 | 否 |
代码演示
- 未改造前的作业配置
对于图示调度配置,再实现对Job的可控制和可运维的前提下保证代码的简洁和程序性能,需要细读Spring源码,将targetObject和targetMethod从代理中剥离出来,统一放到容器里做统一调度控制(很多时候基于最小改动、系统稳定性、资源利用的原则,不得不采取这种剥离方式);
/**
* @author baiyunpeng
* 抽取Job控制信息
*/
MethodInvokingJobDetailFactoryBean methodInvoker = (MethodInvokingJobDetailFactoryBean) jobDetail.getJobDataMap().get("methodInvoker");
String jobName = jobDetail.getKey().getName();
String group = jobDetail.getKey().getGroup();
String className = methodInvoker.getTargetClass().getName();
- 基于注解进行作业配置
@Slf4j(topic = "dynamic-datasource")
@Component
public class DetectJob {
/**
* 作业配置 value=作业名,group=作业所属组,init=true为容器创建完毕时立即触发
*/
@SchedulerJob(value = "detectDataSource",cron = "${cron.detect.data.source}",group = "dynamic-datasource",
descrption="动态数据源切换",init = true)
public void detectDataSource(){
log.info(VariableUtils.join(SystemConstants.Symbol.DELIMITER,"dynamic-datasource","detectDataSource"));
}
}
##cron表达式
cron.detect.data.source=1 * * * * ?
- 代码执行效果
页面演示
通过页面可对作业进行统一的监控和管理(触发、暂停、恢复、动态添加、参数下发)及报警等操作;
简要列出以下功能点:
-
作业展示
-
作业运维报警
-
作业参数下发
- 作业事件跟踪
设计思路
- 应当满足什么业务场景
- 如何简化操作、降低开发成本
- 如何对业务、系统功能进行监控、控制、运维
- 如何设计才能便于后期业务和功能的扩展
功能设计
- 设计思路
- 如何获取方法上的注解及配置
- 如何实现通过Quartz定时执行注解方法
- 如何对每个方法上的注解进行统一的资源管理和监控、控制、运维
- 如何对调度进行性能的优化
- 功能点分析
- 基本调度
- 初始化立即调度
- 人工或系统控制调度(任务创建后不执行调度,控制权交给外部)
- 定时执行调度(及按照指定cron配置周期调度)
- 是否可并发执行
- 资源管理
- 统一管理系统内全部的配置资源(作业所属组、描述、cron表达式、是否开启报警、是否开启监控等)
- 调度管理
- 调度状态管理(系统状态、业务状态)
- 调度行为管理
- 作业业务参数下发(弥补业务过失)
- 调度跟踪、业务跟踪
- 调度报警、业务报警
- 基本调度
- 基本功能点实现
- 注解配置
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface SchedulerJob {
/**
* 作业名
* @return
*/
String value();
/**
* 表达式
* @return
*/
String cron();
/**
* 是否初始化时立即执行
* @return
*/
boolean init() default false;
/**
* 是否人为控制
* @return
*/
boolean control() default false;
/**
* 所属组
* @return
*/
String group() default "default";
/**
* 作业描述
* @return
*/
String descrption() default "";
/**
* 作业执行器
* @return
*/
Class jobClass() default SimpleJob.class;
}
@Target({ElementType.METHOD,ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface SchedulerJobs {
/**
* 注解集
* @return
*/
SchedulerJob[] value();
}
- 调度创建
@Slf4j
@Configuration
public class SchedulerBean implements InitializingBean, DisposableBean {
private Scheduler scheduler;
@Value("#{schdulerProperties['quartz.thread.count']}")
private String threadCount;
@Override
public void destroy() throws Exception {
scheduler.shutdown();
}
@Override
public void afterPropertiesSet() throws Exception {
createScheduler();
}
/**
* 创建调度
* @throws SchedulerException
*/
public void createScheduler() throws SchedulerException {
StdSchedulerFactory factory = new StdSchedulerFactory();
factory.initialize(getBaseQuartzProperties());
this.scheduler = factory.getScheduler();
}
/**
* 作业配置
* @return
*/
private Properties getBaseQuartzProperties() {
Properties result = new Properties();
result.put("org.quartz.threadPool.class", org.quartz.simpl.SimpleThreadPool.class.getName());
result.put("org.quartz.threadPool.threadCount", threadCount);
result.put("org.quartz.scheduler.threadName", "baiyunpeng-scheduler");
result.put("org.quartz.scheduler.instanceName", "baiyunpeng-scheduler");
result.put("org.quartz.jobStore.misfireThreshold", "1");
return result;
}
/**
* 创建作业
* @param jobParam
* @throws SchedulerException
*/
public void createJob(JobParam jobParam) throws SchedulerException {
SchedulerJob schedulerJob = jobParam.getSchedulerJob();
JobDetail jobDetail = JobBuilder.newJob(schedulerJob.jobClass())
.withIdentity(jobParam.getJobKey())
.withDescription(jobParam.getJobKey().getName())
.build();
addJobDataMap(jobDetail,jobParam.getTarget(),jobParam.getTargetMethod());
this.scheduler.scheduleJob(jobDetail,createTrigger(jobParam.getJobKey(),jobParam.getCron()));
}
/**
* 创建触发器
* @param jobKey
* @param cron
* @return
*/
private Trigger createTrigger(JobKey jobKey, String cron) {
return TriggerBuilder.newTrigger().withIdentity(jobKey.getName(),jobKey.getGroup())
.withSchedule(CronScheduleBuilder.cronSchedule(cron)
.withMisfireHandlingInstructionDoNothing()).build();
}
/**
* 添加作业map
* @param jobDetail
* @param target
* @param targetMethod
*/
private void addJobDataMap(JobDetail jobDetail, Object target, Method targetMethod) {
JobDataMap jobDataMap = jobDetail.getJobDataMap();
jobDataMap.put("executeJob",target);
jobDataMap.put("executeMethod",targetMethod);
}
public Scheduler getScheduler() {
return scheduler;
}
public void start() throws SchedulerException {
this.scheduler.start();
}
}
- 简单的作业执行器创建
/**
* 作业抽象类
* @author baiyunpeng
*/
public abstract class ExecuteJob implements Job {
protected Object executeJob;
protected Method executeMethod;
protected void setExecuteJob(Object executeJob) {
this.executeJob = executeJob;
}
protected void setExecuteMethod(Method executeMethod) {
this.executeMethod = executeMethod;
}
}
/**
* 非并发执行
* @author baiyunpeng
*/
@Slf4j
@PersistJobDataAfterExecution
@DisallowConcurrentExecution
public class SimpleJob extends ExecuteJob {
@Override
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
try {
executeMethod.invoke(executeJob);
} catch (IllegalAccessException | InvocationTargetException e) {
log.error(VariableUtils.join(SystemConstants.Symbol.DELIMITER,this.getClass().getName(), ExceptionUtils.getRootCauseMessage(e)));
}
}
}
/**
* 可并发执行
* @author baiyunpeng
*/
@Slf4j
public class ConcurrentJob extends ExecuteJob{
@Override
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
try {
executeMethod.invoke(executeJob);
} catch (IllegalAccessException | InvocationTargetException e) {
log.error(VariableUtils.join(SystemConstants.Symbol.DELIMITER,this.getClass().getName(), ExceptionUtils.getRootCauseMessage(e)));
}
}
}
- 作业创建
/**
* 作业配置解析
* @param scheduled
* @param method
* @param bean
*/
protected void processScheduled(SchedulerJob scheduled, Method method, Object bean) {
Method invocableMethod = AopUtils.selectInvocableMethod(method, bean.getClass());
String cron = scheduled.cron();
if(StringUtils.hasText(cron)){
if(Objects.nonNull(this.embeddedValueResolver)){
cron = this.embeddedValueResolver.resolveStringValue(cron);
}
jobParams.add(new JobParam(scheduled,bean,invocableMethod,new JobKey(scheduled.value(),scheduled.group()),cron));
}
}
/**
* 作业初始化
*/
private void finishRegister() {
if(Objects.isNull(this.schedulerBean)){
SchedulerBean schedulerBean = beanFactory.getBean(SCHEDULER_BEAN, SchedulerBean.class);
AssertUtil.assertNull(schedulerBean, SystemErrorCode.NS000000,"the scheduler bean init error");
this.schedulerBean = schedulerBean;
try {
jobParams.parallelStream().forEach(jobParam -> {
try {
this.schedulerBean.createJob(jobParam);
SchedulerJob schedulerJob = jobParam.getSchedulerJob();
if(!schedulerJob.control()){
if (schedulerJob.init()){
this.schedulerBean.getScheduler().triggerJob(jobParam.getJobKey());
}
}else {
this.schedulerBean.getScheduler().pauseJob(jobParam.getJobKey());
}
} catch (SchedulerException e) {
log.error(VariableUtils.join(SystemConstants.Symbol.DELIMITER,"the scheduler job init error", ExceptionUtils.getRootCauseMessage(e)));
System.exit(1);
}
});
schedulerBean.start();
}catch (Exception e){
log.error(VariableUtils.join(SystemConstants.Symbol.DELIMITER,"the scheduler job init error", ExceptionUtils.getRootCauseMessage(e)));
System.exit(1);
}
}
}
总结
- 如何异步执行方法,首先得获取该方法的实例
- 如何定时执行,首先创建并获取定时器
- 如何基于Quarzt监控作业执行,需获Schedule和Jobkey等
- 如何对作业调度状态做监控,应当抽象出统一的JobWrapper来实现对调度的记录
- 如何对作业的业务状态做监控,定义接口返回的Result,对返回值和状态进行封装;并借鉴上条思路做统一健康;
- 如何采集所有作业信息,在上诉功能点描述中的作业创建过程中对作业进行内存存储或持久存储;
- 如何将调度行为可控,获取JobKey即可对调度作业控制;
- 如何动态为作业添加报警,同样基于内存或DB实现对作业的报警的开启和关闭;
- 如何进行调度或业务的状态上报,建议使用事件机制做异步上报(分布式下有采用Http、MQ等);