# 消息驅動
微服務的目的: 松耦合
事件驅動的優勢:高度解耦
Spring Cloud Stream 的幾個概念 Spring Cloud Stream is a framework for building message-driven microservice applications.
官方定義 Spring Cloud Stream 是一個構建消息驅動微服務的框架。
Spring Cloud Stream Application 應用程序通過 inputs 或者 outputs 來與 Spring Cloud Stream 中binder 交互,通過我們配置來 binding ,而 Spring Cloud Stream 的 binder 負責與中間件交互。所以,我們只需要搞清楚如何與 Spring Cloud Stream 交互就可以方便使用消息驅動的方式
Binder Binder 是 Spring Cloud Stream 的一個抽象概念,是應用與消息中間件之間的粘合劑。目前 Spring Cloud Stream 實現了 Kafka 和 Rabbit MQ 的binder。
通過 binder ,可以很方便的連接中間件,可以動態的改變消息的 destinations(對應于 Kafka 的topic,Rabbit MQ 的 exchanges),這些都可以通過外部配置項來做到。
甚至可以任意的改變中間件的類型而不需要修改一行代碼。
Publish-Subscribe 消息的發布(Publish)和訂閱(Subscribe)是事件驅動的經典模式。Spring Cloud Stream 的數據交互也是基于這個思想。生產者把消息通過某個 topic 廣播出去(Spring Cloud Stream 中的 destinations)。其他的微服務,通過訂閱特定 topic 來獲取廣播出來的消息來觸發業務的進行。
這種模式,極大的降低了生產者與消費者之間的耦合。即使有新的應用的引入,也不需要破壞當前系統的整體結構。
Consumer Groups “Group”,如果使用過 Kafka 的童鞋并不會陌生。Spring Cloud Stream 的這個分組概念的意思基本和 Kafka 一致。
微服務中動態的縮放同一個應用的數量以此來達到更高的處理能力是非常必須的。對于這種情況,同一個事件防止被重復消費,只要把這些應用放置于同一個 “group” 中,就能夠保證消息只會被其中一個應用消費一次。
Durability 消息事件的持久化是必不可少的。Spring Cloud Stream 可以動態的選擇一個消息隊列是持久化,還是 present。
Bindings bindings 是我們通過配置把應用和spring cloud stream 的 binder 綁定在一起,之后我們只需要修改 binding 的配置來達到動態修改topic、exchange、type等一系列信息而不需要修改一行代碼。
基于 RabbitMQ 使用 以下內容源碼: spring cloud demo
消息接收 Spring Cloud Stream 基本用法,需要定義一個接口,如下是內置的一個接口。
~~~
public interface Sink {
String INPUT = "input";
@Input("input")
SubscribableChannel input();
}
~~~
注釋\_\_ @Input\_\_ 對應的方法,需要返回 \_\_ SubscribableChannel \_\_ ,并且參入一個參數值。
這就接口聲明了一個\_\_ binding \_\_命名為 “input” 。
其他內容通過配置指定:
~~~
spring:
cloud:
stream:
bindings:
input:
destination: mqTestDefault
~~~
destination:指定了消息獲取的目的地,對應于MQ就是 exchange,這里的exchange就是 mqTestDefault
~~~
@SpringBootApplication
@EnableBinding(Sink.class)
public class Application {
// 監聽 binding 為 Sink.INPUT 的消息
@StreamListener(Sink.INPUT)
public void input(Message<String> message) {
System.out.println("一般監聽收到:" + message.getPayload());
}
public static void main(String[] args) {
SpringApplication.run(Application.class);
}
}
~~~
定義一個 class (這里直接在啟動類),并且添加注解@EnableBinding(Sink.class) ,其中 Sink 就是上述的接口。同時定義一個方法(此處是 input)標明注解為 \_\_ @StreamListener(Processor.INPUT) \_\_,方法參數為 Message 。
啟動后,默認是會創建一個臨時隊列,臨時隊列綁定的exchange為 “mqTestDefault”,routing key為 “#”。
所有發送 exchange 為“mqTestDefault” 的MQ消息都會被投遞到這個臨時隊列,并且觸發上述的方法。
以上代碼就完成了最基本的消費者部分。
消息發送 消息的發送同消息的接受,都需要定義一個接口,不同的是接口方法的返回對象是 MessageChannel,下面是 Spring Cloud Stream 內置的接口:
~~~
public interface Source {
String OUTPUT = "output";
@Output("output")
MessageChannel output();
}
~~~
這就接口聲明了一個 binding 命名為 “output” ,不同于上述的 “input”,這個binding 聲明了一個消息輸出流,也就是消息的生產者。
~~~
spring:
cloud:
stream:
bindings:
output:
destination: mqTestDefault
contentType: text/plain
~~~
contentType:用于指定消息的類型。具體可以參考 spring cloud stream docs
destination:指定了消息發送的目的地,對應 RabbitMQ,會發送到 exchange 是 mqTestDefault 的所有消息隊列中。
代碼中調用:
~~~
@SpringBootApplication
@EnableBinding(Source.class)
public class Application implements CommandLineRunner {
@Autowired
@Qualifier("output")
MessageChannel output;
@Override
public void run(String... strings) throws Exception {
// 字符串類型發送MQ
System.out.println("字符串信息發送");
output.send(MessageBuilder.withPayload("大家好").build());
}
public static void main(String[] args) {
SpringApplication.run(Application.class);
}
}
~~~
通過注入MessageChannel的方式,發送消息。
通過注入Source 接口的方式,發送消息。 具體可以查看樣例
以上代碼就完成了最基本的生產者部分。
自定義消息發送接收 自定義接口 Spring Cloud Stream 內置了兩種接口,分別定義了 binding 為 “input” 的輸入流,和 “output” 的輸出流,而在我們實際使用中,往往是需要定義各種輸入輸出流。使用方法也很簡單。
~~~
interface OrderProcessor {
String INPUT_ORDER = "inputOrder";
String OUTPUT_ORDER = "outputOrder";
@Input(INPUT_ORDER)
SubscribableChannel inputOrder();
@Output(OUTPUT_ORDER)
MessageChannel outputOrder();
}
~~~
一個接口中,可以定義無數個輸入輸出流,可以根據實際業務情況劃分。上述的接口,定義了一個訂單輸入,和訂單輸出兩個 binding。
使用時,需要在 @EnableBinding 注解中,添加自定義的接口。 使用 @StreamListener 做監聽的時候,需要指定 OrderProcessor.INPUT\_ORDER
~~~
spring:
cloud:
stream:
defaultBinder: defaultRabbit
bindings:
inputOrder:
destination: mqTestOrder
outputOrder:
destination: mqTestOrder
~~~
如上配置,指定了 destination 為 mqTestOrder 的輸入輸出流。
分組與持久化 上述自定義的接口配置中,Spring Cloud Stream 會在 RabbitMQ 中創建一個臨時的隊列,程序關閉,對應的連接關閉的時候,該隊列也會消失。而在實際使用中,我們需要一個持久化的隊列,并且指定一個分組,用于保證應用服務的縮放。
只需要在消費者端的 binding 添加配置項 spring.cloud.stream.bindings.\[channelName\].group = XXX 。對應的隊列就是持久化,并且名稱為:mqTestOrder.XXX。
rabbitMQ routing key 綁定 用慣了 rabbitMQ 的童鞋,在使用的時候,發現 Spring Cloud Stream 的消息投遞,默認是根據 destination + group 進行區分,所有的消息都投遞到 routing key 為 “#‘’ 的消息隊列里。
如果我們需要進一步根據 routing key 來進行區分消息投遞的目的地,或者消息接受,需要進一步配,Spring Cloud Stream 也提供了相關配置:
~~~
spring:
cloud:
stream:
bindings:
inputProductAdd:
destination: mqTestProduct
group: addProductHandler # 擁有 group 默認會持久化隊列
outputProductAdd:
destination: mqTestProduct
rabbit:
bindings:
inputProductAdd:
consumer:
bindingRoutingKey: addProduct.* # 用來綁定消費者的 routing key
outputProductAdd:
producer:
routing-key-expression: '''addProduct.*''' # 需要用這個來指定 RoutingKey
~~~
spring.cloud.stream.rabbit.bindings.\[channelName\].consumer.bindingRoutingKey 指定了生成的消息隊列的routing key
spring.cloud.stream.rabbit.bindings.\[channelName\].producer.routing-key-expression 指定了生產者消息投遞的routing key
DLX 隊列 DLX 作用 DLX:Dead-Letter-Exchange(死信隊列)。利用DLX, 當消息在一個隊列中變成死信(dead message)之后,它能被重新publish到另一個Exchange,這個Exchange就是DLX。消息變成死信一向有一下幾種情況:
消息被拒絕(basic.reject/ basic.nack)并且requeue=false 消息TTL過期(參考:RabbitMQ之TTL(Time-To-Live 過期時間)) 隊列達到最大長度
DLX也是一個正常的Exchange,和一般的Exchange沒有區別,它能在任何的隊列上被指定,實際上就是設置某個隊列的屬性,當這個隊列中有死信時,RabbitMQ就會自動的將這個消息重新發布到設置的Exchange上去,進而被路由到另一個隊列,可以監聽這個隊列中消息做相應的處理。
Spring Cloud Stream 中使用
~~~
spring.cloud.stream.rabbit.bindings.[channelName].consumer.autoBindDlq=true
spring.cloud.stream.rabbit.bindings.[channelName].consumer.republishToDlq=true
~~~
配置說明,可以參考 spring cloud stream rabbitmq consumer properties
結論 Spring Cloud Stream 最大的方便之處,莫過于抽象了事件驅動的一些概念,對于消息中間件的進一步封裝,可以做到代碼層面對中間件的無感知,甚至于動態的切換中間件,切換topic。使得微服務開發的高度解耦,服務可以關注更多自己的業務流程。
# [RabbitMq使用概況](http://localhost:8028/article/rabbitmq)
什么叫消息隊列
消息(Message)是指在應用間傳送的數據。消息可以非常簡單,比如只包含文本字符串,也可以更復雜,可能包含嵌入對象。
\*\*消息隊列(Message Queue)\*\*是一種應用間的通信方式,消息發送后可以立即返回,由消息系統來確保消息的可靠傳遞。消息發布者只管把消息發布到 MQ 中而不用管誰來取,消息使用者只管從 MQ 中取消息而不管是誰發布的。這樣發布者和使用者都不用知道對方的存在。
為何用消息隊列 從上面的描述中可以看出消息隊列是一種應用間的異步協作機制,那什么時候需要使用 MQ 呢?
以常見的訂單系統為例,用戶點擊【下單】按鈕之后的業務邏輯可能包括:扣減庫存、生成相應單據、發紅包、發短信通知。在業務發展初期這些邏輯可能放在一起同步執行,隨著業務的發展訂單量增長,需要提升系統服務的性能,這時可以將一些不需要立即生效的操作拆分出來異步執行,比如發放紅包、發短信通知等。這種場景下就可以用 MQ ,在下單的主流程(比如扣減庫存、生成相應單據)完成之后發送一條消息到 MQ 讓主流程快速完結,而由另外的單獨線程拉取MQ的消息(或者由 MQ 推送消息),當發現 MQ 中有發紅包或發短信之類的消息時,執行相應的業務邏輯。
以上是用于業務解耦的情況,其它常見場景包括最終一致性、廣播、錯峰流控等等。
RabbitMQ 特點 RabbitMQ 是一個由 Erlang 語言開發的 AMQP 的開源實現。
AMQP :Advanced Message Queue,高級消息隊列協議。它是應用層協議的一個開放標準,為面向消息的中間件設計,基于此協議的客戶端與消息中間件可傳遞消息,并不受產品、開發語言等條件的限制。
RabbitMQ 最初起源于金融系統,用于在分布式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。具體特點包括:
可靠性(Reliability) RabbitMQ 使用一些機制來保證可靠性,如持久化、傳輸確認、發布確認。
靈活的路由(Flexible Routing) 在消息進入隊列之前,通過 Exchange 來路由消息的。對于典型的路由功能,RabbitMQ 已經提供了一些內置的 Exchange 來實現。針對更復雜的路由功能,可以將多個 Exchange 綁定在一起,也通過插件機制實現自己的 Exchange 。
消息集群(Clustering) 多個 RabbitMQ 服務器可以組成一個集群,形成一個邏輯 Broker 。
高可用(Highly Available Queues) 隊列可以在集群中的機器上進行鏡像,使得在部分節點出問題的情況下隊列仍然可用。
多種協議(Multi-protocol) RabbitMQ 支持多種消息隊列協議,比如 STOMP、MQTT 等等。
多語言客戶端(Many Clients) RabbitMQ 幾乎支持所有常用語言,比如 Java、.NET、Ruby 等等。
管理界面(Management UI) RabbitMQ 提供了一個易用的用戶界面,使得用戶可以監控和管理消息 Broker 的許多方面。
跟蹤機制(Tracing) 如果消息異常,RabbitMQ 提供了消息跟蹤機制,使用者可以找出發生了什么。
插件機制(Plugin System) RabbitMQ 提供了許多插件,來從多方面進行擴展,也可以編寫自己的插件。
RabbitMQ 中的概念模型 消息模型 所有 MQ 產品從模型抽象上來說都是一樣的過程: 消費者(consumer)訂閱某個隊列。生產者(producer)創建消息,然后發布到隊列(queue)中,最后將消息發送到監聽的消費者。
消息流 RabbitMQ 基本概念 上面只是最簡單抽象的描述,具體到 RabbitMQ 則有更詳細的概念需要解釋。上面介紹過 RabbitMQ 是 AMQP 協議的一個開源實現,所以其內部實際上也是 AMQP 中的基本概念:
RabbitMQ 內部結構 Message 消息,消息是不具名的,它由消息頭和消息體組成。消息體是不透明的,而消息頭則由一系列的可選屬性組成,這些屬性包括routing-key(路由鍵)、priority(相對于其他消息的優先權)、delivery-mode(指出該消息可能需要持久性存儲)等。 Publisher 消息的生產者,也是一個向交換器發布消息的客戶端應用程序。 Exchange 交換器,用來接收生產者發送的消息并將這些消息路由給服務器中的隊列。 Binding 綁定,用于消息隊列和交換器之間的關聯。一個綁定就是基于路由鍵將交換器和消息隊列連接起來的路由規則,所以可以將交換器理解成一個由綁定構成的路由表。 Queue 消息隊列,用來保存消息直到發送給消費者。它是消息的容器,也是消息的終點。一個消息可投入一個或多個隊列。消息一直在隊列里面,等待消費者連接到這個隊列將其取走。 Connection 網絡連接,比如一個TCP連接。 Channel 信道,多路復用連接中的一條獨立的雙向數據流通道。信道是建立在真實的TCP連接內地虛擬連接,AMQP 命令都是通過信道發出去的,不管是發布消息、訂閱隊列還是接收消息,這些動作都是通過信道完成。因為對于操作系統來說建立和銷毀 TCP 都是非常昂貴的開銷,所以引入了信道的概念,以復用一條 TCP 連接。 Consumer 消息的消費者,表示一個從消息隊列中取得消息的客戶端應用程序。 Virtual Host 虛擬主機,表示一批交換器、消息隊列和相關對象。虛擬主機是共享相同的身份認證和加密環境的獨立服務器域。每個 vhost 本質上就是一個 mini 版的 RabbitMQ 服務器,擁有自己的隊列、交換器、綁定和權限機制。vhost 是 AMQP 概念的基礎,必須在連接時指定,RabbitMQ 默認的 vhost 是 / 。 Broker 表示消息隊列服務器實體。 AMQP 中的消息路由 AMQP 中消息的路由過程和 Java 開發者熟悉的 JMS 存在一些差別,AMQP 中增加了 Exchange 和 Binding 的角色。生產者把消息發布到 Exchange 上,消息最終到達隊列并被消費者接收,而 Binding 決定交換器的消息應該發送到那個隊列。
AMQP 的消息路由過程 Exchange 類型 Exchange分發消息時根據類型的不同分發策略有區別,目前共四種類型:direct、fanout、topic、headers 。headers 匹配 AMQP 消息的 header 而不是路由鍵,此外 headers 交換器和 direct 交換器完全一致,但性能差很多,目前幾乎用不到了,所以直接看另外三種類型:
direct
direct 交換器
消息中的路由鍵(routing key)如果和 Binding 中的 binding key 一致, 交換器就將消息發到對應的隊列中。路由鍵與隊列名完全匹配,如果一個隊列綁定到交換機要求路由鍵為“dog”,則只轉發 routing key 標記為“dog”的消息,不會轉發“dog.puppy”,也不會轉發“dog.guard”等等。它是完全匹配、單播的模式。
fanout
fanout 交換器
每個發到 fanout 類型交換器的消息都會分到所有綁定的隊列上去。fanout 交換器不處理路由鍵,只是簡單的將隊列綁定到交換器上,每個發送到交換器的消息都會被轉發到與該交換器綁定的所有隊列上。很像子網廣播,每臺子網內的主機都獲得了一份復制的消息。fanout 類型轉發消息是最快的。
topictopic 交換器
topic 交換器通過模式匹配分配消息的路由鍵屬性,將路由鍵和某個模式進行匹配,此時隊列需要綁定到一個模式上。它將路由鍵和綁定鍵的字符串切分成單詞,這些單詞之間用點隔開。它同樣也會識別兩個通配符:符號“#”和符號“”。#匹配0個或多個單詞,匹配不多不少一個單詞。 RabbitMQ 安裝 一般來說安裝 RabbitMQ 之前要安裝 Erlang ,可以去Erlang官網下載。接著去RabbitMQ官網下載安裝包,之后解壓縮即可。根據操作系統不同官網提供了相應的安裝說明:Windows、Debian / Ubuntu、RPM-based Linux、Mac
如果是Mac 用戶,使用 HomeBrew 來安裝,安裝前要先更新 brew:
brew update 接著安裝 rabbitmq 服務器:
brew install rabbitmq 這樣 RabbitMQ 就安裝好了,安裝過程中會自動其所依賴的 Erlang 。
docker下啟動安裝rabbitmq:
~~~
docker run -d --hostname my-rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3.7.3-management
~~~
RabbitMQ 運行和管理 啟動 啟動很簡單,找到安裝后的 RabbitMQ 所在目錄下的 sbin 目錄,可以看到該目錄下有6個以 rabbitmq 開頭的可執行文件,直接執行 rabbitmq-server 即可,下面將 RabbitMQ 的安裝位置以 . 代替,啟動命令就是: ./sbin/rabbitmq-server 啟動正常的話會看到一些啟動過程信息和最后的 completed with 7 plugins,這也說明啟動的時候默認加載了7個插件。
正常啟動 后臺啟動 如果想讓 RabbitMQ 以守護程序的方式在后臺運行,可以在啟動的時候加上 -detached 參數:
~~~
./sbin/rabbitmq-server -detached
~~~
查詢服務器狀態 sbin 目錄下有個特別重要的文件叫 rabbitmqctl ,它提供了 RabbitMQ 管理需要的幾乎一站式解決方案,絕大部分的運維命令它都可以提供。 查詢 RabbitMQ 服務器的狀態信息可以用參數 status :
~~~
./sbin/rabbitmqctl status
~~~
該命令將輸出服務器的很多信息,比如 RabbitMQ 和 Erlang 的版本、OS 名稱、內存等等
關閉 RabbitMQ 節點 我們知道 RabbitMQ 是用 Erlang 語言寫的,在Erlang 中有兩個概念:節點和應用程序。節點就是 Erlang 虛擬機的每個實例,而多個 Erlang 應用程序可以運行在同一個節點之上。節點之間可以進行本地通信(不管他們是不是運行在同一臺服務器之上)。比如一個運行在節點A上的應用程序可以調用節點B上應用程序的方法,就好像調用本地函數一樣。如果應用程序由于某些原因奔潰,Erlang 節點會自動嘗試重啟應用程序。 如果要關閉整個 RabbitMQ 節點可以用參數 stop :
~~~
./sbin/rabbitmqctl stop
~~~
它會和本地節點通信并指示其干凈的關閉,也可以指定關閉不同的節點,包括遠程節點,只需要傳入參數 -n :
~~~
./sbin/rabbitmqctl -n rabbit@server.example.com stop
~~~
\-n node 默認 node 名稱是 rabbit@server ,如果你的主機名是 server.example.com ,那么 node 名稱就是 rabbit@server.example.com 。
關閉 RabbitMQ 應用程序 如果只想關閉應用程序,同時保持 Erlang 節點運行則可以用 stop\_app:
~~~
./sbin/rabbitmqctl stop_app
~~~
這個命令在后面要講的集群模式中將會很有用。
啟動 RabbitMQ 應用程序
~~~
./sbin/rabbitmqctl start_app
~~~
重置 RabbitMQ 節點
~~~
./sbin/rabbitmqctl reset
~~~
該命令將清除所有的隊列。
查看已聲明的隊列
~~~
./sbin/rabbitmqctl list_queues
~~~
查看交換器
~~~
./sbin/rabbitmqctl list_exchanges
~~~
該命令還可以附加參數,比如列出交換器的名稱、類型、是否持久化、是否自動刪除:
~~~
./sbin/rabbitmqctl list_exchanges name type durable auto_delete
~~~
查看綁定
~~~
./sbin/rabbitmqctl list_bindings
~~~
Java 客戶端訪問 RabbitMQ 支持多種語言訪問,以 Java 為例看下一般使用 RabbitMQ 的步驟。
maven工程的pom文件中添加依賴
~~~
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.1.0</version>
</dependency>
消息生產者
package org.study.rabbitmq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//創建連接工廠
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
//設置 RabbitMQ 地址
factory.setHost("localhost");
//建立到代理服務器到連接
Connection conn = factory.newConnection();
//獲得信道
Channel channel = conn.createChannel();
//聲明交換器
String exchangeName = "hello-exchange";
channel.exchangeDeclare(exchangeName, "direct", true);
String routingKey = "hola";
//發布消息
byte[] messageBodyBytes = "quit".getBytes();
channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);
channel.close();
conn.close();
}
}
~~~
消息消費者
~~~
package org.study.rabbitmq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
factory.setHost("localhost");
//建立到代理服務器到連接
Connection conn = factory.newConnection();
//獲得信道
final Channel channel = conn.createChannel();
//聲明交換器
String exchangeName = "hello-exchange";
channel.exchangeDeclare(exchangeName, "direct", true);
//聲明隊列
String queueName = channel.queueDeclare().getQueue();
String routingKey = "hola";
//綁定隊列,通過鍵 hola 將隊列和交換器綁定起來
channel.queueBind(queueName, exchangeName, routingKey);
while(true) {
//消費消息
boolean autoAck = false;
String consumerTag = "";
channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
String routingKey = envelope.getRoutingKey();
String contentType = properties.getContentType();
System.out.println("消費的路由鍵:" + routingKey);
System.out.println("消費的內容類型:" + contentType);
long deliveryTag = envelope.getDeliveryTag();
//確認消息
channel.basicAck(deliveryTag, false);
System.out.println("消費的消息體內容:");
String bodyStr = new String(body, "UTF-8");
System.out.println(bodyStr);
}
});
}
}
}
~~~
啟動 RabbitMQ 服務器
~~~
./sbin/rabbitmq-server
~~~
運行 Consumer 先運行 Consumer ,這樣當生產者發送消息的時候能在消費者后端看到消息記錄。 運行 Producer 接著運行 Producer ,發布一條消息,在 Consumer 的控制臺能看到接收的消息:
Consumer 控制臺 RabbitMQ 集群 RabbitMQ 最優秀的功能之一就是內建集群,這個功能設計的目的是允許消費者和生產者在節點崩潰的情況下繼續運行,以及通過添加更多的節點來線性擴展消息通信吞吐量。RabbitMQ 內部利用 Erlang 提供的分布式通信框架 OTP 來滿足上述需求,使客戶端在失去一個 RabbitMQ 節點連接的情況下,還是能夠重新連接到集群中的任何其他節點繼續生產、消費消息。
RabbitMQ 集群中的一些概念 RabbitMQ 會始終記錄以下四種類型的內部元數據:
隊列元數據 包括隊列名稱和它們的屬性,比如是否可持久化,是否自動刪除 交換器元數據 交換器名稱、類型、屬性 綁定元數據 內部是一張表格記錄如何將消息路由到隊列 vhost 元數據 為 vhost 內部的隊列、交換器、綁定提供命名空間和安全屬性 在單一節點中,RabbitMQ 會將所有這些信息存儲在內存中,同時將標記為可持久化的隊列、交換器、綁定存儲到硬盤上。存到硬盤上可以確保隊列和交換器在節點重啟后能夠重建。而在集群模式下同樣也提供兩種選擇:存到硬盤上(獨立節點的默認設置),存在內存中。
如果在集群中創建隊列,集群只會在單個節點而不是所有節點上創建完整的隊列信息(元數據、狀態、內容)。結果是只有隊列的所有者節點知道有關隊列的所有信息,因此當集群節點崩潰時,該節點的隊列和綁定就消失了,并且任何匹配該隊列的綁定的新消息也丟失了。還好RabbitMQ 2.6.0之后提供了鏡像隊列以避免集群節點故障導致的隊列內容不可用。
RabbitMQ 集群中可以共享 user、vhost、exchange等,所有的數據和狀態都是必須在所有節點上復制的,例外就是上面所說的消息隊列。RabbitMQ 節點可以動態的加入到集群中。
當在集群中聲明隊列、交換器、綁定的時候,這些操作會直到所有集群節點都成功提交元數據變更后才返回。集群中有內存節點和磁盤節點兩種類型,內存節點雖然不寫入磁盤,但是它的執行比磁盤節點要好。內存節點可以提供出色的性能,磁盤節點能保障配置信息在節點重啟后仍然可用,那集群中如何平衡這兩者呢?
RabbitMQ 只要求集群中至少有一個磁盤節點,所有其他節點可以是內存節點,當節點加入火離開集群時,它們必須要將該變更通知到至少一個磁盤節點。如果只有一個磁盤節點,剛好又是該節點崩潰了,那么集群可以繼續路由消息,但不能創建隊列、創建交換器、創建綁定、添加用戶、更改權限、添加或刪除集群節點。換句話說集群中的唯一磁盤節點崩潰的話,集群仍然可以運行,但知道該節點恢復,否則無法更改任何東西。
RabbitMQ 集群配置和啟動 如果是在一臺機器上同時啟動多個 RabbitMQ 節點來組建集群的話,只用上面介紹的方式啟動第二、第三個節點將會因為節點名稱和端口沖突導致啟動失敗。所以在每次調用 rabbitmq-server 命令前,設置環境變量 RABBITMQ\_NODENAME 和 RABBITMQ\_NODE\_PORT 來明確指定唯一的節點名稱和端口。下面的例子端口號從5672開始,每個新啟動的節點都加1,節點也分別命名為test\_rabbit\_1、test\_rabbit\_2、test\_rabbit\_3。
啟動第1個節點:
~~~
RABBITMQ_NODENAME=test_rabbit_1 RABBITMQ_NODE_PORT=5672 ./sbin/rabbitmq-server -detached
~~~
啟動第2個節點:
~~~
RABBITMQ_NODENAME=test_rabbit_2 RABBITMQ_NODE_PORT=5673 ./sbin/rabbitmq-server -detached
~~~
啟動第2個節點前建議將 RabbitMQ 默認激活的插件關掉,否則會存在使用了某個插件的端口號沖突,導致節點啟動不成功。
現在第2個節點和第1個節點都是獨立節點,它們并不知道其他節點的存在。集群中除第一個節點外后加入的節點需要獲取集群中的元數據,所以要先停止 Erlang 節點上運行的 RabbitMQ 應用程序,并重置該節點元數據,再加入并且獲取集群的元數據,最后重新啟動 RabbitMQ 應用程序。
停止第2個節點的應用程序:
~~~
./sbin/rabbitmqctl -n test_rabbit_2 stop_app
~~~
重置第2個節點元數據:
~~~
./sbin/rabbitmqctl -n test_rabbit_2 reset
~~~
第2節點加入第1個節點組成的集群:
~~~
./sbin/rabbitmqctl -n test_rabbit_2 join_cluster test_rabbit_1@localhost
~~~
啟動第2個節點的應用程序
~~~
./sbin/rabbitmqctl -n test_rabbit_2 start_app
~~~
第3個節點的配置過程和第2個節點類似:
~~~
RABBITMQ_NODENAME=test_rabbit_3 RABBITMQ_NODE_PORT=5674 ./sbin/rabbitmq-server -detached
./sbin/rabbitmqctl -n test_rabbit_3 stop_app
./sbin/rabbitmqctl -n test_rabbit_3 reset
./sbin/rabbitmqctl -n test_rabbit_3 join_cluster test_rabbit_1@localhost
./sbin/rabbitmqctl -n test_rabbit_3 start_app
~~~
RabbitMQ 集群運維 停止某個指定的節點,比如停止第2個節點:
~~~
RABBITMQ_NODENAME=test_rabbit_2 ./sbin/rabbitmqctl stop
~~~
查看節點3的集群狀態:
~~~
./sbin/rabbitmqctl -n test_rabbit_3 cluster_status
~~~