Routing模式
===
> 一個消息可以被多個消費者獲取.并且消息的目標隊列可以被生產者指定

~~~
// 路由模式: 創建RabbitMQ實例
func NewRabbitMQPubRoutiong(exchangeName,routingKey string) *RabbitMQ {
mq := NewRabbitMQ("", exchangeName, routingKey)
return mq
}
// 路由模式: 發送消息
func (r *RabbitMQ) PublishRouting(message string) error {
// 1.嘗試創建交換機
err := r.channel.ExchangeDeclare(
r.Exchange, // 交換機名稱
"direct",// 交換機類型 [路由模式下 交換機類型 direct ]
true, // 是否持久化
false, // 是否自動刪除
false, // 如果是true表示這個exchange不可以被client用來推送exchange和exchange之間綁定
false, //
nil, //其他參數
)
if err != nil {
return err
}
// 2. 發送消息
err = r.channel.Publish(
r.Exchange,
r.Key,
false,
false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(message),
},
)
return err
}
// 路由模式模式: 消費消息
func (r *RabbitMQ) ConsumptionRouting() {
// 1.嘗試創建交換機
err := r.channel.ExchangeDeclare(
r.Exchange, // 交換機名稱
"direct",// 交換機類型 [路由模式下 交換機類型 direct ]
true, // 是否持久化
false, // 是否自動刪除
false, // 如果是true表示這個exchange不可以被client用來推送exchange和exchange之間綁定
false, //
nil, //其他參數
)
if err != nil {
log.Print(err.Error())
return
}
// 2.試探性創建隊列,注意隊列名稱不要寫
queue, err := r.channel.QueueDeclare(
"", //隨機生產隊列名稱
false,
false,
true, // 排他
false,
nil,
)
if err != nil {
log.Print(err.Error())
return
}
// 3.綁定隊列到exchange中
err = r.channel.QueueBind(
queue.Name, // 隊列名稱
r.Key, // 訂閱寫key必須為空
r.Exchange,// 交換機
false,
nil,
)
if err != nil {
log.Print(err.Error())
return
}
// 4. 消費消息
msgch, err := r.channel.Consume(
queue.Name,
"",
true,
false,
false,
false,
nil,
)
forever := make(chan bool)
go func() {
for {
select {
case data := <-msgch:
fmt.Printf("%s\n",data.Body)
}
}
}()
<-forever
}
~~~
### 生產者:
~~~
func main() {
one := RabbitMQ.NewRabbitMQPubRoutiong("exOne","ones")
two := RabbitMQ.NewRabbitMQPubRoutiong("exOne","twos")
for i:=0 ;i<30;i++ {
one.PublishRouting("Hello one!" + strconv.Itoa(i))
two.PublishRouting("Hello two!" + strconv.Itoa(i))
time.Sleep(time.Second)
}
}
~~~
### 消費者
~~~
package main
import "High-concurrent-spike-system/RabbitMQ"
func main() {
one := RabbitMQ.NewRabbitMQPubRoutiong("exOne","ones")
one.ConsumptionRouting()
}
~~~