前言
redigo是用Go语言开发的Redis客户端,受到Redis官方的推荐。
redigo的源码量比较少,也比较清晰易读。redigo主要做了以下事项
- 与Redis server建立连接
- 按照RESP协议组装指令
- 向Redis server发用指令
- 接收Redis server返回的数据
- 将返回数据解析成Go的数据类型
- 连接池(Pool),支持设置最大的活动连接数,最大的空闲连接数
源码概述
redigo主要使用了Go官方的net、io和bufio包,这些包是Go对网络连接,I/O,Buffered I/O的抽象实现。
在看Pool代码的过程中,对于如何控制最大连接数,笔者多花了一些时间才看明白。Pool使用buffered channel (可缓冲的管道,以下简称 buff chan) 来控制最大连接数。
首先看一下活动连接和空闲连接的定义
- 活动连接:经Pool创建,未被关闭。这里的关闭是指关闭网络连接,也就是断开与Redis server的连接
- 空闲连接:活动连接使用完之后,需交还给Pool,Pool会将连接放入空闲连接列表,列表中的连接数量就是空闲连接数
可以看出,活动连接是包含空闲连接的。
Pool原理解读
概述部分提到,Pool使用buff chan来控制最大连接数,这里也是源码中比较难以理解的地方,需要对Go的channel有较深的理解。
Go有两种channel,chan和 buff chan,它们的特性如下
- chan,读写都是同步、阻塞操作。写操作阻塞,等待写入的数据被读取;读操作阻塞,等待数据写入。
- buff chan,读操作和chan类似;写操作在缓冲区未满时非阻塞,否则阻塞等待缓冲区有空位。
Pool利用buff chan的缓冲区长度固定,及读操作的阻塞等待来控制最大连接数。
源码解析
redis/
| -- redis.go // 定义接口
| -- conn.go // 实现redis.go中定义的接口,用于与Redis server建立连接
| -- -- type conn struct // 链接结构体,实现了 redis.go中定义的Conn接口
| -- -- type dialOptions struct // 拨号选项,包括拨号器、拨号函数,超时配置,TLS配置等
| -- -- type DialOption struct // 拨号选项设置结构体,只包含函数f,用于修改拨号选项
| -- -- func Dial // 拨号函数,参数:网络协议,网络地址,拨号选项设置结构体,返回 conn
| -- -- func write* // 写命令函数,用于将命Redis指令写入buffer io
| -- reply.go // 将redis返回数据解析成go的数据类型
连接池如何用channel控制打开的连接数
redis/
| -- pool.go // Get()用于获取连接,Close()用于归还连接
| -- -- Pool.active // 活动连接数,每建立一个连接(Dial)active加1,每关闭(Close)一个连接,active减1
| -- -- Pool.ch chan struct{} // 用来控制最大的活动连接数
| -- -- Pool.lazyInit() // 将Pool.ch 初始化为buffered channel,长度为Pool.MaxActive,并将ch填满
| -- -- Pool.get() // 先从Pool.ch中取出一个元素,如果最终建立新连接(Dial)失败,再往ch写入一个元素
| -- -- Pool.put() // 往Pool.ch写入数据,所以get读取数据,put写入数据
IF Wait == 1 AND MaxActive > 0
AND MaxActive >= MaxIdle
AND MaxActive == 3 AND MaxIdle == 2
初始化:ch长度等于MaxActive, len(ch)=3
Pool.Get() 首先从ch读出数据, 如果新建连接失败,再向ch写入数据,len(ch)不变,否则len(ch)-1
GET#1-没有空闲连接,新建连接,len(ch)=2
GET#2-没有空闲连接,新建连接,len(ch)=1
GET#3-没有空闲连接,新建连接,len(ch)=0
GET#4-ch空了,所以ch读操作阻塞,直到有连接被归还,put()往ch中写入数据,ch读阻塞解除,继续往下获取空闲连接,否则只能等待。
这保证了最大活动连接数不会超过MaxActive。
部分源代码
func (p *Pool) Get() Conn {
pc, err := p.get(nil)
if err != nil {
return errorConn{err}
}
return &activeConn{p: p, pc: pc}
}
func (p *Pool) get(ctx interface {
Done() <-chan struct{}
Err() error
}) (*poolConn, error) {
// Handle limit for p.Wait == true.
if p.Wait && p.MaxActive > 0 {
p.lazyInit() // 初始化ch
if ctx == nil {
<-p.ch // 从ch读数据,如果ch为空则阻塞等待
} else {
select {
case <-p.ch:
case <-ctx.Done():
return nil, ctx.Err()
}
}
}
// ... ...
}
func (ac *activeConn) Close() error {
// ... ...
ac.p.put(pc, ac.state != 0 || pc.c.Err() != nil)
return nil
}
func (p *Pool) put(pc *poolConn, forceClose bool) error {
p.mu.Lock()
// ... ...
if p.ch != nil && !p.closed {
p.ch <- struct{}{} // 归还连接后往ch写入数据
}
p.mu.Unlock()
return nil
}
参考链接