Elastic-Job整理

1.1RegistryCenter

注册中心
<bean id="regCenter" class="com.dangdang.ddframe.reg.zookeeper.ZookeeperRegistryCenter" init-method="init">
        <constructor-arg>
            <bean class="com.dangdang.ddframe.reg.zookeeper.ZookeeperConfiguration">
                <property name="serverLists" value="${elasticjob.zk.url}" />
                <property name="namespace" value="${elasticjob.namespace}" />
                <property name="baseSleepTimeMilliseconds" value="${elasticjob.baseSleepTimeMilliseconds}" />
                <property name="maxSleepTimeMilliseconds" value="${elasticjob.maxSleepTimeMilliseconds}" />
                <property name="maxRetries" value="${elasticjob.maxRetries}" />
            </bean>
        </constructor-arg>
    </bean>

ZookeeperRegistryCenter内部通过ZookeeperConfiguration的配置信息,使用curator来连接zookeeper服务器,并向外提供操作节点的方法。
注册中心的作用是让所有的job都注册到这里,然后统一管理job的配置信息,当前的运行节点信息,然后可以分片以及弹性扩容等。目前只支持zookeeper,未来会增加。
每一个Job节点的结构如下:


下面讲每一个service时会讲到对应的节点。

1.2Job

 <job:bean id="simpleTask" class="packageName.SimpleTask" regCenter="regCenter" cron="0 0 0 1 * ?" shardingTotalCount="1" overwrite="false" />

上面的配置是1.0.8及之前的版本,目前2.X版本的Spring标签以及改为simple等

ElasticJob通过自定义的标签来配置job,每一个job都会注入上面配置的注册中心。
具体使用的类如下:

SpringJobScheduler

每一个<job:bean>都会构造一个SpringJobScheduler类型的bean,然后构造JobScheduler,包括了如下的属性和方法:


至此,所有的SpringJobScheduler都已经被注册为bean。与此同时这个Scheduler对应的Task也被扫描装配为bean。

当SpringJobScheduler根据crontab表达式到达执行的时候,也是会通过SpringJobFactory来找到与自己对应的Task bean,这个Task bean也就是quartz的Job类型,然后通过quartz来执行。

1.3JobScheduler

看一下关键的JobScheduler的初始化

 public void init() {
        LiteJobConfiguration liteJobConfigFromRegCenter = schedulerFacade.updateJobConfiguration(liteJobConfig);
        JobRegistry.getInstance().setCurrentShardingTotalCount(liteJobConfigFromRegCenter.getJobName(), liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getShardingTotalCount());
        JobScheduleController jobScheduleController = new JobScheduleController(
                createScheduler(), createJobDetail(liteJobConfigFromRegCenter.getTypeConfig().getJobClass()), liteJobConfigFromRegCenter.getJobName());
        JobRegistry.getInstance().registerJob(liteJobConfigFromRegCenter.getJobName(), jobScheduleController, regCenter);
        schedulerFacade.registerStartUpInfo(!liteJobConfigFromRegCenter.isDisabled());
        jobScheduleController.scheduleJob(liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getCron());
    }

做了如下的事情:

  1. 到注册中心更新当前Job的配置信息。
  2. 在JobRegistry中注册该Job,包括了JobScheduleController以及Job相关连的RegCenter以及Job的分片信息。
  3. 注册Job启动信息
  4. 根据Cron表达式调度Job

值得一提的是,这里从JobScheduler到facade到facade中的所有service都是和每一个Job相关的,同时也都关联到了注册这个Job的注册中心。

1.4SchedulerFacade

SchedulerFacade是为调度类提供内部服务的门面类,包括如下属性


主要方法有更新Job的config信息,以及注册Job启动信息。

public LiteJobConfiguration updateJobConfiguration(final LiteJobConfiguration liteJobConfig) {
        configService.persist(liteJobConfig);
        return configService.load(false);
    }
public void registerStartUpInfo(final boolean enabled) {
        listenerManager.startAllListeners();
        leaderService.electLeader();
        serverService.persistOnline(enabled);
        instanceService.persistOnline();
        shardingService.setReshardingFlag();
        monitorService.listen();
        if (!reconcileService.isRunning()) {
            reconcileService.startAsync();
        }
    }

1.4.1ConfigService

节点 :config
位置:/namespace/jobName/config
LiteJobConfiguration类的json字符串。(具体的配置信息见这里

public void persist(final LiteJobConfiguration liteJobConfig) {
        checkConflictJob(liteJobConfig);
        if (!jobNodeStorage.isJobNodeExisted(ConfigurationNode.ROOT) || liteJobConfig.isOverwrite()) {
            jobNodeStorage.replaceJobNode(ConfigurationNode.ROOT, LiteJobConfigurationGsonFactory.toJson(liteJobConfig));
        }
    }

使用
持久化config信息的时候,要看config节点是否存在,以及是否可以覆盖,以防启动时配置的默认config覆盖了正在使用的配置信息。而在取出config信息的时候可以选择从本地缓存(Curator的TreeCache)中或者直接从注册中心中取出config值。(如果获取之前的配置的时候,可以从缓存获取,如要分片的时候判断当前是否还有正在执行的分片;而要获取最新分片的配置的时候,就要从注册中心取)
监听
RescheduleListenerManager注册一个监听config节点的TreeCacheListener,如果收到config节点内容更新的事件通知,然后就根据新的config节点内容重新调度Job。

1.4.2ListenerManager

统一注册上文以及后面各个节点相关的listener。

1.4.3LeaderService

节点:leader
位置

选举路径 /namespace/jobName/leader/election/latch
instance路径 /namespace/jobName/leader/election/instance


选举路径没有值,选举时存在临时的子节点;instance节点为当前leader的instanceId(ip@-@pid),这是一个临时节点,当断开连接以后节点会清除。
使用
使用curator的LeaderLatch来进行leader选举,成为leader后会创建instance节点的值写为自己的instanceId;对于leader节点的判断或者删除都是对instance节点的操作。

这里值得注意的是对LeaderLatch的使用,当一个节点被选为leader以后,他会创建instance节点并写为自己,然后退出LeaderLatch,那么剩下参与竞选的节点就会成为leader,但当新leader发现instance节点已经存在之后,他什么都不会做,然后退出LeaderLatch,接着所有剩下的节点会依次成为leader,同样什么都不会做,只有第一个成为leader的会创建instance节点。
所以Ejob中只有在竞选的时候,选举路径才会有临时节点,当leader选定之后,选举路径为空,instance节点存在。

监听
ElectionListenerManager会注册两个Listener

  1. LeaderAbdicationJobListener leader禅让监听
    监听server节点下的自己的ip,如果自己是leader,并且servers节点下的自己ip被disable,那么就删除leader/instance,然后下次调度会重新选举。
  2. LeaderElectionJobListener leader选举监听
    如果当前Job仍在调度中,监听到servers节点下自己的ip没有被disable,且当前没有leader,那么开始选举;
    或者监听到leader节点被移除,且自己是可用的服务器,那么开始选举。

1.4.4ServerService

节点:servers
位置: /namespace/jobName/servers/ip
:ip节点的值为空时表示可用,为disable时表示不可用(可以配置,用于统一开启调度)
使用
创建ip节点,ip节点是永久节点,每次开始运行时会更新节点里的值;
判断是否存在可用的服务器,包括ip节点没有被disable且有instance子节点。

1.4.5InstanceService

节点:instances
位置: /namespace/jobName/instances/instanceId
:空
使用
创建instance节点,临时节点;删除作业是移除节点。

1.4.6ShardingService

节点:sharding
位置: /namespace/jobName/sharding
使用:建立/leader/sharding/necessary子节点,表示这个Job需要重新分片;
AbstractElasticJobExecutor在每次执行Task前的准备工作中就包括了如下的分片流程。
分片流程

  1. 如果当前节点是leader节点,并且这个Job需要分片,等待这个Job所有正在RUNNING的分片执行完成(/sharding/index/running),然后进入下一步;如果不是leader,那么等待分片完成。
  2. 从中心获取最新分片配置,写/leader/sharding/processing临时节点,清空原来的/sharding/index/instance节点(包括多余的/sharding/index),写最新的/sharding/index节点;
  3. 根据配置的策略进行分片,获取JobInstance到分片List的映射
  4. 通过curator transaction来写/sharding/index/instance节点(值为instanceId),删除necessary和processing节点
    监听
    1.ShardingTotalCountChangedJobListener监听到config节点有shardingtotalCount配置修改,和之前不同的话,写necessary节点
  5. ListenServersChangedJobListener监听到instances子节点中有变动(删除或者新增),servers子节点中有新增\删除\更新,写necessary节点。

JobSchdulerController

由quartz的StdSchduler来根据cron调度,当开始执行的任务的时候,触发LiteJob->AbstractElasticJobExecutor来execute,其中会使用的jobFacade来在这个基类中执行
jobFacade.registerJobBegin(shardingContexts);
jobFacade.registerJobCompleted(shardingContexts);
这里处理RUNNING节点的新增以及remove

AbstractElasticJobExecutor

它代表了任务执行的基本流程,其中做的工作非常多,包括了上面提到的分片,leader来分片,非leader等待分片完成;然后无论是leader还是非leader都会获取分配给本实例的分片列表;
然后包装成本实例的shardingContexts,执行前注册listener。
如果没有获得分片,那么不执行;如果有分片,处理本实例相关的分片的/sharding/index/running临时节点;
当分片为1时,直接调用用户实现的Task开始执行;如果分片大于1,那么使用一个线程池开始所有分片的task执行,然后开启一个CountDownLatch开始等待所有分片完成。所有分片完成后会有一些清理工作。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,214评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,307评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,543评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,221评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,224评论 5 371
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,007评论 1 284
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,313评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,956评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,441评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,925评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,018评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,685评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,234评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,240评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,464评论 1 261
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,467评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,762评论 2 345

推荐阅读更多精彩内容