如果你曾經使用過API來獲取服務,那么你可能經受過與速率限制相抗衡。速率限制使得某種資源每次訪問的次數受限。資源可以是任何東西:API連接,磁盤I/O,網絡包,錯誤。
你有沒有想過為什么會需要制定速率限制?為什么不允許無限制地訪問系統?最明顯的答案是,通過對系統進行速率限制,可以防止整個系統遭受攻擊。如果惡意用戶可以在他們的資源允許的情況下極快地訪問你的系統,那么他們可以做各種事情。
例如,他們可以用日志消息或有效的請求填滿服務器的磁盤。如果你的日志配置錯誤,他們甚至可能會執行惡意的操作,然后提交足夠的請求,將任何活動記錄從日志中移出并放入/dev/null中導致日志系統完全崩潰。他們可能試圖暴力訪問資源,或者執行分布式拒絕服務攻擊。如果你沒有對系統進行請求限制,你的系統就成了一個走在街上不穿衣服的大美女。
更糟糕的是,非惡意請求有時也會造成上述結果。惡意使用并不是唯一的原因。 在分布式系統中,如果合法用戶正在以足夠高的速度執行操作,或者他們正在運行的代碼有問題,則合法用戶可能會降低系統的性能。這甚至可能導致我們之前討論的死亡螺旋。 從產品的角度來看,這太糟糕了!通常情況下,你希望向用戶提供某種類型的性能保證,以確保他們可以在一致的基礎上達到不錯的性能。如果一個用戶可以影響該協議,那么你的日子就不好過了。用戶對系統的訪問通常被沙盒化,既不會影響其他用戶的活動也不會受其他用戶影響。如果你打破了這種思維模式,你的系統會表現出糟糕的設計使用感,甚至會導致用戶生氣或離開。
>我曾經在一個分布式系統上工作,通過啟動新的進程來并行擴展(這允許它水平擴展到多臺機器)。每個進程都會打開數據庫連接,讀取一些數據并進行一些計算。一段時間以來,我們在以這種方式擴展系統以滿足客戶需求方面取得了巨大成功。 但是,一段時間后,我們發現大量的數據庫中讀取數據超時。
>我們的數據庫管理員仔細查看了日志,試圖找出問題所在。最后他們發現,由于系統中沒有設定任何速率限制,所以進程彼此重疊。由于不同的進程試圖從磁盤的不同部分讀取數據,因此磁盤競爭將達到100%并一直保持在這個水平。這反過來導致了一系列的循環——超時——重試——循環。工作永遠不會完成。
>我們設計了一個系統來限制數據庫上可能的連接數量,并且速率限制被放在連接可以讀取的秒級單位,問題消失了。雖然客戶不得不等待更長的時間,但畢竟他們的工作完成了,我們能夠在接下來的時間里進行適當的容量規劃,以系統化的方式擴展容量。
速率限制使得你可以通過某個界限來推斷系統的性能和穩定性。如果你需要擴展這些邊界,可以在大量測試后以可控的方式進行。
在密集用戶操作的情況下,速率限制可以使你的系統與用戶之間保持良好的關系。你可以允許他們在嚴格限制的速率下嘗試系統的特性。Google的云產品證明了這一點,這種限制在某種意義上是可以保護用戶的。
速率限制通常由資源的構建者角度來考慮,但對于用戶來說,能夠減少速率限制的影響會讓其感到非常欣慰。
那么我們怎樣用Go來實現呢?
大多數速率限制是通過使用稱為令牌桶的算法完成的。這很容易理解,而且相對容易實施。 我們來看看它背后的理論。
假設要使用資源,你必須擁有資源的訪問令牌。沒有令牌,請求會被拒絕。想象這些令牌存儲在等待被檢索以供使用的桶中。該桶的深度為d,表示它一次可以容納d個訪問令牌。
現在,每次你需要訪問資源時,你都會進入存儲桶并刪除令牌。如果你的存儲桶包含五個令牌,那么您可以訪問五次資源。在第六次訪問時,沒有訪問令牌可用,那么必須將請求加入隊列,直到令牌變為可用,或拒絕請求。
這里有一個時間表來幫助觀察這個概念。time表示以秒為單位的時間增量,bucket表示桶中請求令牌的數量,并且請求列中的tok表示成功的請求。(在這個和未來的時間表中,我們假設這些請求是即時的。)
:-: 
可以看到,在第一秒之前,我們能夠完成五個請求,然后我們被阻塞,因為沒有更多的令牌可供使用。
到目前為止,這些都很容易理解。那么如何補充令牌呢?在令牌桶算法中,我們將r定義為將令牌添加回桶的速率。 它可以是一納秒或一分鐘。它就是我們通常認為的速率限制:因為我們必須等待新的令牌可用,我們將操作限制為令牌的刷新率。
以下是深度為1的令牌桶示例,速率為1令牌/秒:
:-: 
可以看到我們立即可以提出請求,但是只能隔一秒鐘將請求一次。速率限制執行的非常好!
所以我們現在有兩個設置可以擺弄:有多少令牌可用于立即使用,即桶的深度d,以及它們補充的速率r。另外我們還可以考慮下當存儲桶已滿時可以進行多少次請求。
以下是深度為5的令牌桶示例,速率為0.5令牌/秒:
:-: 
在這里,我們能夠立即提出五個請求,在這之后,每兩秒限制一次請求。請求的爆發是在桶最初裝滿的時候。
請注意,用戶可能不會消耗一個長數據流中的整個令牌桶。桶的深度只控制桶的容量。這里有一個用戶爆發兩次請求的例子,然后四秒鐘后爆發了五次:
:-: 
:-: 
雖然用戶有可用的令牌,但這種爆發性僅允許根據調用者的能力限制訪問系統。對于只能間歇性訪問系統但希望盡快往返的用戶來說,這種爆發性的存在是很好的。你只需要確保系統能夠同時處理所有用戶的爆發操作,或者確保操作上不可能有足夠的用戶同時爆發操作以影響你的系統。無論哪種方式,速率限制都可以讓你規避風險。
讓我們使用這個算法來看看一個Go程序在執行令牌桶算法的時如何表現。
假設我們可以訪問API,并且已經提供了客戶端來使用它。該API有兩個操作:一個用于讀取文件,另一個用于將域名解析為IP地址。為了簡單起見,我將忽略任何參數并返回實際訪問服務所需的值。 所以這是我們的客戶端:
```
func Open() *APIConnection {
return &APIConnection{}
}
type APIConnection struct{}
func (a *APIConnection) ReadFile(ctx context.Context) error {
// Pretend we do work here
return nil
}
func (a *APIConnection) ResolveAddress(ctx context.Context) error {
// Pretend we do work here
return nil
}
```
由于理論上這個請求正在通過網絡傳遞,所以我們將context.Context作為第一個參數,以防我們需要取消請求或將值傳遞給服務器。 通過前面的章節討論,想必大家已經熟悉了這樣的用法。
現在我們將創建一個簡單的程序來訪問這個API。程序需要讀取10個文件并解析10個地址,但這些文件和地址彼此沒有關系,因此程序可以并發調用。稍后這將有助于添加利率限制。
```
func main() {
defer log.Printf("Done.")
log.SetOutput(os.Stdout)
log.SetFlags(log.Ltime | log.LUTC)
apiConnection := Open()
var wg sync.WaitGroup
wg.Add(20)
for i := 0; i < 10; i++ {
go func() {
defer wg.Done()
err := apiConnection.ReadFile(context.Background())
if err != nil {
log.Printf("cannot ReadFile: %v", err)
}
}()
}
log.Printf("ReadFile")
for i := 0; i < 10; i++ {
go func() {
defer wg.Done()
err := apiConnection.ResolveAddress(context.Background())
if err != nil {
log.Printf("cannot ResolveAddress: %v", err)
}
}()
}
log.Printf("ResolveAddress")
wg.Wait()
}
```
這會輸出:
```
20:13:13 ResolveAddress
20:13:13 ReadFile
20:13:13 ResolveAddress
20:13:13 ReadFile
20:13:13 ReadFile
20:13:13 ReadFile
20:13:13 ReadFile
20:13:13 ResolveAddress
20:13:13 ResolveAddress
20:13:13 ReadFile
20:13:13 ResolveAddress
20:13:13 ResolveAddress
20:13:13 ResolveAddress
20:13:13 ResolveAddress
20:13:13 ResolveAddress
20:13:13 ResolveAddress
20:13:13 ReadFile
20:13:13 ReadFile
20:13:13 ReadFile
20:13:13 ReadFile
20:13:13 Done.
```
我們可以看到所有的API請求幾乎同時進行。由于沒有設定速率限制,所以用戶可以隨意隨意訪問系統。現在我需要提醒你,當前個可能導致無限循環的錯誤。
好的,讓我們來介紹一個限速器。我打算在APIConnection中這樣做,但通常會在服務器上運行速率限制器,這樣用戶就無法繞過它。生產系統還可能包括一個客戶端速率限制器,以防止用戶因不必要的調用而被拒絕,但這是一種優化,并不是必需的。就我們的目的而言,限速器使事情變得簡單。
我們將看看golang.org/x/time/rate 中的令牌桶速率限制器實現的示例。我選擇了這個包,因為這跟我所能得到的標準庫較為相近。當然還有其他的軟件包可以做更多的花里胡哨的工作。 golang.org/x/time/rate 非常簡單,而且它適用于我們的目的。
我們將與這個包交互的前兩種方法是Limit類型和NewLimiter函數,在這里定義:
```
// Limit 定義了事件的最大頻率。
// Limit 被表示為每秒事件的數量。
// 值為0的Limit不允許任何事件。
type Limit float64
// ewLimiter返回一個新的Limiter實例,
// 件發生率為r,并允許至多b個令牌爆發。
func NewLimiter(r Limit, b int) *Limiter
```
在NewLimiter中,我們看到了兩個熟悉的參數:r ,以及 b.r。b是我們之前討論過的桶的深度。
rate 包還定義了一個有用的函數,Every,幫助將time.Duration轉換為Limit:
```
// Every將事件之間的最小時間間隔轉換為Limit。
func Every(interval time.Duration) Limit
```
Every函數是有意義的,但我想討論每次rate限制了操作次數,而不是請求之間的時間間隔。 我們可以將其表述如下:
```
rate.Limit(events/timePeriod.Seconds())
```
但是我不想每次都輸入這個值,Every函數有一些特殊的邏輯會返回rate.Inf——表示沒有限制——如果提供的時間間隔為零。 正因為如此,我們將用這個詞來表達我們的幫助函數:
```
func Per(eventCount int, duration time.Duration) rate.Limit {
return rate.Every(duration/time.Duration(eventCount))
}
```
在我們建立rate.Limiter后,我們希望使用它來阻塞我們的請求,直到獲得訪問令牌。 我們可以用Wait方法來做到這一點,它只是用參數1來調用WaitN:
```
// Wait 是 WaitN(ctx, 1)的簡寫。
func (lim *Limiter) Wait(ctx context.Context)
// WaitN 會發生阻塞直到 lim 允許的 n 個事件執行。
// 它返回一個 error 如果 n 超過了 Limiter的桶大小,
// Context會被取消, 或等待的時間超過了 Context 的 Deadline。
func (lim *Limiter) WaitN(ctx context.Context, n int) (err error)
```
我們已經集齊了限制API請求的所有要素。讓我們修改APIConnection類型并嘗試一下:
```
func Open() *APIConnection {
return &APIConnection{
rateLimiter: rate.NewLimiter(rate.Limit(1), 1) //1
}
}
type APIConnection struct {
rateLimiter *rate.Limiter
}
func (a *APIConnection) ReadFile(ctx context.Context) error {
if err := a.rateLimiter.Wait(ctx); err != nil { //2
return err
}
// Pretend we do work here
return nil
}
func (a *APIConnection) ResolveAddress(ctx context.Context) error {
if err := a.rateLimiter.Wait(ctx); err != nil { //2
return err
}
// Pretend we do work here
return nil
}
```
1. 我們將所有API連接的速率限制設置為每秒一個事件。
2. 我們等待速率限制器有足夠的權限來完成我們的請求。
這會輸出:
```
22:08:30 ResolveAddress
22:08:31 ReadFile
22:08:32 ReadFile
22:08:33 ReadFile
22:08:34 ResolveAddress
22:08:35 ResolveAddress
22:08:36 ResolveAddress
22:08:37 ResolveAddress
22:08:38 ResolveAddress
22:08:39 ReadFile
22:08:40 ResolveAddress
22:08:41 ResolveAddress
22:08:42 ResolveAddress
22:08:43 ResolveAddress
22:08:44 ReadFile
22:08:45 ReadFile
22:08:46 ReadFile
22:08:47 ReadFile
22:08:48 ReadFile
22:08:49 ReadFile
22:08:49 Done.
```
在生產中,我們可能會想要一些更復雜的東西。我們可能希望建立多層限制:細粒度控制以限制每秒請求數,粗粒度控制限制每分鐘,每小時或每天的請求數。
在某些情況下,可以通過速率限制器來實現; 然而,在所有情況下都這么干是不可能的,并且通過嘗試將單位時間的限制語義轉換為單一層,你會因速率限制器而丟失大量信息。出于這些原因,我發現將限制器分開并將它們組合成一個限速器來管理交互更容易。 為此,我創建了一個簡單的聚合速率限制器multiLimiter。 這是定義:
```
type RateLimiter interface { //1
Wait(context.Context) error
Limit() rate.Limit
}
func MultiLimiter(limiters ...RateLimiter) *multiLimiter {
byLimit := func(i, j int) bool {
return limiters[i].Limit() < limiters[j].Limit()
}
sort.Slice(limiters, byLimit) //2
return &multiLimiter{limiters: limiters}
}
type multiLimiter struct {
limiters []RateLimiter
}
func (l *multiLimiter) Wait(ctx context.Context) error {
for _, l := range l.limiters {
if err := l.Wait(ctx); err != nil {
return err
}
}
return nil
}
func (l *multiLimiter) Limit() rate.Limit {
return l.limiters[0].Limit() //3
}
```
1. 這里我們定義一個RateLimiter接口,以便MultiLimiter可以遞歸地定義其他MultiLimiter實例。
2. 這里我們實現一個優化,并按照每個RateLimiter的Limit()行排序。
3. 因為我們在multiLimiter實例化時對子RateLimiter實例進行排序,所以我們可以簡單地返回限制性最高的limit,這將是切片中的第一個元素。
Wait方法遍歷所有子限制器,并在每一個子限制器上調用Wait。這些調用可能會也可能不會阻塞,但我們需要通知每個子限制器,以便減少令牌桶內的令牌數量。通過等待每個限制器,我們保證最長的等待時間。這是因為如果我們執行時間較小的等待,那么最長的等待時間將被重新計算為剩余時間。在較早的等待被阻塞時,后者會等待令牌桶的填充。
經過思考,讓我們重新定義APIConnection,對每秒和每分鐘都進行限制:
```
func Open() *APIConnection {
secondLimit := rate.NewLimiter(Per(2, time.Second), 1) //1
minuteLimit := rate.NewLimiter(Per(10, time.Minute), 10) //2
return &APIConnection{
rateLimiter: MultiLimiter(secondLimit, minuteLimit) //3
}
}
type APIConnection struct {
rateLimiter RateLimiter
}
func (a *APIConnection) ReadFile(ctx context.Context) error {
if err := a.rateLimiter.Wait(ctx); err != nil {
return err
}
// Pretend we do work here
return nil
}
func (a *APIConnection) ResolveAddress(ctx context.Context) error {
if err := a.rateLimiter.Wait(ctx); err != nil {
return err
}
// Pretend we do work here
return nil
}
```
1. 我們定義了每秒的極限。
2. 我們定義每分鐘的突發極限為10,為用戶提供初始池。每秒的限制將確保我們不會因請求而使系統過載。
3. 我們結合這兩個限制,并將其設置為APIConnection的主限制器。
這會輸出:
```
22:46:10 ResolveAddress
22:46:10 ReadFile
22:46:11 ReadFile
22:46:11 ReadFile
22:46:12 ReadFile
22:46:12 ReadFile
22:46:13 ReadFile
22:46:13 ReadFile
22:46:14 ReadFile
22:46:14 ReadFile
22:46:16 ResolveAddress
22:46:22 ResolveAddress
22:46:28 ReadFile
22:46:34 ResolveAddress
22:46:40 ResolveAddress
22:46:46 ResolveAddress
22:46:52 ResolveAddress
22:46:58 ResolveAddress
22:47:04 ResolveAddress
22:47:10 ResolveAddress
22:47:10 Done.
```
正如您所看到的,我們每秒發出兩個請求,直到請求#11,此時我們開始每隔六秒發出一次請求。 這是因為我們耗盡了我們可用的每分鐘請求令牌池,并受此限制。
為什么請求#11在兩秒內發生而不是像其他請求那樣發生,這可能有點違反直覺。 請記住,盡管我們將API請求限制為每分鐘10個,但一分鐘是一個動態的時間窗口。 當我們達到第十一個要求時,我們的每分鐘限制器已經累積了另一個令牌。
通過定義這樣的限制,我們可以清楚地表達了粗粒度限制,同時仍然以更詳細的細節水平限制請求數量。
這種技術也使我們能夠開始思考除時間之外的其他維度。 當你對系統進行限制時,你可能會限制不止一件事。 也可能對API請求的數量有一些限制,也會對其他資源(如磁盤訪問,網絡訪問等)有限制。讓我們稍微充實一下示例并設置磁盤和網絡限制
```
func Open() *APIConnection {
return &APIConnection{
apiLimit: MultiLimiter( //1
rate.NewLimiter(Per(2, time.Second), 2),
rate.NewLimiter(Per(10, time.Minute), 10)),
diskLimit: MultiLimiter(rate.NewLimiter(rate.Limit(1), 1)), //2
networkLimit: MultiLimiter(rate.NewLimiter(Per(3, time.Second), 3)) //3
}
}
type APIConnection struct {
networkLimit,
diskLimit,
apiLimit RateLimiter
}
func (a *APIConnection) ReadFile(ctx context.Context) error {
err := MultiLimiter(a.apiLimit, a.diskLimit).Wait(ctx) //4
if err != nil {
return err
}
// Pretend we do work here
return nil
}
func (a *APIConnection) ResolveAddress(ctx context.Context) error {
err := MultiLimiter(a.apiLimit, a.networkLimit).Wait(ctx) //5
if err != nil {
return err
}
// Pretend we do work here
return nil
}
```
1. 我們為API調用設置了一個限制器。 每秒請求數和每分鐘請求數都有限制
2. 我們為磁盤讀取設置一個限制器。將其限制為每秒一次讀取。
3. 對于網絡,我們將設置每秒三個請求的限制。
4. 當我們讀取文件時,將結合來自API限制器和磁盤限制器的限制。
5. 當我們需要網絡訪問時,將結合來自API限制器和網絡限制器的限制。
這會輸出:
```
01:40:15 ResolveAddress
01:40:15 ReadFile
01:40:16 ReadFile
01:40:17 ResolveAddress
01:40:17 ResolveAddress
01:40:17 ReadFile
01:40:18 ResolveAddress
01:40:18 ResolveAddress
01:40:19 ResolveAddress
01:40:19 ResolveAddress
01:40:21 ResolveAddress
01:40:27 ResolveAddress
01:40:33 ResolveAddress
01:40:39 ReadFile
01:40:45 ReadFile
01:40:51 ReadFile
01:40:57 ReadFile
01:41:03 ReadFile
01:41:09 ReadFile
01:41:15 ReadFile
01:41:15 Done.
```
讓我們關注一下這樣的事實,即我們可以將限制器組合成對每個調用都有意義的組,并且讓APIClient執行正確的操作。如果我們想隨便觀察一下它是如何工作的,會注意到涉及網絡訪問的API調用似乎以更加規律的方式發生,并在前三分之二的調用中完成。這可能與goroutine調度有關,也是我們的限速器正在執行各自的工作。
我還應該提到的是,rate.Limiter類型有一些其他的技巧來優化不同的用例。這里只討論了它等待令牌桶接收另一個令牌的能力,但如果你有興趣使用它,可以查閱文檔。
在本節中,我們考察了使用速率限制的理由,構建了令牌桶算法的Go實現,以及如何將令牌桶限制器組合為更大,更復雜的速率限制器。這應該能讓你很好地了解速率限制,并幫助你開始使用它們。
* * * * *
學識淺薄,錯誤在所難免。我是長風,歡迎來Golang中國的群(211938256)就本書提出修改意見。
- 前序
- 誰適合讀這本書
- 章節導讀
- 在線資源
- 第一章 并發編程介紹
- 摩爾定律,可伸縮網絡和我們所處的困境
- 為什么并發編程如此困難
- 數據競爭
- 原子性
- 內存訪問同步
- 死鎖,活鎖和鎖的饑餓問題
- 死鎖
- 活鎖
- 饑餓
- 并發安全性
- 優雅的面對復雜性
- 第二章 代碼建模:序列化交互處理
- 并發與并行
- 什么是CSP
- CSP在Go中的衍生物
- Go的并發哲學
- 第三章 Go的并發構建模塊
- Goroutines
- sync包
- WaitGroup
- Mutex和RWMutex
- Cond
- Once
- Pool
- Channels
- select語句
- GOMAXPROCS
- 結論
- 第四章 Go的并發編程范式
- 訪問范圍約束
- fo-select循環
- 防止Goroutine泄漏
- or-channel
- 錯誤處理
- 管道
- 構建管道的最佳實踐
- 便利的生成器
- 扇入扇出
- or-done-channel
- tee-channel
- bridge-channel
- 隊列
- context包
- 小結
- 第五章 可伸縮并發設計
- 錯誤傳遞
- 超時和取消
- 心跳
- 請求并發復制處理
- 速率限制
- Goroutines異常行為修復
- 本章小結
- 第六章 Goroutines和Go運行時
- 任務調度