<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ThinkChat2.0新版上線,更智能更精彩,支持會話、畫圖、視頻、閱讀、搜索等,送10W Token,即刻開啟你的AI之旅 廣告
                案例代碼:https://gitee.com/flymini/codes01/tree/master/rabbitmq_/com-learn-rabbitmq01 **** ![](https://img.kancloud.cn/2e/67/2e6750ce18e53e926d773feefc21ba2f_1391x536.jpg) 將通過不同的死信來源演示上圖的消息消費過程。 [TOC] # 1. 消息 TTL 過期 **1. 生產者** ```java public class Producer { private static final String NORMAL_EXCHANGE = "normal_exchange"; public static void main(String[] argv) throws Exception { try (Channel channel = RabbitMQUtils.getChannel()) { channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); //設置消息的 TTL 時間為 10000 ms AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build(); //生產者生產10條消息 for (int i = 1; i < 11; i++) { String message = "info" + i; channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties, message.getBytes()); System.out.println(Producer.class.getSimpleName() + "[生產者發出消息]: " + message); } } } } ``` **2. 兩個消費者** (1)C1消費者 ```java public class Consumer01 { //普通交換機 private static final String NORMAL_EXCHANGE = "normal_exchange"; //死信交換機 private static final String DEAD_EXCHANGE = "dead_exchange"; public static void main(String[] argv) throws Exception { Channel channel = RabbitMQUtils.getChannel(); //聲明普通交換機和死信交換機 channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT); //聲明死信隊列 String deadQueue = "dead-queue"; channel.queueDeclare(deadQueue, false, false, false, null); //死信隊列綁定死信交換機 channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi"); //正常隊列綁定死信隊列信息 Map<String, Object> params = new HashMap<>(16); //正常隊列設置死信交換機。x-dead-letter-exchange是固定值,不可以隨便寫 params.put("x-dead-letter-exchange", DEAD_EXCHANGE); //正常隊列設置死信routing-key。x-dead-letter-routing-key是固定值,不可以隨便寫 params.put("x-dead-letter-routing-key", "lisi"); String normalQueue = "normal-queue"; channel.queueDeclare(normalQueue, false, false, false, params); channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan"); System.out.println(Consumer01.class.getSimpleName() + "[等待接收消息...]"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(Consumer01.class.getSimpleName() + "[接收到的消息]: " + message); }; channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> { }); } } ``` (2)C2消費者 ```java public class Consumer02 { private static final String DEAD_EXCHANGE = "dead_exchange"; public static void main(String[] argv) throws Exception { Channel channel = RabbitMQUtils.getChannel(); channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT); String deadQueue = "dead-queue"; channel.queueDeclare(deadQueue, false, false, false, null); channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi"); System.out.println(Consumer02.class.getSimpleName() + "[等待接收死信隊列消息...]"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(Consumer02.class.getSimpleName() + "[接收到死信隊列的消息]: " + message); }; channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> { }); } } ``` **3. 測試** (1)先啟動C1消費者,然后將其關閉。 (2)啟動生產者生產10條消息,就得到如下信息。 ![](https://img.kancloud.cn/e6/b5/e6b5f292d5c347c2a4ddffd8d459260b_1194x619.jpg) (3)啟動C2消費者,C2消費了死信隊列的消息。 ``` -----------Consumer02消費者收到了死信隊列的消息----------- Consumer02[接收到死信隊列的消息]: info1 Consumer02[接收到死信隊列的消息]: info2 Consumer02[接收到死信隊列的消息]: info3 Consumer02[接收到死信隊列的消息]: info4 Consumer02[接收到死信隊列的消息]: info5 Consumer02[接收到死信隊列的消息]: info6 Consumer02[接收到死信隊列的消息]: info7 Consumer02[接收到死信隊列的消息]: info8 Consumer02[接收到死信隊列的消息]: info9 Consumer02[接收到死信隊列的消息]: info10 ``` <br/> # 2. 隊列達到最大長度 **1. 生產者** ```java public class Producer { private static final String NORMAL_EXCHANGE = "normal_exchange"; public static void main(String[] argv) throws Exception { try (Channel channel = RabbitMQUtils.getChannel()) { channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); //生產者生產10條消息 for (int i = 1; i < 11; i++) { String message = "info" + i; channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null, message.getBytes()); System.out.println(Producer.class.getSimpleName() + "[生產者發出消息]: " + message); } } } } ``` **2. 兩個消費者** (1)C1消費者 ```java public class Consumer01 { //普通交換機名稱 private static final String NORMAL_EXCHANGE = "normal_exchange"; //死信交換機名稱 private static final String DEAD_EXCHANGE = "dead_exchange"; public static void main(String[] argv) throws Exception { Channel channel = RabbitMQUtils.getChannel(); //聲明死信和普通交換機 channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT); //聲明死信隊列 String deadQueue = "dead-queue"; channel.queueDeclare(deadQueue, false, false, false, null); //死信隊列綁定死信交換機 channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi"); //正常隊列綁定死信隊列信息 Map<String, Object> params = new HashMap<>(16); //正常隊列設置死信交換機。x-dead-letter-exchange是固定寫法,不可以自定義 params.put("x-dead-letter-exchange", DEAD_EXCHANGE); //正常隊列設置死信routing-key。x-dead-letter-routing-key是固定寫法,不可以自定義 params.put("x-dead-letter-routing-key", "lisi"); //設置正常隊列長度為6 params.put("x-max-length", 6); String normalQueue = "normal-queue"; channel.queueDeclare(normalQueue, false, false, false, params); channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan"); System.out.println(Consumer01.class.getSimpleName() + "[等待接收消息...]"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(Consumer01.class.getSimpleName() + "[接收到的消息]: " + message); }; channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> { }); } } ``` (2)C2消費者 ```java public class Consumer02 { private static final String DEAD_EXCHANGE = "dead_exchange"; public static void main(String[] argv) throws Exception { Channel channel = RabbitMQUtils.getChannel(); channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT); String deadQueue = "dead-queue"; channel.queueDeclare(deadQueue, false, false, false, null); channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi"); System.out.println(Consumer02.class.getSimpleName() + "[等待接收死信隊列消息...]"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(Consumer02.class.getSimpleName() + "[接收到死信隊列的消息]: " + message); }; channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> { }); } } ``` **3. 測試** (1)刪除隊列。 ```shell [root@localhost ~]# rabbitmqctl delete_queue dead-queue [root@localhost ~]# rabbitmqctl delete_queue normal-queue ``` (2)先啟動C1消費者,然后將其關閉。 (3)啟動生產者生產10條消息,就得到如下信息。 ![](https://img.kancloud.cn/51/98/5198102296c93acef9f6c98af8cdab36_1458x138.jpg) 可以看到因為在C1中設置隊列最大長度為6(`params.put("x-max-length", 6)`),所以正常隊列只能存放6條消息,超過6條消息的被存儲到了死信隊列中。 (4)啟動C2消費者,C2消費了死信隊列的消息。 ``` -----------Consumer02消費者收到了死信隊列的消息----------- Consumer02[接收到死信隊列的消息]: info1 Consumer02[接收到死信隊列的消息]: info2 Consumer02[接收到死信隊列的消息]: info3 Consumer02[接收到死信隊列的消息]: info4 ``` (5)啟動C1消費者,消費正常隊列中的6條消息。 ``` -----------Consumer01消費者收到了死信隊列的消息----------- Consumer01[接收到的消息]: info5 Consumer01[接收到的消息]: info6 Consumer01[接收到的消息]: info7 Consumer01[接收到的消息]: info8 Consumer01[接收到的消息]: info9 Consumer01[接收到的消息]: info10 ``` <br/> # 3. 消息被拒 消息被拒只需要在消費端調用下面的方法即可。 ```java channel.basicReject(long var1, boolean var3) ``` **1. 生產者** ```java public class Producer { private static final String NORMAL_EXCHANGE = "normal_exchange"; public static void main(String[] argv) throws Exception { try (Channel channel = RabbitMQUtils.getChannel()) { channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); //生產者生產10條消息 for (int i = 1; i < 11; i++) { String message = "info" + i; channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null, message.getBytes()); System.out.println(Producer.class.getSimpleName() + "[生產者發出消息]: " + message); } } } } ``` **2. 兩個消費者** (1)C1消費者 ```java public class Consumer01 { //普通交換機名稱 private static final String NORMAL_EXCHANGE = "normal_exchange"; //死信交換機名稱 private static final String DEAD_EXCHANGE = "dead_exchange"; public static void main(String[] argv) throws Exception { Channel channel = RabbitMQUtils.getChannel(); //聲明死信和普通交換機 channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT); //聲明死信隊列 String deadQueue = "dead-queue"; channel.queueDeclare(deadQueue, false, false, false, null); //死信隊列綁定死信交換機與 routingkey channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi"); //正常隊列綁定死信隊列信息 Map<String, Object> params = new HashMap<>(); //正常隊列設置死信交換機。x-dead-letter-exchange是固定寫法,不可以隨便定義 params.put("x-dead-letter-exchange", DEAD_EXCHANGE); //正常隊列設置死信。x-dead-letter-routing-key是固定寫法,不可以隨便定義。 params.put("x-dead-letter-routing-key", "lisi"); String normalQueue = "normal-queue"; channel.queueDeclare(normalQueue, false, false, false, params); channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan"); System.out.println(Consumer01.class.getSimpleName() + "[等待接收消息...]"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); if (message.equals("info5")) { System.out.println(Consumer01.class.getSimpleName() + "[接收到的消息]: " + message + ", 但拒絕簽收該消息."); //拒絕消費info5消息 channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false); } else { System.out.println(Consumer01.class.getSimpleName() + "[接收到的消息]: " + message); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }; boolean autoAck = false; channel.basicConsume(normalQueue, autoAck, deliverCallback, consumerTag -> { }); } } ``` (2)C2消費者 ```java public class Consumer02 { private static final String DEAD_EXCHANGE = "dead_exchange"; public static void main(String[] argv) throws Exception { Channel channel = RabbitMQUtils.getChannel(); channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT); String deadQueue = "dead-queue"; channel.queueDeclare(deadQueue, false, false, false, null); channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi"); System.out.println(Consumer02.class.getSimpleName() + "[等待接收死信隊列消息...]"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(Consumer02.class.getSimpleName() + "[接收到死信隊列的消息]: " + message); }; channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> { }); } } ``` **3. 測試** (1)刪除隊列。 ```shell [root@localhost ~]# rabbitmqctl delete_queue dead-queue [root@localhost ~]# rabbitmqctl delete_queue normal-queue ``` (2)啟動C1消費者,C2消費者,最后啟動生產者。 ``` -----------Producer生產者生產了10條消息----------- Producer[生產者發出消息]: info1 Producer[生產者發出消息]: info2 Producer[生產者發出消息]: info3 Producer[生產者發出消息]: info4 Producer[生產者發出消息]: info5 Producer[生產者發出消息]: info6 Producer[生產者發出消息]: info7 Producer[生產者發出消息]: info8 Producer[生產者發出消息]: info9 Producer[生產者發出消息]: info10 -----------Consumer01消費者拒絕簽收info5消息----------- Consumer01[接收到的消息]: info1 Consumer01[接收到的消息]: info2 Consumer01[接收到的消息]: info3 Consumer01[接收到的消息]: info4 Consumer01[接收到的消息]: info5, 但拒絕簽收該消息. Consumer01[接收到的消息]: info6 Consumer01[接收到的消息]: info7 Consumer01[接收到的消息]: info8 Consumer01[接收到的消息]: info9 Consumer01[接收到的消息]: info10 -----------Consumer02消費者收到了死信隊列的消息----------- Consumer02[接收到死信隊列的消息]: info5 ```
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看