创建临时有序节点,并进行排序,获取当前节点在排序中的下标index。
如果index小于0,表示节点不在排序列表中,则进行重启
如果index等于0,表示当前节点为主节点,则调用LeaderLatchListener.isLeader()
如果index大于0,表示当前节点不是主节点,则监控前一个节点的删除事件,并调用LeaderLatchListener.notLeader()方法。当前监控到前一个节点的删除事件时,重新排序,比较,获取index,此时index为0,则为新master
实例
public class ZkSchedulerFactoryBean extends SchedulerFactoryBean {
private static final Logger LOGGER = LoggerFactory.getLogger(ZkSchedulerFactoryBean.class);
private LeaderLatch leaderLatch;
@Autowired
private ZkClient zkClient;
private static final String LEAD_PATH = "/lead";
public ZkSchedulerFactoryBean() throws Exception {
this.setAutoStartup(false);
//初始化LeaderLatch
leaderLatch = new LeaderLatch(zkClient.getClient(),LEAD_PATH,zkClient.getIp());
leaderLatch.addListener(new MyLeaderLatchListener(this,zkClient.getIp()));
leaderLatch.start();
}
@Override
public void startScheduler(final Scheduler scheduler, final int startupDelay) throws SchedulerException {
if (this.isAutoStartup()) {
super.startScheduler(scheduler, startupDelay);
}
}
@Override
public void destroy() throws SchedulerException {
CloseableUtils.closeQuietly(leaderLatch);
super.destroy();
}
class MyLeaderLatchListener implements LeaderLatchListener{
private SchedulerFactoryBean schedulerFactoryBean;
private String ip;
public MyLeaderLatchListener(SchedulerFactoryBean schedulerFactoryBean, String ip) {
this.schedulerFactoryBean = schedulerFactoryBean;
this.ip = ip;
}
@Override
public void isLeader() {
LOGGER.info("ip:{},成为leader,执行Scheduler~!");
schedulerFactoryBean.setAutoStartup(true);
schedulerFactoryBean.start();
}
@Override
public void notLeader() {
LOGGER.info("ip:{},不是leader,停止Scheduler~!");
schedulerFactoryBean.setAutoStartup(false);
schedulerFactoryBean.stop();
}
}
}
leaderLatch 分析
public void start() throws Exception
{
//通过AtomicReference原子操作 判断是否已经启动
Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
//AfterConnectionEstablished.execute开启线程,执行run方法
startTask.set(AfterConnectionEstablished.execute(client, new Runnable()
{
@Override
public void run()
{
try
{
//
internalStart();
}
finally
{
startTask.set(null);
}
}
}));
}
private synchronized void internalStart()
{
if ( state.get() == State.STARTED )
{
//添加状态变化监听器,此处会通过调用setLeadership方法,调用isleader(),notleader()
//将listener交给 connectionStateManager管理
client.getConnectionStateListenable().addListener(listener);
try
{
reset();
}
catch ( Exception e )
{
ThreadUtils.checkInterrupted(e);
log.error("An error occurred checking resetting leadership.", e);
}
}
}
//internalStart中client添加的监听器
private final ConnectionStateListener listener = new ConnectionStateListener()
{
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState)
{
//状态变化处理
handleStateChange(newState);
}
};
private void handleStateChange(ConnectionState newState)
{
switch ( newState )
{
default:
{
// NOP
break;
}
case RECONNECTED:
{
try
{ //重连时,不是主节点则重启
//hasLeadership(AtomicBoolean 类型) 保存是否是 主节点
if ( client.getConnectionStateErrorPolicy().isErrorState(ConnectionState.SUSPENDED) || !hasLeadership.get() )
{
reset();
}
}
catch ( Exception e )
{
ThreadUtils.checkInterrupted(e);
log.error("Could not reset leader latch", e);
setLeadership(false);
}
break;
}
case SUSPENDED:
{ //连接暂停,则设置 主节点false
if ( client.getConnectionStateErrorPolicy().isErrorState(ConnectionState.SUSPENDED) )
{
setLeadership(false);
}
break;
}
case LOST:
{ //连接丢失,设置主节点false
setLeadership(false);
break;
}
}
}
private synchronized void setLeadership(boolean newValue)
{ //获取旧状态,设置新状态
boolean oldValue = hasLeadership.getAndSet(newValue);
if ( oldValue && !newValue )
{ // 之前是master,现在不是master 则调用我们 leaderLatch.addListener(new MyLeaderLatchListener(this,zkClient.getIp()));传入的MyLeaderLatchListener的notLeader()方法
listeners.forEach(new Function<LeaderLatchListener, Void>()
{
@Override
public Void apply(LeaderLatchListener listener)
{
listener.notLeader();
return null;
}
});
}
else if ( !oldValue && newValue )
{ // 之前不是master 现在是master,则调用isLeader()方法
listeners.forEach(new Function<LeaderLatchListener, Void>()
{
@Override
public Void apply(LeaderLatchListener input)
{
input.isLeader();
return null;
}
});
}
notifyAll();
}
@VisibleForTesting
void reset() throws Exception
{ //设置当前不是master
setLeadership(false);
//重启时,先删除老节点
setNode(null);
//在forPath方法里,后台执行该任务
BackgroundCallback callback = new BackgroundCallback()
{
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
{
if ( debugResetWaitLatch != null )
{
debugResetWaitLatch.await();
debugResetWaitLatch = null;
}
if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
{
setNode(event.getName());
if ( state.get() == State.CLOSED )
{
setNode(null);
}
else
{
getChildren();
}
}
else
{
log.error("getChildren() failed. rc = " + event.getResultCode());
}
}
};
//创建一个临时有序的节点
client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).inBackground(callback).forPath(ZKPaths.makePath(latchPath, LOCK_NAME), LeaderSelector.getIdBytes(id));
}
//将老路径删除
private void setNode(String newValue) throws Exception
{
String oldPath = ourPath.getAndSet(newValue);
if ( oldPath != null )
{
client.delete().guaranteed().inBackground().forPath(oldPath);
}
}
private void getChildren() throws Exception
{
BackgroundCallback callback = new BackgroundCallback()
{
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
{
if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
{
checkLeadership(event.getChildren());
}
}
};
//查看子节点,执行callback
client.getChildren().inBackground(callback).forPath(ZKPaths.makePath(latchPath, null));
}
//校验当前节点是不是主节点
private void checkLeadership(List<String> children) throws Exception
{
if ( debugCheckLeaderShipLatch != null )
{
debugCheckLeaderShipLatch.await();
}
//从ourPath获取当前节点
final String localOurPath = ourPath.get();
//排序所有节点
List<String> sortedChildren = LockInternals.getSortedChildren(LOCK_NAME, sorter, children);
//获取当前节点的下标
int ourIndex = (localOurPath != null) ? sortedChildren.indexOf(ZKPaths.getNodeFromPath(localOurPath)) : -1;
if ( ourIndex < 0 )
{ //下标小于0,过去节点,重启
log.error("Can't find our node. Resetting. Index: " + ourIndex);
reset();
}
else if ( ourIndex == 0 )
{ //下标为0 表示当前节点为主节点
setLeadership(true);
}
else
{ //监控前一个节点的删除事件
String watchPath = sortedChildren.get(ourIndex - 1);
Watcher watcher = new Watcher()
{
@Override
public void process(WatchedEvent event)
{
if ( (state.get() == State.STARTED) && (event.getType() == Event.EventType.NodeDeleted) && (localOurPath != null) )
{
try
{
getChildren();
}
catch ( Exception ex )
{
ThreadUtils.checkInterrupted(ex);
log.error("An error occurred checking the leadership.", ex);
}
}
}
};
BackgroundCallback callback = new BackgroundCallback()
{
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
{
if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() )
{
// 过期节点,重启
reset();
}
}
};
//监控当前节点的前一个节点,如果监控到前一个节点删除事件,则重新调用getChildren(),判断当前节点是否为主节点。
client.getData().usingWatcher(watcher).inBackground(callback).forPath(ZKPaths.makePath(latchPath, watchPath));
}
}