前面我们讲过Orderer的broadcast服务,是为了收集事件消息,之后会根据这些消息来生成block。那问题来了,block怎么散播出去呢?deliver就是干这个的。
接受事件
在Orderer启动的时候就会初始化deliver的grpc server来接受客户端的请求. 大部分情况是peer来拉取block列表,其他的场景待分析。
func (h *Handler) Handle(ctx context.Context, srv *Server) error {
addr := util.ExtractRemoteAddress(ctx)
logger.Debugf("Starting new deliver loop for %s", addr)
h.Metrics.StreamsOpened.Add(1)
defer h.Metrics.StreamsClosed.Add(1)
for {
envelope, err := srv.Recv()
status, err := h.deliverBlocks(ctx, srv, envelope)
err = srv.SendStatusResponse(status)
}
}
一个典型的grpc server的请求处理流程。最终deliver请求会流转到deliverBlocks来处理。
消息处理
deliverBlocks
erroredChan := chain.Errored()
select {
case <-erroredChan:
return cb.Status_SERVICE_UNAVAILABLE, nil
default:
}
前面一大推标准的envelope校验流程,这里就不展开了.
这里会监听erroredChan的通知,那收到通知代表什么呢?有兴趣的可以进去看看,都是要命的地方会触发,说明Orderer后端的共识服务有问题。进一步该Orderer的账本就很有可能没有及时同步的,所以也就没有资格给别人提供服务了。所以这里立即返回。
cursor, number := chain.Reader().Iterator(seekInfo.Start)
defer cursor.Close()
var stopNum uint64
switch stop := seekInfo.Stop.Type.(type) {
case *ab.SeekPosition_Oldest:
stopNum = number
case *ab.SeekPosition_Newest:
stopNum = chain.Reader().Height() - 1
case *ab.SeekPosition_Specified:
stopNum = stop.Specified.Number
if stopNum < number {
return cb.Status_BAD_REQUEST, nil
}
}
1. SeekInfo是deliver请求的关键参数,这里决定了我是要哪个范围的block列表。 2. 这里主要的目的就是根据position的类型来分别设置start和stop下标。 接下来正式开始循环遍历cursor
for {
if seekInfo.Behavior == ab.SeekInfo_FAIL_IF_NOT_READY {
if number > chain.Reader().Height()-1 {
return cb.Status_NOT_FOUND, nil
}
}
var block *cb.Block
var status cb.Status
iterCh := make(chan struct{})
go func() {
block, status = cursor.Next()
close(iterCh)
}()
select {
case <-ctx.Done():
logger.Debugf("Context canceled, aborting wait for next block")
return cb.Status_INTERNAL_SERVER_ERROR, errors.Wrapf(ctx.Err(), "context finished before block retrieved")
case <-erroredChan:
logger.Warningf("Aborting deliver for request because of background error")
return cb.Status_SERVICE_UNAVAILABLE, nil
case <-iterCh:
// Iterator has set the block and status vars
}
number++
if err := srv.SendBlockResponse(block); err != nil {
logger.Warningf("[channel: %s] Error sending to %s: %s", chdr.ChannelId, addr, err)
return cb.Status_INTERNAL_SERVER_ERROR, err
}
if stopNum == block.Header.Number {
break
}
}
- 如果当前请求的block,本地还没有ready,如果是SeekInfo_FAIL_IF_NOT_READY模式,立即返回。
- 换句话说如果是SeekInfo_BLOCK_UNTIL_READY,难道会一直等到ready么?别急下面一起来看。
- 首先cursor.next会一直等待,直到有新的block已经ready。当拿到后,会通知iterCh,不然会一直等在case <-iterCh这里。
- 到这里后,表示已经收到了新的block,当然start要+1啦。
- 循环给请求方SendBlockResponse
- 如果到达stop,循环收工。
- 另外要注意的是,这里是从Orderer本地查询block列表。
func (rs *responseSender) SendBlockResponse(block *cb.Block) error {
response := &ab.DeliverResponse{
Type: &ab.DeliverResponse_Block{Block: block},
}
return rs.Send(response)
}
将block组装成DeliverResponse,返回给Client
最后
这里主要是Orderer部分的deliver是怎么服务的,至于各个Client请求完后是怎么使用的,这个之后单独来讲。