Schedule 调度系统设计(单机版)

  • 鉴于对Spring实现的@Scheduled的调度和SchedulerFactoryBean的研究发现,基于Spring的调度封装虽满足了大多需求,但其为了简化使用方式,过度封装使得Job并不容易控制和运维,导致开发对Job的控制和运维成本上升;下面是本人基于Quartz和Spring及Annotation开发的单机版调度配置DEMO,满足单机调度的大部分需求和管理、运维操作并解放对配置文件的繁琐操作;
    下面对Spring的@Scheduled注解和SchedulerFactoryBean配置及自定义@SchedulerJob做对比;

源码地址


功能点描述

功能点 Spring @Scheduled 自定义@SchedulerJob
可控制
可运维
可页面化
可统一跟踪业务状态
可统一跟踪调度状态
支持cron表达式
支持类似ScheduledExecutorService的定时调度

代码演示

  • 未改造前的作业配置
    对于图示调度配置,再实现对Job的可控制和可运维的前提下保证代码的简洁和程序性能,需要细读Spring源码,将targetObject和targetMethod从代理中剥离出来,统一放到容器里做统一调度控制(很多时候基于最小改动、系统稳定性、资源利用的原则,不得不采取这种剥离方式);
 /**
  * @author baiyunpeng
  * 抽取Job控制信息
  */
 MethodInvokingJobDetailFactoryBean methodInvoker = (MethodInvokingJobDetailFactoryBean) jobDetail.getJobDataMap().get("methodInvoker");
                String jobName = jobDetail.getKey().getName();
                String group = jobDetail.getKey().getGroup();
                String className = methodInvoker.getTargetClass().getName();
image
  • 基于注解进行作业配置
@Slf4j(topic = "dynamic-datasource")
@Component
public class DetectJob {
   /**
     * 作业配置 value=作业名,group=作业所属组,init=true为容器创建完毕时立即触发
     */
    @SchedulerJob(value = "detectDataSource",cron = "${cron.detect.data.source}",group = "dynamic-datasource",
            descrption="动态数据源切换",init = true)
    public void detectDataSource(){
        log.info(VariableUtils.join(SystemConstants.Symbol.DELIMITER,"dynamic-datasource","detectDataSource"));
    }
}
##cron表达式
cron.detect.data.source=1 * * * * ? 
  • 代码执行效果

image

页面演示

通过页面可对作业进行统一的监控和管理(触发、暂停、恢复、动态添加、参数下发)及报警等操作;

简要列出以下功能点:

  • 作业展示


    image
  • 作业运维报警


    image
  • 作业参数下发


    image
  • 作业事件跟踪
image

设计思路

  • 应当满足什么业务场景
  • 如何简化操作、降低开发成本
  • 如何对业务、系统功能进行监控、控制、运维
  • 如何设计才能便于后期业务和功能的扩展

功能设计

  • 设计思路
    • 如何获取方法上的注解及配置
    • 如何实现通过Quartz定时执行注解方法
    • 如何对每个方法上的注解进行统一的资源管理和监控、控制、运维
    • 如何对调度进行性能的优化
  • 功能点分析
    • 基本调度
      • 初始化立即调度
      • 人工或系统控制调度(任务创建后不执行调度,控制权交给外部)
      • 定时执行调度(及按照指定cron配置周期调度)
      • 是否可并发执行
    • 资源管理
      • 统一管理系统内全部的配置资源(作业所属组、描述、cron表达式、是否开启报警、是否开启监控等)
    • 调度管理
      • 调度状态管理(系统状态、业务状态)
      • 调度行为管理
      • 作业业务参数下发(弥补业务过失)
      • 调度跟踪、业务跟踪
      • 调度报警、业务报警
  • 基本功能点实现
    • 注解配置
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface SchedulerJob {
  /**
   * 作业名
   * @return
   */
  String value();

  /**
   * 表达式
   * @return
   */
  String cron();

  /**
   * 是否初始化时立即执行
   * @return
   */
  boolean init() default false;

  /**
   * 是否人为控制
   * @return
   */
  boolean control() default false;

  /**
   * 所属组
   * @return
   */
  String group() default "default";

  /**
   * 作业描述
   * @return
   */
  String descrption() default "";

  /**
   * 作业执行器
   * @return
   */
  Class jobClass() default SimpleJob.class;
}

@Target({ElementType.METHOD,ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface SchedulerJobs {
    /**
     * 注解集
     * @return
     */
    SchedulerJob[] value();
}
  • 调度创建
@Slf4j
@Configuration
public class SchedulerBean implements InitializingBean, DisposableBean {

    private Scheduler scheduler;

   @Value("#{schdulerProperties['quartz.thread.count']}")
    private String threadCount;

    @Override
    public void destroy() throws Exception {
        scheduler.shutdown();
    }
    @Override
    public void afterPropertiesSet() throws Exception {
        createScheduler();
    }

    /**
     * 创建调度
     * @throws SchedulerException
     */
    public void createScheduler() throws SchedulerException {
        StdSchedulerFactory factory = new StdSchedulerFactory();
        factory.initialize(getBaseQuartzProperties());
        this.scheduler = factory.getScheduler();
    }

    /**
     * 作业配置
     * @return
     */
    private Properties getBaseQuartzProperties() {
        Properties result = new Properties();
        result.put("org.quartz.threadPool.class", org.quartz.simpl.SimpleThreadPool.class.getName());
        result.put("org.quartz.threadPool.threadCount", threadCount);
        result.put("org.quartz.scheduler.threadName", "baiyunpeng-scheduler");
        result.put("org.quartz.scheduler.instanceName", "baiyunpeng-scheduler");
        result.put("org.quartz.jobStore.misfireThreshold", "1");
        return result;
    }

    /**
     * 创建作业
     * @param jobParam
     * @throws SchedulerException
     */
    public void createJob(JobParam jobParam) throws SchedulerException {
        SchedulerJob schedulerJob = jobParam.getSchedulerJob();
        JobDetail jobDetail = JobBuilder.newJob(schedulerJob.jobClass())
                .withIdentity(jobParam.getJobKey())
                .withDescription(jobParam.getJobKey().getName())
                .build();
        addJobDataMap(jobDetail,jobParam.getTarget(),jobParam.getTargetMethod());
        this.scheduler.scheduleJob(jobDetail,createTrigger(jobParam.getJobKey(),jobParam.getCron()));
    }

    /**
     * 创建触发器
     * @param jobKey
     * @param cron
     * @return
     */
    private Trigger createTrigger(JobKey jobKey, String cron) {
        return TriggerBuilder.newTrigger().withIdentity(jobKey.getName(),jobKey.getGroup())
                .withSchedule(CronScheduleBuilder.cronSchedule(cron)
                .withMisfireHandlingInstructionDoNothing()).build();
    }

    /**
     * 添加作业map
     * @param jobDetail
     * @param target
     * @param targetMethod
     */
    private void addJobDataMap(JobDetail jobDetail, Object target, Method targetMethod) {
        JobDataMap jobDataMap = jobDetail.getJobDataMap();
        jobDataMap.put("executeJob",target);
        jobDataMap.put("executeMethod",targetMethod);
    }

    public Scheduler getScheduler() {
        return scheduler;
    }

    public void start() throws SchedulerException {
        this.scheduler.start();
    }
}
  • 简单的作业执行器创建
/**
 * 作业抽象类
 * @author baiyunpeng
 */
public abstract class ExecuteJob implements Job {

    protected Object executeJob;

    protected Method executeMethod;

    protected void setExecuteJob(Object executeJob) {
        this.executeJob = executeJob;
    }

    protected void setExecuteMethod(Method executeMethod) {
        this.executeMethod = executeMethod;
    }
}  

/**
 * 非并发执行
 * @author baiyunpeng
 */
@Slf4j
@PersistJobDataAfterExecution
@DisallowConcurrentExecution
public class SimpleJob extends ExecuteJob {
    @Override
    public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
        try {
            executeMethod.invoke(executeJob);
        } catch (IllegalAccessException | InvocationTargetException e) {
            log.error(VariableUtils.join(SystemConstants.Symbol.DELIMITER,this.getClass().getName(), ExceptionUtils.getRootCauseMessage(e)));
        }
    }
}

/**
 * 可并发执行
 * @author baiyunpeng
 */
@Slf4j
public class ConcurrentJob extends ExecuteJob{
    @Override
    public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
        try {
            executeMethod.invoke(executeJob);
        } catch (IllegalAccessException | InvocationTargetException e) {
            log.error(VariableUtils.join(SystemConstants.Symbol.DELIMITER,this.getClass().getName(), ExceptionUtils.getRootCauseMessage(e)));
        }
    }
}
  • 作业创建
     /**
       * 作业配置解析
       * @param scheduled
       * @param method
       * @param bean
      */
    protected void processScheduled(SchedulerJob scheduled, Method method, Object bean) {
            Method invocableMethod = AopUtils.selectInvocableMethod(method, bean.getClass());
            String cron = scheduled.cron();
            if(StringUtils.hasText(cron)){
                if(Objects.nonNull(this.embeddedValueResolver)){
                    cron = this.embeddedValueResolver.resolveStringValue(cron);
                }
                jobParams.add(new JobParam(scheduled,bean,invocableMethod,new JobKey(scheduled.value(),scheduled.group()),cron));
            }
        }
    
    /**
     * 作业初始化
     */
    private void finishRegister() {
        if(Objects.isNull(this.schedulerBean)){
            SchedulerBean schedulerBean = beanFactory.getBean(SCHEDULER_BEAN, SchedulerBean.class);
            AssertUtil.assertNull(schedulerBean, SystemErrorCode.NS000000,"the scheduler bean init error");
            this.schedulerBean = schedulerBean;
            try {
                jobParams.parallelStream().forEach(jobParam -> {
                    try {
                        this.schedulerBean.createJob(jobParam);
                        SchedulerJob schedulerJob = jobParam.getSchedulerJob();
                        if(!schedulerJob.control()){
                            if (schedulerJob.init()){
                                this.schedulerBean.getScheduler().triggerJob(jobParam.getJobKey());
                            }
                        }else {
                            this.schedulerBean.getScheduler().pauseJob(jobParam.getJobKey());
                        }
                    } catch (SchedulerException e) {
                        log.error(VariableUtils.join(SystemConstants.Symbol.DELIMITER,"the scheduler job init error", ExceptionUtils.getRootCauseMessage(e)));
                       System.exit(1);
                    }
                });
                schedulerBean.start();
            }catch (Exception e){
                log.error(VariableUtils.join(SystemConstants.Symbol.DELIMITER,"the scheduler job init error", ExceptionUtils.getRootCauseMessage(e)));
                System.exit(1);
            }
        }
    }
    

总结

  • 如何异步执行方法,首先得获取该方法的实例
  • 如何定时执行,首先创建并获取定时器
  • 如何基于Quarzt监控作业执行,需获Schedule和Jobkey等
  • 如何对作业调度状态做监控,应当抽象出统一的JobWrapper来实现对调度的记录
  • 如何对作业的业务状态做监控,定义接口返回的Result,对返回值和状态进行封装;并借鉴上条思路做统一健康;
  • 如何采集所有作业信息,在上诉功能点描述中的作业创建过程中对作业进行内存存储或持久存储;
  • 如何将调度行为可控,获取JobKey即可对调度作业控制;
  • 如何动态为作业添加报警,同样基于内存或DB实现对作业的报警的开启和关闭;
  • 如何进行调度或业务的状态上报,建议使用事件机制做异步上报(分布式下有采用Http、MQ等);
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,214评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,307评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,543评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,221评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,224评论 5 371
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,007评论 1 284
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,313评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,956评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,441评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,925评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,018评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,685评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,234评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,240评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,464评论 1 261
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,467评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,762评论 2 345

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,599评论 18 139
  • Spring Boot 参考指南 介绍 转载自:https://www.gitbook.com/book/qbgb...
    毛宇鹏阅读 46,748评论 6 342
  • 专业考题类型管理运行工作负责人一般作业考题内容选项A选项B选项C选项D选项E选项F正确答案 变电单选GYSZ本规程...
    小白兔去钓鱼阅读 8,975评论 0 13
  • ①问题: 如何从一个大项目中,迅速的定位执行速度慢的语句( show status; 查询MySQL已经启动的时间...
    笑Skr人啊阅读 283评论 0 2