fasthttp剖析

先说点题外话,最近在开发公司级的网关,虽然没有明说,但是对于我们大家来说Nginx就是我们对标的对象。但是说实话,想要对标Nginx的性能,用Go开发基本上是不可能的,人家没有scheduler调度这一项就可以吊打Go了,更别说Go还有GC了。跑Benchmark的时候就能很明显地看到,随着并发请求的增多,Ngx的响应时间几乎就是一条完美的直线。而我们用Go开发的网关,在并发数小于等于阈值X的时候还能跟Ngx不相上下,虽然有周期性GC带来的毛刺,但是总体影响不大,毛刺主要影响的是99分位的响应时间。但是,一旦并发数超过阈值X,Go网关的响应时间便指数级地上升了。
这个现象,直接通过pprof观察可以发现,在并发数小于X时,总Goroutine数量基本保持稳定,但是一旦超过阈值X,goroutine数量则快速飙升上去。为什么呢?简单来说就是,由于Go的runtime需要调度goroutine(sleep哪个唤醒哪个抢占哪个),在Goroutine数量巨大的时候这个调度的开销非常大,每个goroutine被唤起的周期变得很长,因此会导致响应变长。同时,由于大量goroutine并没有完成其任务,导致无法回收,新到的请求就只能new goroutine,导致goroutine的数量进一步增加,使得响应时间进一步恶化…然后基本上服务就不可用了,pprof能够看到,绝大部分时间都在执行runtime.findRunableG。
这个现象让我想起了以前大学时代学习的二极管的雪崩击穿。虽然,Go号称goroutine非常轻量级,可以轻松地开到十万百万级,但是这话是省略了很多上下文和限制条件的。它只告诉你可以有millions of goroutine,但是没告诉你后果是啥,怎样才能开到millions,轻松开millions of goroutines是相对谁来说轻松…总之一句话就是,太美的承诺都不能信。

说回正题,fasthttp!为什么我进入主题之前说这么多题外话,本质上的目的就是想表明,对处理高并发场景的应用,goroutine的代价其实是不可忽视的,一定要省着用!fasthttp为什么比标准库net/http快,就是因为它并不是来一个请求就开一个goroutine,而是维护了一个workerPool,尽可能复用goroutine。当然还有很多别的优化,比如尽量减少数据copy,这些在fasthttp的API里就有很直观的体现。

先来简单看看fasthttp的大框架结构:

func (s *Server) Serve(ln net.Listener) error {
    var lastOverflowErrorTime time.Time
    var lastPerIPErrorTime time.Time
    var c net.Conn
    var err error
    // 略

    maxWorkersCount := s.getConcurrency()
    s.concurrencyCh = make(chan struct{}, maxWorkersCount)
    wp := &workerPool{
        WorkerFunc:      s.serveConn,
        MaxWorkersCount: maxWorkersCount,
        LogAllErrors:    s.LogAllErrors,
        Logger:          s.logger(),
        connState:       s.setState,
    }
    wp.Start()

    atomic.AddInt32(&s.open, 1)
    defer atomic.AddInt32(&s.open, -1)

    for {
        if c, err = acceptConn(s, ln, &lastPerIPErrorTime); err != nil {
      //略
        }
        s.setState(c, StateNew)
        atomic.AddInt32(&s.open, 1)
        if !wp.Serve(c) {
    // 略
        }
        c = nil
    }
}

上面是一个经过删减提炼后的代码,从上面可以看到,fasthttp在启动时先实例化并启动了一个workerPool,然后进入到了一个大循环中,也是每当accept一个连接之后就教给workerPool去Serve。
可以简单地对比以下标准库net/http对应逻辑的伪代码:

for {
    c,err := accept(s)
    go s.serve(c)
}

可以看到最大的区别就是fasthttp是wp.serve(c)而标准库是直接起一个goroutinego s.serve(c)。为什么不像标准库直接启动一个goroutine去处理呢?当然是为了优化啊!前面也说了,高并发下goroutine代价也是很高的,尽量复用goroutine。

接着我们来讲讲workerPool。和别的语言中线程池的实现思路基本一致,伪代码如下:


func init() {
    for i:=0 i<N; i++ {
      signalReceiver := make(chan net.Conn)
      ready = append(ready, signalReceiver)
      go job(signalReceiver)
    }
}

// 每个worker都有一个自己的channel,通过从channel中接收消息来获得执行权
func job(ch chan net.Conn) {
    for c := range ch {
        doSomething(c)
        //当完成任务后,把自己的channel放到ready队列里,表示自己是空闲状态
        lock()
        ready = append(ready, ch)
        unlock()
    }
}
var ready = []chan net.Conn{}

// 每次从ready队列里取一个空闲的channel,然后通知该job来执行任务
func serve(c net.Conn) {
  lock()
  readyJob := ready[len(ready)-1]
  ready = ready[:len(ready)-1]
  unlock()
  readyJob <- c
}

以上便是一个最简单的(问题多多)的workerPool。我们一开始启动了N个worker,每个worker都由一个自己的channel用于接收数据,然后一开始把所有worker的channel都放到ready队列里,表示所有的worker都处于空闲状态。每次接收到一个请求时,serve就通过ready去看哪个worker是空闲的,然后向那个worker的channel发消息,从而让该worker执行当前任务。

确保你完全明白上面的workerPool的实现思路,我们再继续看fasthttp的实现。
fasthttp的serve和我们上述基本一致:

func (wp *workerPool) Serve(c net.Conn) bool {
    ch := wp.getCh()
    if ch == nil {
        return false
    }
    ch.ch <- c
    return true
}

从wp里取一个channel,然后向该channel发消息,让对应的worker执行任务。我们这个具体看看fasthttp是怎么找到空闲worker的:

func (wp *workerPool) getCh() *workerChan {
    var ch *workerChan
    createWorker := false

    wp.lock.Lock()
    ready := wp.ready
    n := len(ready) - 1
    if n < 0 {
        if wp.workersCount < wp.MaxWorkersCount {
            createWorker = true
            wp.workersCount++
        }
    } else {
        ch = ready[n]
        ready[n] = nil
        wp.ready = ready[:n]
    }
    wp.lock.Unlock()

    if ch == nil {
        if !createWorker {
            return nil
        }
        vch := wp.workerChanPool.Get()
        if vch == nil {
            vch = &workerChan{
                ch: make(chan net.Conn, workerChanCap),
            }
        }
        ch = vch.(*workerChan)
        go func() {
            wp.workerFunc(ch)
            wp.workerChanPool.Put(vch)
        }()
    }
    return ch
}

其实也是,一开始尝试从ready队列里取,如果ready队列里没有,但是当前worker数量还没有达到用户配置的MaxWorkersCount,那么就新起一个worker,否则就直接返回nil。这里新建worker还用到了临时对象池sync.Pool也就是代码中的wp.workerChanPool,能在两次gc之间复用对象,减少内存分配的开销。不过从这里也能看出,fasthttp的workerPool是lazyLoading的,并不是像我们之前的实现那样一开始就创建N个worker。这么做当然就是省内存啦,大部分业务大时间服务器都不会有这么高的并发压力,因此fasthttp作为通用框架,lazyLoading肯定是一个正确的选择!

这里的wp.workerFunc其实就是我们之前伪代码中的job函数,在里面监听channel消息,然后执行业务逻辑。我们可以具体看看:

func (wp *workerPool) workerFunc(ch *workerChan) {
    var c net.Conn

    var err error
    for c = range ch.ch {
        if c == nil {
            break
        }

        if err = wp.WorkerFunc(c); err != nil && err != errHijacked {
        // 省略错误处理
        }
        if err == errHijacked {
            wp.connState(c, StateHijacked)
        } else {
            c.Close()
            wp.connState(c, StateClosed)
        }
        c = nil
        // 把ch放到ready队列里
        if !wp.release(ch) {
            break
        }
    }

    wp.lock.Lock()
    wp.workersCount--
    wp.lock.Unlock()
}

从上面代码我们能够看到,每个job确实也是不断地监听channel,如果收到消息且不是nil,那就执行真正的业务逻辑。成功执行完之后(省略掉一些错误处理分支),通过wp.release把channel放到ready队列里。正常情况下,workerFunc会一直执行,直到收到一个nil或者执行出错,然后把workerPool的workersCount-1并退出,之后就等着runtime来回收或者释放goroutine了。

以上就是fasthttp的主要逻辑,没有什么特别的设计,和其它线程池的设计几乎是一模一样的。当然fasthttp的workerPool还有些需要注意的性质,从上面可以看出,每次release到ready队列时,直接放到队尾,每次取也是从队尾取。因此fasthttp的worker队列是FILO的,即先进后出。这会导致在并发小的情况下很多先入队的worker会一直空闲。因此fasthttp也支持设置IdleDuration参数,定期清理空闲的worker减少资源占用。这部分代码:

  func (wp *workerPool) Start() {
    if wp.stopCh != nil {
        panic("BUG: workerPool already started")
    }
    wp.stopCh = make(chan struct{})
    stopCh := wp.stopCh
    go func() {
        var scratch []*workerChan
        for {
            wp.clean(&scratch)
            select {
            case <-stopCh:
                return
            default:
                time.Sleep(wp.getMaxIdleWorkerDuration())
            }
        }
    }()
}

func (wp *workerPool) clean(scratch *[]*workerChan) {
    maxIdleWorkerDuration := wp.getMaxIdleWorkerDuration()

    // Clean least recently used workers if they didn't serve connections
    // for more than maxIdleWorkerDuration.
    currentTime := time.Now()

    wp.lock.Lock()
    ready := wp.ready
    n := len(ready)
    i := 0
    for i < n && currentTime.Sub(ready[i].lastUseTime) > maxIdleWorkerDuration {
        i++
    }
    *scratch = append((*scratch)[:0], ready[:i]...)
    if i > 0 {
        m := copy(ready, ready[i:])
        for i = m; i < n; i++ {
            ready[i] = nil
        }
        wp.ready = ready[:m]
    }
    wp.lock.Unlock()

    // Notify obsolete workers to stop.
    // This notification must be outside the wp.lock, since ch.ch
    // may be blocking and may consume a lot of time if many workers
    // are located on non-local CPUs.
    tmp := *scratch
    for i, ch := range tmp {
        ch.ch <- nil
        tmp[i] = nil
    }
}

wp.Start中启动一个goroutine,定期执行wp.clean操作。wp.clean其实就是从头遍历ready队列,把空闲时间超过maxIdleWorkerDuration的都清理掉。这里清理也很简单,直接向该channel发送一个nil就行了。别忘了之前workFunc中,当收到一个nil之后就直接break出大循环,做些收尾工作然后退出函数,整个goroutine也就可以被runtime回收了。

不过这还不算完。我们再看看wp.WorkerFunc吧,这是一个很长的函数,其实就是s.serveConn(在初始化时把s.serveConn赋值给了wp.WorkerFunc),以下也是简化过的代码,我们只关注其hotpath:

func (s *Server) serveConn(c net.Conn) error {
  ctx := s.acquireCtx(c)
  var connRequestNum int
  for {
    connRequestNum++
    var br *bufio.Reader = acquireReader(ctx)
    err = ctx.Request.readLimitBody(br, maxRequestSize)
    err = s.Handler(ctx)
    var wr *bufio.Writer = acquireWriter(ctx)
    err = wr.Flush()
    if err != nil {
      return      
    }
    if s.MaxRequestNumPerConn {
      break;
    }
  }
// 省略
}

这里有个比较奇怪的地方,为什么要用一个无限循环呢?难道是接收网络包的分组之类的?NoNoNo,不要把概念搞混了!分组这些都是协议栈处理的内容,到Go这块直接就是应用层了。那为什么要无限循环呢?在我这篇文章里说过,只有通过三次握手新建的连接,才用Accept去取。建立好连接后,后续数据的收发都是基于该socket对象,也即net.Conn对象。

也就是说,只有新建的连接才会从Serve中的acceptConn函数开始,然后执行上述的逻辑。已经建立好的连接,后续的请求都在serveConn中循环处理。换句话说,如果一个HTTP请求是KeepAlive的(HTTP 1.1默认行为),那么worker就会一直处理此连接,无限循环地从该连接上读取数据(也就是下一个请求),然后进行业务逻辑。除非遇到connRequestNum >= MaxRequestNumPerConn或者其它错误了,才会关闭该连接,然后把自己设置为空闲。

这里还有个问题你可能会疑惑:由于并不知道下一次请求啥时候会发过来,这里只有一个ctx.Request.readLimitBody(br, maxRequestSize),并没有看到“不断尝试去读”这种逻辑呢?

这是一个好问题!!

其实这里的答案就是Go提供的一种强大的抽象net.Conn。它不仅仅是代表一个socket,同时它还被封装成了netPoller对象。netPoller是Go runtime的一个数据结构,也许你早已知道了linux的epoll,netPoller就是对epoll的一种封装。Go把socket注册到epoll里,后续当用户在net.Conn对象上调用Read时,实际上是这样的:

// Read implements io.Reader.
func (fd *FD) Read(p []byte) (int, error) {
    if err := fd.readLock(); err != nil {
        return 0, err
    }
    defer fd.readUnlock()
    if len(p) == 0 {
        // If the caller wanted a zero byte read, return immediately
        // without trying (but after acquiring the readLock).
        // Otherwise syscall.Read returns 0, nil which looks like
        // io.EOF.
        // TODO(bradfitz): make it wait for readability? (Issue 15735)
        return 0, nil
    }
    if err := fd.pd.prepareRead(fd.isFile); err != nil {
        return 0, err
    }
    if fd.IsStream && len(p) > maxRW {
        p = p[:maxRW]
    }
    for {
        n, err := syscall.Read(fd.Sysfd, p)
        if err != nil {
            n = 0
            if err == syscall.EAGAIN && fd.pd.pollable() {
                if err = fd.pd.waitRead(fd.isFile); err == nil {
                    continue
                }
            }

            // On MacOS we can see EINTR here if the user
            // pressed ^Z.  See issue #22838.
            if runtime.GOOS == "darwin" && err == syscall.EINTR {
                continue
            }
        }
        err = fd.eofError(n, err)
        return n, err
    }
}

也就是先会进行一次syscall.Read,但是如果没有数据,此时会得到一个错误syscall.EAGAIN。这时,会执行fd.pd.waitRead,这个函数会一直阻塞直到epoll通知socket有数据就绪。这里的阻塞和syscall的阻塞调用不一样,这里的阻塞相当于主动让出时间片(park),当前线程可以去执行别的goroutine,然后等待适当的时间(epoll event fire)被runtime唤醒。从用户的角度来看,这就像是阻塞的。

那么这是怎么做到的呢?这就涉及到runtime的调度了,具体可以参见这篇文章。顺带提一句,查看太深入到runtime的代码一定要用dlv、lldb或者gdb,用IDE会跳到错误的位置,因为很多runtime的代码直接和平台有关了,不同平台对应实现也不一样,然后链接器也会搞一些事情导致符号表在源代码层面不能正确跳转,所以一定要用单步调试去看代码。

以上便是对fasthttp源码结构的一个剖析,接下来让我们思考一个问题吧:

假设有N个客户端都使用长连接(http keepalive)发送请求,同时假设每个客户端每秒发送M个请求。那么此时fasthttp和net/http的性能会如何呢?

由于有N个客户端,因此fashttp和net/http的Server都会Accept N次。标准库会启动N个goroutine,而对于fasthttp来说,由于每个连接都是长连接,每个worker会一直处理该连接直到连接关闭或者次数到了限制,因此ready队列一直是空的,所以也会启动N个goroutine。即使N很大,这种case下fasthttp和标准库所使用的goroutine数量的持平的。
但是由于fasthttp大量使用了sync.Pool复用对象减少内存分配的开销,而标准库每个请求都会new一个request和response对象。同时fasthttp中大部分存的是[]byte而标准库中多是string,因此fasthttp还相比标准库减少了很多内存复制的开销。
总体而言,fasthttp性能在各种场景下应该都比标准库好很多。

当然还有个tips,作为网关,外网和内网的一道门,为了防止恶意请求,还是应该在response后主动close连接,也就是说应该使用短连接,这样才安全。

That's all!

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,802评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,109评论 2 379
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,683评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,458评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,452评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,505评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,901评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,550评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,763评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,556评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,629评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,330评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,898评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,897评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,140评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,807评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,339评论 2 342

推荐阅读更多精彩内容