Publish模式
===
Publish/Subscribe,訂閱模式
> 消息被路由投遞個給多個隊列,一個消息被多個消費者獲取 (可以被多個消費者重復消費)

~~~
// 訂閱模式: 創建RabbitMQ實例
func NewRabbitMQPubSub(exchangeName string) *RabbitMQ {
mq := NewRabbitMQ("", exchangeName, "")
return mq
}
// 訂閱模式: 發送消息
func (r *RabbitMQ) PublishPub(message string) error {
// 1.嘗試創建交換機
err := r.channel.ExchangeDeclare(
r.Exchange, // 交換機名稱
"fanout",// 交換機類型 [發布訂閱模式下 交換機類型 fanout ]
true, // 是否持久化
false, // 是否自動刪除
false, // 如果是true表示這個exchange不可以被client用來推送exchange和exchange之間綁定
false, //
nil, //其他參數
)
if err != nil {
return err
}
// 2. 發送消息
err = r.channel.Publish(
r.Exchange,
"",
false,
false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(message),
},
)
return err
}
// 訂閱模式: 消費消息
func (r *RabbitMQ) ConsumptionSub() {
// 1.嘗試創建交換機
err := r.channel.ExchangeDeclare(
r.Exchange, // 交換機名稱
"fanout",// 交換機類型 [發布訂閱模式下 交換機類型 fanout ]
true, // 是否持久化
false, // 是否自動刪除
false, // 如果是true表示這個exchange不可以被client用來推送exchange和exchange之間綁定
false, //
nil, //其他參數
)
if err != nil {
log.Print(err.Error())
return
}
// 2.試探性創建隊列,注意隊列名稱不要寫
queue, err := r.channel.QueueDeclare(
"", //隨機生產隊列名稱
false,
false,
true, // 排他
false,
nil,
)
if err != nil {
log.Print(err.Error())
return
}
// 3.綁定隊列到exchange中
err = r.channel.QueueBind(
queue.Name, // 隊列名稱
"", // 訂閱寫key必須為空
r.Exchange,// 交換機
false,
nil,
)
if err != nil {
log.Print(err.Error())
return
}
// 4. 消費消息
msgch, err := r.channel.Consume(
queue.Name,
"",
true,
false,
false,
false,
nil,
)
forever := make(chan bool)
go func() {
for {
select {
case data := <-msgch:
fmt.Printf("%s\n",data.Body)
}
}
}()
<-forever
}
~~~
### 生產者
~~~
// 生產者
func main() {
sub := RabbitMQ.NewRabbitMQPubSub("newProduct")
for i:=0;i<1000;i++ {
sub.PublishPub("訂閱模式生產第: " + strconv.Itoa(i))
}
}
~~~
### 消費者
~~~
func main() {
sub := RabbitMQ.NewRabbitMQPubSub("newProduct")
sub.ConsumptionSub()
}
~~~