前言

在fpm模式下,php中不存在连接池这种东西,顶多是用pconnect的方法使连接保留在fpm进程内,达到复用连接的目的。但如果是go这种常驻内存的语言,连接池就是必不可少的部分。

go-redis是我比较喜欢用的库。这两天看了一下连接池的实现,做个总结。

先思考一下

在进入源码之前,我们可以先思考一下连接池应该具备哪些功能。其实从客户端的角度讲,只需要连接池提供两个方法,如下接口所示:

1
2
3
4
type Pooler interface {
    Get() (*Conn, error) // 获取一个连接
    Put(*Conn) // 将使用过的连接放回池中
}

当然,连接池本身肯定也需要有些数据结构来存放现有的连接,在获取和释放的时候也要有相应的同步机制保证数据一致性,最常用的肯定是锁。

接口定义

先看一下go-redis中连接池的接口定义:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
type Pooler interface {
    NewConn(context.Context) (*Conn, error) // 获取一个新连接
    CloseConn(*Conn) error // 关闭连接
    Get(context.Context) (*Conn, error) // 从连接池中获取连接
    Put(context.Context, *Conn) // 连接放回池中
    Remove(context.Context, *Conn, error) // 删除连接
    Len() int // 当前连接池长度
    IdleLen() int // 当前空闲连接池长度
    Stats() *Stats // 统计数据,可以先忽略
    Close() error // 关闭连接池
}

对应的连接池实现结构:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
type ConnPool struct {
    opt *Options // 连接池的参数
    dialErrorsNum uint32 // 创建连接时错误的数量,通过原子操作增减。
    lastDialError atomic.Value // 最近一次创建连接时的错误,通过原子操作赋值
    queue chan struct{} // 获取连接前需要先排队,通过channel实现
    connsMu      sync.Mutex // 保护下边的几个数据
    conns        []*Conn // 存放所有连接
    idleConns    []*Conn // 存放空闲连接
    poolSize     int // 当前连接池大小
    idleConnsLen int // 空间连接数量
    
    stats Stats // 统计数据,可以先忽略
    
    _closed  uint32 // 连接池是否已关闭,原子操作
    closedCh chan struct{} // 连接池是否已关闭,用来通知后台处理过期连接的goroutine退出
}

// 对tcp连接的封闭
type Conn struct {
    usedAt  int64 // 最近使用时间,用于过期清理
    netConn net.Conn // 底层的tcp连接
    
    rd *proto.Reader // 三个读写的buffer
    bw *bufio.Writer
    wr *proto.Writer
    
    Inited    bool // 是否初始化
    pooled    bool // 是否属于连接池
    createdAt time.Time // 创建时间
}

创建连接池

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
// 创建新连接池
func NewConnPool(opt *Options) *ConnPool {
	p := &ConnPool{
		opt: opt,

		queue:     make(chan struct{}, opt.PoolSize),
		conns:     make([]*Conn, 0, opt.PoolSize),
		idleConns: make([]*Conn, 0, opt.PoolSize),
		closedCh:  make(chan struct{}),
	}

    // 这里要生成配置参数里最少空闲数量的连接
	p.connsMu.Lock()
	p.checkMinIdleConns()
	p.connsMu.Unlock()

	// 清理过期连接
	if opt.IdleTimeout > 0 && opt.IdleCheckFrequency > 0 {
		go p.reaper(opt.IdleCheckFrequency)
	}

	return p
}

// 生成空闲连接
// 这个方法不是协程安全的,需要加了锁再调用
func (p *ConnPool) checkMinIdleConns() {
	if p.opt.MinIdleConns == 0 {
		return
	}
	// 生成MinIdleConns个空闲连接
	for p.poolSize < p.opt.PoolSize && p.idleConnsLen < p.opt.MinIdleConns {
		p.poolSize++
		p.idleConnsLen++
		// 建立新连接是耗时操作,所以这里启动新协程并发创建连接
		// 但是加入队列是按锁顺序的
		go func() {
			err := p.addIdleConn()
			if err != nil {
			// 如果创建失败,需要把相应的数量减掉
				p.connsMu.Lock()
				p.poolSize--
				p.idleConnsLen--
				p.connsMu.Unlock()
			}
		}()
	}
}

// 创建空闲连接
func (p *ConnPool) addIdleConn() error {
	cn, err := p.dialConn(context.TODO(), true) // 建立连接
	if err != nil {
		return err
	}

    // 本方法在单独协程里运行,所以需要加锁。将成功创建的连接放到连接池中
	p.connsMu.Lock()
	p.conns = append(p.conns, cn)
	p.idleConns = append(p.idleConns, cn)
	p.connsMu.Unlock()
	return nil
}

创建连接

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
// 核心的创建连接的流程在这个方法里
func (p *ConnPool) dialConn(ctx context.Context, pooled bool) (*Conn, error) {
	if p.closed() {
		return nil, ErrClosed
	}

	// 如果创建产生的错误量比池大小还多,就没有必要再尝试创建了
	if atomic.LoadUint32(&p.dialErrorsNum) >= uint32(p.opt.PoolSize) {
		return nil, p.getLastDialError()
	}

	// Dialer 在 newConnPool 中定义,而真正的 Dialer 方法,其实是在 redis.Options.init 方法里通过net.Dialer建立连接
	netConn, err := p.opt.Dialer(ctx)
	if err != nil {
		p.setLastDialError(err)
		// 如果拨号错误数量等于池大小,说明错误很多,不再做同步的请求,而是启动一个新goroutine继续尝试连接
		if atomic.AddUint32(&p.dialErrorsNum, 1) == uint32(p.opt.PoolSize) {
			go p.tryDial()
		}
		return nil, err
	}

	internal.NewConnectionsCounter.Add(ctx, 1)
	cn := NewConn(netConn) // 创建一个封装的连接对象
	cn.pooled = pooled
	return cn, nil
}

// 创建连接错误数达到连接池大小时会启动单独的协程尝试连接
func (p *ConnPool) tryDial() {
	for {
		if p.closed() {
			return
		}

		// 这里会一直拨号到天荒地老,感觉可能是个坑点
		conn, err := p.opt.Dialer(context.Background())
		if err != nil {
			p.setLastDialError(err)
			time.Sleep(time.Second)
			continue
		}

		// 只要有一次能拨号成功,就会清除掉错误计数
		atomic.StoreUint32(&p.dialErrorsNum, 0)
		_ = conn.Close() // 好不容易才拨号成功,直接就关闭了?
		return
	}
}

获取连接

获取连接的思路也比较清晰,先排队获取已有的连接,如果没有可用连接就创建新连接。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
func (p *ConnPool) Get(ctx context.Context) (*Conn, error) {
	if p.closed() {
		return nil, ErrClosed
	}

	// 获取连接前要先排个队,排队可以保证同时获取连接的协程数不超过连接池大小。排队失败,说明连接池满了,可以直接返回。
	err := p.waitTurn(ctx)
	if err != nil {
		return nil, err
	}

	// 排到队再尝试去拿连接
	for {
		p.connsMu.Lock()
		cn := p.popIdle()
		p.connsMu.Unlock()

		if cn == nil {
			break
		}

		// 如果过期了就要关闭
		if p.isStaleConn(cn) {
			_ = p.CloseConn(cn)
			continue
		}

		atomic.AddUint32(&p.stats.Hits, 1)
		return cn, nil
	}

	atomic.AddUint32(&p.stats.Misses, 1)

	// 如果没有空闲连接,创建新连接,也是调用p.dialConn方法
	newcn, err := p.newConn(ctx, true)
	if err != nil {
	   // 注意这里。之前排队会向p.queue插入一个元素,如果获取失败,就需要取出元素
		p.freeTurn()
		return nil, err
	}

	return newcn, nil
}

// 获取连接前排队
func (p *ConnPool) waitTurn(ctx context.Context) error {
	select {
	case <-ctx.Done():
		return ctx.Err()
	default:
	}

// 如果能直接放到channel里,说明当前连接数还没有超过池上限
// 这里其实可以跳过直接走下边的三个case,这里单独处理一次,我理解是如果能直接排到队,就避免了计时器的存取处理,效率更高
	select {
	case p.queue <- struct{}{}:
		return nil
	default:
	}

// 这里通过sync.Pool复用了timer对象
	timer := timers.Get().(*time.Timer)
	timer.Reset(p.opt.PoolTimeout)

// 这里是三个条件的判断
	select {
	// context超时
	case <-ctx.Done():
	// 注意这里的细节,如果计时器停止失败,说明事件已经触发过了,需要把channel里的值取出来,不然影响下次使用
		if !timer.Stop() {
			<-timer.C
		}
		timers.Put(timer)
		return ctx.Err()
	case p.queue <- struct{}{}:
	// 只有这种情况是排队成功
		if !timer.Stop() {
			<-timer.C
		}
		timers.Put(timer)
		return nil
	case <-timer.C: // 计时器超时,就可以返回超时错误了
		timers.Put(timer)
		atomic.AddUint32(&p.stats.Timeouts, 1)
		return ErrPoolTimeout
	}
}

func (p *ConnPool) popIdle() *Conn {
	if len(p.idleConns) == 0 {
		return nil
	}

// 注意这里是否idelConns的末尾取的
	idx := len(p.idleConns) - 1
	cn := p.idleConns[idx]
	p.idleConns = p.idleConns[:idx]
	p.idleConnsLen--
// 每次取用,都要检查一下剩余的空闲连接是否满足配置要求
	p.checkMinIdleConns()
	return cn
}

放回连接

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
func (p *ConnPool) Put(ctx context.Context, cn *Conn) {
// 容错处理,避免后续用到有问题的连接
	if cn.rd.Buffered() > 0 {
		internal.Logger.Printf(ctx, "Conn has unread data")
		p.Remove(ctx, cn, BadConnError{})
		return
	}

// pooled为true的才会放回池里,否则直接关闭
	if !cn.pooled {
		p.Remove(ctx, cn, nil)
		return
	}

// 放回到空闲队列的末尾
	p.connsMu.Lock()
	p.idleConns = append(p.idleConns, cn)
	p.idleConnsLen++
	p.connsMu.Unlock()
	// 放回之后,要删除一个排队的元素
	p.freeTurn()
}

清理过期连接

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
// 创建连接池时如果设置了闲置超时时间,就会启动单独的协程处理过期连接
func (p *ConnPool) reaper(frequency time.Duration) {
	ticker := time.NewTicker(frequency)
	defer ticker.Stop()
// 本质就上就是启动一个定时器,定时判断连接是否超时
	for {
		select {
		case <-ticker.C:
			// 这里是个小细节。ticker的channel和p.closedCh可能同时发生,但是go的select机制会随便选一个分支执行,所以这里多加了一个连接池是否已经关闭的判断
			if p.closed() {
				return
			}
			_, err := p.ReapStaleConns()
			if err != nil {
				internal.Logger.Printf(context.Background(), "ReapStaleConns failed: %s", err)
				continue
			}
		case <-p.closedCh:
			return
		}
	}
}

func (p *ConnPool) ReapStaleConns() (int, error) {
	var n int
	for {
	// 清理时也要排队,这里很巧妙,如果排不上队,说明连接池里的所有连接都在被使用中,没有必要清理。
		p.getTurn()

		p.connsMu.Lock()
		cn := p.reapStaleConn()
		p.connsMu.Unlock()
		p.freeTurn()

		if cn != nil {
			_ = p.closeConn(cn)
			n++
		} else {
			break
		}
	}
	atomic.AddUint32(&p.stats.StaleConns, uint32(n))
	return n, nil
}

func (p *ConnPool) reapStaleConn() *Conn {
	if len(p.idleConns) == 0 {
		return nil
	}

// put的时候会把连接放在idleConns的末尾,也就是说idleConns[0]是最有可能过期的连接
	cn := p.idleConns[0]
	if !p.isStaleConn(cn) {
		return nil
	}

// 如果真是过期连接,就修改空闲连接队列,把后边的连接往前挪
	p.idleConns = append(p.idleConns[:0], p.idleConns[1:]...)
	p.idleConnsLen--
	p.removeConn(cn)

	return cn
}

总结

go-redis的连接池代码简单清晰,很好理解。使用两个slice保存连接信息,用锁保护。获取和放回连接都是在slice末尾操作,方便检查过期连接时从头部检查。用channel实现排队的功能很值得学习。

go-redis中的连接池只在内部使用,并不对外暴露,连接的取用和放回也是在client中进行管理。这样应用层只需要考虑业务逻辑,不需要管连接状态,爽歪歪。