先说点题外话,最近在开发公司级的网关,虽然没有明说,但是对于我们大家来说Nginx就是我们对标的对象。但是说实话,想要对标Nginx的性能,用Go开发基本上是不可能的,人家没有scheduler调度这一项就可以吊打Go了,更别说Go还有GC了。跑Benchmark的时候就能很明显地看到,随着并发请求的增多,Ngx的响应时间几乎就是一条完美的直线。而我们用Go开发的网关,在并发数小于等于阈值X的时候还能跟Ngx不相上下,虽然有周期性GC带来的毛刺,但是总体影响不大,毛刺主要影响的是99分位的响应时间。但是,一旦并发数超过阈值X,Go网关的响应时间便指数级地上升了。
这个现象,直接通过pprof观察可以发现,在并发数小于X时,总Goroutine数量基本保持稳定,但是一旦超过阈值X,goroutine数量则快速飙升上去。为什么呢?简单来说就是,由于Go的runtime需要调度goroutine(sleep哪个唤醒哪个抢占哪个),在Goroutine数量巨大的时候这个调度的开销非常大,每个goroutine被唤起的周期变得很长,因此会导致响应变长。同时,由于大量goroutine并没有完成其任务,导致无法回收,新到的请求就只能new goroutine,导致goroutine的数量进一步增加,使得响应时间进一步恶化…然后基本上服务就不可用了,pprof能够看到,绝大部分时间都在执行runtime.findRunableG。
这个现象让我想起了以前大学时代学习的二极管的雪崩击穿。虽然,Go号称goroutine非常轻量级,可以轻松地开到十万百万级,但是这话是省略了很多上下文和限制条件的。它只告诉你可以有millions of goroutine,但是没告诉你后果是啥,怎样才能开到millions,轻松开millions of goroutines是相对谁来说轻松…总之一句话就是,太美的承诺都不能信。
说回正题,fasthttp!为什么我进入主题之前说这么多题外话,本质上的目的就是想表明,对处理高并发场景的应用,goroutine的代价其实是不可忽视的,一定要省着用!fasthttp为什么比标准库net/http快,就是因为它并不是来一个请求就开一个goroutine,而是维护了一个workerPool,尽可能复用goroutine。当然还有很多别的优化,比如尽量减少数据copy,这些在fasthttp的API里就有很直观的体现。
先来简单看看fasthttp的大框架结构:
func (s *Server) Serve(ln net.Listener) error {
var lastOverflowErrorTime time.Time
var lastPerIPErrorTime time.Time
var c net.Conn
var err error
// 略
maxWorkersCount := s.getConcurrency()
s.concurrencyCh = make(chan struct{}, maxWorkersCount)
wp := &workerPool{
WorkerFunc: s.serveConn,
MaxWorkersCount: maxWorkersCount,
LogAllErrors: s.LogAllErrors,
Logger: s.logger(),
connState: s.setState,
}
wp.Start()
atomic.AddInt32(&s.open, 1)
defer atomic.AddInt32(&s.open, -1)
for {
if c, err = acceptConn(s, ln, &lastPerIPErrorTime); err != nil {
//略
}
s.setState(c, StateNew)
atomic.AddInt32(&s.open, 1)
if !wp.Serve(c) {
// 略
}
c = nil
}
}
上面是一个经过删减提炼后的代码,从上面可以看到,fasthttp在启动时先实例化并启动了一个workerPool,然后进入到了一个大循环中,也是每当accept一个连接之后就教给workerPool去Serve。
可以简单地对比以下标准库net/http对应逻辑的伪代码:
for {
c,err := accept(s)
go s.serve(c)
}
可以看到最大的区别就是fasthttp是wp.serve(c)
而标准库是直接起一个goroutinego s.serve(c)
。为什么不像标准库直接启动一个goroutine去处理呢?当然是为了优化啊!前面也说了,高并发下goroutine代价也是很高的,尽量复用goroutine。
接着我们来讲讲workerPool。和别的语言中线程池的实现思路基本一致,伪代码如下:
func init() {
for i:=0 i<N; i++ {
signalReceiver := make(chan net.Conn)
ready = append(ready, signalReceiver)
go job(signalReceiver)
}
}
// 每个worker都有一个自己的channel,通过从channel中接收消息来获得执行权
func job(ch chan net.Conn) {
for c := range ch {
doSomething(c)
//当完成任务后,把自己的channel放到ready队列里,表示自己是空闲状态
lock()
ready = append(ready, ch)
unlock()
}
}
var ready = []chan net.Conn{}
// 每次从ready队列里取一个空闲的channel,然后通知该job来执行任务
func serve(c net.Conn) {
lock()
readyJob := ready[len(ready)-1]
ready = ready[:len(ready)-1]
unlock()
readyJob <- c
}
以上便是一个最简单的(问题多多)的workerPool。我们一开始启动了N个worker,每个worker都由一个自己的channel用于接收数据,然后一开始把所有worker的channel都放到ready队列里,表示所有的worker都处于空闲状态。每次接收到一个请求时,serve就通过ready去看哪个worker是空闲的,然后向那个worker的channel发消息,从而让该worker执行当前任务。
确保你完全明白上面的workerPool的实现思路,我们再继续看fasthttp的实现。
fasthttp的serve和我们上述基本一致:
func (wp *workerPool) Serve(c net.Conn) bool {
ch := wp.getCh()
if ch == nil {
return false
}
ch.ch <- c
return true
}
从wp里取一个channel,然后向该channel发消息,让对应的worker执行任务。我们这个具体看看fasthttp是怎么找到空闲worker的:
func (wp *workerPool) getCh() *workerChan {
var ch *workerChan
createWorker := false
wp.lock.Lock()
ready := wp.ready
n := len(ready) - 1
if n < 0 {
if wp.workersCount < wp.MaxWorkersCount {
createWorker = true
wp.workersCount++
}
} else {
ch = ready[n]
ready[n] = nil
wp.ready = ready[:n]
}
wp.lock.Unlock()
if ch == nil {
if !createWorker {
return nil
}
vch := wp.workerChanPool.Get()
if vch == nil {
vch = &workerChan{
ch: make(chan net.Conn, workerChanCap),
}
}
ch = vch.(*workerChan)
go func() {
wp.workerFunc(ch)
wp.workerChanPool.Put(vch)
}()
}
return ch
}
其实也是,一开始尝试从ready队列里取,如果ready队列里没有,但是当前worker数量还没有达到用户配置的MaxWorkersCount,那么就新起一个worker,否则就直接返回nil。这里新建worker还用到了临时对象池sync.Pool也就是代码中的wp.workerChanPool,能在两次gc之间复用对象,减少内存分配的开销。不过从这里也能看出,fasthttp的workerPool是lazyLoading的,并不是像我们之前的实现那样一开始就创建N个worker。这么做当然就是省内存啦,大部分业务大时间服务器都不会有这么高的并发压力,因此fasthttp作为通用框架,lazyLoading肯定是一个正确的选择!
这里的wp.workerFunc其实就是我们之前伪代码中的job函数,在里面监听channel消息,然后执行业务逻辑。我们可以具体看看:
func (wp *workerPool) workerFunc(ch *workerChan) {
var c net.Conn
var err error
for c = range ch.ch {
if c == nil {
break
}
if err = wp.WorkerFunc(c); err != nil && err != errHijacked {
// 省略错误处理
}
if err == errHijacked {
wp.connState(c, StateHijacked)
} else {
c.Close()
wp.connState(c, StateClosed)
}
c = nil
// 把ch放到ready队列里
if !wp.release(ch) {
break
}
}
wp.lock.Lock()
wp.workersCount--
wp.lock.Unlock()
}
从上面代码我们能够看到,每个job确实也是不断地监听channel,如果收到消息且不是nil,那就执行真正的业务逻辑。成功执行完之后(省略掉一些错误处理分支),通过wp.release把channel放到ready队列里。正常情况下,workerFunc会一直执行,直到收到一个nil或者执行出错,然后把workerPool的workersCount-1并退出,之后就等着runtime来回收或者释放goroutine了。
以上就是fasthttp的主要逻辑,没有什么特别的设计,和其它线程池的设计几乎是一模一样的。当然fasthttp的workerPool还有些需要注意的性质,从上面可以看出,每次release到ready队列时,直接放到队尾,每次取也是从队尾取。因此fasthttp的worker队列是FILO的,即先进后出。这会导致在并发小的情况下很多先入队的worker会一直空闲。因此fasthttp也支持设置IdleDuration参数,定期清理空闲的worker减少资源占用。这部分代码:
func (wp *workerPool) Start() {
if wp.stopCh != nil {
panic("BUG: workerPool already started")
}
wp.stopCh = make(chan struct{})
stopCh := wp.stopCh
go func() {
var scratch []*workerChan
for {
wp.clean(&scratch)
select {
case <-stopCh:
return
default:
time.Sleep(wp.getMaxIdleWorkerDuration())
}
}
}()
}
func (wp *workerPool) clean(scratch *[]*workerChan) {
maxIdleWorkerDuration := wp.getMaxIdleWorkerDuration()
// Clean least recently used workers if they didn't serve connections
// for more than maxIdleWorkerDuration.
currentTime := time.Now()
wp.lock.Lock()
ready := wp.ready
n := len(ready)
i := 0
for i < n && currentTime.Sub(ready[i].lastUseTime) > maxIdleWorkerDuration {
i++
}
*scratch = append((*scratch)[:0], ready[:i]...)
if i > 0 {
m := copy(ready, ready[i:])
for i = m; i < n; i++ {
ready[i] = nil
}
wp.ready = ready[:m]
}
wp.lock.Unlock()
// Notify obsolete workers to stop.
// This notification must be outside the wp.lock, since ch.ch
// may be blocking and may consume a lot of time if many workers
// are located on non-local CPUs.
tmp := *scratch
for i, ch := range tmp {
ch.ch <- nil
tmp[i] = nil
}
}
wp.Start中启动一个goroutine,定期执行wp.clean操作。wp.clean其实就是从头遍历ready队列,把空闲时间超过maxIdleWorkerDuration的都清理掉。这里清理也很简单,直接向该channel发送一个nil就行了。别忘了之前workFunc中,当收到一个nil之后就直接break出大循环,做些收尾工作然后退出函数,整个goroutine也就可以被runtime回收了。
不过这还不算完。我们再看看wp.WorkerFunc吧,这是一个很长的函数,其实就是s.serveConn(在初始化时把s.serveConn赋值给了wp.WorkerFunc),以下也是简化过的代码,我们只关注其hotpath:
func (s *Server) serveConn(c net.Conn) error {
ctx := s.acquireCtx(c)
var connRequestNum int
for {
connRequestNum++
var br *bufio.Reader = acquireReader(ctx)
err = ctx.Request.readLimitBody(br, maxRequestSize)
err = s.Handler(ctx)
var wr *bufio.Writer = acquireWriter(ctx)
err = wr.Flush()
if err != nil {
return
}
if s.MaxRequestNumPerConn {
break;
}
}
// 省略
}
这里有个比较奇怪的地方,为什么要用一个无限循环呢?难道是接收网络包的分组之类的?NoNoNo,不要把概念搞混了!分组这些都是协议栈处理的内容,到Go这块直接就是应用层了。那为什么要无限循环呢?在我这篇文章里说过,只有通过三次握手新建的连接,才用Accept去取。建立好连接后,后续数据的收发都是基于该socket对象,也即net.Conn对象。
也就是说,只有新建的连接才会从Serve中的acceptConn函数开始,然后执行上述的逻辑。已经建立好的连接,后续的请求都在serveConn
中循环处理。换句话说,如果一个HTTP请求是KeepAlive的(HTTP 1.1默认行为),那么worker就会一直处理此连接,无限循环地从该连接上读取数据(也就是下一个请求),然后进行业务逻辑。除非遇到connRequestNum >= MaxRequestNumPerConn或者其它错误了,才会关闭该连接,然后把自己设置为空闲。
这里还有个问题你可能会疑惑:由于并不知道下一次请求啥时候会发过来,这里只有一个ctx.Request.readLimitBody(br, maxRequestSize)
,并没有看到“不断尝试去读”这种逻辑呢?
这是一个好问题!!
其实这里的答案就是Go提供的一种强大的抽象net.Conn
。它不仅仅是代表一个socket,同时它还被封装成了netPoller对象。netPoller是Go runtime的一个数据结构,也许你早已知道了linux的epoll,netPoller就是对epoll的一种封装。Go把socket注册到epoll里,后续当用户在net.Conn对象上调用Read时,实际上是这样的:
// Read implements io.Reader.
func (fd *FD) Read(p []byte) (int, error) {
if err := fd.readLock(); err != nil {
return 0, err
}
defer fd.readUnlock()
if len(p) == 0 {
// If the caller wanted a zero byte read, return immediately
// without trying (but after acquiring the readLock).
// Otherwise syscall.Read returns 0, nil which looks like
// io.EOF.
// TODO(bradfitz): make it wait for readability? (Issue 15735)
return 0, nil
}
if err := fd.pd.prepareRead(fd.isFile); err != nil {
return 0, err
}
if fd.IsStream && len(p) > maxRW {
p = p[:maxRW]
}
for {
n, err := syscall.Read(fd.Sysfd, p)
if err != nil {
n = 0
if err == syscall.EAGAIN && fd.pd.pollable() {
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
// On MacOS we can see EINTR here if the user
// pressed ^Z. See issue #22838.
if runtime.GOOS == "darwin" && err == syscall.EINTR {
continue
}
}
err = fd.eofError(n, err)
return n, err
}
}
也就是先会进行一次syscall.Read,但是如果没有数据,此时会得到一个错误syscall.EAGAIN。这时,会执行fd.pd.waitRead,这个函数会一直阻塞直到epoll通知socket有数据就绪。这里的阻塞和syscall的阻塞调用不一样,这里的阻塞相当于主动让出时间片(park),当前线程可以去执行别的goroutine,然后等待适当的时间(epoll event fire)被runtime唤醒。从用户的角度来看,这就像是阻塞的。
那么这是怎么做到的呢?这就涉及到runtime的调度了,具体可以参见这篇文章。顺带提一句,查看太深入到runtime的代码一定要用dlv、lldb或者gdb,用IDE会跳到错误的位置,因为很多runtime的代码直接和平台有关了,不同平台对应实现也不一样,然后链接器也会搞一些事情导致符号表在源代码层面不能正确跳转,所以一定要用单步调试去看代码。
以上便是对fasthttp源码结构的一个剖析,接下来让我们思考一个问题吧:
假设有N个客户端都使用长连接(http keepalive)发送请求,同时假设每个客户端每秒发送M个请求。那么此时fasthttp和net/http的性能会如何呢?
由于有N个客户端,因此fashttp和net/http的Server都会Accept N次。标准库会启动N个goroutine,而对于fasthttp来说,由于每个连接都是长连接,每个worker会一直处理该连接直到连接关闭或者次数到了限制,因此ready队列一直是空的,所以也会启动N个goroutine。即使N很大,这种case下fasthttp和标准库所使用的goroutine数量的持平的。
但是由于fasthttp大量使用了sync.Pool复用对象减少内存分配的开销,而标准库每个请求都会new一个request和response对象。同时fasthttp中大部分存的是[]byte而标准库中多是string,因此fasthttp还相比标准库减少了很多内存复制的开销。
总体而言,fasthttp性能在各种场景下应该都比标准库好很多。
当然还有个tips,作为网关,外网和内网的一道门,为了防止恶意请求,还是应该在response后主动close连接,也就是说应该使用短连接,这样才安全。
That's all!