<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ## 簡介 顧名思義,延遲隊列就是進入該隊列的消息會被延遲消費的隊列。而一般的隊列,消息一旦入隊了之后就會被消費者馬上消費。 ## 和定時任務區別 >延時任務有別于定時任務,定時任務往往是固定周期的,有明確的觸發時間。 >[warning] 而延時任務一般沒有固定的開始時間,它常常是由一個事件觸發的,而在這個事件觸發之后的一段時間內觸發另一個事件。 > 任務事件生成時并不想讓消費者立即拿到,而是延遲一定時間后才接收到該事件進行消費。 ## 業務場景 - 訂單超時,用戶下單后進入支付頁面(通常會有超時限制)超過15分鐘沒有進行操作,那么這個訂單就需要作廢處理。 - 如何定期檢查處于退款狀態的訂單是否已經退款成功? - 注冊后到現在已經一周的用戶,如何發短信撩動。 - 交易信息雙重效驗防止因系統級/應用級/用戶級等各種異常情況發生后導致的全部/部分丟失的訂單信息。 - 實現重復通知,默認失敗連續通知10次(通知間隔為`n*2+1/min`),直到消費方正確響應,超出推送上限次數后標記為異常狀態,可進行恢復! ## 使用場景 > 延遲隊列多用于需要延遲工作的場景。 最常見的是以下兩種場景: ### 1、延遲消費 1. 用戶生成訂單之后,需要過一段時間校驗訂單的支付狀態,如果訂單仍未支付則需要及時地關閉訂單。 2. 用戶注冊成功之后,需要過一段時間比如一周后校驗用戶的使用情況,如果發現用戶活躍度較低,則發送郵件或者短信來提醒用戶使用。 ### 2、延遲重試 比如消費者從隊列里消費消息時失敗了,但是想要延遲一段時間后自動重試。 >[warning] 如果不使用延遲隊列,那么我們只能通過一個輪詢掃描程序去完成。 ### 掃表存在的問題是 - 掃表與數據庫長時間連接,在數量量大的情況容易出現連接異常中斷,需要更多的異常處理,對程序健壯性要求高 - 在數據量大的情況下延時較高,規定內處理不完,影響業務,雖然可以啟動多個進程來處理,這樣會帶來額外的維護成本,不能從根本上解決。 - 每個業務都要維護一個自己的掃表邏輯。 當業務越來越多時,發現掃表部分的邏輯會重復開發,但是非常類似 ## 緩存隊列設計 ![](https://img.kancloud.cn/9e/bd/9ebdf012c01e3f385b8cac8c9a218cfa_1392x742.png) ## 場景設計 實際的生產場景是筆者負責的某個系統需要對接一個外部的資金方,每一筆資金下單后需要延時30分鐘推送對應的附件。 這里簡化為一個訂單信息數據延遲處理的場景,就是每一筆下單記錄一條訂單消息(暫時叫做`OrderMessage`),訂單消息需要延遲5到15秒后進行異步處理。 ![](https://img.kancloud.cn/08/68/08683eb976b047dac0d1a0b72f7ad4d8_1021x173.png) ## 延時隊列的實現 選用了基于`Redis`的有序集合`Sorted Set`和`Crontab`短輪詢進行實現。 ### 具體方案是: 1. 訂單創建的時候,訂單ID和當前時間戳分別作為`Sorted Set`的`member`和`score`添加到訂單隊列`Sorted Set`中。 2. 訂單創建的時候,訂單ID和推送內容`JSON`字符串分別作為`field`和`value`添加到訂單隊列內容`Hash`中。 3. 第1步和第2步操作的時候用`Lua`腳本保證原子性。 4. 使用一個異步線程通過`Sorted Set`的命令`ZREVRANGEBYSCORE`彈出指定數量的`訂單ID`對應的訂單隊列內容`Hash`中的訂單推送內容數據進行處理。 ### 對于第4點處理有兩種方案: > 處理方案一 彈出訂單內容數據的同時進行數據刪除,也就是`ZREVRANGEBYSCORE`、`ZREM`和`HDEL`命令要在同一個`Lua`腳本中執行,這樣的話`Lua`腳本的編寫難度大,并且由于彈出數據已經在`Redis`中刪除,如果數據處理失敗則可能需要從數據庫重新查詢補償。 > 處理方案二 彈出訂單內容數據之后,在數據處理完成的時候再主動刪除訂單隊列`Sorted Set`和訂單隊列內容`Hash`中對應的數據,這樣的話需要控制并發,有重復執行的可能性。 >[warning] 選用了方案一,也就是從`Sorted Set`彈出訂單ID并且從Hash中獲取完推送數據之后馬上刪除這兩個集合中對應的數據。 方案的流程圖大概是這樣: ![](https://img.kancloud.cn/c9/0a/c90afd71fb917ba12b8878138cd578d4_1094x565.png) ## 相關Redis命令 ### Sorted Set相關命令 >[success] `ZADD`命令 - 將一個或多個成員元素及其分數值加入到有序集當中。 ``` ZADD KEY SCORE1 VALUE1.. SCOREN VALUEN ``` >[success] `ZREVRANGEBYSCORE`命令 - 返回有序集中指定分數區間內的所有的成員。有序集成員按分數值遞減(從大到小)的次序排列。 ``` ZREVRANGEBYSCORE key max min [WITHSCORES] [LIMIT offset count] ``` - max:分數區間 - 最大分數。 - min:分數區間 - 最小分數。 - WITHSCORES:可選參數,是否返回分數值,指定則會返回得分值。 - LIMIT:可選參數,offset和count原理和`MySQL`的`LIMIT offset,size`一致,如果不指定此參數則返回整個集合的數據。 >[success] `ZREM`命令 - 用于移除有序集中的一個或多個成員,不存在的成員將被忽略。 ``` ZREM key member [member ...] ``` ### Hash相關命令 >[success] `HMSET`命令 - 同時將多個field-value(字段-值)對設置到哈希表中。 ``` HMSET KEY_NAME FIELD1 VALUE1 ...FIELDN VALUEN ``` >[success] `HDEL`命令 - 刪除哈希表key中的一個或多個指定字段,不存在的字段將被忽略。 ``` HDEL KEY_NAME FIELD1.. FIELDN ``` ### Lua 語法 * 加載`Lua`腳本并且返回腳本的`SHA-1`字符串:`SCRIPT LOAD script`。 * 執行已經加載的`Lua`腳本:`EVALSHA sha1 numkeys key [key ...] arg [arg ...]`。 * `unpack`函數可以把`table`類型的參數轉化為可變參數,不過需要注意的是`unpack`函數必須使用在非變量定義的函數調用的最后一個參數,否則會失效,詳細見`Stackoverflow`的提問[table.unpack() only returns the first element](https://stackoverflow.com/questions/32439689/table-unpack-only-returns-the-first-element)。 >[warning] 如果不熟悉Lua語言,建議系統學習一下,因為想用好Redis,一定離不開Lua。 ## Lua 腳本 ### 入隊` enqueue.lua` ```lua local zset_key = KEYS[1] local hash_key = KEYS[2] local zset_value = ARGV[1] local zset_score = ARGV[2] local hash_field = ARGV[3] local hash_value = ARGV[4] redis.call('ZADD', zset_key, zset_score, zset_value) redis.call('HSET', hash_key, hash_field, hash_value) return nil ``` > 將任務的執行時間作為score,要執行的任務數據作為value,存放在zset中 ### 出隊 `dequeue.lua` ```lua local zset_key = KEYS[1] local hash_key = KEYS[2] local min_score = ARGV[1] local max_score = ARGV[2] local offset = ARGV[3] local limit = ARGV[4] -- TYPE命令的返回結果是{'ok':'zset'}這樣子,這里利用next做一輪迭代 local status, type = next(redis.call('TYPE', zset_key)) if status ~= nil and status == 'ok' then if type == 'zset' then local list = redis.call('ZREVRANGEBYSCORE', zset_key, max_score, min_score, 'LIMIT', offset, limit) if list ~= nil and #list > 0 then -- unpack函數能把table轉化為可變參數 redis.call('ZREM', zset_key, unpack(list)) local result = redis.call('HMGET', hash_key, unpack(list)) redis.call('HDEL', hash_key, unpack(list)) return result end end end return nil ``` > 如果最小的分數小于等于當前時間戳,就將該任務取出來執行,否則休眠一段時間后再查詢。 >[danger] 注意:這里其實有一個性能隱患,命令`ZREVRANGEBYSCORE`的時間復雜度可以視為為O(N),N是集合的元素個數,由于這里把所有的訂單信息都放進了同一個Sorted Set(ORDER_QUEUE)中,所以在一直有新增數據的時候,`dequeue`腳本的時間復雜度一直比較高,后續訂單量升高之后會此處一定會成為性能瓶頸,后面會給出解決的方案 這里的出隊使用`Crontab` 作為輪訓去查詢消費 ## 業務核心代碼 ### 延遲隊列類 RedisDelayQueue.php ```php <?php /** * @desc Redis 延遲任務隊列 * @author Tinywan(ShaoBo Wan) * @date 2021/03/02 11:36 */ declare(strict_types=1); namespace redis; class RedisDelayQueue { // 生產者 腳本sha值 const DELAY_QUEUE_PRODUCER_SCRIPT_SHA = 'DELAY:QUEUE:PRODUCER:SCRIPT:SHA'; // 消費者 腳本sha值 const DELAY_QUEUE_CONSUMER_SCRIPT_SHA = 'DELAY:QUEUE:CONSUMER:SCRIPT:SHA'; // 訂單關閉 const DELAY_QUEUE_ORDER_CLOSE = 'DELAY:QUEUE:ORDER:CLOSE'; // 訂單關閉詳情哈希 const DELAY_QUEUE_ORDER_CLOSE_HASH = 'DELAY:QUEUE:ORDER:CLOSE:HASH'; /** * Redis 靜態實例 * @return \Redis */ private static function _redis() { $redis = \redis\BaseRedis::server(); $redis->select(3); return $redis; } /** * @desc: 延遲隊列 生產者 * @param string $keys1 * @param string $keys2 * @param string $member * @param int $score * @param array $message * @return mixed */ public static function producer(string $keys1, string $keys2, string $member, int $score, array $message) { $redis = self::_redis(); $scriptSha = $redis->get(self::DELAY_QUEUE_PRODUCER_SCRIPT_SHA); if (!$scriptSha) { $script = <<<luascript redis.call('ZADD', KEYS[1], ARGV[1], ARGV[2]) redis.call('HSET', KEYS[2], ARGV[2], ARGV[3]) return 1 luascript; $scriptSha = $redis->script('load', $script); $redis->set(self::DELAY_QUEUE_PRODUCER_SCRIPT_SHA, $scriptSha); } $hashValue = json_encode($message, JSON_UNESCAPED_UNICODE); return $redis->evalSha($scriptSha, [$keys1, $keys2, $score, $member, $hashValue], 2); } /** * @desc: 延遲隊列 消費者 * @param string $keys1 * @param string $keys2 * @param int $maxScore * @return mixed */ public static function consumer(string $keys1, string $keys2, int $maxScore) { $redis = self::_redis(); $scriptSha = $redis->get(self::DELAY_QUEUE_CONSUMER_SCRIPT_SHA); if (!$scriptSha) { $script = <<<luascript local status, type = next(redis.call('TYPE', KEYS[1])) if status ~= nil and status == 'ok' then if type == 'zset' then local list = redis.call('ZREVRANGEBYSCORE', KEYS[1], ARGV[1], ARGV[2], 'LIMIT', ARGV[3], ARGV[4]) if list ~= nil and #list > 0 then redis.call('ZREM', KEYS[1], unpack(list)) local result = redis.call('HMGET', KEYS[2], unpack(list)) redis.call('HDEL', KEYS[2], unpack(list)) return result end end end luascript; $scriptSha = $redis->script('load', $script); $redis->set(self::DELAY_QUEUE_CONSUMER_SCRIPT_SHA, $scriptSha); } return $redis->evalSha($scriptSha, [$keys1, $keys2, $maxScore, 0, 0, 10], 2); } } ``` > 用redis來實現可以依賴于redis自身的持久化來實現持久化,redis的集群來支持高并發和高可用。因此開發成本很小,可以做到很實時。 ## 腳本命令行 #### 生產者消息 ```php private function delayQueueOrderClose() { $orderId = time(); $keys1 = RedisDelayQueue::DELAY_QUEUE_ORDER_CLOSE; $keys2 = RedisDelayQueue::DELAY_QUEUE_ORDER_CLOSE_HASH; $score = time() + 60; // 延遲60秒執行 $message = [ 'event' => RedisDelayQueue::EVENT_ORDER_CLOSE, 'order_id' => $orderId, 'create_time' => time() ]; $res = RedisDelayQueue::producer($keys1, $keys2, (string) $orderId, $score, $message); var_dump($res); } ``` > 如果是ThinkPHP6 框架,執行該命令則可以生產消息,`php think crontab delay-queue-order-producer` 循環 ```php private function delayOrderProducer() { $keys1 = DelayQueue::KEY_ORDER_CLOSE; $keys2 = DelayQueue::KEY_ORDER_CLOSE_HASH; for ($i = 1; $i <= 10; $i++) { $orderId = 'S' . $i; $score = time(); // 延遲60秒執行 $message = [ 'event' => DelayQueue::EVENT_ORDER_CLOSE, 'order_id' => $orderId, 'create_time' => time() ]; $res = DelayQueue::producer($keys1, $keys2, (string) $orderId, $score, $message); var_dump($res); } } ``` #### 消費者消息 >1、通過Crontab 輪詢執行 ```php private function delayQueueOrderConsumer() { $keys1 = RedisDelayQueue::DELAY_QUEUE_ORDER_CLOSE; $keys2 = RedisDelayQueue::DELAY_QUEUE_ORDER_CLOSE_HASH; $maxScore = time(); $queueList = RedisDelayQueue::consumer($keys1, $keys2, $maxScore); if (false === $queueList) { echo ' [x] Message List is Empty, Try Again ', "\n"; return; } var_dump($queueList); } ``` >[warning] 說明:如果最小的分數小于等于當前時間戳,就將該任務取出來執行,否則休眠一段時間后再查詢 > 2、阻塞執行 ```php private function delayQueueOrderConsumerWhile() { $keys1 = RedisDelayQueue::DELAY_QUEUE_ORDER_CLOSE; $keys2 = RedisDelayQueue::DELAY_QUEUE_ORDER_CLOSE_HASH; while (true) { $maxScore = time(); $queueList = RedisDelayQueue::consumer($keys1, $keys2, $maxScore); if (false === $queueList) { echo ' [x] Message List is Empty, Try Again ', "\n"; sleep(1); continue; } // 處理業務 foreach ($queueList as $queue) { $messageArray = json_decode($queue, true); } } } ``` ## 數據刪除為處理問題 >[danger] 方案一:彈出訂單內容數據的同時進行數據刪除,也就是ZREVRANGEBYSCORE、ZREM和HDEL命令要在同一個Lua腳本中執行,這樣的話Lua腳本的編寫難度大,并且由于彈出數據已經在Redis中刪除,如果數據處理失敗則可能需要從數據庫重新查詢補償。 針對以上的解決方案就是:**消息進入到延遲隊列后,保證至少被消費一次。** - 消費延遲隊列消息后(zset結構中掃描到期的消息),不及時消費 - 把讀取的消息放入一個 redis stream 隊列,同時加入消費組 - 通過消費組消費 redis stream 消費,處理業務邏輯 - Redis Stream 消費組,讀取消息處理并且 `ACK(將消息標記為"已處理")` - 如果消息讀取但是沒處理,則進入XPENDING 列表,進行二次消費并且 `ACK(將消息標記為"已處理")` ## Redis Stream
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看