elastic job服务启动时会通过失效转移监听管理器io.elasticjob.lite.internal.failover.FailoverListenerManager
启动
实例下线监听器和失效转移设置改变监听器,它们会对满足的事件通知做失效转移处理。
public void start() {
addDataListener(new JobCrashedJobListener());
addDataListener(new FailoverSettingsChangedJobListener());
}
实例下线监听器
实例下线监听器io.elasticjob.lite.internal.failover.FailoverListenerManager.JobCrashedJobListener
会监听作业运行实例节点instances
的子节点的变化(NODE_REMOVED)。
// 满足该条件时后续逻辑才会被执行
isFailoverEnabled() && Type.NODE_REMOVED == eventType && instanceNode.isInstancePath(path)
作业运行实例子节点为临时节点,实例丢失(下线)时,临时节点会自动被清除。此时,其他实例可以接收到通知。
1、忽略处理自己的下线通知
if (jobInstanceId.equals(JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId())) {
return;
}
2、处理下线实例分配的失效转移分片
List<Integer> failoverItems = failoverService.getFailoverItems(jobInstanceId);
当监听器收到有实例下线时(可以出现意外,也可能主动下线),会获取下线实例已分配的失效转移分片项集合(可能接管了其他实例的失效转移分片),如果失效转移分片集合不为空,则在新增节点leader/failover/items/分片项
。
leader/failover/items/分片项
一旦有作业崩溃,则会向此节点记录。当有空闲作业服务器时,会从此节点抓取需失效转移的作业项
3、处理下线实例分配的分片项
for (int each : shardingService.getShardingItems(jobInstanceId)) {
failoverService.setCrashedFailoverFlag(each);
failoverService.failoverIfNecessary();
}
如果下线的运行实例未接管失效转移分片,则获取该作业实例的运行分片项。针对每一个分片项新增节点leader/failover/items/分片项
。
上述每一个分片项在新增表示需要失效转移的节点时,当前实例会立即发起选举,确定由谁(存活的实例)立即执行该分片。
leader/failover/items/latch 分配失效转移分片项时占用的分布式锁,为curator的分布式锁使用。
选举结束后,获胜的实例会在节点sharding/分片项/failover
中设置值未当前实例ID,然后删除leader/failover/items/分片项
(表示已有实例负责该失效转移分片,不再需要继续处理),并立即执行该分片的任务调度。
失效转移设置改变监听器
zk任务配置修改为非失效转移时,执行删除作业失效转移信息。即删除节点sharding/分片项/failover
(所有分片项下的子节点failover均会被删除).
failoverService.removeFailoverInfo();