SpringBoot整合ElasticJob

介绍

Elastic-Job是一个分布式调度解决方案,由两个相互独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成。

Elastic-Job-Lite定位为轻量级无中心化解决方案,使用jar包的形式提供分布式任务的协调服务。

  • 分布式调度协调
  • 弹性扩容缩容
  • 失效转移
  • 错过执行作业重触发
  • 作业分片一致性,保证同一分片在分布式环境中仅一个执行实例
  • 自诊断并修复分布式不稳定造成的问题
  • 支持并行调度
  • 支持作业生命周期操作
  • 丰富的作业类型
  • Spring整合以及命名空间提供
  • 运维平台

可以参考文档
https://github.com/elasticjob/elastic-job-lite

整体架构图

m_e6fc4173ea47b5cba5a0b3141d5c4220_r.png

快速开始

1.添加 mavne依赖

 <dependency>
            <groupId>com.dangdang</groupId>
            <artifactId>elastic-job-lite-core</artifactId>
            <version>2.1.5</version>
        </dependency>
        <dependency>
            <groupId>com.dangdang</groupId>
            <artifactId>elastic-job-lite-spring</artifactId>
            <version>2.1.5</version>
 </dependency>

2.初始化zookeeper注册中心


import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
@ConditionalOnExpression("'${elastic-job.server.list}'.length() > 0")
public class ElasticJobCenterConfig {
    @Bean(initMethod = "init")
    public ZookeeperRegistryCenter regCenter(@Value("${elastic-job.server.list}") final String serverList,
                                             @Value("${elastic-job.server.namespace}") final String namespace) {
        return new ZookeeperRegistryCenter(new ZookeeperConfiguration(serverList, namespace));
    }
}

3.添加 定时任务执行程序

import com.chinaway.fms.constant.RedisKeyConstant;
import com.chinaway.fms.dbsafety.model.AuthorOrg2team;
import com.chinaway.fms.dbsafety.model.ClientTruck;
import com.chinaway.fms.service.AuthorOrg2teamService;
import com.chinaway.fms.service.ClientTruckService;
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;


@Slf4j
@Component
public class XxxxJob implements SimpleJob {

    @Override
    public void execute(ShardingContext shardingContext) {
        if(shardingContext.getShardingParameter().equals("1")){
            return;
        }
        log.info("ClientTruckJob execute.");
        yourBusiness();
    }


}

4.配置XxxxJob的执行策略


import com.chinaway.fms.job.XxxxJob;
import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
import com.dangdang.ddframe.job.lite.api.JobScheduler;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.lite.spring.api.SpringJobScheduler;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class ElasticJobConfig {

    @Value("${elastic-job.client.sharding-count}")
    Integer shardingTotalCount;//  分片数量

    //分片执行规则,与 shardingTotalCount配合使用,有几个shardingTotalCount ,就写几个等值,
    // A= B , A 是分片数值, B是机器编号。
    // 比如,你有两台机器A和B,配置四个分片0/1/2/3,设置0和1分片的数据在A机器上执行,2和3分片的数据在B机器上执行。那么 shardingTotalCount = 4,shardingItemParameters = "0=A,1=A,2=B,3=B".
    //再比如,有两台机器A和B,配置1个分片0,设置0分片的数据再A机器上执行,那么shardingTotalCount=1,shardingItemParameters="0=A".
    private final String shardingItemParameters = "0=0,1=1";
    private final String jobParameters = null;


    @Autowired
    private ZookeeperRegistryCenter regCenter;

    @Bean(initMethod = "init")
    public JobScheduler createXxxxJobShedule(final ClientTruckJob job) {
        final String jobName = "[your_business_name]XxxxJob";
        final String cron = "0 */1 * * * ?";//每分钟执行一次,cron表达式也可以放在yml中配置。
        LiteJobConfiguration jobConfiguration = createJobConfiguration(XxxxJob.class, jobName,
                cron, shardingTotalCount, shardingItemParameters, jobParameters);
        return new SpringJobScheduler(job, regCenter, jobConfiguration, new FmsElasticJobListener());
    }
     ///每个bean代表了一个定时任务执行策略的添加
     @Bean(initMethod = "init")
      public JobScheduler createBJobShedule(final BJob job) {
         ...
      }



    public LiteJobConfiguration createJobConfiguration(final Class<?> jobClass,
                                                       final String jobName,
                                                       final String cron,
                                                       final int shardingTotalCount,
                                                       final String shardingItemParameters,
                                                       final String jobParameters) {
        // 定义作业核心配置
        JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder(jobName, cron, shardingTotalCount).
                shardingItemParameters(shardingItemParameters).jobParameter(jobParameters).build();
        // 定义SIMPLE类型配置
        SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig, jobClass.getCanonicalName());
        // 定义Lite作业根配置
        LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).overwrite(true).build();
        return simpleJobRootConfig;
    }

}

5.添加监听器监听任务执行时间或者完善日志


import com.dangdang.ddframe.job.executor.ShardingContexts;
import com.dangdang.ddframe.job.lite.api.listener.ElasticJobListener;
import com.g7.fms.base.mdc.MdcConst;
import com.g7.fms.base.mdc.MdcHelper;
import lombok.extern.slf4j.Slf4j;

import java.util.UUID;

@Slf4j
public class FmsElasticJobListener implements ElasticJobListener {

    private long beginTime = 0;

    @Override
    public void beforeJobExecuted(ShardingContexts shardingContexts) {
        MdcHelper.put(MdcConst.Log.LOG_TRACE_ID, UUID.randomUUID().toString().replace("-", ""));
        beginTime = System.currentTimeMillis();
        log.info("before job @{}", shardingContexts.getJobName());
    }

    @Override
    public void afterJobExecuted(ShardingContexts shardingContexts) {
        log.info("end job @{},cost:{}ms", shardingContexts.getJobName(), System.currentTimeMillis() - beginTime);
    }
}

6.yml配置文件中添加相关配置

elastic-job:
  server:
    namespace: elastic-job-develop-env-xxx
    list: your_zookeeper_ip:zk_port,  your_zookeeper_ip2:zk_port ...
  test-task:
    crontab: 0 0 */1 * * ?
  client:
    sharding-count: 1

至此 就 集成完毕。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,921评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,635评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,393评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,836评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,833评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,685评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,043评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,694评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,671评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,670评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,779评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,424评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,027评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,984评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,214评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,108评论 2 351
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,517评论 2 343

推荐阅读更多精彩内容