青梅梦呓

和世界交手的这许多年,你是否光彩依旧,兴致盎然

0%

Go实现通用网络连接池

连接池在现实开发中是经常见到的,因为TCP的三次握手等原因,建立一个连接算是一个比较高的开销,而频繁的创建和销毁连接会带来严重的性能下降,所以在一个需要多次与特定实体交互的程序中,就需要维持一个连接池,里面有可以复用的连接可供重复使用。

PS: 今天在面试中被要求手写一个通用连接池,紧张之下表现的一塌糊涂,回来之后查缺补漏,增补记录以作后续提成

原理

什么是连接池

  • 顾名思义是一个池子
  • 池子里面存放有限数量即时可用的连接,减少创建连接和关闭连接的时间
  • 连接是有存活时间的

以数据库连接池为例,可以简单画一张流程图表示连接池的生命周期:

连接池的生命周期

目标

gorm是复用database/sql的连接池,redigo是自己维护了一个连接池,代码参考这里。实现一个通用的网络连接池,连接池的实现不依赖具体的实例,而依赖一个接口,只要实现了接口的对象都可以被池管理。
Ps: 实现基于interface{}的连接池,这样任何对象都可以被池管理。

功能与要求

  • 线程安全 thread safe
  • 获取连接,释放连接
  • 连接池中连接类型为interface{},使得更加通用
  • 支撑连接的最大空闲时间,超时的连接将关闭丢弃,可避免空闲时连接自动失效问题
  • 设置最大连接数,连接池的容量,连接存活时间等等
  • 支持用户设定ping方法,检查连接的连通性,无效的连接将丢弃
  • 使用channel处理池中的连接,高效

实现细节

  • 确定连接池的属性: 如最大连接数、容量、连接创建时间和存活时间
  • 拟使用连接池以及超过最大连接数后等待其他连接释放
  • 保证在多协程操作下数据的一致性
  • 最好实现连接的超时监听和通知

调研

gorm复用的是database/sql连接池,redigo维护了一个自己的连接池,但是两者的实现都很复杂,移植代码的成本过高。fatih大神有一个pool库,具体移步这里,遗憾的是该项目目前已经不再维护,具体文档在此。最后一个版本也没有实现主动检查连接超时、MaxIdle配置,还有MaxConn,算是一个半成品,就基于fatih大神的这个库进行再次开发

实现过程

定义Pool interface

1
2
3
4
5
6
7
8
9
10
11
12
13
14
package pool

type Pool interface {
// Get 从连接池中获取一个连接
Get() (interface{}, error)
// Put 连接放回连接池中
Put(interface{}) error
// Close 关闭连接
Close(interface{}) error
// Release 释放连接池中的所有连接
Release()
// Len 查看当前连接迟中的连接数量
Len()
}

定义连接池相关配置

1
2
3
4
5
6
7
8
9
type Config struct {
InitialCap int //连接池中的最小连接数
MaxCap int //最大的并发存活连接数
MaxIdle int //最大存活连接数
Factory func() (interface{}, error) //生成连接的方法
Close func(interface{}) error //关闭连接的方法
Ping func(interface{}) error //检查连接是否有效的方法
IdleTimeout time.Duration //连接最大空闲时间,超过该时间则连接失效
}

实现原理

将连接句柄放入channel中,由于缓冲channel的特性,获取连接时,如果连接池中满足直接返回连接的情况时将直接返回,否则将阻塞等待或者直接创建新的连接,因此需要定义通用连接:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
type idleConn struct {
conn interface{}
t time.Time
}

type connReq struct {
idleConnect *idleConn
}

type GenericPool struct {
type GenericPool struct {
mu sync.RWMutex //锁
connections chan *idleConn //channel 为包装后的类型,包含连接和时间
factory func() (interface{}, error) //创建连接的方法
close func(interface{}) error //关闭连接的方法
ping func(interface{}) error //检查连接的放大
idleTimeout time.Duration //连接最大空闲时间
waitTimeout time.Duration //等待超时时间
maxActive int //最大存活连接数
openingConnections int
connRequests []chan connReq
}
}

简单包装了一些,在fatih大神的基础上增加了连接池常用的一些属性

初始化连接池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
func NewGenericPool(config *Config) (Pool, error) {
//初始容量判断
if !(config.InitialCap <= config.MaxIdle && config.MaxCap >= config.MaxIdle && config.InitialCap >= 0) {
return nil, errors.New("invalid capacity settings")
}
//必须有创建方法
if config.Factory == nil {
return nil, errors.New("invalid factory func settings")
}
if config.Close == nil {
return nil, errors.New("invalid close func settings")
}
genericPool := &GenericPool{
connections: make(chan *idleConn, config.MaxIdle),
factory: config.Factory,
close: config.Close,
idleTimeout: config.IdleTimeout,
maxActive: config.MaxCap,
openingConnections: config.InitialCap,
}

if config.Ping != nil {
genericPool.ping = config.Ping
}
for i := 0; i < config.InitialCap; i++ {
conn, err := genericPool.factory()
if err != nil {
genericPool.Release()
return nil, fmt.Errorf("factory is not able to fill the pool: %s", err)
}
genericPool.connections <- &idleConn{conn: conn, t: time.Now()}
}

return genericPool, nil
}

这个代码很简单,基本就是常见的应用,唯一的问题就是函数签名中返回的是Pool类型,实际返回的是一个genericPool的实例化对象,只要实现我们之前定义的接口的方法即可。

实现接口后的完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
package pool

import (
"errors"
"fmt"
"sync"
"time"
)

var (
ErrClosed = errors.New("pool is closed")
ErrMaxActiveConnReached = errors.New("MaxActiveConnReached")
)

type Config struct {
InitialCap int //连接池中的最小连接数
MaxCap int //最大的并发存活连接数
MaxIdle int //最大存活连接数
Factory func() (interface{}, error) //生成连接的方法
Close func(interface{}) error //关闭连接的方法
Ping func(interface{}) error //检查连接是否有效的方法
IdleTimeout time.Duration //连接最大空闲时间,超过该时间则连接失效
}

type idleConn struct {
conn interface{}
t time.Time
}

type connReq struct {
idleConnect *idleConn
}

type GenericPool struct {
mu sync.RWMutex //锁
connections chan *idleConn //channel 为包装后的类型,包含连接和时间
factory func() (interface{}, error) //创建连接的方法
close func(interface{}) error //关闭连接的方法
ping func(interface{}) error //检查连接的放大
idleTimeout time.Duration //连接最大空闲时间
waitTimeout time.Duration //等待超时时间
maxActive int //最大存活连接数
openingConnections int
connRequests []chan connReq
}

// Ping 检查连接状态
func (g *GenericPool) Ping(conn interface{}) error {
if conn == nil {
return errors.New("connection is nil. rejecting")
}
return g.ping(conn)
}

// getConnections 获取所有连接
func (g *GenericPool) getConnections() chan *idleConn {
defer g.mu.Unlock()
g.mu.Lock()
return g.connections
}

// 获取连接
func (g *GenericPool) Get() (interface{}, error) {
connections := g.getConnections()
if connections == nil {
return nil, ErrClosed
}
for {
select {
case wrapConn := <-connections:
if wrapConn == nil {
return nil, ErrClosed
}
//还要判断这个链接的存活时间是否超过了最大允许存活时间
if g.idleTimeout > 0 && wrapConn.t.Add(g.idleTimeout).Before(time.Now()) {
//丢弃并关闭该连接
g.Close(wrapConn.conn)
continue
}
// 判断是否失效,失效则丢弃,如果用户没有设定 ping 方法,就不检查
if g.ping != nil {
if err := g.Ping(wrapConn.conn); err != nil {
//丢弃并关闭该连接
g.Close(wrapConn.conn)
continue
}
}
return wrapConn, nil
default:
g.mu.Lock()
if g.openingConnections >= g.maxActive {
req := make(chan connReq, 1)
g.connRequests = append(g.connRequests, req)
// 不在循环中使用defer 避免造成内存泄漏
g.mu.Unlock()
ret, ok := <-req
if !ok {
return nil, ErrMaxActiveConnReached
}
if g.idleTimeout > 0 && ret.idleConnect.t.Add(g.idleTimeout).Before(time.Now()) {
//丢弃并关闭该连接
g.Close(ret.idleConnect.conn)
continue
}
return ret.idleConnect.conn, nil
}
if g.factory == nil {
g.mu.Unlock()
return nil, ErrClosed
}
conn, err := g.factory()
if err != nil {
g.mu.Unlock()
return nil, err
}
g.openingConnections++
g.mu.Unlock()
return conn, nil
}
}
}

func (g *GenericPool) Put(conn interface{}) error {
if conn == nil {
return errors.New("connection is nil. rejecting")
}

g.mu.Lock()
defer g.mu.Unlock()

if g.connections == nil {
return g.Close(conn)
}

if l := len(g.connRequests); l > 0 {
req := g.connRequests[0]
copy(g.connRequests, g.connRequests[1:])
g.connRequests = g.connRequests[:l-1]
req <- connReq{
idleConnect: &idleConn{conn: conn, t: time.Now()},
}
return nil
} else {
select {
case g.connections <- &idleConn{conn: conn, t: time.Now()}:
return nil
default:
//连接池已满,直接关闭该连接
return g.Close(conn)
}
}
}

func (g *GenericPool) Close(conn interface{}) error {
if conn == nil {
return errors.New("connection is nil. rejecting")
}
g.mu.Lock()
defer g.mu.Unlock()
if g.close == nil {
return nil
}
g.openingConnections--
return g.close(conn)
}

func (g *GenericPool) Release() {
g.mu.Lock()
defer g.mu.Unlock()
connections := g.connections
g.connections = nil
g.factory = nil
g.ping = nil
closeFun := g.close
g.close = nil

if connections == nil {
return
}

close(connections)
for wrapConn := range connections {
_ = closeFun(wrapConn.conn)
}
}

func (g *GenericPool) Len() int {
return len(g.getConnections())
}

// NewGenericPool 初始化连接池
func NewGenericPool(config *Config) (Pool, error) {
//初始容量判断
if !(config.InitialCap <= config.MaxIdle && config.MaxCap >= config.MaxIdle && config.InitialCap >= 0) {
return nil, errors.New("invalid capacity settings")
}
//必须有创建方法
if config.Factory == nil {
return nil, errors.New("invalid factory func settings")
}
if config.Close == nil {
return nil, errors.New("invalid close func settings")
}
genericPool := &GenericPool{
connections: make(chan *idleConn, config.MaxIdle),
factory: config.Factory,
close: config.Close,
idleTimeout: config.IdleTimeout,
maxActive: config.MaxCap,
openingConnections: config.InitialCap,
}

if config.Ping != nil {
genericPool.ping = config.Ping
}
for i := 0; i < config.InitialCap; i++ {
conn, err := genericPool.factory()
if err != nil {
genericPool.Release()
return nil, fmt.Errorf("factory is not able to fill the pool: %s", err)
}
genericPool.connections <- &idleConn{conn: conn, t: time.Now()}
}

return genericPool, nil
}

测试

又累又饿又困,后续有机会再补上测试吧

总结

连接池的最基本的保证,就是获取连接时候的线程安全。但是在实现诸多额外特性时候却又从不同角度来实现。还是非常有意思的。但是不管存储结构是用 chan 还是还是 slice,都可以很好的实现这一点。如果像 sql 或者 redis 那样用 slice 来存储连接,就得维护一个结构来表示排队等候的效果。