Spring Boot集成quartz实现定时任务并支持切换任务数据源

org.quartz实现定时任务并自定义切换任务数据源

在工作中经常会需要使用到定时任务处理各种周期性的任务,org.quartz是处理此类定时任务的一个优秀框架。随着项目一点点推进,此时我们并不满足于任务仅仅是定时执行,我们还想要对任务进行更多的控制,随时能对任务进行人为干预,就需要对quartz有更深入的了解。而随着微服务的流行,项目中多数据源的情况也越来越常见,在定时任务中集成多数据源切换的功能也需要集成进来。

集成quartz实现定时任务

集成quartz实现定时任务

quartz中实现定时任务需要了解的基本概念

Job

通过实现Job类,在实现方法中写我们具体想要定时任务完成的工作,然后交给quartz管理。

JobDetail

Job只负责实现具体任务,所以还需要借助JobDetail来存储一些描述Job的基本信息。

Quartz JobBuilder

为构造JobDetail实体提供的builder-style API。你可以这样使用它来构建一个JobDetail

@Bean
public JobDetail jobDetail() {
 return JobBuilder.newJob().ofType(SampleJob.class)
 .storeDurably()
 .withIdentity("Qrtz_Job_Detail")
 .withDescription("Invoke Sample Job service...")
 .build();
}

Spring JobDetailFactoryBean

Spring中配置JobDetail的方式:

@Bean
public JobDetailFactoryBean jobDetail() {
 JobDetailFactoryBean jobDetailFactory = new JobDetailFactoryBean();
 jobDetailFactory.setJobClass(SampleJob.class);
 jobDetailFactory.setDescription("Invoke Sample Job service...");
 jobDetailFactory.setDurability(true);
 return jobDetailFactory;
}

Trigger

触发器,代表一个调度参数的配置,什么时候去调度:

@Bean
public Trigger trigger(JobDetail job) {
 return TriggerBuilder.newTrigger().forJob(job)
 .withIdentity("Qrtz_Trigger")
 .withDescription("Sample trigger")
 .withSchedule(simpleSchedule().repeatForever().withIntervalInHours(1))
 .build();
}

Scheduler

调度器,通过JobTrigger来注册一个调度器:

@Bean
public Scheduler scheduler(Trigger trigger, JobDetail job) {
 StdSchedulerFactory factory = new StdSchedulerFactory();
 factory.initialize(new ClassPathResource("quartz.properties").getInputStream());

 Scheduler scheduler = factory.getScheduler();
 scheduler.setJobFactory(springBeanJobFactory());
 scheduler.scheduleJob(job, trigger);

 scheduler.start();
 return scheduler;
}

给系统添加一个Job

quartzJob就是我们需要去执行的任务,由Scheduler调度器负责调度任务们依靠制定好的Trigger来定时执行任务。

因此首先我们需要结合以上基础给系统添加一个Job。

addJob

    public void addJob(BaseJob job) throws SchedulerException {
        /** 创建JobDetail实例,绑定Job实现类
        * JobDetail 表示一个具体的可执行的调度程序,job是这个可执行调度程序所要执行的内容
        * 另外JobDetail还包含了这个任务调度的方案和策略**/
        // 指明job的名称,所在组的名称,以及绑定job类
        JobDetail jobDetail = JobBuilder.newJob(job.getBeanClass())
                .withIdentity(job.getJobKey())
                .withDescription(job.getDescription())
                .usingJobData(job.getDataMap())
                .build();

        /**
         * Trigger代表一个调度参数的配置,什么时候去调度
         */
        //定义调度触发规则, 使用cronTrigger规则
        Trigger trigger = TriggerBuilder.newTrigger()
                .withIdentity(job.getJobName(),job.getJobGroup())
                .withSchedule(CronScheduleBuilder.cronSchedule(job.getCronExpression()))
                .startNow()
                .build();
        //将任务和触发器注册到任务调度中去
        scheduler.scheduleJob(jobDetail,trigger);
        //判断调度器是否启动
        if(!scheduler.isStarted()){
            scheduler.start();
        }
        log.info(String.format("定时任务:%s.%s-已添加到调度器!", job.getJobGroup(),job.getJobName()));
    }

首先需要定义好我们的Job,之后通过Job初始化JobDetailTrigger,最后将JobDetailTrigger注册到调度器中。

BaseJob

Job的结构如下:

public abstract class BaseJob implements Job,Serializable {
    private static final long serialVersionUID = 1L;
    private static final String JOB_MAP_KEY = "self";
    /**
     * 任务名称
     */
    private String jobName;
    /**
     * 任务分组
     */
    private String jobGroup;
    /**
     * 任务状态 是否启动任务
     */
    private String jobStatus;
    /**
     * cron表达式
     */
    private String cronExpression;
    /**
     * 描述
     */
    private String description;
    /**
     * 任务执行时调用哪个类的方法 包名+类名
     */
    private Class beanClass = this.getClass();
    /**
     * 任务是否有状态
     */
    private String isConcurrent;

    /**
     * Spring bean
     */
    private String springBean;

    /**
     * 任务调用的方法名
     */
    private String methodName;

     /**
     * 该任务所使用的数据源
     */
    private String dataSource = DataSourceEnum.DB1.getName();

    /**
     * 为了将执行后的任务持久化到数据库中
     */
    @JsonIgnore
    private JobDataMap dataMap = new JobDataMap();

    public JobKey getJobKey(){
        return JobKey.jobKey(jobName, jobGroup);// 任务名称和组构成任务key
    }
    ...
}

可以看到Job中定义了任务的一些基本信息,重点关注其中的dataSourcedataMap属性。其中dataSource是任务所使用的数据源,并给了一个默认值;由于任务在添加后会持久化到数据库中,之后解析任务就会用到dataMap

SchedulerConfig

在添加Job的时候,JobDetailTrigger都是通过关键字new生成的,而调度器Scheduler则需要放在容器中维护。

@Configuration
@Order
public class SchedulerConfig {
    @Autowired
    private MyJobFactory myJobFactory;

    @Value("${spring.profiles.active}")
    private String profile;

    /*
     * 通过SchedulerFactoryBean获取Scheduler的实例
     */
    @Bean(name = "scheduler")
    public Scheduler scheduler() throws Exception {
        return schedulerFactoryBean().getScheduler();
    }
    
    @Bean
    public SchedulerFactoryBean schedulerFactoryBean() throws IOException {
        SchedulerFactoryBean factory = new SchedulerFactoryBean();

        factory.setOverwriteExistingJobs(true);

        // 延时启动
        factory.setStartupDelay(20);

        // 加载quartz数据源配置
        factory.setQuartzProperties(quartzProperties());

        // 自定义Job Factory,用于Spring注入
        factory.setJobFactory(myJobFactory);
        /*********全局监听器配置************/
        JobListener myJobListener = new SchedulerListener();
        factory.setGlobalJobListeners(myJobListener);//直接添加为全局监听器
        return factory;
    }

    @Bean
    public Properties quartzProperties() throws IOException {
        PropertiesFactoryBean propertiesFactoryBean = new PropertiesFactoryBean();
        if (Util.PRODUCT.equals(profile)) {//正式环境
            System.out.println("正式环境quartz配置");
            propertiesFactoryBean.setLocation(new ClassPathResource("/quartz-prod.properties"));
        } else {
            System.out.println("测试环境quartz配置");
            propertiesFactoryBean.setLocation(new ClassPathResource("/quartz.properties"));
        }
        //在quartz.properties中的属性被读取并注入后再初始化对象
        propertiesFactoryBean.afterPropertiesSet();
        return propertiesFactoryBean.getObject();
    }

    /*
     * quartz初始化监听器
     */
    @Bean
    public QuartzInitializerListener executorListener() {
        return new QuartzInitializerListener();
    }
}

上述代码中,将scheduler加入到Spring容器中。scheduler是由SchedulerFactoryBean进行维护的,在SchedulerFactoryBean中对调度器工厂做了一些基本设置并从配置文件中加载了quartz数据源配置(配置文件的读取会根据运行环境profile来进行自动切换),配置了一个全局监听器用以监听任务的执行过程。

MyJobFactory

使用Spring提供的JobFactory

@Component
public class MyJobFactory extends AdaptableJobFactory {

    @Autowired
    private AutowireCapableBeanFactory capableBeanFactory;

    @Override
    protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
        // 调用父类的方法
        Object jobInstance = super.createJobInstance(bundle);
        // 进行注入
        capableBeanFactory.autowireBean(jobInstance);
        return jobInstance;
    }
}

quartz.properties

quartz.properties中是quartz连接数据库的一些配置信息。

# \u56FA\u5B9A\u524D\u7F00org.quartz
# \u4E3B\u8981\u5206\u4E3Ascheduler\u3001threadPool\u3001jobStore\u3001plugin\u7B49\u90E8\u5206
#
#
org.quartz.scheduler.instanceName = DefaultQuartzScheduler
org.quartz.scheduler.rmi.export = false
org.quartz.scheduler.rmi.proxy = false
org.quartz.scheduler.wrapJobExecutionInUserTransaction = false

# \u5B9E\u4F8B\u5316ThreadPool\u65F6\uFF0C\u4F7F\u7528\u7684\u7EBF\u7A0B\u7C7B\u4E3ASimpleThreadPool
org.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool

# threadCount\u548CthreadPriority\u5C06\u4EE5setter\u7684\u5F62\u5F0F\u6CE8\u5165ThreadPool\u5B9E\u4F8B
# \u5E76\u53D1\u4E2A\u6570
org.quartz.threadPool.threadCount = 5
# \u4F18\u5148\u7EA7
org.quartz.threadPool.threadPriority = 5
org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread = true

org.quartz.jobStore.misfireThreshold = 5000

# \u9ED8\u8BA4\u5B58\u50A8\u5728\u5185\u5B58\u4E2D
#org.quartz.jobStore.class = org.quartz.simpl.RAMJobStore

#\u6301\u4E45\u5316
org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX

#org.quartz.jobStore.useProperties=false

org.quartz.jobStore.tablePrefix = QRTZ_

org.quartz.jobStore.dataSource = qzDS

org.quartz.dataSource.qzDS.driver = com.mysql.jdbc.Driver
org.quartz.dataSource.qzDS.URL=jdbc:mysql://127.0.0.1:3306/quartz?characterEncoding=UTF-8&useSSL=false&testOnBorrow=true&testWhileIdle=true
org.quartz.dataSource.qzDS.user=quartz
org.quartz.dataSource.qzDS.password=123456

org.quartz.dataSource.qzDS.maxConnections = 30

org.quartz.dataSource.qzDS.validationQuery = SELECT 1 FROM DUAL

org.quartz.dataSource.qzDS.validateOnCheckout = true
org.quartz.dataSource.qzDS.idleConnectionValidationSeconds = 40


#org.quartz.dataSource.qzDS.discardIdleConnectionsSeconds = 60

quartz会根据这个配置文件将Job持久化到数据库中,也因此quartz会需要初始化一些数据库表,表结构文件在文末。

SchedulerListener

调度器监听器用以监听任务的执行状态。

public class SchedulerListener implements JobListener {

    private final Logger LOG = LoggerFactory.getLogger(SchedulerListener.class);

    public static final String LISTENER_NAME = "QuartSchedulerListener";

    @Override
    public String getName() {
        return LISTENER_NAME; //must return a name
    }

    //任务被调度前
    @Override
    public void jobToBeExecuted(JobExecutionContext context) {
        String dataSource = context.getJobDetail().getJobDataMap().getString("dataSource");
        // 切换任务的数据源
        DataSourceContextHolder.setDB(dataSource);
        String jobName = context.getJobDetail().getKey().toString();
        LOG.info("Job {} is going to start,switch dataSource to {},Thread name {}", jobName, dataSource, Thread.currentThread().getName());
    }

    //任务调度被拒了
    @Override
    public void jobExecutionVetoed(JobExecutionContext context) {
        String jobName = context.getJobDetail().getKey().toString();
        LOG.error("job {} is jobExecutionVetoed", jobName);
        //可以做一些日志记录原因

    }

    //任务被调度后
    @Override
    public void jobWasExecuted(JobExecutionContext context,
                               JobExecutionException jobException) {
        // 清空存储的数据源
        String jobName = context.getJobDetail().getKey().toString();
        DataSourceContextHolder.clearDB();
        LOG.info("Job : {} is finished", jobName);
        if (jobException != null && !jobException.getMessage().equals("")) {
            LOG.error("Exception thrown by: " + jobName
                    + " Exception: " + jobException.getMessage());
        }

    }
}

SchedulerListener监听任务被调度前、调度后和调度被拒绝时的状态,在任务被调度之前和之后对任务所使用的数据源进行了处理。如果项目中不需要数据源切换的话,这个监听器是不需要的,到此已经完成了quartz的集成。

多数据源切换

多数据源切换

通过自定义DynamicDataSource来覆盖Spring Boot中原有的数据源。

DataSourceConfig

通过读取配置文件中不同的数据源,初始化项目中可能用到的数据源用以切换。

/**
 * 多数据源配置类
 */
@Configuration
public class DataSourceConfig {
    //数据源1
    @Bean(name = "datasource1")
    @ConfigurationProperties(prefix = "spring.datasource.db1") // application.properteis中对应属性的前缀
    public DataSource dataSource1() {
        return DataSourceBuilder.create().build();
    }

    //数据源2
    @Bean(name = "datasource2")
    @ConfigurationProperties(prefix = "spring.datasource.db2") // application.properteis中对应属性的前缀
    public DataSource dataSource2() {
        return DataSourceBuilder.create().build();
    }

    /**
     * 动态数据源: 通过AOP在不同数据源之间动态切换
     *
     * @return
     */
    @Primary
    @Bean(name = "dynamicDataSource")
    public DataSource dynamicDataSource() {
        DynamicDataSource dynamicDataSource = new DynamicDataSource();
        // 默认数据源
        dynamicDataSource.setDefaultTargetDataSource(dataSource1());
        // 配置多数据源
        Map<Object, Object> dsMap = new HashMap();
        dsMap.put(DataSourceEnum.DB1.getName(), dataSource1());
        dsMap.put(DataSourceEnum.DB2.getName(), dataSource2());

        dynamicDataSource.setTargetDataSources(dsMap);
        return dynamicDataSource;
    }

    @Bean
    public SqlSessionFactory sqlSessionFactory(DataSource dataSource) throws Exception {
        SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
        //设置数据源
        sqlSessionFactoryBean.setDataSource(dataSource);
        return sqlSessionFactoryBean.getObject();
    }

    /**
     * 配置@Transactional注解事物
     *
     * @return
     */
    @Bean
    public PlatformTransactionManager transactionManager() {
        return new DataSourceTransactionManager(dynamicDataSource());
    }
}

数据源配置

spring:
  datasource:
    db1:
      driver-class-name: com.mysql.cj.jdbc.Driver
      username: doctor
      password: 123456
      type: com.zaxxer.hikari.HikariDataSource
      jdbc-url: jdbc:mysql://127.0.0.1:3306/doctor?useSSL=false&testOnBorrow=true&testWhileIdle=true
    db2:
      driver-class-name: com.mysql.cj.jdbc.Driver
      username: quartz
      password: 123456
      type: com.zaxxer.hikari.HikariDataSource
      jdbc-url: jdbc:mysql://127.0.0.1:3307/quartz?useSSL=false&testOnBorrow=true&testWhileIdle=true

DataSourceContextHolder

由于quartz在执行过程中是通过不同的线程来执行Job的,因此此处通过ThreadLocal来保存线程所使用的数据源情况。

/**
 * 保存本地数据源
 */
public class DataSourceContextHolder {
    private static final Logger LOG = LoggerFactory.getLogger(DataSourceContextHolder.class);
    /**
     * 默认数据源
     */
    public static final String DEFAULT_DS = DataSourceEnum.DB1.getName();
    /**
     * ThreadLocal之后会进行讲解
     */
    private static final ThreadLocal<String> contextHolder = new ThreadLocal<>();

    // 设置数据源名
    public static void setDB(String dbType) {
        LOG.info("切换到{}数据源", dbType);
        contextHolder.set(dbType);
    }

    // 获取数据源名
    public static String getDB() {
        return (contextHolder.get());
    }

    // 清除数据源名
    public static void clearDB() {
        contextHolder.remove();
    }
}

DynamicDataSource

获取执行中所使用的数据源。由于数据源被保存在了DataSourceContextHolder中的ThreadLocal中,所以直接获取就行了。

/**
 * 获取本地数据源
 */
public class DynamicDataSource extends AbstractRoutingDataSource {
    private static final Logger LOG = LoggerFactory.getLogger(DynamicDataSource.class);

    @Override
    protected Object determineCurrentLookupKey() {
        LOG.info("数据源为{}", DataSourceContextHolder.getDB());
        return DataSourceContextHolder.getDB();
    }
}

至此就完成了集成quartz及数据源切换的功能。然后就是具体的任务了。

执行任务

具体的任务需要继承BaseJob并在execute方法中重写具体需要执行的任务。

execute

@Slf4j
@Service
public class ReadNumJob extends BaseJob {

    @Autowired
    private RedisService redisService;

    @Autowired
    private JdbcTemplate jdbcTemplate;

    private final Logger LOG = LoggerFactory.getLogger(ReadNumJob.class);

    @Override
    public void execute(JobExecutionContext context) {
       doSomething();
    }
}

指定数据源

然后在添加任务时指定任务所使用的数据源

ReadNumJob job = new ReadNumJob();
job.setJobName("test");
job.setJobGroup("hys");
job.setDescription("test");
// 指定数据源
job.getDataMap().put("dataSource", DataSourceEnum.DB1.getName());
job.setCronExpression(
"0 */1 * * * ?"
);
try {
jobAndTriggerService.addJob(job);
} catch (SchedulerException e) {
e.printStackTrace();
}

源码

转评赞就是最大的鼓励

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,324评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,303评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,192评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,555评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,569评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,566评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,927评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,583评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,827评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,590评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,669评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,365评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,941评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,928评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,159评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,880评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,399评论 2 342

推荐阅读更多精彩内容