【Zookeeper】 Server的启动流程

一:前言

当服务通过选举算法进行选举完之后,各个服务器就需要设置自己的角色,并启动相对应的服务(也就是服务的初始化),之后就等待客户端的请求,处理响应的请求。

二:流程分析

2.1、 LEADER

功能:接收客户端的请求, 事务请求的提议者。

首先我们查看启动代码:

    // makeLeader(logFactory) 新建一个Leader实体,打开Leader服务器的交换信息的接口,等待与Learner通信
    setLeader(makeLeader(logFactory));
    // Leader服务启动的主方法
    leader.lead();

Leader服务启动的主方法leader.lead();流程分析:

  void lead() throws IOException, InterruptedException {
       ...
            //1> 加载FileTxnSnapLog中的数据, 并把每一个事务数据封装成一个Proposal,放入committedLog
            //   中,并计算minCommittedLog, maxCommittedLog, 数据放入ZKDatabase
            //2> 处理事务数据时,若事务Type为Session的数据,响应的增删到sessionsWithTimeouts中
            //3> 设置Leader的highestZxid
            //4> 通过ZKDatabase中的sessions与sessionsWithTimeouts进行比较,Kill失效的session, 并将
            //     失效相关联的临时节点进行删除
            zk.loadData();</br>

            // 开启接收Leaner的连接线程, 并把每一个Leaner的连接封装成一个LearnerHandler实体, 添加到
            // Leader的learners列表中,每一个LearnerHandler开启一个线程处理响应的Leaner的信息
            cnxAcceptor = new LearnerCnxAcceptor();
            cnxAcceptor.start();</br>
            
            readyToStart = true;
            // 获取当前选举的轮次, 同步等待法定人数的Leaner注册身份(FOLLOWERINFO/OBSERVERINFO)
            // 到Leader 超时时间为  initLimit \* tickTime
            long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());
            // 根据当前的轮次,初始化zxid
            zk.setZxid(ZxidUtils.makeZxid(epoch, 0));</br>
            ...</br>
            
            newLeaderProposal.packet = new QuorumPacket(NEWLEADER, zk.getZxid(),
                    null, null);</br>
            ...</br>
            //Leaner向Leaner发送LEADERINFO消息,并等待法定人数的Leaner的响应epoch的消息
            waitForEpochAck(self.getId(), leaderStateSummary);
            self.setCurrentEpoch(epoch);
            try {
                //等待法定人数的Leaner初始化同步数据响应的消息,超时时间为 initLimit \* tickTime
                waitForNewLeaderAck(self.getId(), zk.getZxid(), LearnerType.PARTICIPANT);
            } catch (InterruptedException e) {
                ...
            }
            
            //开启对客户端的服务的主方法
            startZkServer();
            
            //是否有zxid的初始化设置
            String initialZxid = System.getProperty("zookeeper.testingonly.initialZxid");
            if (initialZxid != null) {
                long zxid = Long.parseLong(initialZxid);
                zk.setZxid((zk.getZxid() & 0xffffffff00000000L) | zxid);
            }
            
            if (!System.getProperty("zookeeper.leaderServes", "yes").equals("no")) {
                self.cnxnFactory.setZooKeeperServer(zk);
            }
           
           //对每一个Leaner发起ping检测消息, 检测的时间间隔为tickTime / 2
           // 检测learnerType = LearnerType.PARTICIPANT的人数是否少于法定的人数,
           // 如果少于,则   shutDown服务,进行新一轮的选举
            boolean tickSkip = true;
            while (true) {
                Thread.sleep(self.tickTime / 2);
                if (!tickSkip) {
                    self.tick++;
                }
                HashSet<Long> syncedSet = new HashSet<Long>();
                syncedSet.add(self.getId());

                for (LearnerHandler f : getLearners()) {
                    if (f.synced() && f.getLearnerType() == LearnerType.PARTICIPANT) {
                        syncedSet.add(f.getSid());
                    }
                    f.ping();
                }

              if (!tickSkip && !self.getQuorumVerifier().containsQuorum(syncedSet)) {
                    shutdown("Not sufficient followers synced, only synced with sids: [ "
                            + getSidSetString(syncedSet) + " ]");
                    return;
              } 
              tickSkip = !tickSkip;
            }
           //...
    }

下面我们队上面代码的两个流程进行流程分析①: Leaner向Leader的注册同步流程;②:Leader与客户端服务的流程。

2.1.1、 Leaner向Leader的注册同步流程

  • 1.、接收到Leaner的信息注册消息 type = Leader.FOLLOWERINFO/OBSERVERINFO + acceptedEpoch + myid, 根据消息设置LeanerHandler的字段值, this.sid = li.getServerid();this.version = li.getProtocolVersion();, 同步等待法定人数Leander的注册消息得到newEpoch(如果每次接收到的lastAcceptedEpoch >= epoch, 则设置epoch = lastAcceptedEpoch+1;), 发送Leader信息(type:Leader.LEADERINFO, zxid: newEpoch + 0)给Leander
  • 2、同步等待Leaner接收到Leander的信息包之后返回的ACKEPOCH消息, 从ACKEPOCH消息获取到Leaner的lastLoggerZxid(为了防止与Leader的lastLoggerZxid冲突, 用peerLastZxid代替)。处理同步数据逻辑:
  • i: 如果peerLastZxid == LastZxid,则packetToSend置为DIFF, zxidToSend置为peerLastZxid
  • ii: 如果proposals.size() != 0 (proposals为committedLog的列表)
    • a. 子条件maxCommittedLog >= peerLastZxid && minCommittedLog <= peerLastZxid, 比较proposals中最接近且小于等于peerLastZxid的zxid, 如果小于, packetToSend置为TRUNC, zxidToSend置为此值; 否则packetToSend置为DIFF, zxidToSend置为maxCommittedLog, 然后把所有大于peerLastZxid 的propose 封装成QuorumPacket (type: commit, zxid: propose.packet.getZxid())放入queuedPackets队列中,
    • b.子条件peerLastZxid > maxCommittedLog, 则packetToSend置为TRUNC, zxidToSend置为maxCommittedLog
  • iii: peerLastZxid > maxCommittedLog, packetToSend置为TRUNC, zxidToSend置为 maxCommittedLog
  • iv: 否则packetToSend置为SNAP
  • 3、将toBeApplied(ToBeAppliedRequestProcessor未完成的请求)列表中的数据(type: Leader.COMMIT)添加到queuedPackets队列中, 如果handler.LearnerType() == LearnerType.PARTICIPANT(即Leander的角色为Follower), 则将outstandingProposals(ProposalRequestProcessor未完成的请求) 中的提议数据(type: Leader.PROPOSAL)添加到queuedPackets队列中, 将Leaner 根据leanerType分别放入forwardingFollowers 和 observingLearners集合中
  • 4、将新一轮的Leader的信息(type: NEWLEADER, zxid: newEpoch + 0)放入queuedPackets队列中
  • 5、发送同步信息(type: packetToSend, zxidToSend: zxidToSend), 如果为SNAP消息, 则zxidToSend为Leader的lastLoggerZxid, 且将ZKDatabase序列化为数据流发送给Leaner
  • 6、开启线程, 发送queuedPackets 队列中包给Leander
  • 7、接收来自Leander同步的Ack消息, 同步等待法定人数Leaner的人数的回应
  • 8、发送Leader.UPTODATE消息给客户端, 表示可以使用数据了
    <b>注:</b>可以看到queuedPackets队列中的数据的顺序为:Leader.COMMIT —> Leader.PROPOSAL——> Leader.NEWLEADER —> Leader.UPTODATE

Leader异步等待法定的客户端注册同步:Leader的Lead()方法中:

// 异步等待Leaner信息的注册
long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());
// 异步等待Leaner接收到Leader的信息的EpochAck
waitForEpochAck(self.getId(), leaderStateSummary);
// 异步等待Leander对Leader的NEWLEADER消息的Ack
waitForNewLeaderAck(self.getId(), zk.getZxid(), LearnerType.PARTICIPANT);

2.1.2、 Leader与客户端服务的流程

zk处理客户端的请求都是通过Proccessor进行链路处理。对于LeaderZooKeeperServer对应的Proccessor的处理关系为: PrepRequestProcessor ——> ProposalRequestProcessor ——> CommitProcessor ——> Leader.ToBeAppliedRequestProcessor ——>FinalRequestProcessor, 同时ProposalRequestProcessor 将事务请求(request.hdr != null)交给同步刷盘处理器处理 ProposalRequestProcessor ——> SyncRequestProcessor ——> AckRequestProcessor。AckRequestProcessor处理器将刷盘成功的请求交给Leader作为一个提议,作为Leader判断提议成功的法定人数, 成立交给CommitProcessor

图片.png

2.2、 FOLLOWER

功能:接收客户端的请求, 事务请求提议的参与者,将事务请求转发给Leader。

首先查看启动代码

     // makeFollower(logFactory)新建一个Follwer实体,建立与Leader通信
    setFollower(makeFollower(logFactory));
    // follower启动的主方法
    follower.followLeader();

Follower服务启动的主方法follower.followLeader();流程分析:

void followLeader() throws InterruptedException {
       ...
        try {
            InetSocketAddress addr = findLeader();            
            try {
                //与Leader建立通信
                connectToLeader(addr);
                //将自己的信息注册到Leader
                long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);</br>

                //初始化同步Leader的数据
                long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid);
                if (newEpoch < self.getAcceptedEpoch()) {
                    LOG.error("Proposed leader epoch " + ZxidUtils.zxidToString(newEpochZxid)
                            + " is less than our accepted epoch " + ZxidUtils.zxidToString(self.getAcceptedEpoch()));
                    throw new IOException("Error: Epoch of leader is lower");
                }
                syncWithLeader(newEpochZxid);</br>

                //接收Leader通信数据,并做响应的业务处理
                QuorumPacket qp = new QuorumPacket();
                while (self.isRunning()) {
                    readPacket(qp);
                    processPacket(qp);
                }
            } catch (Exception e) {
               ...
            }
        } finally {
            zk.unregisterJMX((Learner)this);
        }
    }

2.2.1 Follwer注册与同步流程分析

  • 1:发送自身信息包(type: Leader.FOLLOWERINFO, zxid: acceptedEpoch + 1)给Leader
  • 2: 接收到Leader的信息包(type: Leader.LEADERINFO, zxid: newEpoch + 1), 判断newEpoch 与 自身acceptedEpoch 的大小: 大于, 则将 自身acceptedEpoch设置为 newEpoch, 发送的epoch设置为旧的acceptedEpoch; 相等, 发送epoch设置为-1; 否则异常。 包(type: Leader.ACKEPOCH, zxid: lastLoggedZxid)
  • 3: 接收来自Leader根据lastLoggedZxid发送过来的初始化同步信息: Leader.DIFF, 不做任何操作; Leader.SNAP, 则将包中的数据序列化作为自己的ZKDatabase; Leader.TRUNC, 则根据接收到的zxid来TRUNC当前ZKDatabase的数据, 设置zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
  • 4: 接收来自Leader的同步数据: Leader.COMMIT——> Leader.PROPOSAL——> Leader.NEWLEADER ——> Leader.UPTODATE
    Leader.PROPOSAL: 提议的数据, 添加到packetsNotCommitted 队列中
    Leader.COMMIT: 需要提交的数据,添加到packetsCommitted队列
    Leader.NEWLEADER: 接收新的Leader领导的事务,保存快照数据, 设置self.setCurrentEpoch(newEpoch);, 回复AcK消息给Leader
    Leader.UPTODATE, 同步完成,设置FollowerZooKeeperServer(self.cnxnFactory.setZooKeeperServer(zk)), 跳出同步流程
  • 5: 对packetsNotCommitted 中的数据调用fzk.logRequest(p.hdr, p.rec)(刷盘, 返回提议AcK),对packetsCommitted中的数据调用fzk.commit(zxid);(提交操作)

2.2.1 Follwer与客户端的服务流程

对于角色Follwer的处理客户端的请求是通过下面的RequestProcessor进行处理: FollowerRequestProcessor ——> CommitProcessor ——> FinalRequestProcessor; 同时将其转发给Leader是并把请求,Leader会进行提议请求,将接收到的提议请求交给SyncRequestProcessor ——> SendAckRequestProcessor, SendAckRequestProcessor将ack消息发送给Leader,作为Leader判断提议成功的法定人数,成立交给CommitProcessor 。

图片.png

2.3、 OBSERVER

功能:接收客户端的请求, 将事务请求转发给Leader。

首先查看启动代码

// makeObserver(logFactory)新建一个Observer实体,建立与Leader通信
setObserver(makeObserver(logFactory));
// observer启动的主方法
observer.observeLeader();

Observer服务启动的主方法observer.observeLeader();流程分析:

void observeLeader() throws InterruptedException {
     ...
            try {
                //建立与Leader的连接
                connectToLeader(addr);
                //将自身的信息注册到Leader中, 返回Leader的lastLoggerZxid
                long newLeaderZxid = registerWithLeader(Leader.OBSERVERINFO);
                //同步Leader的数据信息
                syncWithLeader(newLeaderZxid);
                 //接收Leader通信数据,并做响应的业务处理
                QuorumPacket qp = new QuorumPacket();
                while (self.isRunning()) {
                    readPacket(qp);
                    processPacket(qp);                   
                }
            } catch (Exception e) {
               ...
            }
       ...
    }

3.3.1 Observer注册与同步流程分析

  • 1:发送自身信息包(type: Leader.OBSERVERINFO, zxid: acceptedEpoch + 1)给Leader
  • 2: 接收到Leader的信息包(type: Leader.LEADERINFO, zxid: newEpoch + 1), 判断newEpoch 与 自身acceptedEpoch 的大小: 大于, 则将 自身acceptedEpoch设置为 newEpoch, 发送的epoch设置为旧的acceptedEpoch; 相等, 发送epoch设置为-1; 否则异常。 包(type: Leader.ACKEPOCH, zxid: lastLoggedZxid)
  • 3: 接收来自Leader根据lastLoggedZxid发送过来的初始化同步信息: Leader.DIFF, 不做任何操作; Leader.SNAP, 则将包中的数据序列化作为自己的ZKDatabase; Leader.TRUNC, 则根据接收到的zxid来TRUNC当前ZKDatabase的数据, 设置zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
  • 4: 接收来自Leader的同步数据: Leader.COMMIT——> Leader.NEWLEADER ——> Leader.UPTODATE
    Leader.COMMIT: 需要提交的数据,添加到packetsCommitted队列
    Leader.NEWLEADER: 接收新的Leader领导的事务,保存快照数据, 设置self.setCurrentEpoch(newEpoch);, 回复AcK消息给Leader
    Leader.UPTODATE, 同步完成,设置FollowerZooKeeperServer(self.cnxnFactory.setZooKeeperServer(zk)), 跳出同步流程

3.3.2 Observer与客户端的服务流程

对于Observer的处理客户端的请求是通过下面的RequestProcessor进行处理:ObserverRequestProcessor ——> CommitProcessor ——> FinalRequestProcessor ; 同时对于Leader的Leader.INFORM消息会同时交给SyncRequestProcessor(刷盘操作)跟CommitProcessor。

图片.png
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,179评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,229评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,032评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,533评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,531评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,539评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,916评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,574评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,813评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,568评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,654评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,354评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,937评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,918评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,152评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,852评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,378评论 2 342

推荐阅读更多精彩内容