<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ??一站式輕松地調用各大LLM模型接口,支持GPT4、智譜、豆包、星火、月之暗面及文生圖、文生視頻 廣告
                [TOC] # RabbitMQ必備核心知識 現在很多知名的互聯網公司都有用到RabbitMQ,其性能,可擴展性讓很多大公司青睞于使用它,不過想要完全使用好RabbitMQ需要掌握其核心的一些概念,這里就說說掌握RabbitMQ所需的必要知識 ## 生產者與消費者 生產者: 創建消息,然后發送到代理服務器(RabbitMQ)的程序 消費者:連接到代理服務器,并訂閱到隊列上接收消息 ## 消息流程 AMQP協議規定,AMQP消息必須有三部分,交換機,隊列和綁定。生產者把消息發送到交換機,交換機與隊列的綁定關系決定了消息如何路由到特定的隊列,最終被消費者接收。 ![如圖](https://user-gold-cdn.xitu.io/2019/2/14/168ea816e2437865?imageView2/0/w/1280/h/960/format/webp/ignore-error/1) **Note:**消息是不能直接到達隊列(Queue)的 ### 交換機 消息實際上投遞到的是交換機,具體路由到那個隊列由交換機根據路由鍵(routing key)完成。 * 當你發消息到代理服務器時,即便路由鍵是空的,RabbitMQ也會將其和使用的路由鍵進行匹配。如果路由的消息不匹配任何綁定模式,消息將會進入黑洞。 交換機在隊列與消息中間起到了中間層的作用,有了交換機我們可以實現更靈活的功能,RabbitMQ中有三種常用的交換機類型: * direct: 如果路由鍵匹配,消息就投遞到對應的隊列 * fanout:投遞消息給所有綁定在當前交換機上面的隊列 * topic:允許實現有趣的消息通信場景,使得5不同源頭的消息能夠達到同一個隊列。topic隊列名稱有兩個特殊的關鍵字。 * `*`可以替換一個單詞 * `#`可以替換所有的單詞 可以理解,direct為1v1, fanout為1v所有,topic比較靈活,可以1v任意。 ![image.png](https://user-gold-cdn.xitu.io/2019/2/14/168ea816e2cedd45?imageView2/0/w/1280/h/960/format/webp/ignore-error/1) ## 虛擬主機 每一個虛擬主機(vhost)相當于mini版的RabbitMQ服務器,擁有自己的隊列,交換機和綁定,權限... 這使得一個RabbitMQ服務眾多的應用程序,而不會互相沖突。 rabbitMQ默認的虛擬主機為: "/" ,一般我們在創建Rabbit的用戶時會再給用戶分配一個虛擬主機。 操作虛擬主機,除了命令行之外還有一個web管理頁面 ~~~ #創建虛擬主機 rabbitmqctl add vhost [vhost_name] #刪除虛擬主機 rabbitmqctl delete vhost [vhost_name] #列出虛擬主機 rabbitmqctl list_vhosts 復制代碼 ~~~ ![image.png](https://user-gold-cdn.xitu.io/2019/2/14/168ea816e32b1191?imageView2/0/w/1280/h/960/format/webp/ignore-error/1) ## 消息投遞策略 默認情況下RabbitMQ的隊列和交換機在RabbitMQ服務器重啟之后會消失,原因在于隊列和交換機的durable屬性,該屬性默認情況下為false. 能從AMQP服務器崩潰中恢復的消息稱為持久化消息,如果想要從崩潰中恢復那么消息必須 * 投遞模式設置2,來標記消息為持久化 * 發送到持久化的交換機 * 到到持久化的隊列 缺點:消息寫入磁盤性能差很多。除非特別關鍵的消息會使用 ## 關鍵API 以上都是概念性的內容,實際我們還是要通過編程來實現我們的目的,RabbitMQ的客戶端api提供了很多功能,通過看代碼,來了解它的強大之處。 基本步驟之前的[RabbitMQ快速入門](https://link.juejin.im/?target=https%3A%2F%2Fwww.jianshu.com%2Fp%2F7f47bd851c9a)已經提過了,Channel類是關鍵的部分:包含了很多我們想要的功能 ![image.png](https://user-gold-cdn.xitu.io/2019/2/14/168ea816e31bc376?imageView2/0/w/1280/h/960/format/webp/ignore-error/1) ### 消息確認 生成端可以添加監聽事件: ~~~ channel.addConfirmListener(new ConfirmListener() { @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.err.println("-------no ack!-----------"); } @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { System.err.println("-------ack!-----------"); } }); 復制代碼 ~~~ 消費端可以確認消息狀態: ~~~ public class MyConsumer extends DefaultConsumer { private Channel channel ; public MyConsumer(Channel channel) { super(channel); this.channel = channel; } @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.err.println("-----------consume message----------"); System.err.println("body: " + new String(body)); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } if((Integer)properties.getHeaders().get("num") == 0) { channel.basicNack(envelope.getDeliveryTag(), false, true); } else { channel.basicAck(envelope.getDeliveryTag(), false); } } } 復制代碼 ~~~ channel.basicAck與basicNack最后一個參數指定消息是否重回隊列。 ### 監聽不可達消息 我們的消息生產者通過指定交換機和路由鍵來把消息送到隊列中,但有時候指定的路由鍵不存在,或者交換機不存在,那么消息就會return,我們可以通過添加return listener來實現: ~~~ channel.addReturnListener(new ReturnListener() { @Override public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, BasicProperties properties, byte[] body) throws IOException { System.err.println("---------handle return----------"); System.err.println("replyCode: " + replyCode); System.err.println("replyText: " + replyText); System.err.println("exchange: " + exchange); System.err.println("routingKey: " + routingKey); System.err.println("properties: " + properties); System.err.println("body: " + new String(body)); } }); channel.basicPublish(exchange, routingKeyError, true, null, msg.getBytes()); 復制代碼 ~~~ 在basicPublish中的Mandatory要設置為true才會生效,否則broker會刪除該消息 ### 消費端限流 假設MQ服務器上面囤積了成千上萬條的消息的時候,這個時候突然連接消費端,那么巨量的消息全部推過來,但是客戶端無法一次性處理這么多的數據。 在高并發的時候,瞬間產生的流量很大,消息很大,而MQ有個重要的作用就是限流,限流則是消費端做的。 RabbitMQ提供了一種Qos(服務質量保證)功能,即在非自動確認消息的前提下,在一定數量的消息未被消費前,不進行消費新的消息。 ~~~ // prefetchSize消息的限制大小,一般設置為0,在生產端限制 // prefetchCount 我們一次最多消費多少條消息,一般設置為1 // global,一般設置為false,在消費端進行限制 channel.basicQos(int prefetchSize, int prefetchCount, boolean global) // 使用 channel.basicQos(0, 1, false); channel.basicConsume(queueName, false, new MyConsumer(channel)); 復制代碼 ~~~ **Note:**autoAck設置為false, 一定要手工簽收消息 ### 死信隊列(DLX) 當消息在隊列中變成死信,沒有消費者進行消費的時候,消息可能會被重新發布到另外一個隊列中,這個隊列就是死信隊列。 以下情況會導致消息進入死信隊列: * basic.reject/basic.nack 并且 requeue為false(不重回隊列)的時候,消息就是死信 * 消息TTL過期 * 隊列達到最大的長度 死信隊列也是正常的Exchange,和一般的Exchange沒什么區別,不過要做一點操作。 設置死信隊列包括: * 設置Exchange(dlx.exchange名稱隨意),設置Queue(dlx.queue),設置RoutingKey(#) * 創建正常的交換機,隊列,綁定,只不過加上一個參數 arguments.put("x-dead-letter-exchange","dlx.exchange") ~~~ // 這就是一個普通的交換機 和 隊列 以及路由 String exchangeName = "test_dlx_exchange"; String routingKey = "dlx.#"; String queueName = "test_dlx_queue"; channel.exchangeDeclare(exchangeName, "topic", true, false, null); Map<String, Object> agruments = new HashMap<String, Object>(); agruments.put("x-dead-letter-exchange", "dlx.exchange"); //這個agruments屬性,要設置到聲明隊列上 channel.queueDeclare(queueName, true, false, false, agruments); channel.queueBind(queueName, exchangeName, routingKey); //要進行死信隊列的聲明: channel.exchangeDeclare("dlx.exchange", "topic", true, false, null); channel.queueDeclare("dlx.queue", true, false, false, null); channel.queueBind("dlx.queue", "dlx.exchange", "#"); 復制代碼 ~~~ ## 最后 這里主要講了一些使用RabbitMQ中經常涉及到的概念,懂了概念,在進行應用的時候才不至于糊涂。然后列舉了MQ的Java客戶端重要的幾個API。 ## 參考 * 《RabbitMQ實戰》 * RabbitMQ消息中間件繼續精講
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看