Elastic-job分布式任务调度框架

本文将从以下几个方面展开介绍elastic-job

简介

Elastic-job是由当当网架构师张亮,曹昊和江树建基于Zookepper、Quartz开发并开源的一个Java分布式定时任务,由两个相互独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成。

Elastic-job-lite:
定位为轻量级无中心化解决方案,使用jar包的形式提供分布式任务的协调服务。
仅关注点分布式调度、协调以及分片等核心功能

Elastic-Job-Cloud :
提供一体化私有云服务,将分布式调度、作业部署、资源分配、监控、日志处理等提供完善的解决方案。
功能更加完善,但使用复杂度较高。使用Mesos + Docker(TBD)的解决方案,额外提供资源治理、应用分发以及进程隔离等服务

功能特点:

功能 Elastic-job-lite Elastic-Job-Cloud
分布式调度协调
弹性扩容缩容
失效转移
错过执行作业重触发
作业分片一致性,保证同一分片在分布式环境中仅一个执行实例
支持并行调度
支持作业生命周期操作
丰富的作业类型
Spring整合以及命名空间提供
运维平台
自诊断并修复分布式不稳定造成的问题
应用自动分发
基于Fenzo的弹性资源分配
基于Docker的进程隔离(TBD)

开发指南

基于目前流行的springboot框架来讲解如何在springboot中集成elastic-job,所以在进行下面的操作之前我们先得创建一个springboot项目,至于如何创建springboot项目这里就不做介绍了,访问这个网站:https://start.spring.io/

  1. 引入maven依赖

    <dependency> 
        <groupId>com.dangdang</groupId> 
        <artifactId>elastic-job-lite-core</artifactId> 
        <version>2.1.5</version> 
    </dependency>
    
  2. 作业开发

    Elastic-Job提供Simple、Dataflow和Script 3种作业类型。 方法参数shardingContext包含作业配置、片和运行时信息。可通过getShardingTotalCount(), getShardingItem()等方法分别获取分片总数,运行在本作业服务器的分片序列号等。

    2.1.Simple类型作业

    意为简单实现,未经任何封装的类型。需实现SimpleJob接口。该接口仅提供单一方法用于覆盖,此方法将定时执行。与Quartz原生接口相似,但提供了弹性扩缩容和分片等功能。

    public class CourseSimpleJob implements SimpleJob {
        
        @Override
        public void execute(ShardingContext context) {
            switch (context.getShardingItem()) {
                case 0: 
                    // do something by sharding item 0
                    break;
                case 1: 
                    // do something by sharding item 1
                    break;
                case 2: 
                    // do something by sharding item 2
                    break;
                // case n: ...
            }
        }
    }
    

    这里的context.getShardingItem()为当前分片序号,最大值为分片总数减1(n-1)。例如:分片总数为10,那么context.getShardingItem()的最大值则为9。

    2.2.Dataflow类型作业

    Dataflow类型用于处理数据流,需实现DataflowJob接口。该接口提供2个方法可供覆盖,分别用于抓取(fetchData)和处理(processData)数据。

    public class SpringDataflowJob implements DataflowJob<Foo> {
    
        @Override
        public List<Foo> fetchData(ShardingContext context) {
            switch (context.getShardingItem()) {
                case 0: 
                    List<Foo> data = // get data from database by sharding item 0
                    return data;
                case 1: 
                    List<Foo> data = // get data from database by sharding item 1
                    return data;
                case 2: 
                    List<Foo> data = // get data from database by sharding item 2
                    return data;
                // case n: ...
            }
        }
        
        @Override
        public void processData(ShardingContext shardingContext, List<Foo> data) {
            // process data
            // ...
        }
    }
    

    流式处理

    可通过DataflowJobConfiguration配置是否流式处理。

    流式处理数据只有fetchData方法的返回值为null或集合长度为空时,作业才停止抓取,否则作业将一直运行下去; 非流式处理数据则只会在每次作业执行过程中执行一次fetchData方法和processData方法,随即完成本次作业。

    如果采用流式作业处理方式,建议processData处理数据后更新其状态,避免fetchData再次抓取到,从而使得作业永不停止。 流式数据处理参照TbSchedule设计,适用于不间歇的数据处理。

    2.3.Script类型作业

    Script类型作业意为脚本类型作业,支持shell,python,perl等所有类型脚本。只需通过控制台或代码配置scriptCommandLine即可,无需编码。执行脚本路径可包含参数,参数传递完毕后,作业框架会自动追加最后一个参数为作业运行时信息。

    #!/bin/bash
    echo sharding execution context is $*
    
  3. 作业配置

    3.1.zookeeper配置

    新建RegistryCenterConfig类

    @Configuration
    @ConditionalOnExpression("'${regCenter.serverList}'.length() > 0")
    public class RegistryCenterConfig {
    
        private final Logger log = LoggerFactory.getLogger(RegistryCenterConfig.class);
    
        @Bean(initMethod = "init")
        public ZookeeperRegistryCenter regCenter(@Value("${regCenter.serverList}") final String serverList, @Value("${regCenter.namespace}") final String namespace) {
            log.info("---------regCenter.serverList: {}---------", serverList);
            return new ZookeeperRegistryCenter(new ZookeeperConfiguration(serverList, namespace));
        }
    }
    

    在配置文件application.yml中添加这两个属性

    regCenter:
        serverList: 127.0.0.1:2181 #zookeeper地址
        namespace: elastic-job-lite-springboot #命名空间
    

    3.2.配置事件追踪

    关于事件追踪这里只做代码展示,具体详细请参考:http://elasticjob.io/docs/elastic-job-lite/02-guide/event-trace/

    新建JobEventConfig类:

    @Configuration
    public class JobEventConfig {
    
        @Resource
        private DataSource dataSource;
    
        @Bean
        public JobEventConfiguration jobEventConfiguration() {
            return new JobEventRdbConfiguration(dataSource);
        }
    }
    

    在配置文件application.yml中添加数据源相关的配置

    spring:
        datasource:
            url: jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf-8&verifyServerCertificate=false&useSSL=false&requireSSL=false
            driver-class-name: com.mysql.jdbc.Driver
            username: root
            password: 123456
            tomcat:
                max-wait: 10000
                min-idle: 0
                initial-size: 25
                validation-query: SELECT 1
                test-on-borrow: false
                test-while-idle: true
                time-between-eviction-runs-millis: 18800
                remove-abandoned: true
                remove-abandoned-timeout: 180
    

    这里是在demo中这么配置数据源,一般在现成的项目中都会有相应的数据源配置,无需更改,只需直接将DataSource注入JobEventConfig中即可。

    官方配置

    这里我们先看看官方给的通用配置:

    // 定义作业核心配置
    JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder("demoSimpleJob", "0/15 * * * * ?", 10).build();
    // 定义SIMPLE类型配置
    SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig, SimpleDemoJob.class.getCanonicalName());
    // 定义Lite作业根配置
    JobRootConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).build();
    
    // 定义作业核心配置
    JobCoreConfiguration dataflowCoreConfig = JobCoreConfiguration.newBuilder("demoDataflowJob", "0/30 * * * * ?", 10).build();
    // 定义DATAFLOW类型配置
    DataflowJobConfiguration dataflowJobConfig = new DataflowJobConfiguration(dataflowCoreConfig, DataflowDemoJob.class.getCanonicalName(), true);
    // 定义Lite作业根配置
    JobRootConfiguration dataflowJobRootConfig = LiteJobConfiguration.newBuilder(dataflowJobConfig).build();
    
    // 定义作业核心配置配置
    JobCoreConfiguration scriptCoreConfig = JobCoreConfiguration.newBuilder("demoScriptJob", "0/45 * * * * ?", 10).build();
    // 定义SCRIPT类型配置
    ScriptJobConfiguration scriptJobConfig = new ScriptJobConfiguration(scriptCoreConfig, "test.sh");
    // 定义Lite作业根配置
    JobRootConfiguration scriptJobRootConfig = LiteJobConfiguration.newBuilder(scriptCoreConfig).build();
    

    这里官方只是告诉你他的api是这么用的,但是实际工作中我们肯定不能生搬硬套,不然每一个job都要写一个配置,那多麻烦,有时候还会忘记。因为在实际工作中肯定是有很多个job,其实我们完全可以写一个通用的配置类,然后将job配置的相关参数提取到配置文件中或者直接在job类中添加自定义注解,然后在注解中添加相关配置,最后在配置类中去扫描这些job的相关配置并遍历它生成对应的job配置实例,这样作业的配置代码只需要编写一次即可。

    3.3.Simple类型作业配置

    这里以Simple类型作业为例来简单的编写一个Simple类型通用的作业配置

    @Configuration
    public class SimpleJobConfig {
        private static final String      JOB_TYPE                 = "simpleJob";
        public static final  String      CRON                     = "cron";
        public static final  String      SHARDING_TOTAL_COUNT     = "shardingTotalCount";
        public static final  String      SHARDING_ITEM_PARAMETERS = "shardingItemParameters";
        private final        Environment env;
    
        @Resource
        private ZookeeperRegistryCenter regCenter;
    
        @Resource
        private JobEventConfiguration jobEventConfiguration;
    
        @Autowired
        public SimpleJobConfig(Environment env) {
            this.env = env;
        }
    
        @Bean
        public List<JobScheduler> simpleJobScheduler() {
            List<SimpleJob>    simpleJobs    = schedulers();
            List<JobScheduler> jobSchedulers = new ArrayList<>();
            Joiner             joiner        = Joiner.on(".");
            JobScheduler       scheduler;
            for (SimpleJob job : simpleJobs) {
                String propertiesPrefix       = joiner.join(JOB_TYPE, job.getClass().getCanonicalName());
                String cron                   = env.getProperty(joiner.join(propertiesPrefix, CRON));
                int    shardingTotalCount     = Integer.valueOf(env.getProperty(joiner.join(propertiesPrefix, SHARDING_TOTAL_COUNT)));
                String shardingItemParameters = env.getProperty(joiner.join(propertiesPrefix, SHARDING_ITEM_PARAMETERS));
    
                if (shardingTotalCount > 0
                        && StringUtils.isNoneBlank(cron)
                        && StringUtils.isNoneBlank(shardingItemParameters)
                ) {
                    scheduler = new SpringJobScheduler(job, regCenter, getLiteJobConfiguration(job.getClass(), cron, shardingTotalCount, shardingItemParameters), jobEventConfiguration);
                    scheduler.init();
                    jobSchedulers.add(scheduler);
                }
            }
            return jobSchedulers;
        }
    
        private LiteJobConfiguration getLiteJobConfiguration(final Class<? extends SimpleJob> jobClass, final String cron, final int shardingTotalCount, final String shardingItemParameters) {
            return LiteJobConfiguration.newBuilder(new SimpleJobConfiguration(JobCoreConfiguration.newBuilder(
                    jobClass.getName(), cron, shardingTotalCount).shardingItemParameters(shardingItemParameters).build(), jobClass.getCanonicalName())).overwrite(true).build();
        }
    
        private List<SimpleJob> schedulers() {
            List<SimpleJob> jobList = new ArrayList<>(ApplicationContextProvider.getBeansOfType(SimpleJob.class).values());
            jobList.forEach(job -> System.out.println(job.getClass().getCanonicalName()));
            return jobList;
        }
    }
    

    在application.yml配置文件中添加以下配置:

    simpleJob:
        com.dangdang.ddframe.job.example.job.simple: #job的包名
            CourseSimpleJob: #job的类名
                cron: 0/30 * * * * ?
                shardingTotalCount: 1
                shardingItemParameters: 0=Beijing,1=Shanghai,2=Guangzhou
            SpringSimpleJob: #job的类名
                cron: 0/25 * * * * ?
                shardingTotalCount: 3
                shardingItemParameters: 0=Beijing,1=Shanghai,2=Guangzhou
    

    很显然这里的做法是通过将job的相关配置参数集中到一个文件里面,好处是配置参数可以统一起来集中管理。

    3.4.Dataflow类型作业配置

    @Configuration
    public class DataflowJobConfig {
        
        @Resource
        private ZookeeperRegistryCenter regCenter;
        
        @Resource
        private JobEventConfiguration jobEventConfiguration;
        
        @Bean
        public DataflowJob dataflowJob() {
            return new SpringDataflowJob(); 
        }
        
        @Bean(initMethod = "init")
        public JobScheduler dataflowJobScheduler(final DataflowJob dataflowJob, @Value("${dataflowJob.cron}") final String cron, @Value("${dataflowJob.shardingTotalCount}") final int shardingTotalCount,
                                            @Value("${dataflowJob.shardingItemParameters}") final String shardingItemParameters) {
            return new SpringJobScheduler(dataflowJob, regCenter, getLiteJobConfiguration(dataflowJob.getClass(), cron, shardingTotalCount, shardingItemParameters), jobEventConfiguration);
        }
        
        private LiteJobConfiguration getLiteJobConfiguration(final Class<? extends DataflowJob> jobClass, final String cron, final int shardingTotalCount, final String shardingItemParameters) {
            return LiteJobConfiguration.newBuilder(new DataflowJobConfiguration(JobCoreConfiguration.newBuilder(
                    jobClass.getName(), cron, shardingTotalCount).shardingItemParameters(shardingItemParameters).build(), jobClass.getCanonicalName(), true)).overwrite(true).build();
        }
    }
    

    这里在dataflowJobScheduler方法上的@Bean注解中有一个initMethod = "init",这表示在创建JobScheduler的实例之后会调用JobSchedulerinit方法。而在前面2.3.3.Simple类型作业配置中我们是直接显示的调用init方法的。

    3.5.Script类型作业配置

    脚本类型的作业配置这里就不做展开了,直接参考上面的官方配置即可,或者根据上面的经验进行简单的修改。

  4. 作业运行

    正常地启动springboot项目即可,作业会根据cron参数的配置自动执行。

部署

正常部署应用即可

  1. 启动Elastic-Job-Lite指定注册中心的Zookeeper。
  2. 运行包含Elastic-Job-Lite和业务代码的jar文件。不限与jar或war的启动方式。

运维平台(可选)

  1. 解压缩elastic-job-lite-console-${version}.tar.gz并执行bin\start.sh。

  2. 打开浏览器访问http://localhost:8899/即可访问控制台。8899为默认端口号,可通过启动脚本输入-p自定义端口号。

    elastic-job-lite-console-${version}.tar.gz可通过mvn install编译获取。

最后附上官方文档大全地址:http://elasticjob.io/docs/elastic-job-lite/00-overview/

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,839评论 6 482
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,543评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 153,116评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,371评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,384评论 5 374
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,111评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,416评论 3 400
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,053评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,558评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,007评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,117评论 1 334
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,756评论 4 324
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,324评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,315评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,539评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,578评论 2 355
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,877评论 2 345

推荐阅读更多精彩内容