<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                合規國際互聯網加速 OSASE為企業客戶提供高速穩定SD-WAN國際加速解決方案。 廣告
                細心的你可能發現了,本系列課程中竟然出現了三個課時都是在說消息隊列,第 10 課時講了程序級別的消息隊列以及延遲消息隊列的實現,而第 15 課時講了常見的消息隊列中間件 RabbitMQ、Kafka 等,由此可見消息隊列在整個 Java 技術體系中的重要程度。本課時我們將重點來看一下 Redis 是如何實現消息隊列的。 我們本課時的面試題是,在 Redis 中實現消息隊列的方式有幾種? #### 典型回答 早在 Redis 2.0 版本之前使用 Redis 實現消息隊列的方式有兩種: * 使用 List 類型實現 * 使用 ZSet 類型實現 其中使用**List 類型實現的方式最為簡單和直接**,它主要是通過 lpush、rpop 存入和讀取實現消息隊列的,如下圖所示: ![](https://img.kancloud.cn/9d/8f/9d8fd89380a6778205dc6badeb06a110_898x252.png) lpush 可以把最新的消息存儲到消息隊列(List 集合)的首部,而 rpop 可以讀取消息隊列的尾部,這樣就實現了先進先出,如下圖所示: ![](https://img.kancloud.cn/06/ca/06caae0bf2383680ca923f6d026d2433_1038x252.png) 命令行的實現命令如下: ``` 127.0.0.1:6379> lpush mq "java" #推送消息 java (integer) 1 127.0.0.1:6379> lpush mq "msg" #推送消息 msg (integer) 2 127.0.0.1:6379> rpop mq #接收到消息 java "java" 127.0.0.1:6379> rpop mq #接收到消息 msg "mq" ``` 其中,mq 相當于消息隊列的名稱,而 lpush 用于生產并添加消息,而 rpop 用于拉取并消費消息。 使用 List 實現消息隊列的優點是消息可以被持久化,List 可以借助 Redis 本身的持久化功能,AOF 或者是 RDB 或混合持久化的方式,用于把數據保存至磁盤,這樣當 Redis 重啟之后,消息不會丟失。 但使用 List 同樣存在一定的問題,比如消息不支持重復消費、沒有按照主題訂閱的功能、不支持消費消息確認等。 ZSet 實現消息隊列的方式和 List 類似,它是利用 zadd 和 zrangebyscore 來實現存入和讀取消息的,這里就不重復敘述了。但 ZSet 的實現方式更為復雜一些,因為 ZSet 多了一個分值(score)屬性,我們可以使用它來實現更多的功能,比如用它來存儲時間戳,以此來實現延遲消息隊列等。 ZSet 同樣具備持久化的功能,List 存在的問題它也同樣存在,不但如此,使用 ZSet 還不能存儲相同元素的值。因為它是有序集合,有序集合的存儲元素值是不能重復的,但分值可以重復,也就是說當消息值重復時,只能存儲一條信息在 ZSet 中。 在 Redis 2.0 之后 Redis 就新增了專門的發布和訂閱的類型,Publisher(發布者)和 Subscriber(訂閱者)來實現消息隊列了,它們對應的執行命令如下: * 發布消息,publish channel "message" * 訂閱消息,subscribe channel 使用發布和訂閱的類型,我們可以實現主題訂閱的功能,也就是 Pattern Subscribe 的功能。因此我們可以使用一個消費者“queue_*”來訂閱所有以“queue_”開頭的消息隊列,如下圖所示: ![](https://img.kancloud.cn/68/75/68756af3e11f3195fc669d7825446128_790x562.png) 發布訂閱模式的優點很明顯,但同樣存在以下 3 個問題: * 無法持久化保存消息,如果 Redis 服務器宕機或重啟,那么所有的消息將會丟失; * 發布訂閱模式是“發后既忘”的工作模式,如果有訂閱者離線重連之后就不能消費之前的歷史消息; * 不支持消費者確認機制,穩定性不能得到保證,例如當消費者獲取到消息之后,還沒來得及執行就宕機了。因為沒有消費者確認機制,Redis 就會誤以為消費者已經執行了,因此就不會重復發送未被正常消費的消息了,這樣整體的 Redis 穩定性就被沒有辦法得到保障了。 然而在 Redis 5.0 之后新增了 Stream 類型,我們就可以使用 Stream 的 xadd 和 xrange 來實現消息的存入和讀取了,并且 Stream 提供了 xack 手動確認消息消費的命令,用它我們就可以實現消費者確認的功能了,使用命令如下: ``` 127.0.0.1:6379> xack mq group1 1580959593553-0 (integer) 1 ``` 相關語法如下: ``` xack key group-key ID [ID ...] ``` 消費確認增加了消息的可靠性,一般在業務處理完成之后,需要執行 ack 確認消息已經被消費完成,整個流程的執行如下圖所示: ![](https://img.kancloud.cn/f6/7a/f67aa39f5ad521d121faf54c9ced8f29_1030x372.png) 其中“Group”為群組,消費者也就是接收者需要訂閱到群組才能正常獲取到消息。 以上就 Redis 實現消息隊列的四種方式,他們分別是: * 使用 List 實現消息隊列; * 使用 ZSet 實現消息隊列; * 使用發布訂閱者模式實現消息隊列; * 使用 Stream 實現消息隊列。 #### 考點分析 本課時的題目比較全面的考察了面試者對于 Redis 整體知識框架和新版本特性的理解和領悟。早期版本中比較常用的實現消息隊列的方式是 List、ZSet 和發布訂閱者模式,使用 Stream 來實現消息隊列是近兩年才流行起來的方案,并且很多企業也沒有使用到 Redis 5.0 這么新的版本。因此只需回答出前三種就算及格了,而 Stream 方式實現消息隊列屬于附加題,如果面試中能回答上來的話就更好了,它體現了你對新技術的敏感度與對技術的熱愛程度,屬于面試中的加分項。 和此知識點相關的面試題還有以下幾個: * 在 Java 代碼中使用 List 實現消息隊列會有什么問題?應該如何解決? * 在程序中如何使用 Stream 來實現消息隊列? #### 知識擴展 * [ ] 使用 List 實現消息隊列 在 Java 程序中我們需要使用 Redis 客戶端框架來輔助程序操作 Redis,比如 Jedis 框架。 使用 Jedis 框架首先需要在 pom.xml 文件中添加 Jedis 依賴,配置如下: ``` <!-- https://mvnrepository.com/artifact/redis.clients/jedis --> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>${version}</version> </dependency> ``` List 實現消息隊列的完整代碼如下: ``` import redis.clients.jedis.Jedis; publicclass ListMQTest { public static void main(String[] args){ // 啟動一個線程作為消費者 new Thread(() -> consumer()).start(); // 生產者 producer(); } /** * 生產者 */ public static void producer() { Jedis jedis = new Jedis("127.0.0.1", 6379); // 推送消息 jedis.lpush("mq", "Hello, List."); } /** * 消費者 */ public static void consumer() { Jedis jedis = new Jedis("127.0.0.1", 6379); // 消費消息 while (true) { // 獲取消息 String msg = jedis.rpop("mq"); if (msg != null) { // 接收到了消息 System.out.println("接收到消息:" + msg); } } } } ``` 以上程序的運行結果是: ``` 接收到消息:Hello, Java. ``` 但是以上的代碼存在一個問題,可以看出以上消費者的實現是通過 while 無限循環來獲取消息,但如果消息的空閑時間比較長,一直沒有新任務,而 while 循環不會因此停止,它會一直執行循環的動作,這樣就會白白浪費了系統的資源。 此時我們可以借助 Redis 中的阻塞讀來替代 rpop 的方法就可以解決此問題,具體實現代碼如下: ``` import redis.clients.jedis.Jedis; public class ListMQExample { public static void main(String[] args) throws InterruptedException { // 消費者 new Thread(() -> bConsumer()).start(); // 生產者 producer(); } /** * 生產者 */ public static void producer() throws InterruptedException { Jedis jedis = new Jedis("127.0.0.1", 6379); // 推送消息 jedis.lpush("mq", "Hello, Java."); Thread.sleep(1000); jedis.lpush("mq", "message 2."); Thread.sleep(2000); jedis.lpush("mq", "message 3."); } /** * 消費者(阻塞版) */ public static void bConsumer() { Jedis jedis = new Jedis("127.0.0.1", 6379); while (true) { // 阻塞讀 for (String item : jedis.brpop(0,"mq")) { // 讀取到相關數據,進行業務處理 System.out.println(item); } } } } ``` 以上程序的運行結果是: ``` 接收到消息:Hello, Java. ``` 以上代碼是經過改良的,我們使用 brpop 替代 rpop 來讀取最后一條消息,就可以解決 while 循環在沒有數據的情況下,一直循環消耗系統資源的情況了。brpop 中的 b 是 blocking 的意思,表示阻塞讀,也就是當隊列沒有數據時,它會進入休眠狀態,當有數據進入隊列之后,它才會“蘇醒”過來執行讀取任務,這樣就可以解決 while 循環一直執行消耗系統資源的問題了。 使用 Stream 實現消息隊列 在開始實現消息隊列之前,我們必須先創建分組才行,因為消費者需要關聯分組信息才能正常運行,具體實現代碼如下: ``` import com.google.gson.Gson; import redis.clients.jedis.Jedis; import redis.clients.jedis.StreamEntry; import redis.clients.jedis.StreamEntryID; import utils.JedisUtils; import java.util.AbstractMap; import java.util.HashMap; import java.util.List; import java.util.Map; public class StreamGroupExample { private static final String _STREAM_KEY = "mq"; // 流 key private static final String _GROUP_NAME = "g1"; // 分組名稱 private static final String _CONSUMER_NAME = "c1"; // 消費者 1 的名稱 private static final String _CONSUMER2_NAME = "c2"; // 消費者 2 的名稱 public static void main(String[] args) { // 生產者 producer(); // 創建消費組 createGroup(_STREAM_KEY, _GROUP_NAME); // 消費者 1 new Thread(() -> consumer()).start(); // 消費者 2 new Thread(() -> consumer2()).start(); } /** * 創建消費分組 * @param stream 流 key * @param groupName 分組名稱 */ public static void createGroup(String stream, String groupName) { Jedis jedis = JedisUtils.getJedis(); jedis.xgroupCreate(stream, groupName, new StreamEntryID(), true); } /** * 生產者 */ public static void producer() { Jedis jedis = JedisUtils.getJedis(); // 添加消息 1 Map<String, String> map = new HashMap<>(); map.put("data", "redis"); StreamEntryID id = jedis.xadd(_STREAM_KEY, null, map); System.out.println("消息添加成功 ID:" + id); // 添加消息 2 Map<String, String> map2 = new HashMap<>(); map2.put("data", "java"); StreamEntryID id2 = jedis.xadd(_STREAM_KEY, null, map2); System.out.println("消息添加成功 ID:" + id2); } /** * 消費者 1 */ public static void consumer() { Jedis jedis = JedisUtils.getJedis(); // 消費消息 while (true) { // 讀取消息 Map.Entry<String, StreamEntryID> entry = new AbstractMap.SimpleImmutableEntry<>(_STREAM_KEY, new StreamEntryID().UNRECEIVED_ENTRY); // 阻塞讀取一條消息(最大阻塞時間120s) List<Map.Entry<String, List<StreamEntry>>> list = jedis.xreadGroup(_GROUP_NAME, _CONSUMER_NAME, 1, 120 * 1000, true, entry); if (list != null && list.size() == 1) { // 讀取到消息 Map<String, String> content = list.get(0).getValue().get(0).getFields(); // 消息內容 System.out.println("Consumer 1 讀取到消息 ID:" + list.get(0).getValue().get(0).getID() + " 內容:" + new Gson().toJson(content)); } } } /** * 消費者 2 */ public static void consumer2() { Jedis jedis = JedisUtils.getJedis(); // 消費消息 while (true) { // 讀取消息 Map.Entry<String, StreamEntryID> entry = new AbstractMap.SimpleImmutableEntry<>(_STREAM_KEY, new StreamEntryID().UNRECEIVED_ENTRY); // 阻塞讀取一條消息(最大阻塞時間120s) List<Map.Entry<String, List<StreamEntry>>> list = jedis.xreadGroup(_GROUP_NAME, _CONSUMER2_NAME, 1, 120 * 1000, true, entry); if (list != null && list.size() == 1) { // 讀取到消息 Map<String, String> content = list.get(0).getValue().get(0).getFields(); // 消息內容 System.out.println("Consumer 2 讀取到消息 ID:" + list.get(0).getValue().get(0).getID() + " 內容:" + new Gson().toJson(content)); } } } } ``` 以上代碼運行結果如下: ``` 消息添加成功 ID:1580971482344-0 消息添加成功 ID:1580971482415-0 Consumer 1 讀取到消息 ID:1580971482344-0 內容:{"data":"redis"} Consumer 2 讀取到消息 ID:1580971482415-0 內容:{"data":"java"} ``` 其中,jedis.xreadGroup() 方法的第五個參數 noAck 表示是否自動確認消息,如果設置 true 收到消息會自動確認 (ack) 消息,否則需要手動確認。 可以看出,同一個分組內的多個 consumer 會讀取到不同消息,不同的 consumer 不會讀取到分組內的同一條消息。 > 小貼士:Jedis 框架要使用最新版,低版本 block 設置大于 0 時,會出現 bug,拋連接超時異常。 #### 小結 本課時我們講了 Redis 中消息隊列的四種實現方式:List 方式、ZSet 方式、發布訂閱者模式、Stream 方式,其中發布訂閱者模式不支持消息持久化、而其他三種方式支持持久化,并且 Stream 方式支持消費者確認。我們還使用 Jedis 框架完成了 List 和 Stream 的消息隊列功能,需要注意的是在 List 中需要使用 brpop 來讀取消息,而不是 rpop,這樣可以解決沒有任務時 ,while 一直循環浪費系統資源的問題。 #### 課后問答 * 1、Stream 如何手動確認消息呢 講師回復: Streams 有 xack key group-key ID 可以用來確認消息。
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看