### 訂閱模式
消息被路透投遞給多個隊列,一個消息被多個消費者獲取.

### 特點
1. 需要聲明交換機.
2. 交換機類型為fanout.
2. 不需要routingKey.
3. 需要匿名隊列.(是否需要自動刪除)
4. 將匿名隊列和交換機綁定.
### 實例
~~~
const MQURL = "amqp://guest:guest@localhost:5672"
type RabbitMQ struct {
conn *amqp.Connection
channel *amqp.Channel
Exchange string //交換機名
Queue string //隊列名
Key string //路由名
AMQPUrl string //連接URL
}
//創建rabbit實例
func New(exchange, queue, key string) *RabbitMQ {
r := &RabbitMQ{
Exchange: exchange,
Queue: queue,
Key: key,
AMQPUrl: MQURL,
}
var err error
r.conn, err = amqp.Dial(r.AMQPUrl) // :=不能用于結構體賦值
r.FailOnErr(err, "創建Connection失敗")
r.channel, err = r.conn.Channel()
r.FailOnErr(err, "創建channel失敗")
return r
}
//處理錯誤
func (r *RabbitMQ) FailOnErr(err error, msg string) {
if err != nil {
fmt.Printf("%s : %s\n", err, msg)
return
}
}
//創建訂閱模式的實例
func NewRabbitMQPubSub(exchange string) *RabbitMQ {
r := New(exchange, "", "") //隊列名稱為空
var err error
r.conn, err = amqp.Dial(r.AMQPUrl)
r.FailOnErr(err, "創建connection失敗")
r.channel, err = r.conn.Channel()
r.FailOnErr(err, "創建channel失敗")
return r
}
//訂閱模式發布
func (r *RabbitMQ) PublishPub(message string) {
//嘗試創建交換機
err := r.channel.ExchangeDeclare(
r.Exchange,
amqp.ExchangeFanout, //訂閱類型的交換機是fanout
true,
false,
false,
false,
nil,
)
r.FailOnErr(err, "創建路由器失敗")
r.channel.Publish(
r.Exchange,
"", //key是為空的
false,
false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(message),
})
}
//消費
func (r *RabbitMQ) ConsumePub() {
err := r.channel.ExchangeDeclare(
r.Exchange,
amqp.ExchangeFanout,
true,
false,
false,
false,
nil,
)
if err != nil {
r.FailOnErr(err, "創建交換機失敗")
}
//創建隊列
q, err := r.channel.QueueDeclare(
"", //隊列名稱為空
true,
false,
false,
false,
nil,
)
r.FailOnErr(err, "創建隊列失敗")
r.channel.QueueBind(
q.Name, //綁定隨機生成的隊列名稱
"", //pubsub模式下,這里的key為空
r.Exchange,
false,
nil,
)
msg, err := r.channel.Consume(
q.Name,
"",
true,
false,
false,
false,
nil,
)
forever := make(chan struct{})
go func() {
for v := range msg {
fmt.Println(string(v.Body))
}
}()
fmt.Println("等待接受消息")
<-forever
}
~~~
消費端
~~~
r :=rabbitmq.NewRabbitMQPubSub("test")
r.ConsumePub()
~~~
產生端
~~~
r := rabbitmq.NewRabbitMQPubSub("test")
for i := 0; i < 100; i++ {
r.PublishPub("hello: " + strconv.Itoa(i))
}
~~~
同樣的消息,多個消費端都可以收到.


- 定義和特征
- 安裝
- 基本概念
- 插件管理
- 核心概念
- virtual hosts
- connextion
- exchange
- channel
- queue
- binding
- 工作模式
- simple模式
- work模式
- 訂閱模式
- routing模式
- topic模式
- QOS服務質量
- =====分割線=====
- RabbitMQ核心概念
- 初識RabbitMQ
- 什么是AMQP高級消息隊列協議
- AMQP核心概念
- RabbitMQ整體架構模型
- 命令行與管控臺操作
- RabbitMQ消息生產與消費
- RabbitMQ交換機詳解
- 什么是exchange
- direct
- topic
- fanout
- headers
- RabbitMQ綁定,隊列,虛擬主機,消息
- RabbitMQ高級特性
- 消息保障100%投遞成功
- 冪等性概念及業界主流解決方案
- confirm確認消息
- return返回消息
- 自定義消費者
- 消費端限流策略
- 消費端ack與重回隊列機制
- TTL消息
- 死信隊列
- RabbitMQ集群架構