1.Elastic-Job简介
如果要在分布式集群环境下去安全的执行一个调度任务,常见的做法就是保证在集群环境下,只有集群中的一台机器能够获取执行任务的权限。而Elastic-Job在实现分布式Job时是将集群中所有的机器都利用起来,通过多进程多线程执行作业任务。也就是说如果本机的数据分片分到了多个分片(即一个JVM进程分到了多个分片),Elastic-Job会为每一个分片去启动一个线程来执行分片任务(在AbstractElasticJobExecutor的process(final ShardingContexts shardingContexts, final JobExecutionEvent.ExecutionSource executionSource)
)。
Elastic-Job相当于quartz+zk的加强版,它允许对定时任务分片,可以集群部署(每个job的"分片"会分散到各个节点上),如果某个节点挂了,该节点上的分片,会调度到其它节点上。
Elastic-Job并不直接提供数据处理的功能,框架只会将分片项分配至各个运行中的作业服务器,开发者需要自行处理分片项与真实数据的对应关系。
2.Elastic-Job-Lite分布调度特点
Elastic-Job-Lite并无作业调度中心节点,而是基于部署作业框架的程序在到达相应时间点时各自触发调度。注册中心仅用于作业注册和监控信息存储。而主作业节点仅用于处理分片和清理等功能。
3.Elastic-Job-Lite整体架构
elastic-job-lite是以zookeeper作为注册中心的,console作为控制台和服务端解耦,通过restful api操作zk改变job的配置信息。
服务端启动时会进行:连接zk→注册job→初始化Scheduler→进行leader选举→由选举出的主作业节点进行分片,然后所有节点按照job配置信息调度作业。
4.分片场景
(1)分片总数为1,并使用多于1台的服务器执行作业
作业将会以1主n从的方式执行。一旦执行作业的服务器崩溃,等待执行的服务器将会在下次作业启动时替补执行。开启失效转移功能效果更好,可以保证在本次作业执行时崩溃,备机立即启动替补执行。
在同一个zookepper和jobname情况下,多台机器部署了Elastic job时,只有拿到shardingContext.getShardingItem()为0的机器得以执行,其他的机器不执行。
(2)分片项总数大于服务器的数量
作业将会合理的利用分布式资源,动态的分配分片项。
例如:3台服务器,分成10片,则分片项分配结果为服务器A=0,1,2;服务器B=3,4,5;服务器C=6,7,8,9。 如果服务器C崩溃,则分片项分配结果为服务器A=0,1,2,3,4;服务器B=5,6,7,8,9。在不丢失分片项的情况下,最大限度的利用现有资源提高吞吐量。
5.Elastic-Job-Lite注册中心数据结构
注册中心在定义的命名空间下,创建作业名称节点,用于区分不同作业,所以作业一旦创建则不能修改作业名称,如果修改名称将视为新的作业。作业名称节点下又包含5个数据子节点,分别是config, instances, sharding, servers和leader。
(1)config节点
config节点保存作业配置信息,以JSON格式存储
(2)instances节点
作业运行实例信息,子节点是当前作业运行实例的主键。作业运行实例主键由作业运行服务器的IP地址和PID构成。作业运行实例主键均为临时节点,当作业实例上线时注册,下线时自动清理。注册中心监控这些节点的变化来协调分布式作业的分片以及高可用。 可在作业运行实例节点写入TRIGGER表示该实例立即执行一次。
(3)sharding节点
作业分片信息,子节点是分片项序号,从零开始,至分片总数减一
(4)servers节点
作业服务器信息,子节点是作业服务器的IP地址
(5)leader节点
作业服务器主节点信息。
分为election,sharding和failover三个子节点,分别用于主节点选举,分片和失效转移处理。
失效转移:在某台服务器执行完毕后主动抓取未分配的分片,并且在某台服务器下线后主动寻找可用的服务器执行任务。
6.作业启动流程
Elastic-Job-Lite初始化的入口是JobSchedule,应用服务器启动时,会调用JobSchedule的init方法,开启作业启动流程。首先添加或更新作业配置信息,并将配置信息持久化到zk上;接着创建quartz调度器,作业的调度执行依赖quartz技术;然后启动所有的监听器,包括leader选举监听、失效转移监听、分片监听等,并发起主节点选举,将leader节点信息set到leader/election/instance节点下;然后将服务器信息、实例信息注册到zk上,并且在leader/sharding下创建necessary节点,作为重新分片的标记;最后由quartz调度器根据cron表达式调度执行。
7.作业执行流程
Elastic-Job-Lite执行器的入口是实现了Job接口的LiteJob类,当任务调度执行时,进入LiteJob类的execute方法。在这里完成一系列的操作,包括获取失效转移分片项,如果没有分配的失效转移项,则判断是否需要重新分片,然后获取分配给自己的分片项,然后判断当前分片项是否正在running,如果否,则执行任务项;如果是,则在sharding/[item]下添加misfire节点,标示该分片项错过执行,等待分片项执行结束后,再触发misfire的分片项执行。
8.示例
elastic-job提供了三种类型的作业:Simple类型作业、Dataflow类型作业、Script类型作业。
可以使用JAVA、Spring启动,下面例子使用JAVA启动
(1)Simple类型作业
①添加Maven依赖
<!-- 引入elastic-job-lite核心模块 -->
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-core</artifactId>
<version>2.1.5</version>
</dependency>
②创建Job
public class MySimpleJob implements SimpleJob {
@Override
public void execute(ShardingContext shardingContext) {
int key = shardingContext.getShardingItem();
switch (key) {
case 0:
System.out.println("---------------->0");
break;
case 1:
System.out.println("---------------->1");
break;
default:
System.out.println("---------------->default");
break;
}
}
}
③配置启动调度器
public class StartJob {
public static void main(String[] args) {
new JobScheduler(createRegistryCenter(), createSimpleJobConfiguration()).init();//创建作业调度器并初始化作业
}
private static CoordinatorRegistryCenter createRegistryCenter() {
CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration("127.0.0.1:2181", "elastic-job-demo"));//创建指定命名空间为elastic-job-demo
regCenter.init();//初始化注册中心.
return regCenter;
}
/**
* 创建Job配置
* @return
*/
private static LiteJobConfiguration createSimpleJobConfiguration() {
JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder("demoSimpleJob", "0/15 * * * * ?", 10).build();
// 定义SIMPLE类型配置
SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig, MySimpleJob.class.getCanonicalName());
// 定义Lite作业根配置
LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).build();
return simpleJobRootConfig;
}
}
(2)Dataflow类型作业
①创建Job
public class MyDataFlowJob implements DataflowJob<Foo> {
/**
* 获取待处理数据.
*
* @param context
* @return
*/
@Override
public List<Foo> fetchData(ShardingContext context) {
switch (context.getShardingItem()) {
case 0:
List<Foo> data1 = new ArrayList<>();// get data from database by sharding item 0
data1.add(new Foo("sharding item 0"));
return data1;
case 1:
List<Foo> data2 = new ArrayList<>();// get data from database by sharding item 1
data2.add(new Foo("sharding item 1"));
return data2;
case 2:
List<Foo> data3 = new ArrayList<>();// get data from database by sharding item 2
data3.add(new Foo("sharding item 2"));
return data3;
// case n: ...
}
return null;
}
/**
* 处理数据
*
* @param shardingContext
* @param data
*/
@Override
public void processData(ShardingContext shardingContext, List<Foo> data) {
// process data
for (Foo foo : data) {
System.out.println(foo);
}
}
}
②创建Foo
public class Foo {
private String name;
public Foo(String name) {
this.name = name;
}
public String getName() {
return name;
}
@Override
public String toString() {
return "Foo{" +
"name='" + name + '\'' +
'}';
}
}
③将配置修改成如下
/**
* 创建DataflowJob配置
* @return
*/
private static LiteJobConfiguration createDataflowJobConfiguration() {
// 定义作业核心配置
JobCoreConfiguration dataflowCoreConfig = JobCoreConfiguration.newBuilder("DataflowJob", "0/30 * * * * ?", 10).build();
// 定义DATAFLOW类型配置
DataflowJobConfiguration dataflowJobConfig = new DataflowJobConfiguration(dataflowCoreConfig, MyDataFlowJob.class.getCanonicalName(), true);
// 定义Lite作业根配置
JobRootConfiguration dataflowJobRootConfig = LiteJobConfiguration.newBuilder(dataflowJobConfig).build();
return (LiteJobConfiguration) dataflowJobRootConfig;
}
ps:记得先启动zookeeper
参考:
[1]Elastic-Job原理分析(version:2.1.4) - 快鸟 - 博客园
[2]elastic-job-lite入门以及架构原理分析 - 云+社区 - 腾讯云
[3]Elastic-Job/Elastic-Job-Lite简介
[4]分布式定时任务调度平台Elastic-Job技术详解_慕课手记
[5]Elastic-Job-Lite实现原理