前言
在fpm模式下,php中不存在连接池这种东西,顶多是用pconnect
的方法使连接保留在fpm进程内,达到复用连接的目的。但如果是go这种常驻内存的语言,连接池就是必不可少的部分。
go-redis是我比较喜欢用的库。这两天看了一下连接池的实现,做个总结。
先思考一下
在进入源码之前,我们可以先思考一下连接池应该具备哪些功能。其实从客户端的角度讲,只需要连接池提供两个方法,如下接口所示:
1
2
3
4
|
type Pooler interface {
Get() (*Conn, error) // 获取一个连接
Put(*Conn) // 将使用过的连接放回池中
}
|
当然,连接池本身肯定也需要有些数据结构来存放现有的连接,在获取和释放的时候也要有相应的同步机制保证数据一致性,最常用的肯定是锁。
接口定义
先看一下go-redis中连接池的接口定义:
1
2
3
4
5
6
7
8
9
10
11
|
type Pooler interface {
NewConn(context.Context) (*Conn, error) // 获取一个新连接
CloseConn(*Conn) error // 关闭连接
Get(context.Context) (*Conn, error) // 从连接池中获取连接
Put(context.Context, *Conn) // 连接放回池中
Remove(context.Context, *Conn, error) // 删除连接
Len() int // 当前连接池长度
IdleLen() int // 当前空闲连接池长度
Stats() *Stats // 统计数据,可以先忽略
Close() error // 关闭连接池
}
|
对应的连接池实现结构:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
type ConnPool struct {
opt *Options // 连接池的参数
dialErrorsNum uint32 // 创建连接时错误的数量,通过原子操作增减。
lastDialError atomic.Value // 最近一次创建连接时的错误,通过原子操作赋值
queue chan struct{} // 获取连接前需要先排队,通过channel实现
connsMu sync.Mutex // 保护下边的几个数据
conns []*Conn // 存放所有连接
idleConns []*Conn // 存放空闲连接
poolSize int // 当前连接池大小
idleConnsLen int // 空间连接数量
stats Stats // 统计数据,可以先忽略
_closed uint32 // 连接池是否已关闭,原子操作
closedCh chan struct{} // 连接池是否已关闭,用来通知后台处理过期连接的goroutine退出
}
// 对tcp连接的封闭
type Conn struct {
usedAt int64 // 最近使用时间,用于过期清理
netConn net.Conn // 底层的tcp连接
rd *proto.Reader // 三个读写的buffer
bw *bufio.Writer
wr *proto.Writer
Inited bool // 是否初始化
pooled bool // 是否属于连接池
createdAt time.Time // 创建时间
}
|
创建连接池
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
|
// 创建新连接池
func NewConnPool(opt *Options) *ConnPool {
p := &ConnPool{
opt: opt,
queue: make(chan struct{}, opt.PoolSize),
conns: make([]*Conn, 0, opt.PoolSize),
idleConns: make([]*Conn, 0, opt.PoolSize),
closedCh: make(chan struct{}),
}
// 这里要生成配置参数里最少空闲数量的连接
p.connsMu.Lock()
p.checkMinIdleConns()
p.connsMu.Unlock()
// 清理过期连接
if opt.IdleTimeout > 0 && opt.IdleCheckFrequency > 0 {
go p.reaper(opt.IdleCheckFrequency)
}
return p
}
// 生成空闲连接
// 这个方法不是协程安全的,需要加了锁再调用
func (p *ConnPool) checkMinIdleConns() {
if p.opt.MinIdleConns == 0 {
return
}
// 生成MinIdleConns个空闲连接
for p.poolSize < p.opt.PoolSize && p.idleConnsLen < p.opt.MinIdleConns {
p.poolSize++
p.idleConnsLen++
// 建立新连接是耗时操作,所以这里启动新协程并发创建连接
// 但是加入队列是按锁顺序的
go func() {
err := p.addIdleConn()
if err != nil {
// 如果创建失败,需要把相应的数量减掉
p.connsMu.Lock()
p.poolSize--
p.idleConnsLen--
p.connsMu.Unlock()
}
}()
}
}
// 创建空闲连接
func (p *ConnPool) addIdleConn() error {
cn, err := p.dialConn(context.TODO(), true) // 建立连接
if err != nil {
return err
}
// 本方法在单独协程里运行,所以需要加锁。将成功创建的连接放到连接池中
p.connsMu.Lock()
p.conns = append(p.conns, cn)
p.idleConns = append(p.idleConns, cn)
p.connsMu.Unlock()
return nil
}
|
创建连接
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
|
// 核心的创建连接的流程在这个方法里
func (p *ConnPool) dialConn(ctx context.Context, pooled bool) (*Conn, error) {
if p.closed() {
return nil, ErrClosed
}
// 如果创建产生的错误量比池大小还多,就没有必要再尝试创建了
if atomic.LoadUint32(&p.dialErrorsNum) >= uint32(p.opt.PoolSize) {
return nil, p.getLastDialError()
}
// Dialer 在 newConnPool 中定义,而真正的 Dialer 方法,其实是在 redis.Options.init 方法里通过net.Dialer建立连接
netConn, err := p.opt.Dialer(ctx)
if err != nil {
p.setLastDialError(err)
// 如果拨号错误数量等于池大小,说明错误很多,不再做同步的请求,而是启动一个新goroutine继续尝试连接
if atomic.AddUint32(&p.dialErrorsNum, 1) == uint32(p.opt.PoolSize) {
go p.tryDial()
}
return nil, err
}
internal.NewConnectionsCounter.Add(ctx, 1)
cn := NewConn(netConn) // 创建一个封装的连接对象
cn.pooled = pooled
return cn, nil
}
// 创建连接错误数达到连接池大小时会启动单独的协程尝试连接
func (p *ConnPool) tryDial() {
for {
if p.closed() {
return
}
// 这里会一直拨号到天荒地老,感觉可能是个坑点
conn, err := p.opt.Dialer(context.Background())
if err != nil {
p.setLastDialError(err)
time.Sleep(time.Second)
continue
}
// 只要有一次能拨号成功,就会清除掉错误计数
atomic.StoreUint32(&p.dialErrorsNum, 0)
_ = conn.Close() // 好不容易才拨号成功,直接就关闭了?
return
}
}
|
获取连接
获取连接的思路也比较清晰,先排队获取已有的连接,如果没有可用连接就创建新连接。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
|
func (p *ConnPool) Get(ctx context.Context) (*Conn, error) {
if p.closed() {
return nil, ErrClosed
}
// 获取连接前要先排个队,排队可以保证同时获取连接的协程数不超过连接池大小。排队失败,说明连接池满了,可以直接返回。
err := p.waitTurn(ctx)
if err != nil {
return nil, err
}
// 排到队再尝试去拿连接
for {
p.connsMu.Lock()
cn := p.popIdle()
p.connsMu.Unlock()
if cn == nil {
break
}
// 如果过期了就要关闭
if p.isStaleConn(cn) {
_ = p.CloseConn(cn)
continue
}
atomic.AddUint32(&p.stats.Hits, 1)
return cn, nil
}
atomic.AddUint32(&p.stats.Misses, 1)
// 如果没有空闲连接,创建新连接,也是调用p.dialConn方法
newcn, err := p.newConn(ctx, true)
if err != nil {
// 注意这里。之前排队会向p.queue插入一个元素,如果获取失败,就需要取出元素
p.freeTurn()
return nil, err
}
return newcn, nil
}
// 获取连接前排队
func (p *ConnPool) waitTurn(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
// 如果能直接放到channel里,说明当前连接数还没有超过池上限
// 这里其实可以跳过直接走下边的三个case,这里单独处理一次,我理解是如果能直接排到队,就避免了计时器的存取处理,效率更高
select {
case p.queue <- struct{}{}:
return nil
default:
}
// 这里通过sync.Pool复用了timer对象
timer := timers.Get().(*time.Timer)
timer.Reset(p.opt.PoolTimeout)
// 这里是三个条件的判断
select {
// context超时
case <-ctx.Done():
// 注意这里的细节,如果计时器停止失败,说明事件已经触发过了,需要把channel里的值取出来,不然影响下次使用
if !timer.Stop() {
<-timer.C
}
timers.Put(timer)
return ctx.Err()
case p.queue <- struct{}{}:
// 只有这种情况是排队成功
if !timer.Stop() {
<-timer.C
}
timers.Put(timer)
return nil
case <-timer.C: // 计时器超时,就可以返回超时错误了
timers.Put(timer)
atomic.AddUint32(&p.stats.Timeouts, 1)
return ErrPoolTimeout
}
}
func (p *ConnPool) popIdle() *Conn {
if len(p.idleConns) == 0 {
return nil
}
// 注意这里是否idelConns的末尾取的
idx := len(p.idleConns) - 1
cn := p.idleConns[idx]
p.idleConns = p.idleConns[:idx]
p.idleConnsLen--
// 每次取用,都要检查一下剩余的空闲连接是否满足配置要求
p.checkMinIdleConns()
return cn
}
|
放回连接
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
func (p *ConnPool) Put(ctx context.Context, cn *Conn) {
// 容错处理,避免后续用到有问题的连接
if cn.rd.Buffered() > 0 {
internal.Logger.Printf(ctx, "Conn has unread data")
p.Remove(ctx, cn, BadConnError{})
return
}
// pooled为true的才会放回池里,否则直接关闭
if !cn.pooled {
p.Remove(ctx, cn, nil)
return
}
// 放回到空闲队列的末尾
p.connsMu.Lock()
p.idleConns = append(p.idleConns, cn)
p.idleConnsLen++
p.connsMu.Unlock()
// 放回之后,要删除一个排队的元素
p.freeTurn()
}
|
清理过期连接
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
|
// 创建连接池时如果设置了闲置超时时间,就会启动单独的协程处理过期连接
func (p *ConnPool) reaper(frequency time.Duration) {
ticker := time.NewTicker(frequency)
defer ticker.Stop()
// 本质就上就是启动一个定时器,定时判断连接是否超时
for {
select {
case <-ticker.C:
// 这里是个小细节。ticker的channel和p.closedCh可能同时发生,但是go的select机制会随便选一个分支执行,所以这里多加了一个连接池是否已经关闭的判断
if p.closed() {
return
}
_, err := p.ReapStaleConns()
if err != nil {
internal.Logger.Printf(context.Background(), "ReapStaleConns failed: %s", err)
continue
}
case <-p.closedCh:
return
}
}
}
func (p *ConnPool) ReapStaleConns() (int, error) {
var n int
for {
// 清理时也要排队,这里很巧妙,如果排不上队,说明连接池里的所有连接都在被使用中,没有必要清理。
p.getTurn()
p.connsMu.Lock()
cn := p.reapStaleConn()
p.connsMu.Unlock()
p.freeTurn()
if cn != nil {
_ = p.closeConn(cn)
n++
} else {
break
}
}
atomic.AddUint32(&p.stats.StaleConns, uint32(n))
return n, nil
}
func (p *ConnPool) reapStaleConn() *Conn {
if len(p.idleConns) == 0 {
return nil
}
// put的时候会把连接放在idleConns的末尾,也就是说idleConns[0]是最有可能过期的连接
cn := p.idleConns[0]
if !p.isStaleConn(cn) {
return nil
}
// 如果真是过期连接,就修改空闲连接队列,把后边的连接往前挪
p.idleConns = append(p.idleConns[:0], p.idleConns[1:]...)
p.idleConnsLen--
p.removeConn(cn)
return cn
}
|
总结
go-redis的连接池代码简单清晰,很好理解。使用两个slice保存连接信息,用锁保护。获取和放回连接都是在slice末尾操作,方便检查过期连接时从头部检查。用channel实现排队的功能很值得学习。
go-redis中的连接池只在内部使用,并不对外暴露,连接的取用和放回也是在client中进行管理。这样应用层只需要考虑业务逻辑,不需要管连接状态,爽歪歪。