Hyperledger-Fabric源码分析(Gossip-StateInfoSnapshot)

前面我们说过了StateInfo消息, 这个主要是自发热给别人,有点细水长流的意思。下面我们要讲的是Gossip里面怎么去主动拉取成员的StateInfo列表。

struct

type StateInfoPullRequest struct {
    // channel_MAC is an authentication code that proves
    // that the peer that sent this message knows
    // the name of the channel.
    Channel_MAC          []byte   
}

type StateInfoSnapshot struct {
    Elements             []*Envelope 
}

type Envelope struct {
    Payload              []byte         
    Signature            []byte         
    SecretEnvelope       *SecretEnvelope
}

type SecretEnvelope struct {
    Payload              []byte 
    Signature            []byte   
}

  • 这里要注意的是,这两个消息是成对的,StateInfoPullRequest是获取的请求,StateInfoSnapshot是返回的结果。

GossipMessage_StateInfoPullReq

初始化

func (gc *gossipChannel) createStateInfoRequest() (*proto.SignedGossipMessage, error) {
   return (&proto.GossipMessage{
      Tag:   proto.GossipMessage_CHAN_OR_ORG,
      Nonce: 0,
      Content: &proto.GossipMessage_StateInfoPullReq{
         StateInfoPullReq: &proto.StateInfoPullRequest{
            Channel_MAC: GenerateMAC(gc.pkiID, gc.chainID),
         },
      },
   }).NoopSign()
}
  • 这个请求没什么好讲的,基本上就是生成MAC

发起点

func NewGossipChannel(pkiID common.PKIidType, org api.OrgIdentityType, mcs api.MessageCryptoService,
    chainID common.ChainID, adapter Adapter, joinMsg api.JoinChannelMessage) GossipChannel {
    ...

    // Periodically publish state info
    go gc.periodicalInvocation(gc.publishStateInfo, gc.stateInfoPublishScheduler.C)
    // Periodically request state info
    go gc.periodicalInvocation(gc.requestStateInfo, gc.stateInfoRequestScheduler.C)
    ...

    go gc.membershipTracker.trackMembershipChanges()
    return gc
}
  • 又到了这里,gc初始化的时候就会自循环发起StateInfoPullReq,真是不甘寂寞。边吃边拉。
func (gc *gossipChannel) requestStateInfo() {
   req, err := gc.createStateInfoRequest()
   if err != nil {
      gc.logger.Warningf("Failed creating SignedGossipMessage: %+v", errors.WithStack(err))
      return
   }
   endpoints := filter.SelectPeers(gc.GetConf().PullPeerNum, gc.GetMembership(), gc.IsMemberInChan)
   gc.Send(req, endpoints...)
}
  • 这里不深入了,组装消息前面已经说了
  • 随机取PullPeerNum个节点,Send。这里注意Gossip中有两种发送方式,一种是Send,一种是Gossip。有区别,虽然都是通过底层comm来send,不过emitter会做一层buff来缓冲,对于不是那么急的,可以累计多个消息,一次发出去。
  • 显然这里的场景是要立即,马上发送出去。

接受点

func (gc *gossipChannel) HandleMessage(msg proto.ReceivedMessage) {
   ...

   if m.IsStateInfoPullRequestMsg() {
      msg.Respond(gc.createStateInfoSnapshot(orgID))
      return
   }

   ...
}
  • 消息校验的部分,之前讲过了,这里不再赘述
  • 可以看到收到request后,会createStateInfoSnapshot
  • 下面进到GossipMessage_StateSnapshot的部分

GossipMessage_StateSnapshot

初始化

func (gc *gossipChannel) createStateInfoSnapshot(requestersOrg api.OrgIdentityType) *proto.GossipMessage {
   sameOrg := bytes.Equal(gc.selfOrg, requestersOrg)
   rawElements := gc.stateInfoMsgStore.Get()
   elements := []*proto.Envelope{}
   for _, rawEl := range rawElements {
      msg := rawEl.(*proto.SignedGossipMessage)
      orgOfCurrentMsg := gc.GetOrgOfPeer(msg.GetStateInfo().PkiId)
      // If we're in the same org as the requester, or the message belongs to a foreign org
      // don't do any filtering
      if sameOrg || !bytes.Equal(orgOfCurrentMsg, gc.selfOrg) {
         elements = append(elements, msg.Envelope)
         continue
      }
      // Else, the requester is in a different org, so disclose only StateInfo messages that their
      // corresponding AliveMessages have external endpoints
      if netMember := gc.Lookup(msg.GetStateInfo().PkiId); netMember == nil || netMember.Endpoint == "" {
         continue
      }
      elements = append(elements, msg.Envelope)
   }

   return &proto.GossipMessage{
      Channel: gc.chainID,
      Tag:     proto.GossipMessage_CHAN_OR_ORG,
      Nonce:   0,
      Content: &proto.GossipMessage_StateSnapshot{
         StateSnapshot: &proto.StateInfoSnapshot{
            Elements: elements,
         },
      },
   }
}
  • 首先判定同属一个Org
  • 获取stateInfoMsgStore保存的所有数据集,这里需要注意的是,这个store当初创建的时候是指定了过期策略的,也就是说,里面的状态会定时清理过期的无效状态。还记得StateInfo消息是有时间戳的么。
  • 当然了,如果请求人跟本地同属一个Org,消息全部推送过去没有话讲。
  • 如果不属于一个Org,难道就不推送么?这里处于有利于消息扩散的原则,当然也要尽可能的去做
    • 如果store里面有消息跟本地不是属于同一个组织,那么推送出去也没什么不好,反正自己也不知道是谁发来的,有人要就给他好了。
    • 如果store里面有消息跟本地属于同一个组织,那么问题来了,现在来了个陌生人来请求这个组织内的状态,显然不能给他,这里就跳过了。

发送

func (m *ReceivedMessageImpl) Respond(msg *proto.GossipMessage) {
   sMsg, err := msg.NoopSign()
   if err != nil {
      err = errors.WithStack(err)
      m.conn.logger.Errorf("Failed creating SignedGossipMessage: %+v", err)
      return
   }
   m.conn.send(sMsg, func(e error) {}, blockingSend)
}
  • 因为是Response,当然要点对点send。

接受点

func (gc *gossipChannel) HandleMessage(msg proto.ReceivedMessage) {
    ...

    if m.IsStateInfoSnapshot() {
        gc.handleStateInfSnapshot(m.GossipMessage, msg.GetConnectionInfo().ID)
        return
    }
    ...
}
  • 又来到这里,看来这个系列,这段代码会复制粘贴N次

  • 好了gc收到后,开始处理

处理

func (gc *gossipChannel) handleStateInfSnapshot(m *proto.GossipMessage, sender common.PKIidType) {
   chanName := string(gc.chainID)
   for _, envelope := range m.GetStateSnapshot().Elements {
      stateInf, err := envelope.ToGossipMessage()
      if err != nil {
         gc.logger.Warningf("Channel %s : StateInfo snapshot contains an invalid message: %+v", chanName, errors.WithStack(err))
         return
      }
      if !stateInf.IsStateInfoMsg() {
         gc.logger.Warning("Channel", chanName, ": Element of StateInfoSnapshot isn't a StateInfoMessage:",
            stateInf, "message sent from", sender)
         return
      }
      si := stateInf.GetStateInfo()
      orgID := gc.GetOrgOfPeer(si.PkiId)
      if orgID == nil {
         gc.logger.Debug("Channel", chanName, ": Couldn't find org identity of peer",
            string(si.PkiId), "message sent from", string(sender))
         return
      }

      if !gc.IsOrgInChannel(orgID) {
         gc.logger.Warning("Channel", chanName, ": Peer", stateInf.GetStateInfo().PkiId,
            "is not in an eligible org, can't process a stateInfo from it, sent from", sender)
         return
      }

      expectedMAC := GenerateMAC(si.PkiId, gc.chainID)
      if !bytes.Equal(si.Channel_MAC, expectedMAC) {
         gc.logger.Warning("Channel", chanName, ": StateInfo message", stateInf,
            ", has an invalid MAC. Expected", expectedMAC, ", got", si.Channel_MAC, ", sent from", sender)
         return
      }
      err = gc.ValidateStateInfoMessage(stateInf)
      if err != nil {
         gc.logger.Warningf("Channel %s: Failed validating state info message: %v sent from %v : %+v", chanName, stateInf, sender, errors.WithStack(err))
         return
      }

      if gc.Lookup(si.PkiId) == nil {
         // Skip StateInfo messages that belong to peers
         // that have been expired
         continue
      }

      gc.stateInfoMsgStore.Add(stateInf)
   }
}
  • 遍历收到的StateInfo消息集
  • 如果消息发起节点的组织不属于这个channel,丢弃。
  • 校验MAC
  • 校验消息,StateInfo篇已经讲过了。
  • 消息发起节点是否alive,如果本地都Lookup不到,也选择丢弃。
  • 下面就是久违的加到stateInfoMsgStore了。

总结

  • 可以看到,整个机制还是比较简单,唯一要注意是,StateInfo消息保存下来的必要条件是所属的Org要同属一个Channel。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,053评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,527评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,779评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,685评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,699评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,609评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,989评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,654评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,890评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,634评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,716评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,394评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,976评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,950评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,191评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,849评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,458评论 2 342

推荐阅读更多精彩内容