<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ThinkChat2.0新版上線,更智能更精彩,支持會話、畫圖、視頻、閱讀、搜索等,送10W Token,即刻開啟你的AI之旅 廣告
                ### MQ與DB一致性原理(兩方事務) Apache RocketMQ在4.3.0版中已經支持分布式事務消息,這里RocketMQ采用了2PC的思想來實現了提交事務消息,同時增加一個補償邏輯來處理二階段超時或者失敗的消息 ![](https://img.kancloud.cn/3d/92/3d9251b1eb747ae04f4a06ba620a9ea6_1006x477.png) RocketMQ事務消息流程概要 上圖說明了事務消息的大致方案,其中分為兩個流程:正常事務消息的發送及提交、事務消息的補償流程。 1.事務消息發送及提交: (1) 發送消息(half消息)。 (2) 服務端響應消息寫入結果。 (3) 根據發送結果執行本地事務(如果寫入失敗,此時half消息對業務不可見,本地邏輯不執行)。 (4) 根據本地事務狀態執行Commit或者Rollback(Commit操作生成消息索引,消息對消費者可見) 2.補償流程: (1) 對沒有Commit/Rollback的事務消息(pending狀態的消息),從服務端發起一次“回查” (2) Producer收到回查消息,檢查回查消息對應的本地事務的狀態 (3) 根據本地事務狀態,重新Commit或者Rollback 補償階段用于解決消息Commit或者Rollback發生超時或者失敗的情況,MQ內部提供一個名為“事務狀態服務”的服務,此服務會檢查事務消息的狀態,如果發現消息未COMMIT,則通過Producer啟動時注冊的TransactionCheckListener來回調業務系統,業務系統在checkLocalTransactionState方法中檢查DB事務狀態,如果成功,則回復COMMIT_MESSAGE,否則回復ROLLBACK_MESSAGE > 以上SEND_OK、COMMIT_MESSAGE、ROLLBACK_MESSAGE均是client jar提供的狀態,在MQ服務器內部是一個數字 TransactionCheckListener 是在消息的commit或者rollback消息丟失的情況下才會回調(上圖中灰色部分)。這種消息丟失只存在于斷網或者rocketmq集群掛了的情況下。當rocketmq集群掛了,如果采用異步刷盤,存在1s內數據丟失風險,異步刷盤場景下保障事務沒有意義。所以如果要核心業務用Rocketmq解決分布式事務問題,建議選擇同步刷盤模式 1. 事務消息在一階段對用戶不可見 在RocketMQ事務消息的主要流程中,一階段的消息如何對用戶不可見。其中,事務消息相對普通消息最大的特點就是一階段發送的消息對用戶是不可見的。那么,如何做到寫入消息但是對用戶不可見呢?RocketMQ事務消息的做法是:如果消息是half消息,將備份原消息的主題與消息消費隊列,然后改變主題為RMQ\_SYS\_TRANS\_HALF\_TOPIC。由于消費組未訂閱該主題,故消費端無法消費half類型的消息,然后RocketMQ會開啟一個定時任務,從Topic為RMQ\_SYS\_TRANS\_HALF\_TOPIC中拉取消息進行消費,根據生產者組獲取一個服務提供者發送回查事務狀態請求,根據事務狀態來決定是提交或回滾消息。 在RocketMQ中,消息在服務端的存儲結構如下,每條消息都會有對應的索引信息,Consumer通過ConsumeQueue這個二級索引來讀取消息實體內容,其流程如下: ![](https://img.kancloud.cn/74/19/74198d2ce0de3ca61f4642ae5aeda1db_1146x631.png) RocketMQ的具體實現策略是:寫入的如果事務消息,對消息的Topic和Queue等屬性進行替換,同時將原來的Topic和Queue信息存儲到消息的屬性中,正因為消息主題被替換,故消息并不會轉發到該原主題的消息消費隊列,消費者無法感知消息的存在,不會消費。其實改變消息主題是RocketMQ的常用“套路”,回想一下延時消息的實現機制 2.Commit和Rollback操作以及Op消息的引入 在完成一階段寫入一條對用戶不可見的消息后,二階段如果是Commit操作,則需要讓消息對用戶可見;如果是Rollback則需要撤銷一階段的消息。先說Rollback的情況。對于Rollback,本身一階段的消息對用戶是不可見的,其實不需要真正撤銷消息(實際上RocketMQ也無法去真正的刪除一條消息,因為是順序寫文件的)。但是區別于這條消息沒有確定狀態(Pending狀態,事務懸而未決),需要一個操作來標識這條消息的最終狀態。RocketMQ事務消息方案中引入了Op消息的概念,用Op消息標識事務消息已經確定的狀態(Commit或者Rollback)。如果一條事務消息沒有對應的Op消息,說明這個事務的狀態還無法確定(可能是二階段失敗了)。引入Op消息后,事務消息無論是Commit或者Rollback都會記錄一個Op操作。Commit相對于Rollback只是在寫入Op消息前創建Half消息的索引 3.Op消息的存儲和對應關系 RocketMQ將Op消息寫入到全局一個特定的Topic中通過源碼中的方法—TransactionalMessageUtil.buildOpTopic();這個Topic是一個內部的Topic(像Half消息的Topic一樣),不會被用戶消費。Op消息的內容為對應的Half消息的存儲的Offset,這樣通過Op消息能索引到Half消息進行后續的回查操作 ![](https://img.kancloud.cn/44/99/4499058a4335f02d66da2c9d762d16cd_774x522.png) 4.Half消息的索引構建 在執行二階段Commit操作時,需要構建出Half消息的索引。一階段的Half消息由于是寫到一個特殊的Topic,所以二階段構建索引時需要讀取出Half消息,并將Topic和Queue替換成真正的目標的Topic和Queue,之后通過一次普通消息的寫入操作來生成一條對用戶可見的消息。所以RocketMQ事務消息二階段其實是利用了一階段存儲的消息的內容,在二階段時恢復出一條完整的普通消息,然后走一遍消息寫入流程 5.如何處理二階段失敗的消息? 如果在RocketMQ事務消息的二階段過程中失敗了,例如在做Commit操作時,出現網絡問題導致Commit失敗,那么需要通過一定的策略使這條消息最終被Commit。RocketMQ采用了一種補償機制,稱為“回查”。Broker端對未確定狀態的消息發起回查,將消息發送到對應的Producer端(同一個Group的Producer),由Producer根據消息來檢查本地事務的狀態,進而執行Commit或者Rollback。Broker端通過對比Half消息和Op消息進行事務消息的回查并且推進CheckPoint(記錄那些事務消息的狀態是確定的)。 值得注意的是,rocketmq并不會無休止的的信息事務狀態回查,默認回查15次,如果15次回查還是無法得知事務狀態,rocketmq默認回滾該消息
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看