Hyperledger-Fabric源码分析(加入通道)

前一篇讲完通道创建,接下来马上趁热打铁来看下加入通道这部分的实现,先看下命令。

peer channel join -b mychannel.block

可以看到,这里用上了上一步所生成的通道的genesisblock。

peer

executeJoin

func executeJoin(cf *ChannelCmdFactory) (err error) {
   spec, err := getJoinCCSpec()
   if err != nil {
      return err
   }

   // Build the ChaincodeInvocationSpec message
   invocation := &pb.ChaincodeInvocationSpec{ChaincodeSpec: spec}

   creator, err := cf.Signer.Serialize()
   if err != nil {
      return fmt.Errorf("Error serializing identity for %s: %s", cf.Signer.GetIdentifier(), err)
   }

   var prop *pb.Proposal
   prop, _, err = putils.CreateProposalFromCIS(pcommon.HeaderType_CONFIG, "", invocation, creator)
   if err != nil {
      return fmt.Errorf("Error creating proposal for join %s", err)
   }

   var signedProp *pb.SignedProposal
   signedProp, err = putils.GetSignedProposal(prop, cf.Signer)
   if err != nil {
      return fmt.Errorf("Error creating signed proposal %s", err)
   }

   var proposalResp *pb.ProposalResponse
   proposalResp, err = cf.EndorserClient.ProcessProposal(context.Background(), signedProp)
   if err != nil {
      return ProposalFailedErr(err.Error())
   }

   if proposalResp == nil {
      return ProposalFailedErr("nil proposal response")
   }

   if proposalResp.Response.Status != 0 && proposalResp.Response.Status != 200 {
      return ProposalFailedErr(fmt.Sprintf("bad proposal response %d: %s", proposalResp.Response.Status, proposalResp.Response.Message))
   }
   logger.Info("Successfully submitted proposal to join channel")
   return nil
}

这里主要做几件事情

  • 生成cis,也就是ChaincodeInvocationSpec,这里跟chaincode invoke篇高度相似,这里就不再赘述。而这里cis的重点是里面最终调用的是什么地方。

    • 可以看到最终是调用cscc的JoinChain
func getJoinCCSpec() (*pb.ChaincodeSpec, error) {
  if genesisBlockPath == common.UndefinedParamValue {
     return nil, errors.New("Must supply genesis block file")
  }

  gb, err := ioutil.ReadFile(genesisBlockPath)
  if err != nil {
     return nil, GBFileNotFoundErr(err.Error())
  }
  // Build the spec
  input := &pb.ChaincodeInput{Args: [][]byte{[]byte(cscc.JoinChain), gb}}

  spec := &pb.ChaincodeSpec{
     Type:        pb.ChaincodeSpec_Type(pb.ChaincodeSpec_Type_value["GOLANG"]),
     ChaincodeId: &pb.ChaincodeID{Name: "cscc"},
     Input:       input,
  }

  return spec, nil
}
  • 最后包装成HeaderType_CONFIG的Proposal
  • 注意,这里会去生成数字签名,当然是用的signidentity,而签名身份是跟admin是一致的。
  • 接着就是开始处理proposal了

cscc

case JoinChain:
   if args[1] == nil {
      return shim.Error("Cannot join the channel <nil> configuration block provided")
   }

   block, err := utils.GetBlockFromBlockBytes(args[1])
   if err != nil {
      return shim.Error(fmt.Sprintf("Failed to reconstruct the genesis block, %s", err))
   }

   cid, err := utils.GetChainIDFromBlock(block)
   if err != nil {
      return shim.Error(fmt.Sprintf("\"JoinChain\" request failed to extract "+
         "channel id from the block due to [%s]", err))
   }

   if err := validateConfigBlock(block); err != nil {
      return shim.Error(fmt.Sprintf("\"JoinChain\" for chainID = %s failed because of validation "+
         "of configuration block, because of %s", cid, err))
   }

   // 2. check local MSP Admins policy
   // TODO: move to ACLProvider once it will support chainless ACLs
   if err = e.policyChecker.CheckPolicyNoChannel(mgmt.Admins, sp); err != nil {
      return shim.Error(fmt.Sprintf("access denied for [%s][%s]: [%s]", fname, cid, err))
   }

   // Initialize txsFilter if it does not yet exist. We can do this safely since
   // it's the genesis block anyway
   txsFilter := util.TxValidationFlags(block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER])
   if len(txsFilter) == 0 {
      // add array of validation code hardcoded to valid
      txsFilter = util.NewTxValidationFlagsSetValue(len(block.Data.Data), pb.TxValidationCode_VALID)
      block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER] = txsFilter
   }

   return joinChain(cid, block, e.ccp, e.sccp)
  • 前面就是block的各种校验,跳过

  • CheckPolicyNoChannel(mgmt.Admins, sp),为什么前面要强调签名身份跟admin一致,说明这个proposal包是admin签署的。而这里就是校验规则是否是admin。截取片段证明我所言非虚。

    case m.MSPRole_ADMIN:
       mspLogger.Debugf("Checking if identity satisfies ADMIN role for %s", msp.name)
       // in the case of admin, we check that the
       // id is exactly one of our admins
       for _, admincert := range msp.admins {
          if bytes.Equal(id.(*identity).cert.Raw, admincert.(*identity).cert.Raw) {
             // we do not need to check whether the admin is a valid identity
             // according to this MSP, since we already check this at Setup time
             // if there is a match, we can just return
             return nil
          }
       }
    
    • 可以看到这里就是比对证书是否跟本地保存的该组织的admin是否一致。

joinChain

func joinChain(chainID string, block *common.Block, ccp ccprovider.ChaincodeProvider, sccp sysccprovider.SystemChaincodeProvider) pb.Response {
   if err := peer.CreateChainFromBlock(block, ccp, sccp); err != nil {
      return shim.Error(err.Error())
   }

   peer.InitChain(chainID)

   return shim.Success(nil)
}

这里分两步,CreateChainFromBlock和InitChain下面会讲到。处理完后给peer返回success

CreateChainFromBlock

func CreateChainFromBlock(cb *common.Block, ccp ccprovider.ChaincodeProvider, sccp sysccprovider.SystemChaincodeProvider) error {
   cid, err := utils.GetChainIDFromBlock(cb)
   if err != nil {
      return err
   }

   var l ledger.PeerLedger
   if l, err = ledgermgmt.CreateLedger(cb); err != nil {
      return errors.WithMessage(err, "cannot create ledger from genesis block")
   }

   return createChain(cid, l, cb, ccp, sccp, pluginMapper)
}

这里做两件事情

  • 创建本地帐本
  • createChain,回忆下上篇创建通道,那里会初始化orderer端的chain,并启动。而这里是peer端

createChain

func createChain(cid string, ledger ledger.PeerLedger, cb *common.Block, ccp ccprovider.ChaincodeProvider, sccp sysccprovider.SystemChaincodeProvider, pm txvalidator.PluginMapper) error {
   chanConf, err := retrievePersistedChannelConfig(ledger)
   if err != nil {
      return err
   }

   var bundle *channelconfig.Bundle

   if chanConf != nil {
      bundle, err = channelconfig.NewBundle(cid, chanConf)
      if err != nil {
         return err
      }
   } else {
      // Config was only stored in the statedb starting with v1.1 binaries
      // so if the config is not found there, extract it manually from the config block
      envelopeConfig, err := utils.ExtractEnvelope(cb, 0)
      if err != nil {
         return err
      }

      bundle, err = channelconfig.NewBundleFromEnvelope(envelopeConfig)
      if err != nil {
         return err
      }
   }

   capabilitiesSupportedOrPanic(bundle)

   channelconfig.LogSanityChecks(bundle)

   gossipEventer := service.GetGossipService().NewConfigEventer()

   gossipCallbackWrapper := func(bundle *channelconfig.Bundle) {
      ac, ok := bundle.ApplicationConfig()
      if !ok {
         // TODO, handle a missing ApplicationConfig more gracefully
         ac = nil
      }
      gossipEventer.ProcessConfigUpdate(&gossipSupport{
         Validator:   bundle.ConfigtxValidator(),
         Application: ac,
         Channel:     bundle.ChannelConfig(),
      })
      service.GetGossipService().SuspectPeers(func(identity api.PeerIdentityType) bool {
         // TODO: this is a place-holder that would somehow make the MSP layer suspect
         // that a given certificate is revoked, or its intermediate CA is revoked.
         // In the meantime, before we have such an ability, we return true in order
         // to suspect ALL identities in order to validate all of them.
         return true
      })
   }

   trustedRootsCallbackWrapper := func(bundle *channelconfig.Bundle) {
      updateTrustedRoots(bundle)
   }

   mspCallback := func(bundle *channelconfig.Bundle) {
      // TODO remove once all references to mspmgmt are gone from peer code
      mspmgmt.XXXSetMSPManager(cid, bundle.MSPManager())
   }

   ac, ok := bundle.ApplicationConfig()
   if !ok {
      ac = nil
   }

   cs := &chainSupport{
      Application: ac, // TODO, refactor as this is accessible through Manager
      ledger:      ledger,
   }

   peerSingletonCallback := func(bundle *channelconfig.Bundle) {
      ac, ok := bundle.ApplicationConfig()
      if !ok {
         ac = nil
      }
      cs.Application = ac
      cs.Resources = bundle
   }

   cs.bundleSource = channelconfig.NewBundleSource(
      bundle,
      gossipCallbackWrapper,
      trustedRootsCallbackWrapper,
      mspCallback,
      peerSingletonCallback,
   )

   vcs := struct {
      *chainSupport
      *semaphore.Weighted
   }{cs, validationWorkersSemaphore}
   validator := txvalidator.NewTxValidator(cid, vcs, sccp, pm)
   c := committer.NewLedgerCommitterReactive(ledger, func(block *common.Block) error {
      chainID, err := utils.GetChainIDFromBlock(block)
      if err != nil {
         return err
      }
      return SetCurrConfigBlock(block, chainID)
   })

   ordererAddresses := bundle.ChannelConfig().OrdererAddresses()
   if len(ordererAddresses) == 0 {
      return errors.New("no ordering service endpoint provided in configuration block")
   }

   // TODO: does someone need to call Close() on the transientStoreFactory at shutdown of the peer?
   store, err := TransientStoreFactory.OpenStore(bundle.ConfigtxValidator().ChainID())
   if err != nil {
      return errors.Wrapf(err, "[channel %s] failed opening transient store", bundle.ConfigtxValidator().ChainID())
   }
   csStoreSupport := &CollectionSupport{
      PeerLedger: ledger,
   }
   simpleCollectionStore := privdata.NewSimpleCollectionStore(csStoreSupport)

   service.GetGossipService().InitializeChannel(bundle.ConfigtxValidator().ChainID(), ordererAddresses, service.Support{
      Validator:            validator,
      Committer:            c,
      Store:                store,
      Cs:                   simpleCollectionStore,
      IdDeserializeFactory: csStoreSupport,
   })

   chains.Lock()
   defer chains.Unlock()
   chains.list[cid] = &chain{
      cs:        cs,
      cb:        cb,
      committer: c,
   }

   return nil
}
  • 首先根据传入的genesisblock中拿到envelope,进而转换成bundle,又来了,看过上篇的,应该很熟悉了。

  • 拿到bundle后,第一时间就是组装chainsupport,可以看到后面的wrapper和callback都是用来处理当bundle更新的场景,也就是当配置变更的时候会回调这些逻辑,这里就不展开了。自行分析吧。

cs := &chainSupport{
      Application: ac, // TODO, refactor as this is accessible through Manager
      ledger:      ledger,
}

cs.bundleSource = channelconfig.NewBundleSource(
  bundle,
  gossipCallbackWrapper,
  trustedRootsCallbackWrapper,
  mspCallback,
  peerSingletonCallback,
)
  • 然后就是根据账本去初始化LedgerCommitter,这是账本提交相关的。

  • 最后最最重要的是初始化gossip服务,回忆下gossip会做什么?

    • 去orderer拉取block
    • peer节点间选举
    • peer成员间主动被动同步block
    • peer成员状态同步
  • 而这些都会在这里面去初始化,service.GetGossipService().InitializeChannel感兴趣的可以进去看看,可以找到前面所讲的deliveryService,leaderelection,GossipStateProvider

InitChain

peer.Initialize(func(cid string) {
   logger.Debugf("Deploying system CC, for channel <%s>", cid)
   sccp.DeploySysCCs(cid, ccp)
   sub, err := lifecycle.NewChannelSubscription(cid, cc.QueryCreatorFunc(func() (cc.Query, error) {
      return peer.GetLedger(cid).NewQueryExecutor()
   }))
   if err != nil {
      logger.Panicf("Failed subscribing to chaincode lifecycle updates")
   }
   cceventmgmt.GetMgr().Register(cid, sub)
}
  • 当peer node start的时候也会调用sccp.DeploySysCCs(cid, ccp),只不过cid为空,说明是系统级的scc,而这里部署的是通道相关的scc。也就是说以后在通道的基础上调用的scc就是该通道独有的,跟其他通道以及系统区分开。

  • 好奇,到底有什么区别,都用系统级的不好么?进去看看

deploySysCC

func deploySysCC(chainID string, ccprov ccprovider.ChaincodeProvider, syscc SelfDescribingSysCC) error {
   ...

   if chainID != "" {
      lgr := peer.GetLedger(chainID)
      if lgr == nil {
         panic(fmt.Sprintf("syschain %s start up failure - unexpected nil ledger for channel %s", syscc.Name(), chainID))
      }

      txsim, err := lgr.NewTxSimulator(txid)
      if err != nil {
         return err
      }

      txParams.TXSimulator = txsim
      defer txsim.Done()
   }

   ...
}

进来可以看到,如果绑定通道id的结果是生成独有的事件模拟器。

小结

至此,整个join处理完毕,很多部分前面已经讲过了,显得这里很粗糙,建议搞清楚了再来,抱歉。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,293评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,604评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,958评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,729评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,719评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,630评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,000评论 3 397
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,665评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,909评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,646评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,726评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,400评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,986评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,959评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,197评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,996评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,481评论 2 342

推荐阅读更多精彩内容