// Gossip leader election module // Algorithm properties: // - Peers break symmetry by comparing IDs // - Each peer is either a leader or a follower, // and the aim is to have exactly 1 leader if the membership view // is the same for all peers // - If the network is partitioned into 2 or more sets, the number of leaders // is the number of network partitions, but when the partition heals, // only 1 leader should be left eventually // - Peers communicate by gossiping leadership proposal or declaration messages // The Algorithm, in pseudo code: // // // variables: // leaderKnown = false // // Invariant: // Peer listens for messages from remote peers // and whenever it receives a leadership declaration, // leaderKnown is set to true // // Startup(): // wait for membership view to stabilize, or for a leadership declaration is received // or the startup timeout expires. // goto SteadyState() // // SteadyState(): // while true: // If leaderKnown is false: // LeaderElection() // If you are the leader: // Broadcast leadership declaration // If a leadership declaration was received from // a peer with a lower ID, // become a follower // Else, you're a follower: // If haven't received a leadership declaration within // a time threshold: // set leaderKnown to false // // LeaderElection(): // Gossip leadership proposal message // Collect messages from other peers sent within a time period // If received a leadership declaration: // return // Iterate over all proposal messages collected. // If a proposal message from a peer with an ID lower // than yourself was received, return. // Else, declare yourself a leader
基本上注释写得很清楚,就是一个简化版的Raft共识算法。
初始化
func NewLeaderElectionService(adapter LeaderElectionAdapter, id string, callback leadershipCallback) LeaderElectionService {
if len(id) == 0 {
panic("Empty id")
}
le := &leaderElectionSvcImpl{
id: peerID(id),
proposals: util.NewSet(),
adapter: adapter,
stopChan: make(chan struct{}, 1),
interruptChan: make(chan struct{}, 1),
logger: util.GetLogger(util.ElectionLogger, ""),
callback: noopCallback,
}
if callback != nil {
le.callback = callback
}
go le.start()
return le
}
随Gossip启动而启动,最后开始start
启动
func (le *leaderElectionSvcImpl) start() {
le.stopWG.Add(2)
go le.handleMessages()
le.waitForMembershipStabilization(getStartupGracePeriod())
go le.run()
}
这里便是election模块的核心起手式了。下面我们具体来分析下里面的实现。
handleMessages
func (le *leaderElectionSvcImpl) handleMessage(msg Msg) {
msgType := "proposal"
if msg.IsDeclaration() {
msgType = "declaration"
}
le.logger.Debug(le.id, ":", msg.SenderID(), "sent us", msgType)
le.Lock()
defer le.Unlock()
if msg.IsProposal() {
le.proposals.Add(string(msg.SenderID()))
} else if msg.IsDeclaration() {
atomic.StoreInt32(&le.leaderExists, int32(1))
if le.sleeping && len(le.interruptChan) == 0 {
le.interruptChan <- struct{}{}
}
if bytes.Compare(msg.SenderID(), le.id) < 0 && le.IsLeader() {
le.stopBeingLeader()
}
} else {
// We shouldn't get here
le.logger.Error("Got a message that's not a proposal and not a declaration")
}
}
首先,跟选举有关的消息类型是LeadershipMessage
而消息又根据行为分为两种,proposal和declaration,前者是提案表达我要参加选举,后者是声明表示我已经当选
搞清楚这些,那么这里就知道是什么意思了。
- 首先如果是提案消息的话,先收集下来,为选举做准备,具体怎么用后面再讲
- 如果有声明消息过来,说明有新的leader选举产生,那么如果接收点是上届leader,需要交出leader权限。
- interruptChan这里需要注意,有两个地方会等待,一是选举的过程中如果收到这个通知,说明有新的选举结果产生,有可能是自己当选,也有可能是别人。
- 二是成为leader之后如果收到这个通知,说明有新的leader产生,可能需要换届。
func (le *leaderElectionSvcImpl) stopBeingLeader() { le.logger.Info(le.id, "Stopped being a leader") atomic.StoreInt32(&le.isLeader, int32(0)) le.callback(false) }
- 看起来也很简单,只是设置isLeader的标志位而已。
- 下面我们看下callback在干嘛?
callback
func (g *gossipServiceImpl) onStatusChangeFactory(chainID string, committer blocksprovider.LedgerInfo) func(bool) {
return func(isLeader bool) {
if isLeader {
yield := func() {
g.lock.RLock()
le := g.leaderElection[chainID]
g.lock.RUnlock()
le.Yield()
}
if err := g.deliveryService[chainID].StartDeliverForChannel(chainID, committer, yield); err != nil {
}
} else {
if err := g.deliveryService[chainID].StopDeliverForChannel(chainID); err != nil {
}
}
}
}
- 如果看过orderer篇的应该知道Delivery service,用来拉取order的block,这里很关键,不做leader,不是简单标识下就完了,需要交出代成员接收block的权力。
- 同一个组织内的peer节点的block同步是靠leader来运作的。
- 这里有个地方需要注意的是yield,只在当前节点是leader的情况下进来。当选举结果需要换届时,不光是交出了delivery的权力,还对le本身设置了yield的标志位。
waitForMembershipStabilization
func (le *leaderElectionSvcImpl) waitForMembershipStabilization(timeLimit time.Duration) {
le.logger.Debug(le.id, ": Entering")
defer le.logger.Debug(le.id, ": Exiting, peers found", len(le.adapter.Peers()))
endTime := time.Now().Add(timeLimit)
viewSize := len(le.adapter.Peers())
for !le.shouldStop() {
time.Sleep(getMembershipSampleInterval())
newSize := len(le.adapter.Peers())
if newSize == viewSize || time.Now().After(endTime) || le.isLeaderExists() {
return
}
viewSize = newSize
}
}
这个方法很有意思,其目的是等待成员稳定。这里三种方式来退出这个黑洞。
- 隔一段时间,来比对前后的peer列表个数是否一致,一致就说明稳定
- 当然了,不能没完没了的等待一致,如果到达deadline,也强行退出
- 或者有leader选出,也直接退出
run
func (le *leaderElectionSvcImpl) run() {
defer le.stopWG.Done()
for !le.shouldStop() {
if !le.isLeaderExists() {
le.leaderElection()
}
// If we are yielding and some leader has been elected,
// stop yielding
if le.isLeaderExists() && le.isYielding() {
le.stopYielding()
}
if le.shouldStop() {
return
}
if le.IsLeader() {
le.leader()
} else {
le.follower()
}
}
}
- 我们下面分步来拆解
- 这里的逻辑基于上面handleMessage的结论进行进一步处理
- 如果当前没有leader产生,那么立即发起选举,并且自荐给其他好朋友。这里后面详细讲。
- 如果当前有leader存在,而且当前节点是yield状态,停止这个过度状态。
- 如果已经是leader,那么leader
- 否则follower,这些后面会详细分析
leaderElection
func (le *leaderElectionSvcImpl) leaderElection() {
le.logger.Debug(le.id, ": Entering")
defer le.logger.Debug(le.id, ": Exiting")
// If we're yielding to other peers, do not participate
// in leader election
if le.isYielding() {
return
}
// Propose ourselves as a leader
le.propose()
// Collect other proposals
le.waitForInterrupt(getLeaderElectionDuration())
// If someone declared itself as a leader, give up
// on trying to become a leader too
if le.isLeaderExists() {
le.logger.Info(le.id, ": Some peer is already a leader")
return
}
if le.isYielding() {
le.logger.Debug(le.id, ": Aborting leader election because yielding")
return
}
// Leader doesn't exist, let's see if there is a better candidate than us
// for being a leader
for _, o := range le.proposals.ToArray() {
id := o.(string)
if bytes.Compare(peerID(id), le.id) < 0 {
return
}
}
// If we got here, there is no one that proposed being a leader
// that's a better candidate than us.
le.beLeader()
atomic.StoreInt32(&le.leaderExists, int32(1))
}
如果是yield状态,说明他正在移交权力给别人,所以不要参与选举。
给其他人进行自荐,发出选举提案
等待interruptChan,这里前面提到过,等到的话,说明有新的选举结果产生。当然了,如果等不到,还有超时会触发。
接下来,如果当前leader已经产生,直接返回,因为选举已经结束。
这里又判断一次yield,因为是新的阶段,前面是发起选举前,这里是收到选举结果后。
重点来了,接下来是怎么从众多提案中找到有资格担任leader的节点。
算法也很简单,就是比大小,bytes.Compare(peerID(id), le.id),从收集到的proposals里面看自己是不是最小的。如果是,做beLeader。然后标识leader已经产生。
func (le *leaderElectionSvcImpl) beLeader() { le.logger.Info(le.id, ": Becoming a leader") atomic.StoreInt32(&le.isLeader, int32(1)) le.callback(true) }
- 成为leader的结果,一是托管从orderer拉取block的任务,二是标识自己是leader。
leader
func (le *leaderElectionSvcImpl) leader() {
leaderDeclaration := le.adapter.CreateMessage(true)
le.adapter.Gossip(leaderDeclaration)
le.waitForInterrupt(getLeadershipDeclarationInterval())
}
- 首先当leader不光是前面说的那些任务,还要跟别人分享。回忆下Raft选举算法的核心,怎么跟别人维持自己的leader地位,是需要定时发心跳的。而这里心跳变成了leaderDeclaration。
- 接下来就是waitForInterrupt,当然了有新的leader发来的Declaration消息会通知到这里,另外,里面每次都会sleep一段时间。再加上外面的for循环,那么效果就是每隔一段时间去发declaration去维持leader的地位,这不就是心跳么?
follower
func (le *leaderElectionSvcImpl) follower() {
le.logger.Debug(le.id, ": Entering")
defer le.logger.Debug(le.id, ": Exiting")
le.proposals.Clear()
atomic.StoreInt32(&le.leaderExists, int32(0))
select {
case <-time.After(getLeaderAliveThreshold()):
case <-le.stopChan:
le.stopChan <- struct{}{}
}
}
- 这里还是有讲究的,首先走到这里说明这轮选举很不幸已经结束,该节点没有选上。没关系,再接再厉,清理掉这一轮的选举提案,准备为下轮做准备。
- 这里很关键的设置了leaderExists为0,说明默认认为leader不存在。有两个时间你需要搞清楚。
- peer.gossip.election.leaderElectionDuration=5s
- peer.gossip.election.leaderAliveThreshold=10s
- 我们前面讲过了,leaderElectionDuration是leader发起心跳的间隔,而这里用到的leaderAliveThreshold是指leader的存活时间
- 这里就很清楚了,5秒内我能收到心跳,那么就认为leader是好的,否则超过10s,当前还没有leader存在,说明leader大概率挂了,需要重新选举。这里返回后,会重新leaderElection。