<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                合規國際互聯網加速 OSASE為企業客戶提供高速穩定SD-WAN國際加速解決方案。 廣告
                [TOC] ## 1. 消息出來 topic:隊列的集合稱為topic ## 1.1 Simple 消息直接發送,無法保證 ## 1.2 Order ### 1.2.1 使用場景 > 1. 一個生產者可以發送消息給多給topic > 2. 一個topic默認有4個隊列 > 3. producer以roundrobin(輪詢)的方式給多個隊列發送消息 > 4. 同一個隊列消息遵守FIFO > * 順序消費: > 例如在網購的時候,我們需要下單,那么下單需要假如有三個順序,第一、創建訂單 ,第二:訂單付款,第三:訂單完成。也就是這個三個環節要有順序,這個訂單才有意義。 > RocketMQ可以保證順序消費,他的實現是生產者將這個三個消息放在topic的一個隊列里面,單機支持上萬個持久化隊列,消費端去消費的時候也是只能有一個Consumer去取得這個隊列里面的數據,然后順序消費。 > * rocketmq的順序消息需要滿足2點: > 1.Producer端保證發送消息有序,且發送到同一個隊列。 > 2.consumer端只能讓一個consumer保證消費同一個隊列。 ### 1.2.2 使用場景 如何實現 ### 1.2.3 使用場景producer順序發送消息到同一隊列 > 1. 默認的情況下,producer會向topic(隊列的集合,默認四個隊列)中的隊列輪詢式的發生消息,這就不滿足順序消費一系列消息發送到一個隊列的要求,所以要修改向隊列發送消息的方法。 > 2. 重寫MessageQueueSelector,從字面理解就是消息隊列選擇器,非常的貼切!原理就是在隊列數量不變的情況下,通過一系列事務的編號(訂單id)和隊列叔取模 ~~~ SendResult sendResult = producer.send(msg, new MessageQueueSelector() { public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Integer id = (Integer) arg; int index = id % mqs.size(); return mqs.get(index); } }, 3); # 這里的3用于取模運算,相同編號的數據會路由到同一個隊列當中去 ~~~ 這里設置編號為1 ![](https://box.kancloud.cn/e55d1a164eae168f29585825ddc278a9_1737x549.png) 這里設置編號為3,驗證了topic默認四個隊列,且可以指定消息用于取模的id ![](https://box.kancloud.cn/f01f5866332953af0a98498f32d1f5de_1139x484.png) 以上可以保證同一系列事務被發送到了一個隊列當中。 ### 1.2.4 使用場景 某一個Consumer順序消費同一個隊列 通過設置Listener實現 1. MessageListenerOrderly(有序的) 實現了MessageListenerOrderly表示一個隊列只會被一個線程取到,第二個線程無法訪問這個隊列 自動實現順序消費 ~~~ consumer.registerMessageListener(new MessageListenerOrderly() { AtomicLong consumeTimes = new AtomicLong(0); public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { // 設置自動提交 context.setAutoCommit(true); for (MessageExt msg : msgs) { System.out.println(msg + ",內容:" + new String(msg.getBody())); } try { TimeUnit.SECONDS.sleep(5L); } catch (InterruptedException e) { e.printStackTrace(); } ; return ConsumeOrderlyStatus.SUCCESS; } }); ~~~ 2. MessageListenerConcurrently(無序的) 需要把線程池改為單線程模式。 > 1. ConsumeMessageOrderlyService類的start()方法,如果是集群消費,則啟動定時任務,定時向broker發送批量鎖住當前正在消費的隊列集合的消息,具體是consumer端拿到正在消費的隊列集合,發送鎖住隊列的消息至broker,broker端返回鎖住成功的隊列集合。 > consumer收到后,設置是否鎖住標志位。 > 這里注意2個變量: > consumer端的RebalanceImpl里的ConcurrentHashMap processQueueTable,是否鎖住設置在ProcessQueue里。 > broker端的RebalanceLockManager里的ConcurrentHashMap> mqLockTable,這里維護著全局隊列鎖。 > 2. ConsumeMessageOrderlyService.ConsumeRequest的run方法是消費消息,這里還有個MessageQueueLock messageQueueLock,維護當前consumer端的本地隊列鎖。保證當前只有一個線程能夠進行消費。 > 3. 拉到消息存入ProcessQueue,然后判斷,本地是否獲得鎖,全局隊列是否被鎖住,然后從ProcessQueue里取出消息,用MessageListenerOrderly進行消費。 > 拉到消息后調用ProcessQueue.putMessage(final List msgs) 存入,具體是存入TreeMap msgTreeMap。 > 然后是調用ProcessQueue.takeMessags(final int batchSize)消費,具體是把msgTreeMap里消費過的消息,轉移到TreeMap msgTreeMapTemp。 > 4. 本地消費的事務控制,ConsumeOrderlyStatus.SUCCESS(提交),ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT(掛起一會再消費),在此之前還有一個變量ConsumeOrderlyContext context的setAutoCommit()是否自動提交。 > 當SUSPEND_CURRENT_QUEUE_A_MOMENT時,autoCommit設置為true或者false沒有區別,本質跟消費相反,把消息從msgTreeMapTemp轉移回msgTreeMap,等待下次消費。 > 當SUCCESS時,autoCommit設置為true時比設置為false多做了2個動作,consumeRequest.getProcessQueue().commit()和this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), commitOffset, false); > ProcessQueue.commit() :本質是刪除msgTreeMapTemp里的消息,msgTreeMapTemp里的消息在上面消費時從msgTreeMap轉移過來的。 > this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset() :本質是把拉消息的偏移量更新到本地,然后定時更新到broker。 > 那么少了這2個動作會怎么樣呢,隨著消息的消費進行,msgTreeMapTemp里的消息堆積越來越多,消費消息的偏移量一直沒有更新到broker導致consumer每次重新啟動后都要從頭開始重復消費。 > 就算更新了offset到broker,那么msgTreeMapTemp里的消息堆積呢?不知道這算不算bug。 > 所以,還是把autoCommit設置為true比較好。 ## 2. 生產中的使用 ### 2.1 使用注意事項 > 1. 消費者處理MQ消息時必須冪等性(即無論接收到多少相同的消息,執行后的結果一致),如果不具有冪等性,則轉換成冪等性處理方法; > 2. 業務方自己保證每條發送到RocketMQ消息都有唯一的ID,這樣消費者根據消息的唯一ID去重,并確保消息處理成功。 ### 2.2 java 交互RocketMQ #### 2.2.1 producer ~~~ package com.aixin.lovetocar.rocketmq.util; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.MessageQueueSelector; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingException; import java.util.List; /** * Created by dailin on 2018/4/25. */ public class RocketMQProducer { private static DefaultMQProducer defaultMQProducer; /** * @param groupName 指定producer組 * @param nameServer namerserver地址 */ public RocketMQProducer(String groupName, String nameServer) throws MQClientException { defaultMQProducer = new DefaultMQProducer(groupName); defaultMQProducer.setNamesrvAddr(nameServer); defaultMQProducer.start(); //producer開始 } /** * 同步發送消息 * * @param topic * @param tags * @param key * @param data * @throws Exception */ public void sentSynData(String topic, String tags, String key, String data) throws Exception { Message msg = new Message(topic, tags, key, data.getBytes()); //封裝消息 SendResult sendResult = defaultMQProducer.send(msg); //發送消息 System.out.printf("%s%n", sendResult); } /** * 同步發送消息 * * @param topic * @param tags * @param data * @throws MQClientException * @throws RemotingException * @throws InterruptedException * @throws MQBrokerException */ public void sentSynData(String topic, String tags, String data) throws MQClientException, RemotingException, InterruptedException, MQBrokerException { Message msg = new Message(topic, tags, data.getBytes()); //封裝消息 SendResult sendResult = defaultMQProducer.send(msg); //發送消息 System.out.printf("%s%n", sendResult); } /** * 發送順序消息 * * @param topic * @param tags * @param data * @param order * @throws InterruptedException * @throws RemotingException * @throws MQClientException * @throws MQBrokerException */ public void sentOrderDate(String topic, String tags, String key, String data, Integer order) throws InterruptedException, RemotingException, MQClientException, MQBrokerException { Message msg = new Message(topic, tags, key, data.getBytes()); SendResult sendResult = defaultMQProducer.send(msg, new MessageQueueSelector() { public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Integer id = (Integer) arg; int index = id % mqs.size(); return mqs.get(index); } }, order); System.out.println(sendResult); } /** * 發送順序消息 * * @param topic * @param tags * @param data * @param order * @throws InterruptedException * @throws RemotingException * @throws MQClientException * @throws MQBrokerException */ public void sentOrderDate(String topic, String tags, String data, Integer order) throws InterruptedException, RemotingException, MQClientException, MQBrokerException { Message msg = new Message(topic, tags, data.getBytes()); //隊列選擇 SendResult sendResult = defaultMQProducer.send(msg, new MessageQueueSelector() { public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Integer id = (Integer) arg; int index = id % mqs.size(); return mqs.get(index); } }, order); System.out.println(sendResult); } public DefaultMQProducer getDefaultMQProducer() { return defaultMQProducer; } /** * 關閉producer與RocketMQ的連接 */ public void shudownProducer() { defaultMQProducer.shutdown(); } } ~~~
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看