<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ThinkChat2.0新版上線,更智能更精彩,支持會話、畫圖、視頻、閱讀、搜索等,送10W Token,即刻開啟你的AI之旅 廣告
                [TOC] # 1 單機版消息中心 一個消息中心,最基本的需要支持多生產者、多消費者,例如下: ```java class Scratch { public static void main(String[] args) { // 實際中會有 nameserver 服務來找到 broker 具體位置以及 broker 主從信息 Broker broker = new Broker(); Producer producer1 = new Producer(); producer1.connectBroker(broker); Producer producer2 = new Producer(); producer2.connectBroker(broker); Consumer consumer1 = new Consumer(); consumer1.connectBroker(broker); Consumer consumer2 = new Consumer(); consumer2.connectBroker(broker); for (int i = 0; i < 2; i++) { producer1.asyncSendMsg("producer1 send msg" + i); producer2.asyncSendMsg("producer2 send msg" + i); } System.out.println("broker has msg:" + broker.getAllMagByDisk()); for (int i = 0; i < 1; i++) { System.out.println("consumer1 consume msg:" + consumer1.syncPullMsg()); } for (int i = 0; i < 3; i++) { System.out.println("consumer2 consume msg:" + consumer2.syncPullMsg()); } } } class Producer { private Broker broker; public void connectBroker(Broker broker) { this.broker = broker; } public void asyncSendMsg(String msg) { if (broker == null) { throw new RuntimeException("please connect broker first"); } new Thread(() -> { broker.sendMsg(msg); }).start(); } } class Consumer { private Broker broker; public void connectBroker(Broker broker) { this.broker = broker; } public String syncPullMsg() { return broker.getMsg(); } } class Broker { // 對應 RocketMQ 中 MessageQueue,默認情況下 1 個 Topic 包含 4 個 MessageQueue private LinkedBlockingQueue<String> messageQueue = new LinkedBlockingQueue(Integer.MAX_VALUE); // 實際發送消息到 broker 服務器使用 Netty 發送 public void sendMsg(String msg) { try { messageQueue.put(msg); // 實際會同步或異步落盤,異步落盤使用的定時任務定時掃描落盤 } catch (InterruptedException e) { } } public String getMsg() { try { return messageQueue.take(); } catch (InterruptedException e) { } return null; } public String getAllMagByDisk() { StringBuilder sb = new StringBuilder("\n"); messageQueue.iterator().forEachRemaining((msg) -> { sb.append(msg + "\n"); }); return sb.toString(); } } ``` 問題: 1. 沒有實現真正執行消息存儲落盤 2. 沒有實現 NameServer 去作為注冊中心,定位服務 3. 使用 LinkedBlockingQueue 作為消息隊列,注意,參數是無限大,在真正 RocketMQ 也是如此是無限大,理論上不會出現對進來的數據進行拋棄,但是會有內存泄漏問題(阿里巴巴開發手冊也因為這個問題,建議我們使用自制線程池) 4. 沒有使用多個隊列(即多個 LinkedBlockingQueue),RocketMQ 的順序消息是通過生產者和消費者同時使用同一個 MessageQueue 來實現,但是如果我們只有一個 MessageQueue,那我們天然就支持順序消息 5. 沒有使用 MappedByteBuffer 來實現文件映射從而使消息數據落盤非常的快(實際 RocketMQ 使用的是 FileChannel+DirectBuffer) # 2 分布式消息中心 ## 2.1 問題與解決 ### 2.1.1 消息丟失的問題 1. 當你系統需要保證百分百消息不丟失,你可以使用生產者每發送一個消息,Broker 同步返回一個消息發送成功的反饋消息 2. 即每發送一個消息,同步落盤后才返回生產者消息發送成功,這樣只要生產者得到了消息發送生成的返回,事后除了硬盤損壞,都可以保證不會消息丟失 3. 但是這同時引入了一個問題,同步落盤怎么才能快? ### 2.1.2 同步落盤怎么才能快 1. 使用 FileChannel + DirectBuffer 池,使用堆外內存,加快內存拷貝 2. 使用數據和索引分離,當消息需要寫入時,使用 commitlog 文件順序寫,當需要定位某個消息時,查詢index 文件來定位,從而減少文件IO隨機讀寫的性能損耗 ### 2.1.3 消息堆積的問題 1. 后臺定時任務每隔72小時,刪除舊的沒有使用過的消息信息 2. 根據不同的業務實現不同的丟棄任務,具體參考線程池的 AbortPolicy,例如FIFO/LRU等(RocketMQ沒有此策略) 3. 消息定時轉移,或者對某些重要的 TAG 型(支付型)消息真正落庫 ### 2.1.4 定時消息的實現 1. 實際 RocketMQ 沒有實現任意精度的定時消息,它只支持某些特定的時間精度的定時消息 2. 實現定時消息的原理是:創建特定時間精度的 MessageQueue,例如生產者需要定時1s之后被消費者消費,你只需要將此消息發送到特定的 Topic,例如:MessageQueue-1 表示這個 MessageQueue 里面的消息都會延遲一秒被消費,然后 Broker 會在 1s 后發送到消費者消費此消息,使用 newSingleThreadScheduledExecutor 實現 ### 2.1.5 順序消息的實現 1. 與定時消息同原理,生產者生產消息時指定特定的 MessageQueue ,消費者消費消息時,消費特定的 MessageQueue,其實單機版的消息中心在一個 MessageQueue 就天然支持了順序消息 2. 注意:同一個 MessageQueue 保證里面的消息是順序消費的前提是:消費者是串行的消費該 MessageQueue,因為就算 MessageQueue 是順序的,但是當并行消費時,還是會有順序問題,但是串行消費也同時引入了兩個問題: >1. 引入鎖來實現串行 >2. 前一個消費阻塞時后面都會被阻塞 ### 2.1.6 分布式消息的實現 1. 需要前置知識:2PC 2. RocketMQ4.3 起支持,原理為2PC,即兩階段提交,prepared->commit/rollback 3. 生產者發送事務消息,假設該事務消息 Topic 為 Topic1-Trans,Broker 得到后首先更改該消息的 Topic 為 Topic1-Prepared,該 Topic1-Prepared 對消費者不可見。然后定時回調生產者的本地事務A執行狀態,根據本地事務A執行狀態,來是否將該消息修改為 Topic1-Commit 或 Topic1-Rollback,消費者就可以正常找到該事務消息或者不執行等 >注意,就算是事務消息最后回滾了也不會物理刪除,只會邏輯刪除該消息 ### 2.1.7 消息的 push 實現 1. 注意,RocketMQ 已經說了自己會有低延遲問題,其中就包括這個消息的 push 延遲問題 2. 因為這并不是真正的將消息主動的推送到消費者,而是 Broker 定時任務每5s將消息推送到消費者 ### 2.1.8 消息重復發送的避免 1. RocketMQ 會出現消息重復發送的問題,因為在網絡延遲的情況下,這種問題不可避免的發生,如果非要實現消息不可重復發送,那基本太難,因為網絡環境無法預知,還會使程序復雜度加大,因此默認允許消息重復發送 2. RocketMQ 讓使用者在消費者端去解決該問題,即需要消費者端在消費消息時支持冪等性的去消費消息 3. 最簡單的解決方案是每條消費記錄有個消費狀態字段,根據這個消費狀態字段來是否消費或者使用一個集中式的表,來存儲所有消息的消費狀態,從而避免重復消費 4. 具體實現可以查詢關于消息冪等消費的解決方案 ### 2.1.9 廣播消費與集群消費 1. 消息消費區別:廣播消費,訂閱該 Topic 的消息者們都會消費**每個**消息。集群消費,訂閱該 Topic 的消息者們只會有一個去消費**某個**消息 2. 消息落盤區別:具體表現在消息消費進度的保存上。廣播消費,由于每個消費者都獨立的去消費每個消息,因此每個消費者各自保存自己的消息消費進度。而集群消費下,訂閱了某個 Topic,而旗下又有多個 MessageQueue,每個消費者都可能會去消費不同的 MessageQueue,因此總體的消費進度保存在 Broker 上集中的管理 ### 2.1.10 RocketMQ 不使用 ZooKeeper 作為注冊中心的原因,以及自制的 NameServer 優缺點? 1. ZooKeeper 作為支持順序一致性的中間件,在某些情況下,它為了滿足一致性,會丟失一定時間內的可用性,RocketMQ 需要注冊中心只是為了發現組件地址,在某些情況下,RocketMQ 的注冊中心可以出現數據不一致性,這同時也是 NameServer 的缺點,因為 NameServer 集群間互不通信,它們之間的注冊信息可能會不一致 2. 另外,當有新的服務器加入時,NameServer 并不會立馬通知到 Produer,而是由 Produer 定時去請求 NameServer 獲取最新的 Broker/Consumer 信息(這種情況是通過 Producer 發送消息時,負載均衡解決)
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看