目前常用的几种任务调度
- Timer,简单无门槛,一般也没人用。
- spring @Scheduled注解,一般集成于项目中,小任务很方便。
- 开源工具 Quartz,分布式集群开源工具,以下两个分布式任务应该都是基于Quartz实现的,可以说是中小型公司必选,当然也视自身需求而定。
- 分布式任务 XXL-JOB,是一个轻量级分布式任务调度框架,支持通过 Web 页面对任务进行 CRUD 操作,支持动态修改任务状态、暂停/恢复任务,以及终止运行中任务,支持在线配置调度任务入参和在线查看调度结果。
- 分布式任务 Elastic-Job,是一个分布式调度解决方案,由两个相互独立的子项目 Elastic-Job-Lite 和 Elastic-Job-Cloud 组成。定位为轻量级无中心化解决方案,使用 jar 包的形式提供分布式任务的协调服务。支持分布式调度协调、弹性扩容缩容、失效转移、错过执行作业重触发、并行调度、自诊。
- 分布式任务 Saturn,Saturn是唯品会在github开源的一款分布式任务调度产品。它是基于当当elastic-job来开发的,其上完善了一些功能和添加了一些新的feature。目前在github上开源大半年,470个star。Saturn的任务可以用多种语言开发比如python、Go、Shell、Java、Php。其在唯品会内部已经发部署350+个节点,每天任务调度4000多万次。同时,管理和统计也是它的亮点。
Quartz是一个广泛使用的开源任务调度框架,用于在Java应用程序中执行定时任务和周期性任务。它提供了强大的调度功能,允许您计划、管理和执行各种任务,从简单的任务到复杂的任务。
Quartz 核心元素:
Quartz任务调度的核心元素为:Scheduler——任务调度器、Trigger——触发器、Job——任务。其中trigger和job是任务调度的元数据,scheduler是实际执行调度的控制器。
Trigger是用于定义调度时间的元素,即按照什么时间规则去执行任务。Quartz中主要提供了四种类型的trigger:SimpleTrigger,CronTirgger,DateIntervalTrigger,和NthIncludedDayTrigger。这四种trigger可以满足企业应用中的绝大部分需求。
Job用于表示被调度的任务。主要有两种类型的job:无状态的(stateless)和有状态的(stateful)。对于同一个trigger来说,有状态的job不能被并行执行,只有上一次触发的任务被执行完之后,才能触发下一次执行。Job主要有两种属性:volatility和durability,其中volatility表示任务是否被持久化到数据库存储,而durability表示在没有trigger关联的时候任务是否被保留。两者都是在值为true的时候任务被持久化或保留。一个job可以被多个trigger关联,但是一个trigger只能关联一个job。
Scheduler由scheduler工厂创建
以下是Quartz的一些关键特点和功能:
- 灵活的调度器:Quartz提供了一个高度可配置的调度器,允许您根据不同的时间表执行任务,包括固定的时间、每日、每周、每月、每秒等。您可以设置任务执行的时间和频率。
- 多任务支持:Quartz支持同时管理和执行多个任务。您可以定义多个作业和触发器,并将它们添加到调度器中。
- 集群和分布式调度:Quartz支持集群模式,可以在多台机器上协调任务的执行。这使得Quartz非常适合大规模和分布式应用,以确保任务的高可用性和负载均衡。
- 持久化:Quartz可以将任务和调度信息持久化到数据库中,以便在应用程序重启时不会丢失任务信息。这对于可靠性和数据保持非常重要。
- 错过任务处理:Quartz可以配置在任务错过执行时如何处理,例如,是否立即执行、延迟执行或丢弃任务。
- 监听器:Quartz提供了各种监听器,可以用来监视任务的执行,以及在任务执行前后执行自定义操作。
-
多种作业类型:Quartz支持不同类型的作业,包括无状态作业(Stateless Job)和有状态作业(Stateful
Job)。这允许您选择最适合您需求的作业类型。 - 插件机制:Quartz具有灵活的插件机制,可以扩展其功能。您可以创建自定义插件,以满足特定需求。
- 丰富的API:Quartz提供了丰富的Java API,使任务调度的配置和管理非常方便。
背景
各个服务需要改造支持集群,现在的授权、日程使用的是基于内存的spring scheduler定时任务,如果部署多个节点,那么到了时间点,多个节点都会开始执行定时任务从而可能引起业务和性能上的问题。
服务中的定时任务比较轻量,为了避免引入redis、zookeeper、单独的定时任务程序,所以建议选用quartz这种基于数据库的分布式定时任务调度框架,无需引用多余中间件。
简单设计
原则上是尽量与quartz的耦合降至最低,针对我们的业务场景并不需要太多的调度操作(即图上的controller),只需要程序启动的时候初始化好指定的定时任务就行了,所以先这样搞,如果有更好的设计欢迎一起交流。
SpringBoot集成quartz
1.在官网下载quartz
-
下载之后解压,进入src\org\quartz\impl\jdbcjobstore找到22种数据库11张表的初始化sql文件,根据不同的数据库选择不同的文件,(达梦为Oracle系,需要使用tables_oracle.sql)
11张表的功能说明:
表名 | 功能 |
---|---|
qrtz_job_details | 存储每一个已配置的 Job 的详细信息 |
qrtz_triggers | 存储已配置的 Trigger 的信息 |
qrtz_simple_triggers | 存储简单的 Trigger,包括重复次数,间隔,以及已触的次数 |
qrtz_cron_triggers | 存储 Cron Trigger,包括 Cron 表达式和时区信息 |
qrtz_simprop_triggers | 存储简单的 Trigger,包括重复次数,间隔,以及已触的次数 |
qrtz_blob_triggers | Trigger 作为 Blob 类型存储(用于 Quartz 用户用 JDBC 创建他们自己定制的 Trigger 类型,JobStore 并不知道如何存储实例的时候) |
qrtz_calendars | 以 Blob 类型存储 Quartz 的 Calendar 信息 |
qrtz_paused_trigger_grps | 存储已暂停的 Trigger 组的信息 |
qrtz_fired_triggers | 存储与已触发的 Trigger 相关的状态信息,以及相联 Job 的执行信息 |
qrtz_scheduler_state | 存储少量的有关 Scheduler 的状态信息,和别的 Scheduler 实例(假如是用于一个集群中) |
qrtz_locks | 存储程序的悲观锁的信息(假如使用了悲观锁) |
2.引入依赖
<!--quartz依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-quartz</artifactId>
</dependency>
3.config配置
- application.properties
#===================================================================
# quartz基础配置
#===================================================================
# 存储方式,可选值:MEMORY(内存方式,不推荐)、JDBC(持久化存储,推荐)
spring.quartz.jobStoreType=JDBC
# 可选值:ALWAYS(每次都生成、注意只有druid数据库连接池才会自动生成表)、EMBEDDED(仅初始化嵌入式数据源)、NEVER(不初始化数据源)。
spring.quartz.jdbc.initializeSchema=ALWAYS
# quartz自动建表的库类型
spring.quartz.jdbc.platform=mysql_innodb
# quartz自动建表sql的指定
spring.quartz.jdbc.schema=classpath:org/quartz/impl/jdbcjobstore/tables_@@platform@@.sql
# 随着容器启动,启动定时任务(默认值ture)
spring.quartz.autoStartup=true
# 定时任务延时启动的时间 (默认值0s)
spring.quartz.startupDelay=5
# 是否可以覆盖定时任务,true 是 (默认值false)
spring.quartz.overwriteExistingJobs=true
# 在容器关闭时,任务执行后关闭容 (默认值false)
spring.quartz.waitForJobsToCompleteOnShutdown=true
- quartz.properties
#===================================================================
# 配置JobStore
#===================================================================
# 数据保存方式为数据库持久化
org.quartz.jobStore.class=org.springframework.scheduling.quartz.LocalDataSourceJobStore
# 数据库代理类,一般org.quartz.impl.jdbcjobstore.StdJDBCDelegate 可以满足大部分数据库,建议pg系使用 org.quartz.impl.jdbcjobstore.PostgreSQLDelegate,oracle系使用OracleDelegate
org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.StdJDBCDelegate
# 表的前缀,默认QRTZ_
org.quartz.jobStore.tablePrefix=QRTZ_
# 是否加入集群
org.quartz.jobStore.isClustered=true
# 信息保存时间 默认值60秒 单位:ms
org.quartz.jobStore.misfireThreshold=25000
# 调度实例失效的检查时间间隔 ms
org.quartz.jobStore.clusterCheckinInterval=5000
# JobDataMaps是否都为String类型,默认false
org.quartz.jobStore.useProperties=true
# 当设置为“true”时,此属性告诉Quartz 在非托管JDBC连接上调用setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED)。
org.quartz.jobStore.txIsolationLevelReadCommitted=true
#===================================================================
# Scheduler 调度器属性配置
#===================================================================
# 调度标识名 集群中每一个实例都必须使用相同的名称
org.quartz.scheduler.instanceName=DiServerClusterScheduler
# ID设置为自动获取 每一个必须不同
org.quartz.scheduler.instanceId=AUTO
# 是否开启守护线程
org.quartz.scheduler.makeSchedulerThreadDaemon=true
#===================================================================
# 配置ThreadPool
#===================================================================
# 线程池的实现类(一般使用SimpleThreadPool即可满足需求)
org.quartz.threadPool.class=org.quartz.simpl.SimpleThreadPool
# 指定在线程池里面创建的线程是否是守护线程
org.quartz.threadPool.makeThreadsDaemons=true
# 指定线程数,至少为1(无默认值),一般设置为1-100直接的整数,根据系统资源配置
org.quartz.threadPool.threadCount=10
# 设置线程的优先级(最大为java.lang.Thread.MAX_PRIORITY 10,最小为Thread.MIN_PRIORITY 1,默认为5)
org.quartz.threadPool.threadPriority=5
3.config配置
springboot集成quartz核心配置类
- 初始化quartz:QuartzSchedulerConfig .java
import org.quartz.Scheduler;
import org.quartz.spi.JobFactory;
import org.springframework.beans.factory.config.PropertiesFactoryBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import org.springframework.core.io.ClassPathResource;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
import javax.sql.DataSource;
import java.io.IOException;
import java.util.Properties;
@Configuration
public class QuartzSchedulerConfig {
/**
* JobFactory与schedulerFactoryBean中的JobFactory相互依赖,注意bean的名称
* 在这里为JobFactory注入了Spring上下文
*
* @param applicationContext
* @return
*/
@Bean
public JobFactory jobFactory(ApplicationContext applicationContext) {
QuartzJobFactory quartzJobFactory = new QuartzJobFactory();
quartzJobFactory.setApplicationContext(applicationContext);
return quartzJobFactory;
}
/**
* 从quartz.properties文件中读取Quartz配置属性
*
* @return
* @throws IOException
*/
@Bean
public Properties quartzProperties() throws IOException {
PropertiesFactoryBean propertiesFactoryBean = new PropertiesFactoryBean();
propertiesFactoryBean.setLocation(new ClassPathResource("/quartz/quartz.properties"));
propertiesFactoryBean.afterPropertiesSet();
return propertiesFactoryBean.getObject();
}
@Bean
@DependsOn(value = {"jobFactory"})
public SchedulerFactoryBean schedulerFactoryBean(JobFactory jobFactory, DataSource dataSource) throws IOException {
// 创建SchedulerFactoryBean
SchedulerFactoryBean factory = new SchedulerFactoryBean();
factory.setQuartzProperties(quartzProperties());
// 支持在JOB实例中注入其他的业务对象
factory.setJobFactory(jobFactory);
factory.setApplicationContextSchedulerContextKey("applicationContextKey");
// 这样当spring关闭时,会等待所有已经启动的quartz job结束后spring才能完全shutdown。
factory.setWaitForJobsToCompleteOnShutdown(true);
// 是否覆盖己存在的Job
factory.setOverwriteExistingJobs(true);
// QuartzScheduler 延时启动,应用启动完后 QuartzScheduler 再启动
factory.setStartupDelay(10);
// 注入spring维护的DataSource
factory.setDataSource(dataSource);
return factory;
}
/**
* 通过SchedulerFactoryBean获取Scheduler的实例
*
* @return
* @throws IOException
*/
@Bean(name = "scheduler")
public Scheduler scheduler(JobFactory jobFactory, DataSource dataSource) throws IOException {
return schedulerFactoryBean(jobFactory, dataSource).getScheduler();
}
}
- 注入SpringBean:QuartzJobFactory.java
import org.quartz.spi.TriggerFiredBundle;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.scheduling.quartz.SpringBeanJobFactory;
/**
* 为JobFactory注入SpringBean,否则Job无法使用Spring创建的bean
*/
public final class QuartzJobFactory extends SpringBeanJobFactory
implements ApplicationContextAware {
private transient AutowireCapableBeanFactory beanFactory;
@Override
protected Object createJobInstance(final TriggerFiredBundle bundle) throws Exception {
// 调用父类的方法
final Object job = super.createJobInstance(bundle);
// 进行注入
beanFactory.autowireBean(job);
return job;
}
@Override
public void setApplicationContext(final ApplicationContext applicationContext) throws BeansException {
beanFactory = applicationContext.getAutowireCapableBeanFactory();
}
}
- 具体的定时任务:TaskGroupJob .java
/**
* @DisallowConcurrentExecution : 此标记用在实现Job的类上面,意思是不允许并发执行.
* 注org.quartz.threadPool.threadCount的数量有多个的情况,@DisallowConcurrentExecution才生效
*/
@Component
@Slf4j
@DisallowConcurrentExecution
public class TaskGroupJob implements Job {
/**
* 核心方法,Quartz Job真正的执行逻辑.
* @param context 中封装有Quartz运行所需要的所有信息
* @throws JobExecutionException execute()方法只允许抛出JobExecutionException异常
*/
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
JobDataMap jdMap = context.getJobDetail().getJobDataMap();
String jobId = (String) jdMap.get("jobId");
log.info("任务组定时任务执行start, jobId:{}", jobId);
//具体定时任务逻辑处理
log.info("任务组定时任务执行SUCCESS, jobId:{}", jobId);
}
}
定时任务动态操作
import com.alibaba.fastjson.JSON;
import com.landray.data.dto.arrange.TaskJobDto;
import lombok.extern.slf4j.Slf4j;
import org.quartz.CronScheduleBuilder;
import org.quartz.CronTrigger;
import org.quartz.JobBuilder;
import org.quartz.JobDataMap;
import org.quartz.JobDetail;
import org.quartz.JobKey;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.TriggerBuilder;
import org.quartz.TriggerKey;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.GetMapping;
import javax.annotation.Resource;
/**
* @Author: huangyibo
* @Date: 2024/5/11 14:29
* @Description: Quartz定时任务动态API Handler
*/
@Component
@Slf4j
public class QuartzTaskGroupHandler {
@Resource
private Scheduler scheduler;
/**
* 新增定时任务
* @param taskDto
*/
public void addCronJob(TaskJobDto taskDto) {
try {
//构建TriggerKey, 封装Job的name和group
TriggerKey triggerKey = TriggerKey.triggerKey(taskDto.getJobId(), taskDto.getGroupName());
//构建job信息, 用于描叙Job实现类及其他的一些静态信息, 构建一个作业实例
JobDetail job = JobBuilder.newJob(taskDto.getClazz()).withIdentity(taskDto.getJobId(), taskDto.getGroupName()).build();
JobDataMap jobDataMap = job.getJobDataMap();
jobDataMap.put("jobId", taskDto.getJobId());
// 构建一个触发器,规定触发的规则
CronTrigger trigger = TriggerBuilder.newTrigger()
.withIdentity(triggerKey)
.startNow()
.withSchedule(CronScheduleBuilder.cronSchedule(taskDto.getCron()).withMisfireHandlingInstructionDoNothing())
.build();
//SimpleScheduleBuilder.simpleSchedule().withRepeatCount(0).withIntervalInSeconds(20)//每隔多少秒执行一次; withRepeatCount 设置重复的次数
//.startNow().withSchedule(cronScheduleBuilder)
//交由Scheduler安排触发
scheduler.scheduleJob(job, trigger);
if(!scheduler.isStarted()){
scheduler.start();
}
log.info("添加定时任务成功, taskDto:{}", JSON.toJSONString(taskDto));
} catch (SchedulerException e) {
log.error("添加定时任务异常, taskDto:{}", JSON.toJSONString(taskDto), e);
}
}
/**
* 更新定时任务, 定时任务不存在直接新增,定时任务存在删除后新增
* @param taskDto
*/
public void addCronJobNotExists(TaskJobDto taskDto) {
try {
boolean taskExist = scheduler.checkExists(JobKey.jobKey(taskDto.getJobId(), taskDto.getGroupName()));
if(!taskExist){
addCronJob(taskDto);
}
} catch (Exception e) {
log.error("更新定时任务异常, taskDto:{}", JSON.toJSONString(taskDto), e);
}
}
/**
* 更新定时任务, 定时任务不存在直接新增,定时任务存在则更新
* @param taskDto
*/
public void updateCronJob(TaskJobDto taskDto) {
try {
TriggerKey triggerKey = TriggerKey.triggerKey(taskDto.getJobId(), taskDto.getGroupName());
CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
//判断执行周期表达式是否一致
if(trigger.getCronExpression().equals(taskDto.getCron())){
return;
}
// 修改map
trigger.getJobDataMap().put("jobId", taskDto.getJobId());
// 表达式调度构建器
CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(taskDto.getCron()).withMisfireHandlingInstructionDoNothing();
// 按新的cronExpression表达式重新构建trigger
trigger = trigger.getTriggerBuilder().withIdentity(triggerKey).startNow().withSchedule(scheduleBuilder).build();
// 按新的trigger重新设置job执行
scheduler.rescheduleJob(triggerKey, trigger);
} catch (Exception e) {
log.error("更新定时任务异常, taskDto:{}", JSON.toJSONString(taskDto), e);
}
}
/**
* 立即运行一次
* @param taskDto
*/
public void runOnce(TaskJobDto taskDto) {
try {
scheduler.triggerJob(JobKey.jobKey(taskDto.getJobId(), taskDto.getGroupName()));
} catch (SchedulerException e) {
log.error("立即运行一次定时任务失败", e);
}
}
/**
* 删除任务
* @param taskDto
* @return
*/
public void removeCronJob(TaskJobDto taskDto) {
try {
// TriggerKey 定义了trigger的名称和组别 ,通过任务名和任务组名获取TriggerKey
TriggerKey triggerKey = TriggerKey.triggerKey(taskDto.getJobId(), taskDto.getGroupName());
// 停止触发器
scheduler.resumeTrigger(triggerKey);
// 移除触发器
scheduler.unscheduleJob(triggerKey);
// 移除任务
scheduler.deleteJob(JobKey.jobKey(taskDto.getJobId(), taskDto.getGroupName()));
log.info("删除定时任务成功, taskDto:{}", JSON.toJSONString(taskDto));
} catch (SchedulerException e) {
log.error("删除定时任务异常, taskDto:{}", JSON.toJSONString(taskDto), e);
}
}
/**
* 暂停定时任务
* @param taskDto
*/
public void pauseJob(TaskJobDto taskDto) {
try {
JobKey jobKey = JobKey.jobKey(taskDto.getJobId(), taskDto.getGroupName());
// 暂停任务
scheduler.pauseJob(jobKey);
log.info("暂停定时任务成功, taskDto:{}", JSON.toJSONString(taskDto));
} catch (SchedulerException e) {
log.error("暂停定时任务异常, taskDto:{}", JSON.toJSONString(taskDto), e);
}
}
/**
* 继续定时任务
* @param taskDto
*/
@GetMapping("/resumeJob")
public void resumeJob(TaskJobDto taskDto) {
try {
// 通过任务名和任务组名获取jobKey
JobKey jobKey = JobKey.jobKey(taskDto.getJobId(), taskDto.getGroupName());
// 继续任务
scheduler.resumeJob(jobKey);
log.info("继续定时任务成功, taskDto:{}", JSON.toJSONString(taskDto));
} catch (SchedulerException e) {
log.error("继续定时任务异常, taskDto:{}", JSON.toJSONString(taskDto), e);
}
}
}
@ApiModel("任务管理Dto")
@Data
public class TaskJobDto implements Serializable {
@ApiModelProperty(value = "任务id")
private String jobId;
@ApiModelProperty(value = "任务组名称")
private String groupName = "TaskGroup";
@ApiModelProperty(value = "执行周期")
private String cron;
@ApiModelProperty(value = "任务执行类")
private Class<? extends Job> clazz;
}
- 任务预热,预先加载已经启用的任务组任务
/**
* @Author: huangyibo
* @Date: 2024/5/10 18:26
* @Description: 任务预热,预先加载已经启用的任务组任务
*/
@Component
@Slf4j
public class TaskGroupJobRunner {
@Resource
private TaskGroupHandler taskGroupHandler;
@PostConstruct
public void init() {
log.info("任务预热,预先加载已经启用的任务组任务start");
LambdaQueryWrapper<ArrangeTaskGroup> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(ArrangeTaskGroup::getFdTaskStatus, TaskStatusEnum.ENABLE);
List<ArrangeTaskGroup> taskGroupList = XxxMapper.selectList(queryWrapper);
if(CollectionUtils.isEmpty(taskGroupList)){
log.info("任务组管理——任务预热,预先加载已经启用的任务组为空");
return;
}
taskGroupList.forEach(taskGroup -> {
TaskJobDto taskDto = new TaskJobDto();
taskDto.setJobId(taskGroup.getFdJobId());
taskDto.setCron(taskGroup.getFdCron());
taskDto.setClazz(TaskGroupJob.class);
taskGroupHandler.addCronJob(taskDto);
});
log.info("任务预热,预先加载已经启用的任务组SUCCESS, taskGroupList:{}", JSON.toJSONString(taskGroupList));
log.info("任务预热,手动构造数据start");
TaskJobDto taskDto = new TaskJobDto();
taskDto.setJobId("DataAccess_06684bfdadb543f5982778dd663320a7");
taskDto.setCron("0 0 1 * * ?");
taskDto.setGroupName("DataAccessTask");
taskDto.setClazz(DataAccessJob.class);
taskGroupHandler.addCronJob(taskDto);
log.info("任务预热,手动构造数据SUCCESS, taskDto:{}", JSON.toJSONString(taskDto));
}
}
@Service
@Slf4j
public class TaskGroupServiceImpl extends ServiceImpl<TaskGroupMapper, ArrangeTaskGroup> implements TaskGroupService {
@Resource
private TaskGroupMapper taskGroupMapper;
@Resource
private TaskGroupHandler taskGroupHandler;
@Transactional(rollbackFor = Exception.class)
@Override
public Boolean insert(TaskGroupInsert insert) {
log.info("新增任务组start, insert:{}", JSON.toJSONString(insert));
TaskGroup taskGroup = new TaskGroup();
BeanUtils.copyProperties(insert, taskGroup);
taskGroup.setFdId(IDUtil.getUuId());
if(Objects.isNull(insert.getFdTaskStatus())){
taskGroup.setFdTaskStatus(TaskStatusEnum.DISABLE);
}
taskGroup.setFdRunStatus(TaskRunStatusEnum.NOT_RUNNING);
taskGroup.setFdJobId(IDUtil.getUuId());
taskGroupMapper.insert(taskGroup);
//任务组启用状态下添加任务组任务
if(TaskStatusEnum.ENABLE.equals(taskGroup.getFdTaskStatus())){
TaskJobDto taskDto = new TaskJobDto();
taskDto.setJobId(taskGroup.getFdJobId());
taskDto.setCron(taskGroup.getFdCron());
taskDto.setClazz(TaskGroupJob.class);
taskGroupHandler.addCronJob(taskDto);
}
log.info("新增任务组SUCCESS, insert:{}", JSON.toJSONString(insert));
return Boolean.TRUE;
}
@Transactional(rollbackFor = Exception.class)
@Override
public Boolean update(TaskGroupUpdate update) {
log.info("更新任务组start, update:{}", JSON.toJSONString(update));
TaskGroup taskGroup = new TaskGroup();
BeanUtils.copyProperties(update, taskGroup);
if(Objects.isNull(update.getFdTaskStatus())){
taskGroup.setFdTaskStatus(TaskStatusEnum.DISABLE);
}
taskGroup.setFdRunStatus(TaskRunStatusEnum.NOT_RUNNING);
taskGroupMapper.updateById(taskGroup);
taskGroup = arrangeTaskGroupMapper.selectById(update.getFdId());
//任务组启用状态下添加任务组任务
if(TaskStatusEnum.ENABLE.equals(taskGroup.getFdTaskStatus())){
TaskJobDto taskDto = new TaskJobDto();
taskDto.setJobId(taskGroup.getFdJobId());
taskDto.setCron(taskGroup.getFdCron());
taskDto.setClazz(TaskGroupJob.class);
taskGroupHandler.updateCronJob(taskDto);
}
log.info("更新任务组SUCCESS, update:{}", JSON.toJSONString(update));
return Boolean.TRUE;
}
@Transactional(rollbackFor = Exception.class)
@Override
public Boolean deleteById(FdIdForm idForm) {
log.info("删除任务组start, idForm:{}", JSON.toJSONString(idForm));
taskGroupMapper.deleteById(idForm.getFdId());
//删除已经启动的定时任务
TaskJobDto taskDto = new TaskJobDto();
taskDto.setJobId(taskGroup.getFdJobId());
taskDto.setCron(taskGroup.getFdCron());
taskGroupHandler.removeCronJob(taskDto);
log.info("删除任务组SUCCESS, idForm:{}", JSON.toJSONString(idForm));
return Boolean.TRUE;
}
@Transactional(rollbackFor = Exception.class)
@Override
public Boolean batchUpdate(ArrangeTaskUpdate update) {
log.info("任务组批量启停start, update:{}", JSON.toJSONString(update));
LambdaQueryWrapper<TaskGroup> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.in(TaskGroup::getFdId, update.getGroupIdList());
List<TaskGroup> taskGroupList = taskGroupMapper.selectList(queryWrapper);
if(CollectionUtils.isEmpty(taskGroupList)){
throw new BusinessException("批量启停操作任务组不存在!");
}
LambdaUpdateWrapper<TaskGroup> lambdaWrapper = new LambdaUpdateWrapper<>();
lambdaWrapper.set(TaskGroup::getFdTaskStatus, update.getFdTaskStatus()).in(TaskGroup::getFdId, update.getGroupIdList());
taskGroupMapper.update(null, lambdaWrapper);
taskGroupList.forEach(taskGroup -> {
TaskJobDto taskDto = new TaskJobDto();
taskDto.setJobId(taskGroup.getFdJobId());
taskDto.setCron(taskGroup.getFdCron());
taskDto.setClazz(TaskGroupJob.class);
taskGroupHandler.removeCronJob(taskDto);
if(TaskStatusEnum.ENABLE.equals(update.getFdTaskStatus())){
taskGroupHandler.addCronJob(taskDto);
}
});
log.info("任务组批量启停SUCCESS, update:{}", JSON.toJSONString(update));
return Boolean.TRUE;
}
}
参考:
https://www.jb51.net/article/261554.htm
https://www.cnblogs.com/Alida/p/12986967.html
https://blog.csdn.net/lc1025082182/article/details/123656328