我在反广告、杀病毒、检木马等行业的不同软件公司里已经工作 15 年以上了,非常了解这类系统软件因每天处理海量数据而导致的复杂性。
目前我作为 smsjunk.com 的 CEO 和 KnowBe4 的主架构师,在这两个网络安全领域的公司里工作。
有趣的是,在过去的 10 年里,作为软件工程师,我接触到的 web 后端代码大多是用 Ruby on Rails 开发的。请不要误会,我很喜欢 Ruby on Railds 框架,而且我认为它是一套令人称赞的框架,不过时间一长,你就会习惯于使用 ruby 语言的方式思考和设计系统,会忘记利用多线程,并行化,快速执行和小的内存消耗,软件架构本可以如此高效且简单。很多年来,我也是一个 C/C++,Delphi 以及 C# 的使用者,而且我开始认识到使用正确的工具能让事情变得更简单。
我对互联网上没完没了的语言框架之间的论战并不感冒。因为我相信解决方案的效能及代码可维护性主要倚仗于你的架构能做到多简单。
在实现某个遥测分析系统时,我们遇到一个实际问题,要处理来自数百万终端的 POST 请求。其中的 web 请求处理过程会接收到一个 JSON 文档,它包含一个由许多荷载数据组成的集合,我们要把它写到 Amazon S3 存储中,之后我们的 map-reduce 系统就可以对这些数据进行处理。
一般我们会利用如下的组件去创建一个有后台工作层的架构,如:
并且建立两个不同的服务集群,一个用作 web 前端接收数据,另一个执行具体的工作,这样我们就能动态调整后台处理工作的能力了。
不过从项目伊始,我们的团队就认为应该用 Go 语言来实现这项工作,因为在讨论过程中我们发现这可能是一个流量巨大的系统。我已经使用 Go 语言快两年了,而且我们已经在工作中用它开发了一些系统,只是还没遇到过负载如此大的系统。
我们从定义一些 web 的 POST 请求载荷数据结构开始,还有一个用于上传到 S3 存储的方法。
type PayloadCollection struct {
WindowsVersion string `json:"version"`
Token string `json:"token"`
Payloads []Payload `json:"data"`
}
type Payload struct {
// [redacted]
}
func (p *Payload) UploadToS3() error {
// the storageFolder method ensures that there are no name collision in
// case we get same timestamp in the key name
storage_path := fmt.Sprintf("%v/%v", p.storageFolder, time.Now().UnixNano())
bucket := S3Bucket
b := new(bytes.Buffer)
encodeErr := json.NewEncoder(b).Encode(payload)
if encodeErr != nil {
return encodeErr
}
// Everything we post to the S3 bucket should be marked 'private'
var acl = s3.Private
var contentType = "application/octet-stream"
return bucket.PutReader(storage_path, b, int64(b.Len()), contentType, acl, s3.Options{})
}
起初我们实现了一个非常简单的 POST 处理接口,尝试用一个简单的 goroutine 并行工作处理过程:
func payloadHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
w.WriteHeader( http.StatusMethodNotAllowed)
return
}
// Read the body into a string for json decoding
var content = &PayloadCollection{}
err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)
if err != nil {
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
w.WriteHeader( http.StatusBadRequest)
return
}
// Go through each payload and queue items individually to be posted to S3
for _, payload := range content.Payloads {
go payload.UploadToS3() // <----- DON'T DO THIS
}
w.WriteHeader( http.StatusOK)
}
在普通负载的情况下,这段代码对于大多数人已经够用了,不过很快就被证明了不适合大流量的情形。当我们把第一个版本的代码部署到生产环境后,才发现实际情况远远超出我们的预期,系统流量比之前预计的大许多,我们低估了数据负载量。
上面的处理方式从几个方面来看都有问题。我们无法办法控制创建的 go routines 的数量。而且我们每分钟收到一百万次的 POST 请求,代码必然很快就崩溃。
我们需要寻找别的出路。从一开始,我们就在讨论怎样保证请求处理时间较短,然后在后台进行工作处理。当然,在 Ruby on Rails 里必须这样做,否则你会阻塞掉所有的 web 处理进程,无论你是否使用了 puma,unicorn,passenger (我们这里就不讨论 JRuby 了)。然后我们可能会使用常见的解决方案,比如 Resque,Sidkiq,SQS,等等。有许多方法可以完成这个任务。
所以第二次迭代采用了缓冲通道( buffered channel ),我们可以将一些工作先放入队列,再将它们上传至 S3,由于我们能够控制队列的大小,而且有充足的内存可用,所以我们以为将任务缓冲到 channel 队列中就可以了。
var Queue chan Payload
func init() {
Queue = make(chan Payload, MAX_QUEUE)
}
func payloadHandler(w http.ResponseWriter, r *http.Request) {
...
// Go through each payload and queue items individually to be posted to S3
for _, payload := range content.Payloads {
Queue <- payload
}
...
}
然后将任务从队列中取出再进行处理,我们使用了类似下面的代码:
func StartProcessor() {
for {
select {
case job := <-Queue:
job.payload.UploadToS3() // <-- 仍然不好使!
}
}
}
老实说,我都不知道当时我们在想些什么。这一定是喝红牛熬夜导致的结果。这个方案没给我们带来任何好处,我们只是将一个有问题的并发过程替换为了一个缓冲队列,它只是将问题推后了而已。我们的同步处理过程每次只将一份载荷数据上传到 S3,由于接受到请求的速率远大于单例程上传到 S3 的能力,我们的缓冲队列很快就满了,导致请求处理过程阻塞,无法将更多的数据送入队列。
我们傻乎乎地忽略了问题,最终开始了系统的死亡倒计时。在部署了这个问题版本之后几分钟里,系统的延迟以固定的速率不断增加。
我们决定使用 Go 通道的一种常用模式构建一个两层的通道系统,一个通道用作任务队列,另一个来控制处理任务时的并发量。
这个办法是想以一种可持续的速率、并发地上传数据至 S3 存储,这样既不会把机器跑挂掉也不会产生 S3 的连接错误。因此我们选择使用了一种 Job/Worker 模式。如果你熟悉 Java,C# 等语言,可以认为这是使用通道以 Go 语言的方式实现了一个工作线程池。
var (
MaxWorker = os.Getenv("MAX_WORKERS")
MaxQueue = os.Getenv("MAX_QUEUE")
)
// Job represents the job to be run
type Job struct {
Payload Payload
}
// A buffered channel that we can send work requests on.
var JobQueue chan Job
// Worker represents the worker that executes the job
type Worker struct {
WorkerPool chan chan Job
JobChannel chan Job
quit chan bool
}
func NewWorker(workerPool chan chan Job) Worker {
return Worker{
WorkerPool: workerPool,
JobChannel: make(chan Job),
quit: make(chan bool)}
}
// Start method starts the run loop for the worker, listening for a quit channel in
// case we need to stop it
func (w Worker) Start() {
go func() {
for {
// register the current worker into the worker queue.
w.WorkerPool <- w.JobChannel
select {
case job := <-w.JobChannel:
// we have received a work request.
if err := job.Payload.UploadToS3(); err != nil {
log.Errorf("Error uploading to S3: %s", err.Error())
}
case <-w.quit:
// we have received a signal to stop
return
}
}
}()
}
// Stop signals the worker to stop listening for work requests.
func (w Worker) Stop() {
go func() {
w.quit <- true
}()
}
我们修改了 web 请求处理过程,使用数据载荷创建了一个 Job
实例,然后将其送入 JobQueue
通道中供工作例程使用。
func payloadHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
w.WriteHeader( http.StatusMethodNotAllowed)
return
}
// Read the body into a string for json decoding
var content = &PayloadCollection{}
err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)
if err != nil {
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
w.WriteHeader( http.StatusBadRequest)
return
}
// Go through each payload and queue items individually to be posted to S3
for _, payload := range content.Payloads {
// let's create a job with the payload
work := Job{Payload: payload}
// Push the work onto the queue.
JobQueue <- work
}
w.WriteHeader( http.StatusOK)
}
在 web 服务初始化的过程中,我们创建了一个 Dispatcher
实例,调用 Run()
方法创建了工作例程池,并且通过监听 JobQueue
获取工作任务。
dispatcher := NewDispatcher(MaxWorker)
dispatcher.Run()
下面的代码是任务分派器的具体实现:
type Dispatcher struct {
// A pool of workers channels that are registered with the dispatcher
WorkerPool chan chan Job
}
func NewDispatcher(maxWorkers int) *Dispatcher {
pool := make(chan chan Job, maxWorkers)
return &Dispatcher{WorkerPool: pool}
}
func (d *Dispatcher) Run() {
// starting n number of workers
for i := 0; i < d.maxWorkers; i++ {
worker := NewWorker(d.pool)
worker.Start()
}
go d.dispatch()
}
func (d *Dispatcher) dispatch() {
for {
select {
case job := <-JobQueue:
// a job request has been received
go func(job Job) {
// try to obtain a worker job channel that is available.
// this will block until a worker is idle
jobChannel := <-d.WorkerPool
// dispatch the job to the worker job channel
jobChannel <- job
}(job)
}
}
}
注意我们提供了一个最大数量的参数,用于控制工作池中初始的例程数量。因为这个项目使用了 Amazon Elasticbeanstalk 以及 docker 中的 Go 环境,所以我们努力遵循 12-factor 的方法,从环境变量中读取配置值,便于在生产环境中进行系统配置。通过这种方式,我们可以控制工作例程的数量和工作队列的长度,无需对集群进行重新部署,我们就能快速调整参数值。
1
kslr 2018-02-28 15:32:51 +08:00
如果直接上传到 S3 然后报告结果给 API 呢
|
2
douglarek 2018-02-28 15:34:33 +08:00
mark
|
3
Crabbbbb 2018-02-28 15:36:17 +08:00
mark
|
4
douglarek 2018-02-28 15:36:20 +08:00
原文标题可没说是 Golang 轻松来搞定
|
5
chinvo 2018-02-28 15:36:20 +08:00
让客户端上传到 s3,然后发 s3 的结果给 API,这样做和文中方案有何利弊区分?
|
6
fatjiong 2018-02-28 15:38:17 +08:00
感谢分享。
|
7
est 2018-02-28 15:38:24 +08:00 1
> 从环境变量中读取配置值,便于在生产环境中进行系统配置。通过这种方式,我们可以控制工作例程的数量和工作队列的长度,无需对集群进行重新部署
这特么也是需要重启一下进程才能实现的把??? |
8
douglarek 2018-02-28 15:43:34 +08:00
看明白了,实现了个 sqs + 多进程消费 ?
|
9
ljypaul2011 2018-02-28 15:57:24 +08:00
mark
|
11
Icezers 2018-02-28 15:58:52 +08:00
为什么不拆分成 生产者-任务队列-消费者的微服务模式呢,只要保证任务队列的数据库不炸就行了啊,如果生产者的生产速率恒大于消费者的消费速率,文章中的 JobQueue 一样会炸啊,无非是时间问题,内存可比硬盘贵多了
|
12
xkeyideal 2018-02-28 16:01:37 +08:00 1
去年的老帖,动动脑再来发,作者说的最后一种高效方案写的真的好么?实现的复杂一逼,而且这种情况,后端服务堵了啥语言都白给。
|
13
murmur 2018-02-28 16:02:04 +08:00
Handling 1 Million Requests per Minute with Golang
又一个翻译标题党 这需求太简单了吧,收集一大堆 json 然后整合起来上传到亚马逊云上 几乎没有什么处理操作 只要内存队列控制的好的语言应该都能完成 |
15
Icezers 2018-02-28 16:11:46 +08:00 4
重新看了一遍原文看明白了
第一种方案,并发量过大导致上传任务奔溃 第二种方案,使用了 channel 控制并发,导致协程阻塞 于是作者的第三种方案就是写了一个类 Java 的线程池。。。。在高并发的时候把任务放入线程池,同时控制任务执行速率防止上传协程崩溃, 问题来了,如果真按文中所述每分钟百万次请求,要么放大协程数量炸上传,要么 JobQueue 阻塞炸内存。。。。 问题根源不是出在上传那里么,还是没有解决 所以没有什么是加一台机器不能解决的,如果有,就加 2 台 |
16
wph95 2018-02-28 16:12:47 +08:00 via iPhone 1
aws kinesis 了解一下
没看出来这文章哪里体现出一分钟一百万了 python async 跑分还吊打 go …… |
18
jswh 2018-02-28 16:20:18 +08:00
看下来了,和 go 关系不大
|
19
sagaxu 2018-02-28 16:25:50 +08:00
一秒才一万多,逻辑也不复杂,随便哪个语言都能轻松搞定吧
|
20
clippit 2018-02-28 16:26:46 +08:00
就单纯上报的话,还可以试试 AWS Lambda (逃
|
21
CoderGeek 2018-02-28 16:29:57 +08:00 via iPhone
好像真的没啥关系 微服务能很大程度避免这种问题吧
|
22
verzhshq 2018-02-28 16:31:25 +08:00
谢谢分享
|
23
xkeyideal 2018-02-28 16:31:30 +08:00 1
@Icezers 这个帖子,去年我专门拿出来做过相关的剖析,前提是不考虑后端服务的处理能力。
作者说了三种 goroutine 常用的方法,第一种最常见,第二种也还好,第三种设计的模型真的垃圾,简单代码复杂化,让人很难看懂,明明可以用个类 worker pool 的方式就能搞定,开多个 worker,每个 worker 去各自的 pool 中取任务即可,非要搞那么复杂,做的高级一点,还可以对 worker 的数量进行缩扩容。 最后讨论了一下这个应用场景,结论就是后端的 S3 服务一睹,啥方案都白给。 综合来看,这篇文章的价值很小,给一些刚入门的,不太理解 goroutine 的人看看还行 |
24
pmispig 2018-02-28 16:32:30 +08:00
我来总结下吧。最后优化的方式是使用了 2 个 channel. 一个用来放 job 列队,一个用来控制 Go routines 的并发数量。
|
25
rrfeng 2018-02-28 16:33:32 +08:00 via Android
这个翻译文章问题太多了...
大家去看原文吧。记得两年前就看过了,翻译出来根本不是原来的味道 |
26
jimrok 2018-02-28 16:34:33 +08:00
把消费者也并发执行了,最终搞定。这模式 java,erlang 都很容易实现,golang 又拿出来炒一遍。
|
27
shell314 2018-02-28 16:44:59 +08:00
学习了
|
28
xuanyuanaosheng 2018-02-28 16:45:15 +08:00
mark
|
29
glacier2002 2018-02-28 16:52:36 +08:00
学习了
|
30
yc8332 2018-02-28 17:01:49 +08:00
怎么感觉很 low 的样子 (逃
|
31
nnnoml 2018-02-28 17:07:51 +08:00
mark
|
32
soli 2018-02-28 17:23:14 +08:00 3
没仔细看代码,从描述来看,感觉作者及同事处理问题的步骤没啥大问题。
但估计是作者没有真正理解每个环节所遇到的问题的本质,所以给人有种『吃了第十个馒头饱了,就觉得前九个馒头都没用』的感觉。 简单梳理一下: 1. 最开始他们遇到的问题是『要处理来自数百万终端的 POST 请求』。 这其实是要解决并发数和响应时间的问题,顺带要考虑突发流量问题,即要『削峰』。 这时候他们采用协程去解决,没啥问题。 (有简单的解决方法,肯定用简单的哈。) 2. 解决了并发数和响应时间问题后,导致了爆协程的问题。 请求确实不少,协程处理没那么快的话,当然爆掉了。 尤其是遇到突发流量的话,服务不死才怪。 为了解决爆协程的问题(注意:这个阶段的问题已经和 1 中的问题不是同一个问题了), 那就用队列吧。至少队列可以控制数量哈。 这时候他们解决问题的方式,也没啥可指摘的。 3. 这时候横着爆的问题变成了纵向爆了。 用队列代替协程之后,协程数量爆的问题变成了队列长度爆的问题。 这个阶段才把吞吐率的问题暴露出来。 无论横着竖着都爆,那就是说明问题是数据处理不够快哈。 那开个线程池,多放几个线程处理数据呗。 这时候他们的解决方法,也没得说。 最后,既然每一步都没啥问题,但整个过程为啥给人一种有问题的感觉? 我认为,唯一的问题是整个团队没有有经验的人。 有经验的人,遇到这种情况的时候,至少能把各个阶段面临的问题理解清楚,处理起来有的放矢。 更好点的,可以预见后续会遇到的问题,从而从一开始就采用不同的解决方案。 再好点的,可以迅速做出解决方案原型,并进行多项性能测试,从而采用最简单的方案把问题解决。 如果原型+测试表明,用协程就可以解决了,那直接协程就好了,队列线程池什么的就属于过渡设计了; 类似文中这种情况,原型+测试的方法可以极大地减少对生产环境的影响, 也尽量避免了开发人员为了紧急应对线上问题而疲于奔命。 |
33
Icezers 2018-02-28 17:31:35 +08:00
@soli 我觉得他们的本质问题是 UploadToS3 速率过慢的问题,就算『削峰』+开线程池控制协程,出水口依然没有变大,最后池子还是会有问题的
|
34
soli 2018-02-28 17:44:06 +08:00
|
35
AckywOw 2018-02-28 17:44:23 +08:00
mark
|
36
hugodotlau 2018-02-28 18:06:18 +08:00
马克华菲
|
37
jinya 2018-02-28 19:01:39 +08:00
m
|
38
feverzsj 2018-02-28 19:24:44 +08:00
还以为是百万每秒,原来是每分钟,这也值得拿来说事?
|
39
angelshq 2018-02-28 19:35:21 +08:00
mark,学习完 go 在回头看看。
|
40
rayjoy 2018-02-28 22:26:56 +08:00
mark,go 学习中
|
41
sen506 2018-02-28 22:40:05 +08:00 via iPhone
1 秒 1 万多真心不高。。
|
42
kunluanbudang 2018-02-28 23:08:54 +08:00 via Android
|
43
searene 2018-03-01 09:01:47 +08:00
说句题外话,翻译口味有点浓,一看就能看出来
|
44
puperSB 2018-03-01 10:21:40 +08:00
mark
|