leaderLatch 选主

创建临时有序节点,并进行排序,获取当前节点在排序中的下标index。
如果index小于0,表示节点不在排序列表中,则进行重启
如果index等于0,表示当前节点为主节点,则调用LeaderLatchListener.isLeader()
如果index大于0,表示当前节点不是主节点,则监控前一个节点的删除事件,并调用LeaderLatchListener.notLeader()方法。当前监控到前一个节点的删除事件时,重新排序,比较,获取index,此时index为0,则为新master


实例

public class ZkSchedulerFactoryBean extends SchedulerFactoryBean {

    private static final Logger LOGGER = LoggerFactory.getLogger(ZkSchedulerFactoryBean.class);

    private LeaderLatch leaderLatch;
    @Autowired
    private ZkClient zkClient;

    private static final String LEAD_PATH = "/lead";

    public ZkSchedulerFactoryBean() throws Exception {

        this.setAutoStartup(false);
        //初始化LeaderLatch
        leaderLatch = new LeaderLatch(zkClient.getClient(),LEAD_PATH,zkClient.getIp());
        leaderLatch.addListener(new MyLeaderLatchListener(this,zkClient.getIp()));
        leaderLatch.start();
    }

    @Override
    public void startScheduler(final Scheduler scheduler, final int startupDelay) throws SchedulerException {

        if (this.isAutoStartup()) {
            super.startScheduler(scheduler, startupDelay);
        }

    }

    @Override
    public void destroy() throws SchedulerException {
        CloseableUtils.closeQuietly(leaderLatch);
        super.destroy();
    }

    class MyLeaderLatchListener implements LeaderLatchListener{

        private SchedulerFactoryBean schedulerFactoryBean;

        private String ip;

        public MyLeaderLatchListener(SchedulerFactoryBean schedulerFactoryBean, String ip) {
            this.schedulerFactoryBean = schedulerFactoryBean;
            this.ip = ip;
        }

        @Override
        public void isLeader() {
            LOGGER.info("ip:{},成为leader,执行Scheduler~!");
            schedulerFactoryBean.setAutoStartup(true);
            schedulerFactoryBean.start();
        }

        @Override
        public void notLeader() {
            LOGGER.info("ip:{},不是leader,停止Scheduler~!");
            schedulerFactoryBean.setAutoStartup(false);
            schedulerFactoryBean.stop();
        }
    }
}


leaderLatch 分析

public void start() throws Exception
    {
         //通过AtomicReference原子操作 判断是否已经启动
        Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
        //AfterConnectionEstablished.execute开启线程,执行run方法
        startTask.set(AfterConnectionEstablished.execute(client, new Runnable()
        {
            @Override
            public void run()
            {
                try
                {
                    //
                    internalStart();
                }
                finally
                {
                    startTask.set(null);
                }
            }
        }));
    }

 private synchronized void internalStart()
    {
        if ( state.get() == State.STARTED )
        {
            //添加状态变化监听器,此处会通过调用setLeadership方法,调用isleader(),notleader()
            //将listener交给 connectionStateManager管理
            client.getConnectionStateListenable().addListener(listener);
            try
            {
                reset();
            }
            catch ( Exception e )
            {
                ThreadUtils.checkInterrupted(e);
                log.error("An error occurred checking resetting leadership.", e);
            }
        }
    }
//internalStart中client添加的监听器
private final ConnectionStateListener listener = new ConnectionStateListener()
    {
        @Override
        public void stateChanged(CuratorFramework client, ConnectionState newState)
        {
            //状态变化处理
            handleStateChange(newState);
        }
    };

private void handleStateChange(ConnectionState newState)
    {
        switch ( newState )
        {
            default:
            {
                // NOP
                break;
            }

            case RECONNECTED:
            {
                try
                {    //重连时,不是主节点则重启
                    //hasLeadership(AtomicBoolean  类型)  保存是否是 主节点
                    if ( client.getConnectionStateErrorPolicy().isErrorState(ConnectionState.SUSPENDED) || !hasLeadership.get() )
                    {
                        reset();
                    }
                }
                catch ( Exception e )
                {
                    ThreadUtils.checkInterrupted(e);
                    log.error("Could not reset leader latch", e);
                    setLeadership(false);
                }
                break;
            }

            case SUSPENDED:
            {  //连接暂停,则设置 主节点false
                if ( client.getConnectionStateErrorPolicy().isErrorState(ConnectionState.SUSPENDED) )
                {
                    setLeadership(false);
                }
                break;
            }

            case LOST:
            {  //连接丢失,设置主节点false
                setLeadership(false);
                break;
            }
        }
    }

private synchronized void setLeadership(boolean newValue)
    {  //获取旧状态,设置新状态
        boolean oldValue = hasLeadership.getAndSet(newValue);

        if ( oldValue && !newValue )
        { // 之前是master,现在不是master 则调用我们 leaderLatch.addListener(new MyLeaderLatchListener(this,zkClient.getIp()));传入的MyLeaderLatchListener的notLeader()方法
            listeners.forEach(new Function<LeaderLatchListener, Void>()
                {
                    @Override
                    public Void apply(LeaderLatchListener listener)
                    {
                        listener.notLeader();
                        return null;
                    }
                });
        }
        else if ( !oldValue && newValue )
        { // 之前不是master 现在是master,则调用isLeader()方法
            listeners.forEach(new Function<LeaderLatchListener, Void>()
                {
                    @Override
                    public Void apply(LeaderLatchListener input)
                    {
                        input.isLeader();
                        return null;
                    }
                });
        }
        notifyAll();
    }

@VisibleForTesting
    void reset() throws Exception
    {  //设置当前不是master
        setLeadership(false);
        //重启时,先删除老节点
        setNode(null);
        //在forPath方法里,后台执行该任务
        BackgroundCallback callback = new BackgroundCallback()
        {
            @Override
            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
            {
                if ( debugResetWaitLatch != null )
                {
                    debugResetWaitLatch.await();
                    debugResetWaitLatch = null;
                }

                if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
                {
                    setNode(event.getName());
                    if ( state.get() == State.CLOSED )
                    {
                        setNode(null);
                    }
                    else
                    {
                        getChildren();
                    }
                }
                else
                {
                    log.error("getChildren() failed. rc = " + event.getResultCode());
                }
            }
        };
//创建一个临时有序的节点   
     client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).inBackground(callback).forPath(ZKPaths.makePath(latchPath, LOCK_NAME), LeaderSelector.getIdBytes(id));
    }

//将老路径删除
private void setNode(String newValue) throws Exception
    {
        String oldPath = ourPath.getAndSet(newValue);
        if ( oldPath != null )
        {
            client.delete().guaranteed().inBackground().forPath(oldPath);
        }
    }

private void getChildren() throws Exception
    {
        BackgroundCallback callback = new BackgroundCallback()
        {
            @Override
            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
            {
                if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
                {
                    checkLeadership(event.getChildren());
                }
            }
        };
        //查看子节点,执行callback
        client.getChildren().inBackground(callback).forPath(ZKPaths.makePath(latchPath, null));
    }

 //校验当前节点是不是主节点
private void checkLeadership(List<String> children) throws Exception
    {
        if ( debugCheckLeaderShipLatch != null )
        {
            debugCheckLeaderShipLatch.await();
        }
        //从ourPath获取当前节点
        final String localOurPath = ourPath.get();
        //排序所有节点
        List<String> sortedChildren = LockInternals.getSortedChildren(LOCK_NAME, sorter, children);
//获取当前节点的下标       
 int ourIndex = (localOurPath != null) ? sortedChildren.indexOf(ZKPaths.getNodeFromPath(localOurPath)) : -1;
        if ( ourIndex < 0 )
        {  //下标小于0,过去节点,重启
            log.error("Can't find our node. Resetting. Index: " + ourIndex);
            reset();
        }
        else if ( ourIndex == 0 )
        {  //下标为0 表示当前节点为主节点
            setLeadership(true);
        }
        else
        {  //监控前一个节点的删除事件
            String watchPath = sortedChildren.get(ourIndex - 1);
            Watcher watcher = new Watcher()
            {
                @Override
                public void process(WatchedEvent event)
                {
                    if ( (state.get() == State.STARTED) && (event.getType() == Event.EventType.NodeDeleted) && (localOurPath != null) )
                    {
                        try
                        {
                            getChildren();
                        }
                        catch ( Exception ex )
                        {
                            ThreadUtils.checkInterrupted(ex);
                            log.error("An error occurred checking the leadership.", ex);
                        }
                    }
                }
            };

            BackgroundCallback callback = new BackgroundCallback()
            {
                @Override
                public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
                {
                    if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() )
                    {
                        // 过期节点,重启
                        reset();
                    }
                }
            };
//监控当前节点的前一个节点,如果监控到前一个节点删除事件,则重新调用getChildren(),判断当前节点是否为主节点。
client.getData().usingWatcher(watcher).inBackground(callback).forPath(ZKPaths.makePath(latchPath, watchPath));
        }
    }
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,732评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,496评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,264评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,807评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,806评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,675评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,029评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,683评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,704评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,666评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,773评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,413评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,016评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,978评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,204评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,083评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,503评论 2 343

推荐阅读更多精彩内容