<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                合規國際互聯網加速 OSASE為企業客戶提供高速穩定SD-WAN國際加速解決方案。 廣告
                  消息隊列產品有很多,比如 ActiveMQ、RabbitMQ ,ZeroMQ ,RocketMQ ,連 redis 這樣的 NoSQL 數據庫也支持 MQ 功能。總之這塊知名的產品就有十幾種,本文先講 RabbitMQ ,在此之前先看下消息隊列的相關概念。 ## 什么是消息隊列   消息(Message)是指在應用間傳送的數據。消息可以非常簡單,比如只包含文本字符串,也可以更復雜,可能包含嵌入對象。   消息隊列(Message Queue)是一種應用間的通信方式,消息發送后可以立即返回,由消息系統來確保消息的可靠傳遞。消息發布者只管把消息發布到 MQ 中而不用管誰來取,消息使用者只管從 MQ 中取消息而不管是誰發布的。這樣發布者和使用者都不用知道對方的存在。 ## RabbitMQ特點   RabbitMQ 是一個由 Erlang 語言開發的 AMQP 的開源實現。RabbitMQ 最初起源于金融系統,用于在分布式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。具體特點包括: 1. 可靠性(Reliability) RabbitMQ 使用一些機制來保證可靠性,如持久化、傳輸確認、發布確認。 2. 靈活的路由(Flexible Routing) 在消息進入隊列之前,通過 Exchange 來路由消息的。對于典型的路由功能,RabbitMQ 已經提供了一些內置的 Exchange 來實現。針對更復雜的路由功能,可以將多個 Exchange 綁定在一起,也通過插件機制實現自己的 Exchange 。 3. 消息集群(Clustering) 多個 RabbitMQ 服務器可以組成一個集群,形成一個邏輯 Broker 。 4. 高可用(Highly Available Queues) 隊列可以在集群中的機器上進行鏡像,使得在部分節點出問題的情況下隊列仍然可用。 5. 多種協議(Multi-protocol) RabbitMQ 支持多種消息隊列協議,比如 STOMP、MQTT 等等。 6. 多語言客戶端(Many Clients) RabbitMQ 幾乎支持所有常用語言,比如 Java、.NET、Ruby 等等。 7. 管理界面(Management UI) RabbitMQ 提供了一個易用的用戶界面,使得用戶可以監控和管理消息 Broker 的許多方面。 8. 跟蹤機制(Tracing) 如果消息異常,RabbitMQ 提供了消息跟蹤機制,使用者可以找出發生了什么。 9. 插件機制(Plugin System) RabbitMQ 提供了許多插件,來從多方面進行擴展,也可以編寫自己的插件。 RabbitMQ的安裝可以參考博客:   https://www.cnblogs.com/ericli-ericli/p/5902270.html ## RabbitMQ的概念模型 ### 基本模型 所有 MQ 產品從模型抽象上來說都是一樣的過程:   消費者(consumer)訂閱某個隊列。生產者(producer)創建消息,然后發布到隊列(queue)中,最后將消息發送到監聽的消費者。 ![](https://img.kancloud.cn/db/90/db90c08390ac0568025060d70c760174_518x143.png) ### 基本概念 ![](https://img.kancloud.cn/36/6b/366b54b07a20a1297509c1d54916af20_624x178.png) * Message 消息,消息是不具名的,它由消息頭和消息體組成。消息體是不透明的,而消息頭則由一系列的可選屬性組成,這些屬性包括routing-key(路由鍵)、priority(相對于其他消息的優先權)、delivery-mode(指出該消息可能需要持久性存儲)等。 * Publisher 消息的生產者,也是一個向交換器發布消息的客戶端應用程序。 * Exchange 交換器,用來接收生產者發送的消息并將這些消息路由給服務器中的隊列。 * Binding 綁定,用于消息隊列和交換器之間的關聯。一個綁定就是基于路由鍵將交換器和消息隊列連接起來的路由規則,所以可以將交換器理解成一個由綁定構成的路由表。 * Queue 消息隊列,用來保存消息直到發送給消費者。它是消息的容器,也是消息的終點。一個消息可投入一個或多個隊列。消息一直在隊列里面,等待消費者連接到這個隊列將其取走。 * Connection 網絡連接,比如一個TCP連接。 * Channel 信道,多路復用連接中的一條獨立的雙向數據流通道。信道是建立在真實的TCP連接內地虛擬連接,AMQP 命令都是通過信道發出去的,不管是發布消息、訂閱隊列還是接收消息,這些動作都是通過信道完成。因為對于操作系統來說建立和銷毀 TCP 都是非常昂貴的開銷,所以引入了信道的概念,以復用一條 TCP 連接。 * Consumer 消息的消費者,表示一個從消息隊列中取得消息的客戶端應用程序。 * Virtual Host 虛擬主機,表示一批交換器、消息隊列和相關對象。虛擬主機是共享相同的身份認證和加密環境的獨立服務器域。每個 vhost 本質上就是一個 mini 版的 RabbitMQ 服務器,擁有自己的隊列、交換器、綁定和權限機制。vhost 是 AMQP 概念的基礎,必須在連接時指定,RabbitMQ 默認的 vhost 是 / 。 * Broker 表示消息隊列服務器實體。 ## RabbitMQ實現消息傳遞實例 **最簡單的隊列通信** 接收端 ~~~ import pika,time connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel() channel.queue_declare(queue='hello1') def callback(ch, method, properties, body): print('>>',ch,method,properties) time.sleep(30) print(" [x] Received %r" % body) channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue='hello1', no_ack=True ) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming() ~~~ 發送端 ~~~ import pika connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel() channel.queue_declare(queue='hello1') channel.basic_publish(exchange='', routing_key='hello1', body='Hello World!') connection.close() ~~~ PS:**遠程連接rabbitmq server的話,需要配置權限** 可參考博客:https://www.cnblogs.com/alex3714/articles/5248247.html ![](https://img.kancloud.cn/65/1d/651d05999521403668e08bc865722547_523x121.png)   在這種工作模式下,如果啟動了多個接收端,RabbitMQ會默認把p發的消息依次分發給各個消費者(c),相當于輪詢。   但是上面這樣寫的話,隊列里面的消息如果沒有被全部取出來,而且同時RabbitMQ又down機了,那這樣的話隊列里面的消息就都沒有了,所以我們需要做一個**消息的持久化**。 ``` import pika,time connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel() channel.queue_declare(queue='hello',durable=True) def callback(ch, method, properties, body): print('>>',ch,method,properties) time.sleep(30) print(" [x] Received %r" % body) channel.basic_consume(callback, queue='hello', ) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming() receive端 ``` ``` import pika connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel() # 聲明queue channel.queue_declare(queue='hello',durable=True) channel.basic_publish(exchange='', routing_key='hello', body='Hello World!', properties=pika.BasicProperties( delivery_mode=2, # make message persistent )) print(" [x] Sent 'Hello World!'") connection.close() send端 ``` **消息公平分發**   如果Rabbit只管按順序把消息發到各個消費者身上,不考慮消費者負載的話,很可能出現,一個機器配置不高的消費者那里堆積了很多消息處理不完,同時配置高的消費者卻一直很輕松。為解決此問題,**可以在各個消費者端,配置perfetch=1**,意思就是告訴RabbitMQ在我這個消費者當前消息還沒處理完的時候就不要再給我發新消息了。 ~~~ channel.basic_qos(prefetch_count=1) ~~~ ``` import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) print(' [*] Waiting for messages. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] Received %r" % body) time.sleep(60) print(" [x] Done") ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue='task_queue') channel.start_consuming() 接收端 ``` ``` import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) message = "Hello World!" channel.basic_publish(exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode=2, )) connection.close() 發送端 ``` **Publish\\Subscribe(消息發布\\訂閱) **   之前的例子都基本都是1對1的消息發送和接收,即消息只能發送到指定的queue里,但有些時候你想讓你的消息被所有的Queue收到,類似廣播的效果,這時候就要用到exchange了。 fanout:?所有bind到此exchange的queue都可以接收消息 ![](https://img.kancloud.cn/ea/56/ea5672a3851dae462a19f8dad26c5197_693x315.png) ``` import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', exchange_type='fanout') message = "Hello World!" channel.basic_publish(exchange='logs', routing_key='', body=message) print(" [x] Sent %r" % message) connection.close() publisher ``` ``` import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs',exchange_type='fanout') result = channel.queue_declare(exclusive=True) # 不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開后,自動將queue刪除 queue_name = result.method.queue channel.queue_bind(exchange='logs', queue=queue_name) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming() subscriber ``` direct:?通過routingKey和exchange決定的那個唯一的queue可以接收消息 ![](https://img.kancloud.cn/d2/4d/d24d8f22514298b30250a0aa21242d84_494x299.png) ``` import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', exchange_type='direct') severity = sys.argv[1] if len(sys.argv) > 1 else 'info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message) print(" [x] Sent %r:%r" % (severity, message)) connection.close() publisher ``` ``` import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', exchange_type='direct') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue severities = sys.argv[1:] if not severities: sys.stderr.write("Usage: %s info warning error\n" % sys.argv[0]) sys.exit(1) for severity in severities: channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity) def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming() subscriber ``` topic:所有符合routingKey(此時可以是一個表達式)的routingKey所bind的queue可以接收消息 ![](https://img.kancloud.cn/34/1c/341c4448472c9c1e09dc3180cb478d5c_659x258.png) ``` import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='topic_logs', exchange_type='topic') routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='topic_logs', routing_key=routing_key, body=message) print(" [x] Sent %r:%r" % (routing_key, message)) connection.close() publisher ``` ``` import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='topic_logs', exchange_type='topic') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue binding_keys = sys.argv[1:] if not binding_keys: sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0]) sys.exit(1) for binding_key in binding_keys: channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=binding_key) def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming() subscriber ``` PS:表達式符號說明:#代表收所有,\*代表任何字符。(在終端運行,運行命令后面跟上符合條件的topic)
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看