介绍
Elastic-Job是一个分布式调度解决方案,由两个相互独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成。
Elastic-Job-Lite定位为轻量级无中心化解决方案,使用jar包的形式提供分布式任务的协调服务。
- 分布式调度协调
- 弹性扩容缩容
- 失效转移
- 错过执行作业重触发
- 作业分片一致性,保证同一分片在分布式环境中仅一个执行实例
- 自诊断并修复分布式不稳定造成的问题
- 支持并行调度
- 支持作业生命周期操作
- 丰富的作业类型
- Spring整合以及命名空间提供
- 运维平台
可以参考文档
https://github.com/elasticjob/elastic-job-lite
整体架构图
快速开始
1.添加 mavne依赖
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-core</artifactId>
<version>2.1.5</version>
</dependency>
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-spring</artifactId>
<version>2.1.5</version>
</dependency>
2.初始化zookeeper注册中心
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@ConditionalOnExpression("'${elastic-job.server.list}'.length() > 0")
public class ElasticJobCenterConfig {
@Bean(initMethod = "init")
public ZookeeperRegistryCenter regCenter(@Value("${elastic-job.server.list}") final String serverList,
@Value("${elastic-job.server.namespace}") final String namespace) {
return new ZookeeperRegistryCenter(new ZookeeperConfiguration(serverList, namespace));
}
}
3.添加 定时任务执行程序
import com.chinaway.fms.constant.RedisKeyConstant;
import com.chinaway.fms.dbsafety.model.AuthorOrg2team;
import com.chinaway.fms.dbsafety.model.ClientTruck;
import com.chinaway.fms.service.AuthorOrg2teamService;
import com.chinaway.fms.service.ClientTruckService;
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
@Slf4j
@Component
public class XxxxJob implements SimpleJob {
@Override
public void execute(ShardingContext shardingContext) {
if(shardingContext.getShardingParameter().equals("1")){
return;
}
log.info("ClientTruckJob execute.");
yourBusiness();
}
}
4.配置XxxxJob的执行策略
import com.chinaway.fms.job.XxxxJob;
import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
import com.dangdang.ddframe.job.lite.api.JobScheduler;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.lite.spring.api.SpringJobScheduler;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ElasticJobConfig {
@Value("${elastic-job.client.sharding-count}")
Integer shardingTotalCount;// 分片数量
//分片执行规则,与 shardingTotalCount配合使用,有几个shardingTotalCount ,就写几个等值,
// A= B , A 是分片数值, B是机器编号。
// 比如,你有两台机器A和B,配置四个分片0/1/2/3,设置0和1分片的数据在A机器上执行,2和3分片的数据在B机器上执行。那么 shardingTotalCount = 4,shardingItemParameters = "0=A,1=A,2=B,3=B".
//再比如,有两台机器A和B,配置1个分片0,设置0分片的数据再A机器上执行,那么shardingTotalCount=1,shardingItemParameters="0=A".
private final String shardingItemParameters = "0=0,1=1";
private final String jobParameters = null;
@Autowired
private ZookeeperRegistryCenter regCenter;
@Bean(initMethod = "init")
public JobScheduler createXxxxJobShedule(final ClientTruckJob job) {
final String jobName = "[your_business_name]XxxxJob";
final String cron = "0 */1 * * * ?";//每分钟执行一次,cron表达式也可以放在yml中配置。
LiteJobConfiguration jobConfiguration = createJobConfiguration(XxxxJob.class, jobName,
cron, shardingTotalCount, shardingItemParameters, jobParameters);
return new SpringJobScheduler(job, regCenter, jobConfiguration, new FmsElasticJobListener());
}
///每个bean代表了一个定时任务执行策略的添加
@Bean(initMethod = "init")
public JobScheduler createBJobShedule(final BJob job) {
...
}
public LiteJobConfiguration createJobConfiguration(final Class<?> jobClass,
final String jobName,
final String cron,
final int shardingTotalCount,
final String shardingItemParameters,
final String jobParameters) {
// 定义作业核心配置
JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder(jobName, cron, shardingTotalCount).
shardingItemParameters(shardingItemParameters).jobParameter(jobParameters).build();
// 定义SIMPLE类型配置
SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig, jobClass.getCanonicalName());
// 定义Lite作业根配置
LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).overwrite(true).build();
return simpleJobRootConfig;
}
}
5.添加监听器监听任务执行时间或者完善日志
import com.dangdang.ddframe.job.executor.ShardingContexts;
import com.dangdang.ddframe.job.lite.api.listener.ElasticJobListener;
import com.g7.fms.base.mdc.MdcConst;
import com.g7.fms.base.mdc.MdcHelper;
import lombok.extern.slf4j.Slf4j;
import java.util.UUID;
@Slf4j
public class FmsElasticJobListener implements ElasticJobListener {
private long beginTime = 0;
@Override
public void beforeJobExecuted(ShardingContexts shardingContexts) {
MdcHelper.put(MdcConst.Log.LOG_TRACE_ID, UUID.randomUUID().toString().replace("-", ""));
beginTime = System.currentTimeMillis();
log.info("before job @{}", shardingContexts.getJobName());
}
@Override
public void afterJobExecuted(ShardingContexts shardingContexts) {
log.info("end job @{},cost:{}ms", shardingContexts.getJobName(), System.currentTimeMillis() - beginTime);
}
}
6.yml配置文件中添加相关配置
elastic-job:
server:
namespace: elastic-job-develop-env-xxx
list: your_zookeeper_ip:zk_port, your_zookeeper_ip2:zk_port ...
test-task:
crontab: 0 0 */1 * * ?
client:
sharding-count: 1
至此 就 集成完毕。