## 5) 存儲線程池及消息隊列
? 我們現在的reporter_service的io入庫操作,完全是在消息的callback中進行的,那么實際上,這回占用我們server的工作線程的阻塞時間,從而浪費cpu。所以我們應該將io的入庫操作,交給一個專門做入庫的消息隊列線程池來做,這樣我們的callback就會立刻返回該業務,從而可以繼續處理下一個conn鏈接的消息事件業務。
? 所以我們就要在此給reporter_service設計一個存儲數據的線程池及配套的消息隊列。當然這里面我們還是直接用寫好的`lars_reactor`框架里的接口即可。
> lars_reporter/src/reporter_service.cpp
```c
#include "lars_reactor.h"
#include "lars.pb.h"
#include "store_report.h"
#include <string>
thread_queue<lars::ReportStatusRequest> **reportQueues = NULL;
int thread_cnt = 0;
void get_report_status(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data)
{
lars::ReportStatusRequest req;
req.ParseFromArray(data, len);
//將上報數據存儲到db
StoreReport sr;
sr.store(req);
//輪詢將消息平均發送到每個線程的消息隊列中
static int index = 0;
//將消息發送給某個線程消息隊列
reportQueues[index]->send(req);
index ++;
index = index % thread_cnt;
}
void create_reportdb_threads()
{
thread_cnt = config_file::instance()->GetNumber("reporter", "db_thread_cnt", 3);
//開線程池的消息隊列
reportQueues = new thread_queue<lars::ReportStatusRequest>*[thread_cnt];
if (reportQueues == NULL) {
fprintf(stderr, "create thread_queue<lars::ReportStatusRequest>*[%d], error", thread_cnt) ;
exit(1);
}
for (int i = 0; i < thread_cnt; i++) {
//給當前線程創建一個消息隊列queue
reportQueues[i] = new thread_queue<lars::ReportStatusRequest>();
if (reportQueues == NULL) {
fprintf(stderr, "create thread_queue error\n");
exit(1);
}
pthread_t tid;
int ret = pthread_create(&tid, NULL, store_main, reportQueues[i]);
if (ret == -1) {
perror("pthread_create");
exit(1);
}
pthread_detach(tid);
}
}
int main(int argc, char **argv)
{
event_loop loop;
//加載配置文件
config_file::setPath("./conf/lars_reporter.conf");
std::string ip = config_file::instance()->GetString("reactor", "ip", "0.0.0.0");
short port = config_file::instance()->GetNumber("reactor", "port", 7779);
//創建tcp server
tcp_server server(&loop, ip.c_str(), port);
//添加數據上報請求處理的消息分發處理業務
server.add_msg_router(lars::ID_ReportStatusRequest, get_report_status);
//為了防止在業務中出現io阻塞,那么需要啟動一個線程池對IO進行操作的,接受業務的請求存儲消息
create_reportdb_threads();
//啟動事件監聽
loop.event_process();
return 0;
}
```
? 這里主線程啟動了線程池,根據配置文件的`db_thread_cnt`數量來開辟。每個線程都會執行`store_main`方法,我們來看一下實現
> lars_reporter/src/store_thread.cpp
```c
#include "lars.pb.h"
#include "lars_reactor.h"
#include "store_report.h"
struct Args
{
thread_queue<lars::ReportStatusRequest>* first;
StoreReport *second;
};
//typedef void io_callback(event_loop *loop, int fd, void *args);
void thread_report(event_loop *loop, int fd, void *args)
{
//1. 從queue里面取出需要report的數據(需要thread_queue)
thread_queue<lars::ReportStatusRequest>* queue = ((Args*)args)->first;
StoreReport *sr = ((Args*)args)->second;
std::queue<lars::ReportStatusRequest> report_msgs;
//1.1 從消息隊列中取出全部的消息元素集合
queue->recv(report_msgs);
while ( !report_msgs.empty() ) {
lars::ReportStatusRequest msg = report_msgs.front();
report_msgs.pop();
//2. 將數據存儲到DB中(需要StoreReport)
sr->store(msg);
}
}
void *store_main(void *args)
{
//得到對應的thread_queue
thread_queue<lars::ReportStatusRequest> *queue = (thread_queue<lars::ReportStatusRequest>*)args;
//定義事件觸發機制
event_loop loop;
//定義一個存儲對象
StoreReport sr;
Args callback_args;
callback_args.first = queue;
callback_args.second = &sr;
queue->set_loop(&loop);
queue->set_callback(thread_report, &callback_args);
//啟動事件監聽
loop.event_process();
return NULL;
}
```
? 每個線程都會綁定一個`thread_queue<lars::ReportStatusRequest>`,然后一個線程里面有一個loop,來監控消息隊列是否有消息事件過來,如果有消息實現過來,針對每個消息會觸發`thread_report()`方法, 在`thread_report()`中,我們就直接將`lars::ReportStatusRequest`消息存儲到db中。
? 那么,由誰來給每個線程的`thread_queue`發送消息呢,就是agent/客戶端發送的請求,我們在處理`lars::ID_ReportStatusRequest` 消息分發業務的時候調用`get_report_status()`來觸發。
> lars_reporter/src/reporter_service.cpp
```c
void get_report_status(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data)
{
lars::ReportStatusRequest req;
req.ParseFromArray(data, len);
//將上報數據存儲到db
StoreReport sr;
sr.store(req);
//輪詢將消息平均發送到每個線程的消息隊列中
static int index = 0;
//將消息發送給某個線程消息隊列
reportQueues[index]->send(req);
index ++;
index = index % thread_cnt;
}
```
? 這里的分發機制,是采用最輪詢的方式,是每個線程依次分配,去調用`thread_queue`的`send()`方法,將消息發送給消息隊列。
? 最后我們進行測試,效果跟之前的效果是一樣的。我們現在已經集成進來了存儲線程池,現在就不用擔心在處理業務的時候,因為DB等的io阻塞,使cpu得不到充分利用了。
---
### 關于作者:
作者:`Aceld(劉丹冰)`
mail: [danbing.at@gmail.com](mailto:danbing.at@gmail.com)
github: [https://github.com/aceld](https://github.com/aceld)
原創書籍: [http://www.hmoore.net/@aceld](http://www.hmoore.net/@aceld)

>**原創聲明:未經作者允許請勿轉載, 如果轉載請注明出處**
- 一、Lars系統概述
- 第1章-概述
- 第2章-項目目錄構建
- 二、Reactor模型服務器框架
- 第1章-項目結構與V0.1雛形
- 第2章-內存管理與Buffer封裝
- 第3章-事件觸發EventLoop
- 第4章-鏈接與消息封裝
- 第5章-Client客戶端模型
- 第6章-連接管理及限制
- 第7章-消息業務路由分發機制
- 第8章-鏈接創建/銷毀Hook機制
- 第9章-消息任務隊列與線程池
- 第10章-配置文件讀寫功能
- 第11章-udp服務與客戶端
- 第12章-數據傳輸協議protocol buffer
- 第13章-QPS性能測試
- 第14章-異步消息任務機制
- 第15章-鏈接屬性設置功能
- 三、Lars系統之DNSService
- 第1章-Lars-dns簡介
- 第2章-數據庫創建
- 第3章-項目目錄結構及環境構建
- 第4章-Route結構的定義
- 第5章-獲取Route信息
- 第6章-Route訂閱模式
- 第7章-Backend Thread實時監控
- 四、Lars系統之Report Service
- 第1章-項目概述-數據表及proto3協議定義
- 第2章-獲取report上報數據
- 第3章-存儲線程池及消息隊列
- 五、Lars系統之LoadBalance Agent
- 第1章-項目概述及構建
- 第2章-主模塊業務結構搭建
- 第3章-Report與Dns Client設計與實現
- 第4章-負載均衡模塊基礎設計
- 第5章-負載均衡獲取Host主機信息API
- 第6章-負載均衡上報Host主機信息API
- 第7章-過期窗口清理與過載超時(V0.5)
- 第8章-定期拉取最新路由信息(V0.6)
- 第9章-負載均衡獲取Route信息API(0.7)
- 第10章-API初始化接口(V0.8)
- 第11章-Lars Agent性能測試工具
- 第12章- Lars啟動工具腳本