# 7. redis 實現消息隊列
#### 1. 介紹
redis有一個數據類型叫list(列表),它的每個子元素都是 string 類型的雙向鏈表。我們可以通過 push,pop 操作從鏈表的頭部或者尾部添加刪除元素。這使得 list 既可以用作棧,也可以用作隊列。
假如,我們有一個隊列系統,把一個個任務放到隊列中,另一個進程就把隊列中的任務取出來執行。
放到隊列我們使用`LPUSH`,也就是往雙向鏈表的尾部填充一個元素,這一端也叫生產者,是產生內容的一端。
另一個進程使用`RPOP`往頭部取出元素來執行,這一端也叫消費者。
如果僅僅是這種方式來實現隊列,它就是需要進程不斷地循環隊列,判斷隊列是不是有新元素,有的話就取出來執行,沒有的話,就繼續循環,但是這個總有一個時間間隔,你總得規定每隔一段時間去循環,雖然這個時間很小,但總有延遲,這種方式叫作輪循。有沒有一種方式就是讓不斷執行一個redis命令,而redis中的列隊有值就會通過命令通知程序呢?有的,那就是阻塞操作的`RPOP`,它叫作`BRPOP`。
官方文檔有一篇文章[An introduction to Redis data types and abstractions](http://redis.io/topics/data-types-intro#lists)是介紹了redis的各種數據結構,其中談到了list。list部分談到"Blocking operations on lists",這種就是阻塞版本的list,通常就是用它來實現消息隊列的。
#### 2. 實現
我們來演示一下它是如何實現的。
```
$ redis-cli
127.0.0.1:6379> BRPOP list1 0
```
先執行BRPOP,假如隊列list1沒有值,它會返回nil,并且阻塞在那,在等另一個程序或進程往list1中填值。
我們開啟另一個`redis`端終。
```
$ redis-cli
127.0.0.1:6379> LPUSH list1 a
(integer) 1
```
我們再來看之前的結果。
```
127.0.0.1:6379> BRPOP list1 0
1) "list1"
2) "a"
(16.99s)
```
這樣就能把列表的值給取到了。
#### 3. ost
下面我們通過這個叫[ost](https://github.com/soveran/ost)的ruby gem來實現消息隊列,并來分析它的源碼,來了解redis是如何結合編程語言來實現消息隊列的。
先把下面一行添加到Gemfile文件中。
```
gem 'ost'
```
接著在`config/initializers`添加一個文件叫ost.rb,內容如下。
```
require "ost"
Ost.redis = Redic.new("redis://127.0.0.1:6379")
```
ost是使用一個輕量級的ruby客戶端[redic](https://github.com/amakawa/redic)來連接redis的。
消息隊列的模型生成兩個部分,分別是生產者和消費者,生產者部分就是把訪問的文章放到隊列中,那就把文章的唯一標識id放到隊列就好了。
```
class ArticlesController < ApplicationController
def show
@article = Article.find(params[:id])
Ost[:article] << @article.id
end
end
```
`Ost[:article] << @article.id`這一部分就相當于上文提到的`LPUSH`。
現在可以打開`redis-cli`,運行監控命令來查看redis中的狀態。
```
$ redis-cli
127.0.0.1:6379> monitor
OK
```
我們在頁面上隨便刷新一篇文章,然后可以在`monitor`中看到類似下面的信息。
```
1446890795.971666 [0 127.0.0.1:53622] "LPUSH" "ost:article" "21"
```
現在生產者好了,要來處理消費者部分。
一般來說我們是要開啟另一個進程,但現在我們的重點不在這,我們就用`rails console`來摸似就好了。
在`console`中運行下面的命令。
```
Ost[:article].each do |article_id|
@article = Article.find(article_id)
@article.visit_count += 1
@article.save!(validate: false)
end
```
現在到頁面上刷新,再觀察visit\_count的變化,會發現文章的visit\_count會加1的。
而且在`monitor`中出不斷地出現下面的字樣。
```
1446891057.725337 [0 127.0.0.1:54316] "BRPOPLPUSH" "ost:article" "ost:article:MacintoshdeMacBook-Air.local:4188" "2"
1446891059.807253 [0 127.0.0.1:54316] "BRPOPLPUSH" "ost:article" "ost:article:MacintoshdeMacBook-Air.local:4188" "2"
1446891061.827532 [0 127.0.0.1:54316] "BRPOPLPUSH" "ost:article" "ost:article:MacintoshdeMacBook-Air.local:4188" "2"
1446891063.881999 [0 127.0.0.1:54316] "BRPOPLPUSH" "ost:article" "ost:article:MacintoshdeMacBook-Air.local:4188" "2"
1446891065.897304 [0 127.0.0.1:54316] "BRPOPLPUSH" "ost:article" "ost:article:MacintoshdeMacBook-Air.local:4188" "2"
```
`console`中的執令就是調用redis的阻塞式的`RPOP`。
現在整個流程已經完成了,ost是怎么實現的呢,這就需要來分析它的源碼。
#### 4. ost源碼分析
ost這個gem只有一個[源文件](https://github.com/soveran/ost/blob/master/lib/ost.rb),總共有77行代碼。
其中最主要的有下面的部分。
```
def push(value)
redis.call("LPUSH", @key, value)
end
def each(&block)
loop do
item = redis.call("BRPOPLPUSH", @key, @backup, TIMEOUT)
if item
block.call(item)
redis.call("LPOP", @backup)
end
break if @stopping
end
end
alias << push
alias pop each
```
`Ost[:article] << @article.id`就對應上面的`push`方法,而`Ost[:article].each`部分就對應`each`方法。
從`each`方法的代碼中可以看到調用了loop循環`BRPOPLPUSH`,`BRPOPLPUSH`是另一個阻塞版本的`RPOP`,它可以接超時的時間。
完結。