[TOC]
### rabbitMQ 工作隊列 輪詢分發

>[danger] 簡單隊列是一對一的關系,一個生成者對應一個消費者,實際開發中,一般消費者是以業務相結合的,需要時間去處理業務,如果只有一個消費者,那么生產者就會積壓很多消息,消費不出去
*****
代碼演示:
```
'use strict';
const Controller = require('egg').Controller;
/**
* 隊列一對多演示
* 生產者 ----> 隊列 ----> 消費者
* ----> 消費者
----> 消費者
*/
// 頻道名稱
const queueName = 'hasMany'
class UserController extends Controller {
// 生成者
async send() {
const { msg } = this.ctx.query;
//1. 創建頻道
const ch = await this.app.amqplib.createChannel();
// 2. 創建隊列 開啟持久化存儲
await ch.assertQueue(queueName, { durable: true });
// 3. 發送消息
let ok = null;
for(let i=0; i<50; i++) {
// 此時我們確信即使RabbitMQ重新啟動,task_queue隊列也不會丟失。現在我們需要將消息標記為持久性 - 通過使用持久性選項Channel.sendToQueue。
ok = await ch.sendToQueue(queueName, Buffer.from(msg+i), { persistent: true });
}
//4. 關閉連接
await ch.close();
this.ctx.body = ok;
this.ctx.status = 200;
}
// 消費者
async work1() {
// 1. 創建頻道
const ch = await this.app.amqplib.createChannel();
//2. 選擇隊列
await ch.assertQueue(queueName, { durable: true });
// 3. 接收消息 noAck 關閉消息自動確認模式
,需要手動 ack
const resultMsg = await new Promise(resolve => ch.consume(queueName, msg => {
setTimeout(() => {
resolve(msg)
}, 500)
}, { noAck: false }) );
if (resultMsg !== null) {
const { content } = resultMsg;
//消費者發回ack(nowledgement)告訴RabbitMQ已收到,處理了特定消息,RabbitMQ可以自由刪除它
ch.ack(resultMsg);
await ch.close();
this.ctx.body = { work1: content.toString() };
this.ctx.status = 200;
} else {
this.ctx.body = '消費者1號失敗'
this.ctx.status = 500
}
}
async work2() {
// 1. 創建頻道
const ch = await this.app.amqplib.createChannel();
//2. 選擇隊列 RabbitMQ永遠不會丟失我們的隊列。為此,我們需要聲明它是持久的
await ch.assertQueue(queueName, { durable: true });
// 3. 接收消息 noAck 開啟自動確認模式
const resultMsg = await new Promise(resolve => ch.consume(queueName, msg => {
setTimeout(() => {
resolve(msg)
}, 1000)
}, { noAck: false }) );
if (resultMsg !== null) {
const { content } = resultMsg;
ch.ack(resultMsg);
await ch.close();
this.ctx.body = { work2: content.toString() };
this.ctx.status = 200;
} else {
this.ctx.body = '消費者2號失敗'
this.ctx.status = 500
}
}
async work3() {
// 1. 創建頻道
const ch = await this.app.amqplib.createChannel();
//2. 選擇隊列
await ch.assertQueue(queueName, { durable: true });
// 3. 接收消息 noAck 開啟自動確認模式
const resultMsg = await new Promise(resolve => ch.consume(queueName, msg => {
setTimeout(() => {
resolve(msg)
}, 1500)
}, { noAck: false }) );
if (resultMsg !== null) {
const { content } = resultMsg;
//消費者發回ack(nowledgement)告訴RabbitMQ已收到,處理了特定消息,RabbitMQ可以自由刪除它
ch.ack(resultMsg);
await ch.close();
this.ctx.body = { work3: content.toString() };
this.ctx.status = 200;
} else {
this.ctx.body = '消費者3號失敗'
this.ctx.status = 500
}
}
}
module.exports = UserController;
```
- 概述
- 起步
- 跨域配置
- 路徑別名
- 路由
- api版本控制
- 錯誤和異常
- 全局異常處理
- 數據庫
- 創建遷移文件
- sequelize數據類型
- 配置
- 新增
- 查詢
- 條件查詢
- 模糊查詢
- 排序查詢
- 聚合查詢
- 分組查詢
- 分頁查詢
- 修改
- 刪除
- 獲取器
- 修改器
- 靜態屬性
- 字段驗證
- 外鍵約束
- 關聯模型
- 一對一
- 一對多
- 左外連接
- 多對多
- 字段顯示隱藏
- 事務
- 字段自增
- 驗證層
- egg-validate
- indicative驗證器
- egg-validate-plus
- betterValidate
- 校驗規則
- 中間件
- 安全
- 數據加密
- 單向加密
- 示例代碼
- 封裝egg加密
- 上傳
- path模塊
- 單文件上傳
- 多文件上傳
- 按照日期存儲
- 工具函數
- egg常用工具函數
- 緩存
- 配置緩存插件
- 設置緩存
- 獲取緩存
- 刪除緩存
- 消息隊列
- rabbitMQ
- 安裝
- 簡單隊列
- 工作隊列
- 工作隊列(dispach分發)
- 消息應答和持久化
- redis
- 數據類型
- 字符串類型(String)
- 哈希類型(Hash)
- 列表(List)
- 無序集合(Set)
- 可排序集合(Zset)
- 郵件系統
- nodeMailer
- 第三方模塊
- 生成隨機數
- JWT
- JWT鑒權
- 生成Token
- 短信服務
- 阿里大魚短信驗證碼
- 發送短信邏輯
- 阿里短信Node類