<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                企業??AI智能體構建引擎,智能編排和調試,一鍵部署,支持知識庫和私有化部署方案 廣告
                [TOC] 我們平時習慣于使用 Rabbitmq 和 Kafka 作為消息隊列中間件,來給應用程序之間增加異步消息傳遞功能。這兩個中間件都是專業的消息隊列中間件,特性之多超出了大多數人的理解能力。 使用過 Rabbitmq 的同學知道它使用起來有多復雜,發消息之前要創建 Exchange,再創建 Queue,還要將 Queue 和 Exchange 通過某種規則綁定起來,發消息的時候要指定 routing-key,還要控制頭部信息。消費者在消費消息之前也要進行上面一系列的繁瑣過程。但是絕大多數情況下,雖然我們的消息隊列只有一組消費者,但還是需要經歷上面這些繁瑣的過程。 有了 Redis,它就可以讓我們解脫出來,對于那些只有一組消費者的消息隊列,使用 Redis 就可以非常輕松的搞定。Redis 的消息隊列不是專業的消息隊列,它沒有非常多的高級特性,沒有 ack 保證,如果對消息的可靠性有著極致的追求,那么它就不適合使用。 ## 異步消息隊列 Redis 的 list(列表) 數據結構常用來作為異步消息隊列使用,使用`rpush/lpush`操作入隊列,使用`lpop 和 rpop`來出隊列。 ![](https://user-gold-cdn.xitu.io/2018/7/10/1648229e1dbfd776?imageView2/0/w/1280/h/960/format/webp/ignore-error/1) ~~~ > rpush notify-queue apple banana pear (integer) 3 > llen notify-queue (integer) 3 > lpop notify-queue "apple" > llen notify-queue (integer) 2 > lpop notify-queue "banana" > llen notify-queue (integer) 1 > lpop notify-queue "pear" > llen notify-queue (integer) 0 > lpop notify-queue (nil) ~~~ 上面是 rpush 和 lpop 結合使用的例子。還可以使用 lpush 和 rpop 結合使用,效果是一樣的。這里不再贅述。 ## 隊列空了怎么辦? 客戶端是通過隊列的 pop 操作來獲取消息,然后進行處理。處理完了再接著獲取消息,再進行處理。如此循環往復,這便是作為隊列消費者的客戶端的生命周期。 可是如果隊列空了,客戶端就會陷入 pop 的死循環,不停地 pop,沒有數據,接著再 pop,又沒有數據。這就是浪費生命的空輪詢。空輪詢不但拉高了客戶端的 CPU,redis 的 QPS 也會被拉高,如果這樣空輪詢的客戶端有幾十來個,Redis 的慢查詢可能會顯著增多。 通常我們使用 sleep 來解決這個問題,讓線程睡一會,睡個 1s 鐘就可以了。不但客戶端的 CPU 能降下來,Redis 的 QPS 也降下來了。 ~~~ time.sleep(1) # python 睡 1s Thread.sleep(1000) # java 睡 1s ~~~ ![](https://user-gold-cdn.xitu.io/2018/7/10/164822ccec7ccb85?imageView2/0/w/1280/h/960/format/webp/ignore-error/1) ## 隊列延遲 用上面睡眠的辦法可以解決問題。但是有個小問題,那就是睡眠會導致消息的延遲增大。如果只有 1 個消費者,那么這個延遲就是 1s。如果有多個消費者,這個延遲會有所下降,因為每個消費者的睡覺時間是岔開來的。 有沒有什么辦法能顯著降低延遲呢?你當然可以很快想到:那就把睡覺的時間縮短點。這種方式當然可以,不過有沒有更好的解決方案呢?當然也有,那就是 blpop/brpop。 這兩個指令的前綴字符`b`代表的是`blocking`,也就是阻塞讀。 阻塞讀在隊列沒有數據的時候,會立即進入休眠狀態,一旦數據到來,則立刻醒過來。消息的延遲幾乎為零。用`blpop/brpop`替代前面的`lpop/rpop`,就完美解決了上面的問題。 ## 空閑連接自動斷開 你以為上面的方案真的很完美么?先別急著開心,其實他還有個問題需要解決。 什么問題?——**空閑連接**的問題。 如果線程一直阻塞在哪里,Redis 的客戶端連接就成了閑置連接,閑置過久,服務器一般會主動斷開連接,減少閑置資源占用。這個時候`blpop/brpop`會拋出異常來。 所以編寫客戶端消費者的時候要小心,注意捕獲異常,還要重試。 ## 鎖沖突處理 上節課我們講了分布式鎖的問題,但是沒有提到客戶端在處理請求時加鎖沒加成功怎么辦。一般有 3 種策略來處理加鎖失敗: 1. 直接拋出異常,通知用戶稍后重試; 2. sleep 一會再重試; 3. 將請求轉移至延時隊列,過一會再試; **直接拋出特定類型的異常** 這種方式比較適合由用戶直接發起的請求,用戶看到錯誤對話框后,會先閱讀對話框的內容,再點擊重試,這樣就可以起到人工延時的效果。如果考慮到用戶體驗,可以由前端的代碼替代用戶自己來進行延時重試控制。它本質上是對當前請求的放棄,由用戶決定是否重新發起新的請求。 **sleep** sleep 會阻塞當前的消息處理線程,會導致隊列的后續消息處理出現延遲。如果碰撞的比較頻繁或者隊列里消息比較多,sleep 可能并不合適。如果因為個別死鎖的 key 導致加鎖不成功,線程會徹底堵死,導致后續消息永遠得不到及時處理。 **延時隊列** 這種方式比較適合異步消息處理,將當前沖突的請求扔到另一個隊列延后處理以避開沖突。 ## 延時隊列的實現 延時隊列可以通過 Redis 的 zset(有序列表) 來實現。我們將消息序列化成一個字符串作為 zset 的`value`,這個消息的到期處理時間作為`score`,然后用多個線程輪詢 zset 獲取到期的任務進行處理,多個線程是為了保障可用性,萬一掛了一個線程還有其它線程可以繼續處理。因為有多個線程,所以需要考慮并發爭搶任務,確保任務不能被多次執行。 ~~~ def delay(msg): msg.id = str(uuid.uuid4()) # 保證 value 值唯一 value = json.dumps(msg) retry_ts = time.time() + 5 # 5 秒后重試 redis.zadd("delay-queue", retry_ts, value) def loop(): while True: # 最多取 1 條 values = redis.zrangebyscore("delay-queue", 0, time.time(), start=0, num=1) if not values: time.sleep(1) # 延時隊列空的,休息 1s continue value = values[0] # 拿第一條,也只有一條 success = redis.zrem("delay-queue", value) # 從消息隊列中移除該消息 if success: # 因為有多進程并發的可能,最終只會有一個進程可以搶到消息 msg = json.loads(value) handle_msg(msg) ~~~ Redis 的 zrem 方法是多線程多進程爭搶任務的關鍵,它的返回值決定了當前實例有沒有搶到任務,因為 loop 方法可能會被多個線程、多個進程調用,同一個任務可能會被多個進程線程搶到,通過 zrem 來決定唯一的屬主。 同時,我們要注意一定要對 handle\_msg 進行異常捕獲,避免因為個別任務處理問題導致循環異常退出。以下是 Java 版本的延時隊列實現,因為要使用到 Json 序列化,所以還需要 fastjson 庫的支持。 ~~~ import java.lang.reflect.Type; import java.util.Set; import java.util.UUID; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.TypeReference; import redis.clients.jedis.Jedis; public class RedisDelayingQueue<T> { static class TaskItem<T> { public String id; public T msg; } // fastjson 序列化對象中存在 generic 類型時,需要使用 TypeReference private Type TaskType = new TypeReference<TaskItem<T>>() { }.getType(); private Jedis jedis; private String queueKey; public RedisDelayingQueue(Jedis jedis, String queueKey) { this.jedis = jedis; this.queueKey = queueKey; } public void delay(T msg) { TaskItem<T> task = new TaskItem<T>(); task.id = UUID.randomUUID().toString(); // 分配唯一的 uuid task.msg = msg; String s = JSON.toJSONString(task); // fastjson 序列化 jedis.zadd(queueKey, System.currentTimeMillis() + 5000, s); // 塞入延時隊列 ,5s 后再試 } public void loop() { while (!Thread.interrupted()) { // 只取一條 Set<String> values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1); if (values.isEmpty()) { try { Thread.sleep(500); // 歇會繼續 } catch (InterruptedException e) { break; } continue; } String s = values.iterator().next(); if (jedis.zrem(queueKey, s) > 0) { // 搶到了 TaskItem<T> task = JSON.parseObject(s, TaskType); // fastjson 反序列化 this.handleMsg(task.msg); } } } public void handleMsg(T msg) { System.out.println(msg); } public static void main(String[] args) { Jedis jedis = new Jedis(); RedisDelayingQueue<String> queue = new RedisDelayingQueue<>(jedis, "q-demo"); Thread producer = new Thread() { public void run() { for (int i = 0; i < 10; i++) { queue.delay("codehole" + i); } } }; Thread consumer = new Thread() { public void run() { queue.loop(); } }; producer.start(); consumer.start(); try { producer.join(); Thread.sleep(6000); consumer.interrupt(); consumer.join(); } catch (InterruptedException e) { } } } ~~~ ## 進一步優化 上面的算法中同一個任務可能會被多個進程取到之后再使用 zrem 進行爭搶,那些沒搶到的進程都是白取了一次任務,這是浪費。可以考慮使用 lua scripting 來優化一下這個邏輯,將 zrangebyscore 和 zrem 一同挪到服務器端進行原子化操作,這樣多個進程之間爭搶任務時就不會出現這種浪費了。 ## 思考 1. Redis 作為消息隊列為什么不能保證 100% 的可靠性? 因為沒有ask機制,當消費端崩潰后消息丟失。pop出消息后,list 中就沒這個消息了,如果處理消息的程序拿到消息還未處理就掛掉了,那消息就丟失了,所以是不可靠隊列。[https://redis.io/commands/rpoplpush](https://link.juejin.im/?target=https%3A%2F%2Fredis.io%2Fcommands%2Frpoplpush) 這個可以實現可靠隊列 > Rpoplpush 命令用于移除列表的最后一個元素,并將該元素添加到另一個列表并返回。 2. 使用 Lua Scripting 來優化延時隊列的邏輯。
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看