## 編寫一個 WorkerPool 隊列消費
> 隊列消費是高并發系統中最常用的異步處理模型,通常我們是編寫一個 Console 命令行程序在后臺執行 Redis、RabbitMQ 等 MQ 的隊列消費,并將處理結果落地到 mysql 等數據庫中,由于這類需求的標準化比較容易,因此我們開發了 [workerpool](https://github.com/mix-go/workerpool) 庫來處理這類需求,基本上大部分異步處理類需求都可使用
首先我們使用 `mix` 命令創建一個 Console 項目骨架:
~~~
mix new --name=hello
~~~
然后我們在 `commands` 新建 `commands/workerpool.go` 文件:
- 定義一個新的命令結構體 `WorkerPoolDaemonCommand`
- `WorkerPoolDaemonCommand.Main` 方法為默認執行的入口方法
- 代碼中 `workerpool.NewDispatcher(jobQueue, 15, NewWorker)` 創建了一個調度器
- `NewWorker` 負責初始化執行任務的工作協程
- 任務數據會在 `worker.Do` 方法中觸發,我們只需要將我們的業務邏輯寫到該方法中即可
- 當程序接收到進程信號時,調度器能平滑控制所有的 Worker 在執行完隊列里全部的任務后再退出調度,保證數據的完整性
~~~
package commands
import (
"context"
"fmt"
"github.com/mix-go/console-skeleton/globals"
"github.com/mix-go/console/catch"
"github.com/mix-go/workerpool"
"os"
"os/signal"
"strings"
"syscall"
"time"
)
type worker struct {
workerpool.WorkerTrait
}
func (t *worker) Do(data interface{}) {
defer func() {
if err := recover(); err != nil {
catch.Error(err)
}
}()
// 執行業務處理
// ...
// 將處理結果落地到數據庫
// ...
}
func NewWorker() workerpool.Worker {
return &worker{}
}
type WorkerPoolDaemonCommand struct {
}
func (t *WorkerPoolDaemonCommand) Main() {
redis := globals.Redis()
jobQueue := make(chan interface{}, 50)
d := workerpool.NewDispatcher(jobQueue, 15, NewWorker)
ch := make(chan os.Signal)
signal.Notify(ch, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-ch
d.Stop()
}()
go func() {
for {
res, err := redis.BRPop(context.Background(), 3*time.Second, "foo").Result()
if err != nil {
if strings.Contains(err.Error(), "redis: nil") {
continue
}
fmt.Println(fmt.Sprintf("Redis Error: %s", err))
d.Stop();
return
}
// brPop命令最后一個鍵才是值
jobQueue <- res[1]
}
}()
d.Run() // 阻塞代碼,直到任務全部執行完成并且全部 Worker 停止
}
~~~
然后我們把上面的 `WorkerPoolDaemonCommand` 在 `manifest/commands` 中注冊,新增一個命令配置:
- 新建 `manifest/commands/workerpool.go` 文件
~~~
package commands
import (
"github.com/mix-go/console"
"github.com/mix-go/console-skeleton/commands"
)
func init() {
Commands = append(Commands,
console.CommandDefinition{
Name: "wpd",
Usage: "\tWorker pool daemon demo",
Options: []console.OptionDefinition{
{
Names: []string{"d", "daemon"},
Usage: "Run in the background",
},
},
Command: &commands.WorkerPoolDaemonCommand{},
},
)
}
~~~
## 編譯與測試
> 也可以在 Goland Run 里配置 Program arguments 直接編譯執行,[Goland 使用] 章節有詳細介紹
接下來我們編譯上面的程序:
~~~
// linux & macOS
go build -o bin/go_build_main_go main.go
// win
go build -o bin/go_build_main_go.exe main.go
~~~
執行 `wpd` 命令:
~~~
$ cd bin
$ ./go_build_main_go wpd
~~~
當我們想在服務器后臺執行時,只需增加 `-d/--daemon` 參數:
~~~
$ ./go_build_main_go wpd -d
~~~