golang sql 包连接池分析

golang 在使用 mysql 的时候会用到 database/sql 这个库,每次都在黑盒使用它,有必要来梳理一下整个请求流程和细节,以免以后碰到问题会有解决的思路。

阅读之前的几个问题

  • sql 的连接池的连接怎么维护的?
  • Query / Exec 如何获取查询的连接?
  • 连接池的连接如何释放的?

几个重要的结构

DB struct

先来看看 DB 结构,该结构是 sql 包的核心结构。DB 是表示零个或多个底层连接池的数据库句柄,是并发安全的。

sql 包可以自动创建和释放连接,它还维护一个空闲连接池。如果数据库具有每个连接状态的概念,则只能在事务中可靠地观察到这种状态。调用 DB.Begin 后,返回的 Tx 将绑定到单个连接。在事务上调用 Commit 或 Rollback 后,该事务的连接将返回到 DB 的空闲连接池。SetMaxIdleConns 用来控制连接池大小。

type DB struct {
    driver driver.Driver // 数据库驱动
    dsn    string // 数据库连接参数,比如 username,hostname,password 等等
    numClosed uint64 // numClosed 是一个原子计数器,表示已关闭连接的总数。Stmt.openStmt 在清除 Stmt.css 中的已关闭连接之前对其进行检查。

    mu           sync.Mutex // 保护下面的字段
    freeConn     []*driverConn // 空闲连接
    connRequests map[uint64]chan connRequest // 阻塞请求队列。当达到最大连接数时,后续请求将插入该队列来等待可用连接
    nextRequest  uint64 // connRequests 的下一个 key
    numOpen      int    // 已连接或者正等待连接的数量

    // 一个创建新连接的信号,
    // 运行connectionOpener()的goroutine读取此chan,maybeOpenNewConnections发送此chan(每个需要的连接发送一次)
    // 它在db.Close()时关闭,并通知connectionOpener goroutine退出。
    openerCh    chan struct{} 
    closed      bool
    dep         map[finalCloser]depSet
    lastPut     map[*driverConn]string // 用于 debug
    maxIdle     int                    // 最大空闲连接数, 0等价于 defaultMaxIdleConns 常量(代码中值为2),负数等价于0
    maxOpen     int                    // 数据库的最大连接数,0 等价于不限制最大连接数
    maxLifetime time.Duration          // 连接的最大生命周期
    cleanerCh   chan struct{} // 用于释放连接池中过期的连接的信号
}
driverConn struct

driverConn 使用互斥锁封装一个 driver.Conn 结构,在所有对 Conn 的调用期间保持(包括对通过该 Conn 返回的接口的任何调用,例如对 Tx,Stmt,Result,Rows 的调用)

type driverConn struct {
    db        *DB
    createdAt time.Time

    sync.Mutex  // 保护下面的字段
    ci          driver.Conn
    closed      bool
    finalClosed bool // ci.Close 已经被调用则为 true
    openStmt    map[*driverStmt]bool

    // 下面的字段被 db.mu 保护
    inUse      bool
    onPut      []func() // 下次返回 conn 时运行的代码
    dbmuClosed bool     // 与 closed 字段相同,但由 db.mu 保护,用于 removeClosedStmtLocked
}

// driver.Conn 是具体的接口 用来支持不同的数据库
// Conn 是与数据库的连接,不是 gotoutines 安全的。
// Conn 被认为是有状态的。
type Conn interface {
    // Prepare 返回绑定到该连接的就绪语句 Stmt。
    Prepare(query string) (Stmt, error)

    // Close 使当前就绪的语句和事务无效并可能停止,将此连接标记为不再使用。
    //
    // 因为sql包维护一个空闲的连接池,并且只有在空闲连接过剩时才调用Close,所以驱动不需要做自己的连接缓存。
    Close() error

    // Begin 启动并返回一个新的事务 Tx
    Begin() (Tx, error)
}

// 可以看到driverConn的这个方法,看名字就知道是释放连接的 调用了DB 的 putConn 方法,这里先留个印象
func (dc *driverConn) releaseConn(err error) {
    dc.db.putConn(dc, err)
}

驱动注册绑定

我们在使用指定数据库时需要使用import _ "github.com/go-sql-driver/mysql"来执行 init() 函数。这个 init() 函数主要用来将指定的数据库驱动注册到 sql 的 一个 map 类型的 drivers 变量中。

mysql/driver.go

// 该方法注册到驱动,也就是db.Open的调用,返回的mc是实现的driver.Conn接口的结构,dsn 为连接该数据库的配置
func (d MySQLDriver) Open(dsn string) (driver.Conn, error) {
    // New mysqlConn
    mc := &mysqlConn{
        maxAllowedPacket: maxPacketSize,
        maxWriteSize:     maxPacketSize - 1,
        closech:          make(chan struct{}),
    }
    ...
    return mc, nil
}

// 注册驱动 MySQLDriver 结构实现了 driver.Driver 接口
func init() {
    sql.Register("mysql", &MySQLDriver{})
}

连接

创建连接的过程
创建连接

可以看到,调用sql.Open的时候会启动一个 goroutine 一直阻塞读取db.openerCh。当这个openerCh收到信号时,会启动创建连接的流程,调用驱动提供的创建连接的方法创建连接。如果创建成功,优先把改连接给db.connRequests中阻塞的请求使用,如果没有阻塞的请求就把这个新连接放入 db.freeConn中待请求使用。

关键方法
// 调用驱动的 Open 方法创建新连接
func (db *DB) openNewConnection() {
    // 创建新连接
    ci, err := db.driver.Open(db.dsn)
    db.mu.Lock()
    defer db.mu.Unlock()
    if db.closed {
        if err == nil {
            ci.Close()
        }
        db.numOpen--
        return
    }
    if err != nil {
        db.numOpen--
        db.putConnDBLocked(nil, err)
        db.maybeOpenNewConnections()
        return
    }
    dc := &driverConn{
        db:        db,
        createdAt: nowFunc(),
        ci:        ci,
    }
    // 直接给阻塞的请求使用 或者 放入连接池
    if db.putConnDBLocked(dc, err) {
        db.addDepLocked(dc, dc)
    } else {
        db.numOpen--
        ci.Close()
    }
}

// 给 阻塞在 connRequest 队列的请求分配连接
func (db *DB) putConnDBLocked(dc *driverConn, err error) bool {
    if db.closed {
        return false
    }
    // 如果超过最大连接,直接返回false,connRequest 队列的请求继续阻塞
    if db.maxOpen > 0 && db.numOpen > db.maxOpen {
        return false
    }
    // 把连接分配给 connRequests 阻塞的请求
    if c := len(db.connRequests); c > 0 {
        var req chan connRequest
        var reqKey uint64
        // 取第一个
        for reqKey, req = range db.connRequests {
            break
        }
        // 阻塞的请求得到连接 删除 connRequests 的记录
        delete(db.connRequests, reqKey)
        // 标记该连接正在使用
        if err == nil {
            dc.inUse = true
        }
        // 通过 chan 把该连接发送给请求
        req <- connRequest{
            conn: dc,
            err:  err,
        }
        return true
    // 如果空闲连接数小于最大连接限制 把该连接放到 freeConn 中
    } else if err == nil && !db.closed && db.maxIdleConnsLocked() > len(db.freeConn) {
        db.freeConn = append(db.freeConn, dc)
        // 根据 db.maxLifetime 起一个 goroutine 清除freeConn中过期的连接 
        db.startCleanerLocked()
        return true
    }
    return false
}

查询如何获取连接

先来看提供的两个基本的查询的方法 Query / Exec

// 使用方法
db.Query("SELECT * FROM table")
db.Exec("INSERT INTO table VALUES (1)")
  • Query:执行需要返回 rows 的操作,例如 SELECT)不释放连接,但在调用后仍然保持连接,即放回 freeConn。
  • Exec:执行没有返回 rows 的操作,例如 INSERT, UPDATE,DELETE)在调用后自动释放连接。


    Query 查询
关键方法
// conn 返回新打开的连接,或者从连接池freeConn中取
func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn, error) {
    ...
    // 检查上下文是否被取消
    select {
    default:
    case <-ctx.Done():
        db.mu.Unlock()
        return nil, ctx.Err()
    }
    lifetime := db.maxLifetime

    // cachedOrNewConn 模式获取连接
    numFree := len(db.freeConn)
    if strategy == cachedOrNewConn && numFree > 0 {
        conn := db.freeConn[0]
        copy(db.freeConn, db.freeConn[1:])
        db.freeConn = db.freeConn[:numFree-1]
        conn.inUse = true
        db.mu.Unlock()
        if conn.expired(lifetime) {
            conn.Close()
            return nil, driver.ErrBadConn
        }
        return conn, nil
    }

    // 如果连接数已经超过限制,将该请求放入connRequest中阻塞,直到有空闲连接
    if db.maxOpen > 0 && db.numOpen >= db.maxOpen {
        // Make the connRequest channel. It's buffered so that the
        // connectionOpener doesn't block while waiting for the req to be read.
        req := make(chan connRequest, 1)
        reqKey := db.nextRequestKeyLocked()
        db.connRequests[reqKey] = req
        db.mu.Unlock()

        // 上下文判断请求超时
        select {
        case <-ctx.Done():
            // 删除 connRequests 中阻塞的请求
            db.mu.Lock()
            delete(db.connRequests, reqKey)
            db.mu.Unlock()
            select {
            default:
            case ret, ok := <-req:
                if ok {
                    // 如果收到了连接,由于超时了,回收该连接
                    db.putConn(ret.conn, ret.err)
                }
            }
            return nil, ctx.Err()
        // 获取到了连接,返回处理
        case ret, ok := <-req:
            if !ok {
                return nil, errDBClosed
            }
            if ret.err == nil && ret.conn.expired(lifetime) {
                ret.conn.Close()
                return nil, driver.ErrBadConn
            }
            return ret.conn, ret.err
        }
    }

    // 连接池中没有连接,且打开的连接数没有超限,创建新连接
    db.numOpen++ // optimistically
    db.mu.Unlock()
    ci, err := db.driver.Open(db.dsn)
    if err != nil {
        db.mu.Lock()
        db.numOpen-- // correct for earlier optimism
        db.maybeOpenNewConnections()
        db.mu.Unlock()
        return nil, err
    }
    db.mu.Lock()
    dc := &driverConn{
        db:        db,
        createdAt: nowFunc(),
        ci:        ci,
    }
    db.addDepLocked(dc, dc)
    dc.inUse = true
    db.mu.Unlock()
    return dc, nil
}

连接的回收或释放

被动回收或释放

我们沿着上面的 Query 请求分析下来,在 queryConn 的方法中会看到一个 releaseConn 的方法,它调用了putConn方法去处理这个 dc 连接。

func (dc *driverConn) releaseConn(err error) {
    dc.db.putConn(dc, err)
}

再来看看 putConn 方法的定义

// 把 dc 连接放回连接池 freeConn 或者释放
func (db *DB) putConn(dc *driverConn, err error) {
    db.mu.Lock()
    // 回收一个没被使用的连接 会panic
    if !dc.inUse {
        if debugGetPut {
            fmt.Printf("putConn(%v) DUPLICATE was: %s\n\nPREVIOUS was: %s", dc, stack(), db.lastPut[dc])
        }
        panic("sql: connection returned that was never out")
    }
    if debugGetPut {
        db.lastPut[dc] = stack()
    }
    // 将该连接置为 未使用
    dc.inUse = false

    // 执行完该连接的函数
    for _, fn := range dc.onPut {
        fn()
    }
    dc.onPut = nil
    // 不重用无效的连接
    if err == driver.ErrBadConn {
        // 该函数会判断 阻塞在 connRequest 的请求数量,然后在不超限的情况下,通过 openerCh 唤醒 connectionOpener goroutine 创建新连接处理请求。
        db.maybeOpenNewConnections()
        db.mu.Unlock()
        // 释放连接
        dc.Close()
        return
    }
    if putConnHook != nil {
        putConnHook(db, dc)
    }
    // 如果是有效的连接,将该连接给 阻塞在 connRequest 的请求使用,或者放回连接池
    added := db.putConnDBLocked(dc, nil)
    db.mu.Unlock()

    // 改连接没被回收,释放
    if !added {
        dc.Close()
    }
}
主动回收或释放

除了上述的连接回收释放方式,还有没有其他地方回收释放呢。当我们设置 db.SetConnMaxLifetime 也就是设置连接的最大存活时间时,都会调起一个 goroutine 负责处理连接池中过期的连接。同 openerCh 信号一样,释放也用到了一个 cleanerCh 用于通知该 goroutine 处理任务。

func (db *DB) SetConnMaxLifetime(d time.Duration) {
    if d < 0 {
        d = 0
    }
    db.mu.Lock()
    // 当缩小 maxLifetime 的时候,直接清理不符的连接
    if d > 0 && d < db.maxLifetime && db.cleanerCh != nil {
        select {
        case db.cleanerCh <- struct{}{}:
        default:
        }
    }
    db.maxLifetime = d
    // 该方法会起一个 goroutine  负责释放过期连接
    db.startCleanerLocked()
    db.mu.Unlock()
}

// 满足条件开启一个 goroutine 维护过期的连接
func (db *DB) startCleanerLocked() {
    if db.maxLifetime > 0 && db.numOpen > 0 && db.cleanerCh == nil {
        db.cleanerCh = make(chan struct{}, 1)
        go db.connectionCleaner(db.maxLifetime)
    }
}

// 核心的逻辑
func (db *DB) connectionCleaner(d time.Duration) {
    const minInterval = time.Second

    if d < minInterval {
        d = minInterval
    }
    t := time.NewTimer(d)

    // 阻塞等待 cleanerCh 信号
    for {
        select {
        case <-t.C:
        case <-db.cleanerCh: // maxLifetime 修改 或者 db 关闭会发送该信号
        }

        db.mu.Lock()
        d = db.maxLifetime
        if db.closed || db.numOpen == 0 || d <= 0 {
            db.cleanerCh = nil
            db.mu.Unlock()
            return
        }

        expiredSince := nowFunc().Add(-d)
        var closing []*driverConn
        // 从连接池 freeConn 获取过期连接
        for i := 0; i < len(db.freeConn); i++ {
            c := db.freeConn[i]
            if c.createdAt.Before(expiredSince) {
                closing = append(closing, c)
                last := len(db.freeConn) - 1
                db.freeConn[i] = db.freeConn[last]
                db.freeConn[last] = nil
                db.freeConn = db.freeConn[:last]
                i--
            }
        }
        db.mu.Unlock()
        // 释放连接
        for _, c := range closing {
            c.Close()
        }

        if d < minInterval {
            d = minInterval
        }
        t.Reset(d)
    }
}

小结

现在回过头来看看开始的三个问题,基本就有解了。

  • sql 的连接池的连接怎么维护的?
    有效的连接存储在连接池 freeConn 中。启用一个connectionOpener goroutine 通过接受 openerCh 信号负责调用驱动的 Open 方法创建连接。当用db.SetConnMaxLifetime 设置 MaxLifetime 或者调用 putConnDBLocked方法满足条件时候会启用一个 connectionCleanergoroutine 通过接受cleanerCh信号负责清理连接。
  • Query / Exec 如何获取查询的连接?
    1. 先查看 freeConn 是否有可用的连接,如果有就从连接池取。如果没有进入下一步。
    2. 判断当前连接数是否超限。如果超限,将该请求放入 connRequests 阻塞等待可用连接。如果没超限进入下一步。
    3. 创建新的连接
  • 连接池的连接如何回收/释放的?
    1. 被动回收/释放。通过查询等操作返回错误的时候会执行 releaseConn 函数回收连接。当满足条件时 会起一个connectionCleaner goroutine 清理连接池的无效连接。
    2. 主动回收/释放。设置 db.SetConnMaxLifetime 的时候会触发一次connectionCleaner goroutine 清理连接池。

可以看到sql库的连接池的实现机制其实还是蛮复杂的,生产者connectionOpener goroutine 阻塞监听 openerCh 创建连接放入连接池。当请求来时,先查询连接池有没有空闲连接,如果没有空闲连接则创建,Query 类的请求用完继续放回连接池重用。当需要清理连接池的连接时调用connectionCleaner goroutine。分析过一遍,以后遇到问题会更快的处理。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,362评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,330评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,247评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,560评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,580评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,569评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,929评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,587评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,840评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,596评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,678评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,366评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,945评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,929评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,165评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,271评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,403评论 2 342