目录
分布式定时任务系列
配置
客户端核心配置SpringJobScheduler
@Bean(initMethod = "init")
public JobScheduler simpleJobScheduler(final SimpleJob simpleJob,
@Value("${simpleJob.cron}") final String cron,
@Value("${simpleJob.shardingTotalCount}") final int shardingTotalCount,
@Value("${simpleJob.shardingItemParameters}") final String shardingItemParameters,
@Value("${simpleJob.jobParameter}") final String jobParameter,
@Value("${simpleJob.failover}") final boolean failover,
@Value("${simpleJob.monitorExecution}") final boolean monitorExecution,
@Value("${simpleJob.monitorPort}") final int monitorPort,
@Value("${simpleJob.maxTimeDiffSeconds}") final int maxTimeDiffSeconds,
@Value("${simpleJob.jobShardingStrategyClass}") final String jobShardingStrategyClass) {
// SpringJobScheduler配置,分析初始化从这里开始
return new SpringJobScheduler(simpleJob,
registryCenter,
getLiteJobConfiguration(simpleJob.getClass(),
cron,
shardingTotalCount,
shardingItemParameters,
jobParameter,
failover,
monitorExecution,
monitorPort,
maxTimeDiffSeconds,
jobShardingStrategyClass),
jobEventConfiguration,
new SimpleJobListener());
}
private LiteJobConfiguration getLiteJobConfiguration(Class<? extends SimpleJob> jobClass, String cron,
int shardingTotalCount, String shardingItemParameters, String jobParameter, boolean failover,
boolean monitorExecution, int monitorPort, int maxTimeDiffSeconds, String jobShardingStrategyClass) {
//定义作业核心配置
JobCoreConfiguration jobCoreConfiguration = JobCoreConfiguration
.newBuilder(jobClass.getName(), cron, shardingTotalCount)
.misfire(true)
.failover(failover)
.jobParameter(jobParameter)
.shardingItemParameters(shardingItemParameters)
.jobProperties("job_exception_handler", "com.seeger.demo.config.MyJobExceptionHandler")
.build();
//定义SIMPLE类型配置
SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(jobCoreConfiguration, jobClass.getCanonicalName());
//定义Lite作业根配置
LiteJobConfiguration liteJobConfiguration = LiteJobConfiguration.newBuilder(simpleJobConfiguration)
.jobShardingStrategyClass(jobShardingStrategyClass)
.monitorExecution(monitorExecution)
.monitorPort(monitorPort)
.maxTimeDiffSeconds(maxTimeDiffSeconds)
.overwrite(true)
.build();
return liteJobConfiguration;
}
SpringJobScheduler
- 初始化代码
public JobScheduler(final CoordinatorRegistryCenter regCenter, final LiteJobConfiguration liteJobConfig, final JobEventConfiguration jobEventConfig,
final ElasticJobListener... elasticJobListeners) {
this(regCenter, liteJobConfig, new JobEventBus(jobEventConfig), elasticJobListeners);
}
private JobScheduler(final CoordinatorRegistryCenter regCenter, final LiteJobConfiguration liteJobConfig, final JobEventBus jobEventBus, final ElasticJobListener... elasticJobListeners) {
JobRegistry.getInstance().addJobInstance(liteJobConfig.getJobName(), new JobInstance());
this.liteJobConfig = liteJobConfig;
this.regCenter = regCenter;
List<ElasticJobListener> elasticJobListenerList = Arrays.asList(elasticJobListeners);
setGuaranteeServiceForElasticJobListeners(regCenter, elasticJobListenerList);
schedulerFacade = new SchedulerFacade(regCenter, liteJobConfig.getJobName(), elasticJobListenerList);
jobFacade = new LiteJobFacade(regCenter, liteJobConfig.getJobName(), Arrays.asList(elasticJobListeners), jobEventBus);
}
-
可以看到父类JobScheduler有几个属性LiteJobConfiguration(Lite作业配置),CoordinatorRegistryCenter(注册中心),SchedulerFacade,JobFacade
LiteJobConfiguration
-
作业配置类
-
typeConfig: 作业配置类型,分为simple,dataflow,script不同类型不同配置类
- monitorExecution监控作业运行状态,如果执行时间与执行间隔都短的情况下不建议开启
- maxTimeDiffSeconds设置最大容忍的本机与注册中心的时间误差秒数
- monitorPort作业监控端口,可以dump文件,echo “dump” | nc 127.0.0.1 9888
- jobShardingStrategyClass策略模式实现分片的不同策略
- reconcileIntervalMinutes修复作业服务器不一致状态服务调度间隔时间
- disabled是否禁用
- overwrite使用本地作业配置覆盖注册中心的作业配置
-
JobTypeConfiguration,以最常用的SimpleJobConfiguration举例
- jobName作业名称
- cron core表达式
- shardingTotalCount作业分片总数
- shardingItemParameters 分片项目参数,给分片上别名,本来分片是0,1,2,3这种的,可以起别名增加可读性
- jobParameter自定义作业参数
- failover失效转移是否启动,集群上别的集群如果有分片失效了比如这台服务宕机,可以让本机执行失效的分片,这里并不是master节点(可能每个任务都不同的master节点,因为主节点如果存在说明zk上有/${JOB_NAME}/leader/electron/instance,不同job就有可能存在不同的master节点)执行,而是任务争夺到分布式锁的机器执行,这里能够利用集群能力
- misfire错过执行 如果本机的任务时间间隔比执行时间短,那就错过了,elastic-job提供错过重执行的功能
- description描述
- jobProperties job属性,可以自定义异常处理器和线程池是SPI接口
CoordinatorRegistryCenter
-
注册中心,用于协调分布式服务,提供了默认实现ZookeeperRegistryCenter。ZookeeperRegistryCenter定义了持久节点、临时节点、持久顺序节点、临时顺序节点等目录服务接口方法,隐性的要求提供事务、分布式锁、数据订阅等特。ZookeeperRegistryCenter封装了apaceh /curator(zk连接设置节点等,使用curator很方便)方法。elastic job使用 Quartz 作为调度内核。
- ZookeeperConfiguration,初始化方法, 根据设置的zk配置初始化curator框架的client。其中ExponentialBackoffRetry动态计算时间间隔
public void init() {
log.debug("Elastic job: zookeeper registry center init, server lists is: {}.", zkConfig.getServerLists());
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
.connectString(zkConfig.getServerLists())
.retryPolicy(new ExponentialBackoffRetry(zkConfig.getBaseSleepTimeMilliseconds(), zkConfig.getMaxRetries(), zkConfig.getMaxSleepTimeMilliseconds()))
.namespace(zkConfig.getNamespace());
if (0 != zkConfig.getSessionTimeoutMilliseconds()) {
builder.sessionTimeoutMs(zkConfig.getSessionTimeoutMilliseconds());
}
if (0 != zkConfig.getConnectionTimeoutMilliseconds()) {
builder.connectionTimeoutMs(zkConfig.getConnectionTimeoutMilliseconds());
}
if (!Strings.isNullOrEmpty(zkConfig.getDigest())) {
builder.authorization("digest", zkConfig.getDigest().getBytes(Charsets.UTF_8))
.aclProvider(new ACLProvider() {
@Override
public List<ACL> getDefaultAcl() {
return ZooDefs.Ids.CREATOR_ALL_ACL;
}
@Override
public List<ACL> getAclForPath(final String path) {
return ZooDefs.Ids.CREATOR_ALL_ACL;
}
});
}
client = builder.build();
client.start();
try {
if (!client.blockUntilConnected(zkConfig.getMaxSleepTimeMilliseconds() * zkConfig.getMaxRetries(), TimeUnit.MILLISECONDS)) {
client.close();
throw new KeeperException.OperationTimeoutException();
}
//CHECKSTYLE:OFF
} catch (final Exception ex) {
//CHECKSTYLE:ON
RegExceptionHandler.handleException(ex);
}
}
// 动态计算时间间隔
baseSleepTimeMs * Math.max(1, random.nextInt(1 << (retryCount + 1)))
- 缓存是通过 Curator TreeCache 实现监控整个树( Zookeeper目录 )的数据订阅和缓存
- 对zk的crud比较基础细节可以深入源码看
SchedulerFacade
- 通过调用构造器
public JobScheduler(final CoordinatorRegistryCenter regCenter, final LiteJobConfiguration liteJobConfig, final JobEventConfiguration jobEventConfig,
final ElasticJobListener... elasticJobListeners) {
this(regCenter, liteJobConfig, new JobEventBus(jobEventConfig), elasticJobListeners);
}
private JobScheduler(final CoordinatorRegistryCenter regCenter, final LiteJobConfiguration liteJobConfig, final JobEventBus jobEventBus, final ElasticJobListener... elasticJobListeners) {
// 省去部分代码
schedulerFacade = new SchedulerFacade(regCenter, liteJobConfig.getJobName(), elasticJobListenerList);
}
// 调用SchedulerFacade
public SchedulerFacade(final CoordinatorRegistryCenter regCenter, final String jobName, final List<ElasticJobListener> elasticJobListeners) {
this.jobName = jobName;
configService = new ConfigurationService(regCenter, jobName);
leaderService = new LeaderService(regCenter, jobName);
serverService = new ServerService(regCenter, jobName);
instanceService = new InstanceService(regCenter, jobName);
shardingService = new ShardingService(regCenter, jobName);
executionService = new ExecutionService(regCenter, jobName);
monitorService = new MonitorService(regCenter, jobName);
reconcileService = new ReconcileService(regCenter, jobName);
listenerManager = new ListenerManager(regCenter, jobName, elasticJobListeners);
}
-
为调度器提供内部服务的门面类,包含各个service
- ElasticJobListeners监听器是放到ListenerManager做管理
- 可以看到各个service都是基于regCenter, jobName做初始化
JobFacade
- 通过调用构造器初始化
public JobScheduler(final CoordinatorRegistryCenter regCenter, final LiteJobConfiguration liteJobConfig, final JobEventConfiguration jobEventConfig,
final ElasticJobListener... elasticJobListeners) {
this(regCenter, liteJobConfig, new JobEventBus(jobEventConfig), elasticJobListeners);
}
private JobScheduler(final CoordinatorRegistryCenter regCenter, final LiteJobConfiguration liteJobConfig, final JobEventBus jobEventBus, final ElasticJobListener... elasticJobListeners) {
// 省略部分代码
jobFacade = new LiteJobFacade(regCenter, liteJobConfig.getJobName(), Arrays.asList(elasticJobListeners), jobEventBus);
}
// 调用LiteJobFacade构造器
public LiteJobFacade(final CoordinatorRegistryCenter regCenter, final String jobName, final List<ElasticJobListener> elasticJobListeners, final JobEventBus jobEventBus) {
configService = new ConfigurationService(regCenter, jobName);
shardingService = new ShardingService(regCenter, jobName);
executionContextService = new ExecutionContextService(regCenter, jobName);
executionService = new ExecutionService(regCenter, jobName);
failoverService = new FailoverService(regCenter, jobName);
this.elasticJobListeners = elasticJobListeners;
this.jobEventBus = jobEventBus;
}
-
为Job提供内部服务的门面类, 包含各个service, 以及event bus
- 初始化时会传设置好的JobEventConfiguration,通过new JobEventBus(jobEventConfig)设置JobEventBus。
- 监听器是LiteJobFacade自个管理
- 可以看到各个service都是基于regCenter, jobName做初始化
事件追踪配置JobEventConfiguration
@Autowired
private DataSource dataSource;
@Bean
public JobEventConfiguration jobEventConfiguration() {
return new JobEventRdbConfiguration(dataSource);
}
- 初始化的类比较简单
public final class JobEventRdbConfiguration extends JobEventRdbIdentity implements JobEventConfiguration, Serializable {
private static final long serialVersionUID = 3344410699286435226L;
private final transient DataSource dataSource;
public JobEventListener createJobEventListener() throws JobEventListenerConfigurationException {
try {
return new JobEventRdbListener(this.dataSource);
} catch (SQLException var2) {
throw new JobEventListenerConfigurationException(var2);
}
}
// 初始化时调用
@ConstructorProperties({"dataSource"})
public JobEventRdbConfiguration(DataSource dataSource) {
this.dataSource = dataSource;
}
public DataSource getDataSource() {
return this.dataSource;
}
}
JobRegistry作业注册表
- 调用构造函数构造SpringJobScheduler时,会往JobRegistry注册
public JobScheduler(final CoordinatorRegistryCenter regCenter, final LiteJobConfiguration liteJobConfig, final JobEventConfiguration jobEventConfig,
final ElasticJobListener... elasticJobListeners) {
this(regCenter, liteJobConfig, new JobEventBus(jobEventConfig), elasticJobListeners);
}
private JobScheduler(final CoordinatorRegistryCenter regCenter, final LiteJobConfiguration liteJobConfig, final JobEventBus jobEventBus, final ElasticJobListener... elasticJobListeners) {
JobRegistry.getInstance().addJobInstance(liteJobConfig.getJobName(), new JobInstance());
// 省略部分代码
}
// getInstance单例
public static JobRegistry getInstance() {
if (null == instance) {
synchronized (JobRegistry.class) {
if (null == instance) {
instance = new JobRegistry();
}
}
}
return instance;
}
// addJobInstance
public void addJobInstance(final String jobName, final JobInstance jobInstance) {
jobInstanceMap.put(jobName, jobInstance);
}
-
JobRegistry是单例,设计思想有点类似spring ioc容器
- instance: 单例 private static volatile JobRegistry 修饰
- 各个map缓存数据,ConcurrentHashMap保证单个put并发安全
初始化
- 调用完构造函数后,会调用init方法
@Bean(initMethod = "init")
public JobScheduler simpleJobScheduler(//省略参数) {
return new SpringJobScheduler(省略参数);
}
- 调用init方法
public void init() {
//更新作业配置
LiteJobConfiguration liteJobConfigFromRegCenter = schedulerFacade.updateJobConfiguration(liteJobConfig);
// 设置分片信息
JobRegistry.getInstance().setCurrentShardingTotalCount(liteJobConfigFromRegCenter.getJobName(), liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getShardingTotalCount());
// 创建作业调度控制器
JobScheduleController jobScheduleController = new JobScheduleController(
createScheduler(), createJobDetail(liteJobConfigFromRegCenter.getTypeConfig().getJobClass()), liteJobConfigFromRegCenter.getJobName());
// 添加作业调度控制器
JobRegistry.getInstance().registerJob(liteJobConfigFromRegCenter.getJobName(), jobScheduleController, regCenter);
// 注册启动信息
schedulerFacade.registerStartUpInfo(!liteJobConfigFromRegCenter.isDisabled());
// 调度作业,基于quartz
jobScheduleController.scheduleJob(liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getCron());
}
更新作业配置
- SchedulerFacade门面里面ConfigService new出来的, ConfigService依赖JobNodeStorage,JobNodeStorage使用curator对zk操作
public LiteJobConfiguration updateJobConfiguration(final LiteJobConfiguration liteJobConfig) {
// 持久化
configService.persist(liteJobConfig);
// 读取配置
return configService.load(false);
}
// ConfigService
public void persist(final LiteJobConfiguration liteJobConfig) {
checkConflictJob(liteJobConfig);
if (!jobNodeStorage.isJobNodeExisted(ConfigurationNode.ROOT) || liteJobConfig.isOverwrite()) {
jobNodeStorage.replaceJobNode(ConfigurationNode.ROOT, LiteJobConfigurationGsonFactory.toJson(liteJobConfig));
}
}
设置作业分片
- SchedulerFacade门面里面JobRegistry.getInstance().setCurrentShardingTotalCount
private Map<String, Integer> currentShardingTotalCountMap = new ConcurrentHashMap<>();
public void setCurrentShardingTotalCount(final String jobName, final int currentShardingTotalCount) {
currentShardingTotalCountMap.put(jobName, currentShardingTotalCount);
}
创建作业调度控制器
- JobScheduleController作业调度控制器,提供对Quartz的封装,
JobScheduleController jobScheduleController = new JobScheduleController(
createScheduler(), createJobDetail(liteJobConfigFromRegCenter.getTypeConfig().getJobClass()), liteJobConfigFromRegCenter.getJobName());
createScheduler
- org.quartz.threadPool.threadCount这里配置1,Quartz 执行作业线程数量为 1, 一个ElasticJob对应1个JobScheduler作业调度器,失效转移时会调用quartz的trigger,有可能多次调用,这个时候因为Quartz 执行作业线程数设置为1,所以是串行执行,不会有并发问题。
// 创建quartz调度器
private Scheduler createScheduler() {
Scheduler result;
try {
// quartz提供的能力创建Scheduler
StdSchedulerFactory factory = new StdSchedulerFactory();
factory.initialize(getBaseQuartzProperties());
result = factory.getScheduler();
result.getListenerManager().addTriggerListener(schedulerFacade.newJobTriggerListener());
} catch (final SchedulerException ex) {
throw new JobSystemException(ex);
}
return result;
}
private Properties getBaseQuartzProperties() {
Properties result = new Properties();
result.put("org.quartz.threadPool.class", org.quartz.simpl.SimpleThreadPool.class.getName());
// Quartz 执行作业线程数量为 1, 一个ElasticJob对应1个JobScheduler作业调度器
result.put("org.quartz.threadPool.threadCount", "1");
result.put("org.quartz.scheduler.instanceName", liteJobConfig.getJobName());
result.put("org.quartz.jobStore.misfireThreshold", "1");
result.put("org.quartz.plugin.shutdownhook.class", JobShutdownHookPlugin.class.getName());
result.put("org.quartz.plugin.shutdownhook.cleanShutdown", Boolean.TRUE.toString());
return result;
}
createJobDetail
- 创建JobDetail时使用newJob(LiteJob.class),每次任务都会new LiteJob,Jodetail.jobDataMap为LiteJob添加属性,这样虽然每次都是new的LiteJob, 但是每次属性是一样的。
private JobDetail createJobDetail(final String jobClass) {
// quartz的能力,创建JobDetail
JobDetail result = JobBuilder.newJob(LiteJob.class).withIdentity(liteJobConfig.getJobName()).build();
result.getJobDataMap().put(JOB_FACADE_DATA_MAP_KEY, jobFacade);
// 创建ElasticJob对象
Optional<ElasticJob> elasticJobInstance = createElasticJobInstance();
if (elasticJobInstance.isPresent()) {
result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, elasticJobInstance.get());
} else if (!jobClass.equals(ScriptJob.class.getCanonicalName())) {
try {
result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, Class.forName(jobClass).newInstance());
} catch (final ReflectiveOperationException ex) {
throw new JobConfigurationException("Elastic-Job: Job class '%s' can not initialize.", jobClass);
}
}
return result;
}
注册作业启动信息
- SchedulerFacade里面调用注册作业启动信息
public void registerStartUpInfo(final boolean enabled) {
// 开启所有监听器
listenerManager.startAllListeners();
// 选主
leaderService.electLeader();
// zk持久化作业服务信息
serverService.persistOnline(enabled);
// zk持久化作业运行实例上线相关信息
instanceService.persistOnline();
// 在zk上设置需要重新分片的标注,新上线服务,后面会分析分片
shardingService.setReshardingFlag();
// 初始化作业监听服务,这个在后面自诊断修复会涉及
monitorService.listen();
// 初始化调解作业不一致状态服务
if (!reconcileService.isRunning()) {
reconcileService.startAsync();
}
}
作业调度
public void init() {
// 进行作业调度 利用quartz
jobScheduleController.scheduleJob(liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getCron());
}
初始化与执行联系
quartz简单介绍
- 核心元素有四个
- Trigger: 触发器用于定义时间规则,比如core表达式的触发
- Job: 任务执行的逻辑
- JobDetail: 任务详情
- Scheduler: 调度控制器
-
借用参考文章6的图
- quartz有两种线程类型,一种是常规执行任务线程,一种是错过执行线程,当任务触发时间小于任务执行时间时,就会标记错过的任务,这个时候elastic-job将quartz错过执行的逻辑转移到elastic-job框架处理
初始化与执行的联系
- 在初始化时创建JobDetail,JobBuilder.newJob(LiteJob.class),每次执行会new LiteJob,并且设置属性进行,属性里面的值每次都是同一实例
private JobDetail createJobDetail(final String jobClass) {
// quartz的能力,创建JobDetail
JobDetail result = JobBuilder.newJob(LiteJob.class).withIdentity(liteJobConfig.getJobName()).build();
result.getJobDataMap().put(JOB_FACADE_DATA_MAP_KEY, jobFacade);
}
- 任务触发之后会执行LiteJob.execute方法
public final class LiteJob implements Job {
@Override
public void execute(final JobExecutionContext context) throws JobExecutionException {
JobExecutorFactory.getJobExecutor(elasticJob, jobFacade).execute();
}
}
// JobExecutorFactory
// 这里是策略模式的Context,根据不同输入,获取不同实例,有脚本job simple job和data flow job
public static AbstractElasticJobExecutor getJobExecutor(final ElasticJob elasticJob, final JobFacade jobFacade) {
if (null == elasticJob) {
return new ScriptJobExecutor(jobFacade);
}
if (elasticJob instanceof SimpleJob) {
return new SimpleJobExecutor((SimpleJob) elasticJob, jobFacade);
}
if (elasticJob instanceof DataflowJob) {
return new DataflowJobExecutor((DataflowJob) elasticJob, jobFacade);
}
throw new JobConfigurationException("Cannot support job type '%s'", elasticJob.getClass().getCanonicalName());
}
-
到AbstractElasticJobExecutor.executor是几个Job Executor的父类,executor这块代码也是elastic-job核心的代码