大家好,我是dandyhuang。最近有个朋友,问http2
协议解析的时候。request body
获取的时候是否可以避免压缩。因为他们的业务是proxy
,不希望解析body部分。借此看了一下grpc-go
解析。之前就一直听说h2
比h1
好,并且有很多优势。
HTTP/2相较于HTTP/1.1改进
二进制分帧
头部压缩
数据流
多向请求与响应
请求优先级
流量控制
服务器推送
HTTP 2.0 协议详解
可以先了解h2
基础,对后续理解代码也会更有帮忙,可以查阅HTTP 2.0 协议详解
grpc-go是如何实现的
func (s *Server) handleRawConn(lisAddr string, rawConn net.Conn) {
...
// Finish handshaking (HTTP2)
st := s.newHTTP2Transport(rawConn)
rawConn.SetDeadline(time.Time{})
if st == nil {
return
}
if !s.addConn(lisAddr, st) {
return
}
go func() {
s.serveStreams(st)
s.removeConn(lisAddr, st)
}()
}
- server.go中,
handleRawConn
中获取client
发送的tcp
数据。newHTTP2Transport
中获取完成了http2
中,前期SETTINGS
帧的发送确认,组成Connection Preface
(连接序言)。
Connection Preface连接序言
newHTTP2Transport
中,transport.NewServerTransport
创建。这里实现了http2
的握手过程,可以理解为在tcp
基础上,h2
自身又实现了一次握手。
func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) {
var authInfo credentials.AuthInfo
rawConn := conn
if config.Credentials != nil {
...
}
writeBufSize := config.WriteBufferSize
readBufSize := config.ReadBufferSize
maxHeaderListSize := defaultServerMaxHeaderListSize
if config.MaxHeaderListSize != nil {
maxHeaderListSize = *config.MaxHeaderListSize
}
// framer初始化
framer := newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize)
// Send initial settings as connection preface to client.
isettings := []http2.Setting{{
ID: http2.SettingMaxFrameSize,
Val: http2MaxFrameLen,
}}
// TODO(zhaoq): Have a better way to signal "no limit" because 0 is
// permitted in the HTTP2 spec.
maxStreams := config.MaxStreams
if maxStreams == 0 {
maxStreams = math.MaxUint32
} else {
isettings = append(isettings, http2.Setting{
ID: http2.SettingMaxConcurrentStreams,
Val: maxStreams,
})
}
... // setting帧的设置
// endWrite->调用的write是newBufWriter中的。还没有发送给client端
if err := framer.fr.WriteSettings(isettings...); err != nil {
return nil, connectionErrorf(false, err, "transport: %v", err)
}
// Adjust the connection flow control window if needed.
if delta := uint32(icwz - defaultWindowSize); delta > 0 {
if err := framer.fr.WriteWindowUpdate(0, delta); err != nil {
return nil, connectionErrorf(false, err, "transport: %v", err)
}
}
kp := config.KeepaliveParams
if kp.MaxConnectionIdle == 0 {
kp.MaxConnectionIdle = defaultMaxConnectionIdle
}
if kp.MaxConnectionAge == 0 {
kp.MaxConnectionAge = defaultMaxConnectionAge
}
// Add a jitter to MaxConnectionAge.
kp.MaxConnectionAge += getJitter(kp.MaxConnectionAge)
if kp.MaxConnectionAgeGrace == 0 {
kp.MaxConnectionAgeGrace = defaultMaxConnectionAgeGrace
}
if kp.Time == 0 {
kp.Time = defaultServerKeepaliveTime
}
if kp.Timeout == 0 {
kp.Timeout = defaultServerKeepaliveTimeout
}
kep := config.KeepalivePolicy
if kep.MinTime == 0 {
kep.MinTime = defaultKeepalivePolicyMinTime
}
done := make(chan struct{})
t := &http2Server{
ctx: setConnection(context.Background(), rawConn),
done: done,
conn: conn,
remoteAddr: conn.RemoteAddr(),
localAddr: conn.LocalAddr(),
authInfo: authInfo,
framer: framer,
readerDone: make(chan struct{}),
writerDone: make(chan struct{}),
maxStreams: maxStreams,
inTapHandle: config.InTapHandle,
fc: &trInFlow{limit: uint32(icwz)},
state: reachable,
activeStreams: make(map[uint32]*Stream),
stats: config.StatsHandler,
kp: kp,
idle: time.Now(),
kep: kep,
initialWindowSize: iwz,
czData: new(channelzData),
bufferPool: newBufferPool(),
}
// 后续窗口大小调整使用
t.controlBuf = newControlBuffer(t.done)
if dynamicWindow {
t.bdpEst = &bdpEstimator{
bdp: initialWindowSize,
updateFlowControl: t.updateFlowControl,
}
}
if t.stats != nil {
t.ctx = t.stats.TagConn(t.ctx, &stats.ConnTagInfo{
RemoteAddr: t.remoteAddr,
LocalAddr: t.localAddr,
})
connBegin := &stats.ConnBegin{}
t.stats.HandleConn(t.ctx, connBegin)
}
if channelz.IsOn() {
t.channelzID = channelz.RegisterNormalSocket(t, config.ChannelzParentID, fmt.Sprintf("%s -> %s", t.remoteAddr, t.localAddr))
}
t.connectionID = atomic.AddUint64(&serverConnectionCounter, 1)
// 套接字发送给client端,settting帧信息
t.framer.writer.Flush()
defer func() {
if err != nil {
t.Close()
}
}()
// Check the validity of client preface.
preface := make([]byte, len(clientPreface))
// 接收client发送的preface数据
if _, err := io.ReadFull(t.conn, preface); err != nil {
// In deployments where a gRPC server runs behind a cloud load balancer
// which performs regular TCP level health checks, the connection is
// closed immediately by the latter. Returning io.EOF here allows the
// grpc server implementation to recognize this scenario and suppress
// logging to reduce spam.
if err == io.EOF {
return nil, io.EOF
}
return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to receive the preface from client: %v", err)
}
// 校验数据的合法性 连接序言以字符串 "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" 开始
if !bytes.Equal(preface, clientPreface) {
return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams received bogus greeting from client: %q", preface)
}
// 再次读取client端发送的setting帧
frame, err := t.framer.fr.ReadFrame()
if err == io.EOF || err == io.ErrUnexpectedEOF {
return nil, err
}
if err != nil {
return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to read initial settings frame: %v", err)
}
atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
sf, ok := frame.(*http2.SettingsFrame)
if !ok {
return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams saw invalid preface type %T from client", frame)
}
// setting帧ack回复
t.handleSettings(sf)
// 后续server端发送的数据帧,都是通过这个loopWriter来发送的
go func() {
t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst)
t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler
if err := t.loopy.run(); err != nil {
if logger.V(logLevel) {
logger.Errorf("transport: loopyWriter.run returning. Err: %v", err)
}
}
t.conn.Close()
t.controlBuf.finish()
close(t.writerDone)
}()
// 保活
go t.keepalive()
return t, nil
}
tcp
链接建立成功后,server
是先发送setting
帧,包括服务端的初始设置,参数发送给client端。后在接收
client
的数据。收到的数据为连接序言以字符串 "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" 开始。这个序列后面必须跟一个可以为空的SETTINGS
帧。这之后,就可以理解为完成一次http2
的初始握手了。handleSettings为settting帧ack回复的消息,发送给client端。并且是通过loopWriter去做发送的。
另外这里启动了一个协程loopWriter。去处理后续
server
需要发送的二进制分帧的数据。
serveStreams解析二进制分帧包
func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) {
defer close(t.readerDone)
for {
// 流量控制
t.controlBuf.throttle()
// 读取包头数据
frame, err := t.framer.fr.ReadFrame()
atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
if err != nil {
if se, ok := err.(http2.StreamError); ok {
if logger.V(logLevel) {
logger.Warningf("transport: http2Server.HandleStreams encountered http2.StreamError: %v", se)
}
t.mu.Lock()
s := t.activeStreams[se.StreamID]
t.mu.Unlock()
if s != nil {
t.closeStream(s, true, se.Code, false)
} else {
t.controlBuf.put(&cleanupStream{
streamID: se.StreamID,
rst: true,
rstCode: se.Code,
onWrite: func() {},
})
}
continue
}
if err == io.EOF || err == io.ErrUnexpectedEOF {
t.Close()
return
}
if logger.V(logLevel) {
logger.Warningf("transport: http2Server.HandleStreams failed to read frame: %v", err)
}
t.Close()
return
}
// 判断帧类型
switch frame := frame.(type) {
case *http2.MetaHeadersFrame:
// 继续解析包头Meta、filed等数据
if t.operateHeaders(frame, handle, traceCtx) {
t.Close()
break
}
// http body数据
case *http2.DataFrame:
t.handleData(frame)
// rst帧
case *http2.RSTStreamFrame:
t.handleRSTStream(frame)
// setting帧
case *http2.SettingsFrame:
t.handleSettings(frame)
case *http2.PingFrame:
t.handlePing(frame)
case *http2.WindowUpdateFrame:
t.handleWindowUpdate(frame)
case *http2.GoAwayFrame:
// TODO: Handle GoAway from the client appropriately.
default:
if logger.V(logLevel) {
logger.Errorf("transport: http2Server.HandleStreams found unhandled frame type %v.", frame)
}
}
}
}
当时看读HandleStreams
这段代码的时候,一直没有想明白,为什么body
的数据还没解析获取到,就开始处理业务逻辑的。
先获取到握手
setting帧
ack信息,之后接收其他帧数据。h2
协议中,包头HEADERS
是先到的,DATA
帧类是后到的。内部顺序是由tcp
肯定是能保证成功的。解析
operateHeaders
过程:handle->handleStream()->processUnaryRPC()当时看到这里。HEADERS
解析完,直接去处理RPC业务逻辑processUnaryRPC
。而没有等待data
帧的解析再去处理业务。继续深入
processUnaryRPC
func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, info *serviceInfo, md *MethodDesc, trInfo *traceInfo) (err error) {
sh := s.opts.statsHandler
...
binlog := binarylog.GetMethodLogger(stream.Method())
if binlog != nil {
... // binlog
}
var comp, decomp encoding.Compressor
var cp Compressor
var dc Decompressor
// 压缩类型
if s.opts.cp != nil {
cp = s.opts.cp
stream.SetSendCompress(cp.Type())
} else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
// Legacy compressor not specified; attempt to respond with same encoding.
comp = encoding.GetCompressor(rc)
if comp != nil {
stream.SetSendCompress(rc)
}
}
var payInfo *payloadInfo
if sh != nil || binlog != nil {
payInfo = &payloadInfo{}
}
// 获取body数据
d, err := recvAndDecompress(&parser{r: stream}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp)
if err != nil {
if e := t.WriteStatus(stream, status.Convert(err)); e != nil {
channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status %v", e)
}
return err
}
if channelz.IsOn() {
t.IncrMsgRecv()
}
df := func(v interface{}) error {
// decode数据
if err := s.getCodec(stream.ContentSubtype()).Unmarshal(d, v); err != nil {
return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)
}
if sh != nil {
sh.HandleRPC(stream.Context(), &stats.InPayload{
RecvTime: time.Now(),
Payload: v,
WireLength: payInfo.wireLength + headerLen,
Data: d,
Length: len(d),
})
}
if binlog != nil {
binlog.Log(&binarylog.ClientMessage{
Message: d,
})
}
if trInfo != nil {
trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
}
return nil
}
ctx := NewContextWithServerTransportStream(stream.Context(), stream)
// rpc业务逻辑处理
reply, appErr := md.Handler(info.serviceImpl, ctx, df, s.opts.unaryInt)
if appErr != nil {
...
return appErr
}
// 回包处理
if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil {
...
}
return err
}
先获取压缩类型,
recvAndDecompress
获取body的数据。protocol-http2协议详解中,
body
部分数据解析获取到
body
后,我们就是对起进行handler
处理,decode-body数据。这里抛出疑惑,
data
数据帧我们是还没收到的,那是如何获取的呢
如何获取h2
DATA数据
func recvAndDecompress(p *parser, s *transport.Stream, dc Decompressor, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor) ([]byte, error) {
// 获取data数据
pf, d, err := p.recvMsg(maxReceiveMessageSize)
if err != nil {
return nil, err
}
if payInfo != nil {
payInfo.wireLength = len(d)
}
// payload校验
if st := checkRecvPayload(pf, s.RecvCompress(), compressor != nil || dc != nil); st != nil {
return nil, st.Err()
}
var size int
// 是否压缩
if pf == compressionMade {
// To match legacy behavior, if the decompressor is set by WithDecompressor or RPCDecompressor,
// use this decompressor as the default.
if dc != nil {
d, err = dc.Do(bytes.NewReader(d))
size = len(d)
} else {
d, size, err = decompress(compressor, d, maxReceiveMessageSize)
}
if err != nil {
return nil, status.Errorf(codes.Internal, "grpc: failed to decompress the received message %v", err)
}
if size > maxReceiveMessageSize {
// TODO: Revisit the error code. Currently keep it consistent with java
// implementation.
return nil, status.Errorf(codes.ResourceExhausted, "grpc: received message after decompression larger than max (%d vs. %d)", size, maxReceiveMessageSize)
}
}
return d, nil
}
func (p *parser) recvMsg(maxReceiveMessageSize int) (pf payloadFormat, msg []byte, err error) {
// 获取5字节的body包头数据
if _, err := p.r.Read(p.header[:]); err != nil {
return 0, nil, err
}
//是否压缩
pf = payloadFormat(p.header[0])
// body长度,大端序模式
length := binary.BigEndian.Uint32(p.header[1:])
if length == 0 {
return pf, nil, nil
}
if int64(length) > int64(maxInt) {
return 0, nil, status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max length allowed on current machine (%d vs. %d)", length, maxInt)
}
if int(length) > maxReceiveMessageSize {
return 0, nil, status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", length, maxReceiveMessageSize)
}
// TODO(bradfitz,zhaoq): garbage. reuse buffer after proto decoding instead
// of making it for each message:
msg = make([]byte, int(length))
// 读取真正的body数据
if _, err := p.r.Read(msg); err != nil {
if err == io.EOF {
err = io.ErrUnexpectedEOF
}
return 0, nil, err
}
return pf, msg, nil
}
recvAndDecompress
中p.recvMsg是如何获取到的呢。read嵌套很深:从parser.recvMsg->Stream.Read->io.ReadFull()->ReadAtLeast()->transportReader.Read()->recvBufferReader->read管道recv.get()阻塞住了。
之前解析中,
handleStream
已经启动协程去处理handle回调了。所以即使这里阻塞住了,也不影响后续data
帧的接收。后续读取channel中的数据。先读取5个字节的包头的数据,第一位为是否压缩,后四个字节为
data
长度,大端序模式。这里我们也可以看到request body
的数据怎么避免压缩,如何去解决朋友说的这个问题。
DATA帧解析
func (t *http2Server) handleData(f *http2.DataFrame) {
// 帧长度
size := f.Header().Length
// Select the right stream to dispatch.
// stream信息
s, ok := t.getStream(f)
if !ok {
return
}
// stream是否已经读取完毕
if s.getState() == streamReadDone {
t.closeStream(s, true, http2.ErrCodeStreamClosed, false)
return
}
if size > 0 {
if err := s.fc.onData(size); err != nil {
t.closeStream(s, true, http2.ErrCodeFlowControl, false)
return
}
if f.Header().Flags.Has(http2.FlagDataPadded) {
if w := s.fc.onRead(size - uint32(len(f.Data()))); w > 0 {
t.controlBuf.put(&outgoingWindowUpdate{s.id, w})
}
}
if len(f.Data()) > 0 {
buffer := t.bufferPool.get()
buffer.Reset()
buffer.Write(f.Data())
// 数据写入
s.write(recvMsg{buffer: buffer})
}
}
// 判断是否是结束帧
if f.StreamEnded() {
// Received the end of stream from the client.
s.comp
areAndSwapState(streamActive, streamReadDone)
s.write(recvMsg{err: io.EOF})
}
}
- 这里s.write(recvMsg{buffer: buffer}),将buf的数据写入Stream->buf.recvBuffer->put进c中的recvMsg的channel中。和上面的read阻塞获取
data
帧数据。刚好一一对应,一个写入,一个读取。
总结
大家如果理解了
tcp
协议的大致流程,那么h2
协议就不难理解从最开始学习
http2
的协议,了解client
端是先发送HEAD
帧,在发送DATA
帧数据。一开始可能一直陷入为什么
DATA
的数据没有接受完,就可以处理业务逻辑。并且回调启协程没有太注意看。recvMsg中read
的嵌套非常深。种种原因,叠加在一起,一开始确实一头雾水。不过拨开云雾,逐层慢慢分析,思路还是比较清晰的。client
(C)先和server
(S)建立h2
握手(settting帧
),回复ack。后C继续发送HEAD帧。S端解析HEAD,启动goroutine
并channel
阻塞获取后续DATA
数据,C继续发送DATA
帧数据,S端解析DATA
数据,并将body
数据发送到channel
中。刚启动的G获取到channel
中的数据。就可以处理后续的rpc业务逻辑了。
大家可以添加我一起探讨
我是一个爱扣源码细节的dandyhuang,码字不易,点个小赞,只希望大家能更加明白。w:dandyhuang_