(本文编译自这篇文章,略去一些与技术无关的介绍文字)
面临的问题
我们正在构建一个匿名数据统计系统,希望能够处理来自数百万个终端的POST请求。我们需要把JSON请求中的数据写入到Amazon S3中,之后交由Map-Reduce系统处理。
通常我们会考虑使用一些异步处理的架构,比如:
- Sidekiq
- Resque
- DelayedJob
- ElasticBeanstalk Worker Tier
- RabbitMQ
- 等等…
然后搭建两个集群,一个用来提供前端的服务,一个用来异步处理请求。这样可以很方便的进行扩容。
但是从前期讨论方案开始,我们的团队认为这个系统将会面临很大的流量,而我们可以使用go语言来构建这个系统。我有两年go语言的使用经验,也开发了一些系统,但是还没有一个会面临这么大流量的考验。
我们先定义一些基础的数据结构表示web请求中的数据和将数据上传到S3的方法。如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
type PayloadCollection struct {
WindowsVersion string `json:"version"`
Token string `json:"token"`
Payloads []Payload `json:"data"`
}
type Payload struct {
// 省略
}
func (p *Payload) UploadToS3() error {
// storageFolder可以保证文件名不会重复
storage_path := fmt.Sprintf("%v/%v", p.storageFolder, time.Now().UnixNano())
bucket := S3Bucket
b := new(bytes.Buffer)
encodeErr := json.NewEncoder(b).Encode(payload)
if encodeErr != nil {
return encodeErr
}
// 所有上传到S3的数据都标识为私有
var acl = s3.Private
var contentType = "application/octet-stream"
return bucket.PutReader(storage_path, b, int64(b.Len()), contentType, acl, s3.Options{})
}
|
原生Goroutine
一开始我们用最简单的方式实现POST handler,利用原生的goroutine实现任务的并行处理。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
func payloadHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
// 从请求中读json数据
var content = &PayloadCollection{}
err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)
if err != nil {
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
w.WriteHeader(http.StatusBadRequest)
return
}
// 遍历每条数据,写入到S3中
for _, payload := range content.Payloads {
go payload.UploadToS3() // <----- 千万不要这么写!!!
}
w.WriteHeader(http.StatusOK)
}
|
如果请求量不是很大,上边的这段代码可以适用于很多场景。但面临大量请求时这段代码是有问题的。我们上线这段代码的时候知道会面临比较大的请求量,但是真实的请求量比我们预想的要大得多。
观察一下这段代码,你就会发现我们无法控制生成的goroutine的数量。我们线上每分钟会收到1百万个请求,这段代码很快就能让系统崩溃。
再度尝试
我们需要想一个别的办法来解决这个问题。我们希望尽可能缩短handler方法的处理时间,把耗时的操作交给后台进程处理。比如用Ruby on Rails的时候,我们会借助Resque、SideKiq、SQS等工具来达到这个目的。
于是我们改了一版代码,想借助buffered channel
来完成这个功能。既然我们可以控制buffer的长度,也有足够的内存存放任务数据,我们想当然地把任务放在了channel队列里。
1
2
3
4
5
6
7
8
9
10
11
12
13
|
var Queue chan Payload
func init() {
Queue = make(chan Payload, MAX_QUEUE)
}
func payloadHandler(w http.ResponseWriter, r *http.Request) {
...
for _, payload := range content.Payloads {
Queue <- payload
}
...
}
|
然后再用下边的代码读取和处理任务:
1
2
3
4
5
6
7
8
|
func StartProcessor() {
for {
select {
case job := <-Queue:
job.payload.UploadToS3() // <-- 仍然不是好办法
}
}
}
|
说实话,我们肯定是大半夜脑袋进水了才想到这个办法。你可以看到,这个办法并没有解决问题,只是把问题推迟了一点。我们的任务处理器是同步模式,一次只会上传一个数据到S3。请求进来的速度显然超过了同步处理任务的速度,很快buffered channel就满了,后续的请求handler无法把任务写入到channel中,于是请求的响应时间就会大幅增加。
这段坑爹代码上线后,我们的接口响应时间每分钟都会递增一点。
更好的办法
于是我们决定使用两级channel的模式,一级作为任务队列,另一级用来控制并发处理任务的worker数量。
本质上我们希望以某种可持续的速率并行处理上传S3的任务,既不能拖垮机器,又不能造成太多与S3服务器的连接错误。于是我们就想到了worker模式。有点像Java的worker线程池,只不过在go语言中我们用channel来实现。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
|
var (
MaxWorker = os.Getenv("MAX_WORKERS")
MaxQueue = os.Getenv("MAX_QUEUE")
)
// 需要执行的任务
type Job struct {
Payload Payload
}
// buffered channel。用来传递任务
var JobQueue chan Job
// 表示执行任务的一个worker
type Worker struct {
WorkerPool chan chan Job
JobChannel chan Job
quit chan bool
}
func NewWorker(workerPool chan chan Job) Worker {
return Worker{
WorkerPool: workerPool,
JobChannel: make(chan Job),
quit: make(chan bool)}
}
// 开启一个worker协程,监听quit channel用于控制协程退出
func (w Worker) Start() {
go func() {
for {
// 把当前worker注册到worker队列中
w.WorkerPool <- w.JobChannel
select {
case job := <-w.JobChannel:
// 取到一个任务
if err := job.Payload.UploadToS3(); err != nil {
log.Errorf("Error uploading to S3: %s", err.Error())
}
case <-w.quit:
// 收到退出信号
return
}
}
}()
}
// 向worker协程发送退出信号
func (w Worker) Stop() {
go func() {
w.quit <- true
}()
}
|
之后我们修改了一下handler方法,创建一个Job
的结构体,并把它放到任务队列里等worker协程处理。
1
2
3
4
5
6
7
8
9
10
11
|
func payloadHandler(w http.ResponseWriter, r *http.Request) {
...
for _, payload := range content.Payloads {
// 创建一个新任务
work := Job{Payload: payload}
// 放到任务队列中
JobQueue <- work
}
...
}
|
web服务器初始化的时候,我们会创建并运行一个Dispatcher
来创建worker池,监听来自JobQueue
的任务
1
2
|
dispatcher := NewDispatcher(MaxWorker)
dispatcher.Run()
|
下边是Dispatcher
的代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
type Dispatcher struct {
// worker池
WorkerPool chan chan Job
}
func NewDispatcher(maxWorkers int) *Dispatcher {
pool := make(chan chan Job, maxWorkers)
return &Dispatcher{WorkerPool: pool}
}
func (d *Dispatcher) Run() {
// 创建指定数量的worker协程
for i := 0; i < d.maxWorkers; i++ {
worker := NewWorker(d.WorkerPool)
worker.Start()
}
go d.dispatch()
}
func (d *Dispatcher) dispatch() {
for {
select {
case job := <-JobQueue:
// 从任务队列读取一个任务
go func(job Job) {
// 从worker池中拿一个worker出来。这条语句会阻塞直到有可用的worker。
// (译注:正因为会阻塞,所以这里起了一个新协程来处理任务,而不是直接处理,不然就会面临上一个方法的处境--任务队列满之后handler无法写入新任务)
jobChannel := <-d.WorkerPool
// 把任务交给该worker
jobChannel <- job
}(job)
}
}
}
|
上了这段代码后,我们的接口响应时间立马就下降了。而且服务器实例也从原来的100台下降到了20台。
译者的思考
对于大流量的系统,简单粗暴地使用go语句创建协程肯定是不靠谱的。大量的协程一来导致大量的内存占用。二来cpu忙于协程之间的调度,真正用来处理业务的时间大幅下降。如果涉及到第三方服务的话,也会导致对第三方服务的大量并发请求。所以一定需要协程池的处理。
而作者最初用buffered channel
来做任务队列的方案显示是不合适的,文中提到只有一个协程在消费,那么必然会面临channel满员无法写入的情况。但是即使改造成使用多个协程同时消费也无法保证消费能力时刻高于写入量,一旦channel拥堵,仍然会导致接口响应时间上涨。
之后作者采用的两级channel模式,初次看代码可能觉得有点绕。但是仔细推敲一下也能看明白。每个worker都有一个job channel,worker池里放的就是这些channel。调度器从worker池里取worker本质上是取出了一个worker的job channel,往这个channel里写数据就相当把任务交给了这个worker。而且worker的job channel是不带缓冲的,原因是调度器拿到worker的时候,worker一定是空闲状态,写入的数据是可以立刻被处理的。
另外,在dispatch
方法里,任务提交给worker是通过新起一个协程实现的,这样可以保证JobQueue
不会阻塞,handler也可以无阻塞地写入任务数据,响应时间得到了保障。
这里我有一个疑问。dispatch
方法里的go协程数量其实也是不可控的,理论上等于请求量与消费量的差值。那为什么这种方案会比原始的大量go协程性能好呢?
我的理解是,原始大量协程均处于活跃状态,都要参与调度器的调度。而dispatch
里的go协程就算很多,也是处于阻塞读channel的状态,并不参与调度,只有当channel可读时才会被唤醒进行后续处理,而且一旦唤醒,只有两个channel操作,生命周期很短,所以只是增加了一些内存使用而已。如果读者有更好的理解,不妨留言告诉我。