### 10.1 消息任務類型
> lars_reactor/include/task_msg.h
```c
#pragma once
#include "event_loop.h"
struct task_msg
{
enum TASK_TYPE
{
NEW_CONN, //新建鏈接的任務
NEW_TASK, //一般的任務
};
TASK_TYPE type; //任務類型
//任務的一些參數
union {
//針對 NEW_CONN新建鏈接任務,需要傳遞connfd
int connfd;
/*==== 暫時用不上 ==== */
//針對 NEW_TASK 新建任務,
//那么可以給一個任務提供一個回調函數
struct {
void (*task_cb)(event_loop*, void *args);
void *args;
};
};
};
```
? 這里面task_msg一共有兩個類型的type,一個是新鏈接的任務,一個是普通任務。兩個任務所攜帶的參數不同,所以用了一個union。
?
### 10.2 消息任務隊列
> lars_reactor/include/thread_queue.h
```c
#pragma once
#include <queue>
#include <pthread.h>
#include <sys/eventfd.h>
#include <stdio.h>
#include <unistd.h>
#include "event_loop.h"
/*
*
* 每個thread對應的 消息任務隊列
*
* */
template <typename T>
class thread_queue
{
public:
thread_queue()
{
_loop = NULL;
pthread_mutex_init(&_queue_mutex, NULL);
_evfd = eventfd(0, EFD_NONBLOCK);
if (_evfd == -1) {
perror("evenfd(0, EFD_NONBLOCK)");
exit(1);
}
}
~thread_queue()
{
pthread_mutex_destroy(&_queue_mutex);
close(_evfd);
}
//向隊列添加一個任務
void send(const T& task) {
//觸發消息事件的占位傳輸內容
unsigned long long idle_num = 1;
pthread_mutex_lock(&_queue_mutex);
//將任務添加到隊列
_queue.push(task);
//向_evfd寫,觸發對應的EPOLLIN事件,來處理該任務
int ret = write(_evfd, &idle_num, sizeof(unsigned long long));
if (ret == -1) {
perror("_evfd write");
}
pthread_mutex_unlock(&_queue_mutex);
}
//獲取隊列,(當前隊列已經有任務)
void recv(std::queue<T>& new_queue) {
unsigned int long long idle_num = 1;
pthread_mutex_lock(&_queue_mutex);
//把占位的數據讀出來,確保底層緩沖沒有數據存留
int ret = read(_evfd, &idle_num, sizeof(unsigned long long));
if (ret == -1) {
perror("_evfd read");
}
//將當前的隊列拷貝出去,將一個空隊列換回當前隊列,同時清空自身隊列,確保new_queue是空隊列
std::swap(new_queue, _queue);
pthread_mutex_unlock(&_queue_mutex);
}
//設置當前thead_queue是被哪個事件觸發event_loop監控
void set_loop(event_loop *loop) {
_loop = loop;
}
//設置當前消息任務隊列的 每個任務觸發的回調業務
void set_callback(io_callback *cb, void *args = NULL)
{
if (_loop != NULL) {
_loop->add_io_event(_evfd, cb, EPOLLIN, args);
}
}
//得到當前loop
event_loop * get_loop() {
return _loop;
}
private:
int _evfd; //觸發消息任務隊列讀取的每個消息業務的fd
event_loop *_loop; //當前消息任務隊列所綁定在哪個event_loop事件觸發機制中
std::queue<T> _queue; //隊列
pthread_mutex_t _queue_mutex; //進行添加任務、讀取任務的保護鎖
};
```
? 一個模板類,主要是消息任務隊列里的元素類型未必一定是`task_msg`類型。
`thread_queue`需要綁定一個`event_loop`。來觸發消息到達,捕獲消息并且觸發處理消息業務的動作。
? 這里面有個`_evfd`是為了觸發消息隊列消息到達,處理該消息作用的,將`_evfd`加入到對應線程的`event_loop`中,然后再通過`set_callback`設置一個通用的該queue全部消息所觸發的處理業務call_back,在這個call_back里開發者可以自定義實現一些處理業務流程。
1. 通過`send`將任務發送給消息隊列。
2. 通過`event_loop`觸發注冊的io_callback得到消息隊列里的任務。
3. 在io_callback中調用`recv`取得`task`任務,根據任務的不同類型,處理自定義不同業務流程。
### 10.3 線程池
? 接下來,我們定義線程池,將`thread_queue`和`thread_pool`進行關聯。
> lars_reactor/include/thread_pool.h
```c
#pragma once
#include <pthread.h>
#include "task_msg.h"
#include "thread_queue.h"
class thread_pool
{
public:
//構造,初始化線程池, 開辟thread_cnt個
thread_pool(int thread_cnt);
//獲取一個thead
thread_queue<task_msg>* get_thread();
private:
//_queues是當前thread_pool全部的消息任務隊列頭指針
thread_queue<task_msg> ** _queues;
//當前線程池中的線程個數
int _thread_cnt;
//已經啟動的全部therad編號
pthread_t * _tids;
//當前選中的線程隊列下標
int _index;
};
```
**屬性**:
`_queues`:是`thread_queue`集合,和當前線程數量一一對應,每個線程對應一個queue。里面存的元素是`task_msg`。
`_tids`:保存線程池中每個線程的ID。
`_thread_cnt`:當前線程的個數.
`_index`:表示外層在選擇哪個thead處理任務時的一個下標,因為是輪詢處理,所以需要一個下標記錄。
**方法**:
`thread_pool()`:構造函數,初始化線程池。
`get_thread()`:通過輪詢方式,獲取一個線程的thread_queue.
> lars_reactor/src/thread_pool.cpp
```c
#include "thread_pool.h"
#include "event_loop.h"
#include "tcp_conn.h"
#include <unistd.h>
#include <stdio.h>
/*
* 一旦有task消息過來,這個業務是處理task消息業務的主流程
*
* 只要有人調用 thread_queue:: send()方法就會觸發次函數
*/
void deal_task_message(event_loop *loop, int fd, void *args)
{
//得到是哪個消息隊列觸發的
thread_queue<task_msg>* queue = (thread_queue<task_msg>*)args;
//將queue中的全部任務取出來
std::queue<task_msg> tasks;
queue->recv(tasks);
while (tasks.empty() != true) {
task_msg task = tasks.front();
//彈出一個元素
tasks.pop();
if (task.type == task_msg::NEW_CONN) {
//是一個新建鏈接的任務
//并且將這個tcp_conn加入當當前線程的loop中去監聽
tcp_conn *conn = new tcp_conn(task.connfd, loop);
if (conn == NULL) {
fprintf(stderr, "in thread new tcp_conn error\n");
exit(1);
}
printf("[thread]: get new connection succ!\n");
}
else if (task.type == task_msg::NEW_TASK) {
//是一個新的普通任務
//TODO
}
else {
//其他未識別任務
fprintf(stderr, "unknow task!\n");
}
}
}
//一個線程的主業務main函數
void *thread_main(void *args)
{
thread_queue<task_msg> *queue = (thread_queue<task_msg>*)args;
//每個線程都應該有一個event_loop來監控客戶端鏈接的讀寫事件
event_loop *loop = new event_loop();
if (loop == NULL) {
fprintf(stderr, "new event_loop error\n");
exit(1);
}
//注冊一個觸發消息任務讀寫的callback函數
queue->set_loop(loop);
queue->set_callback(deal_task_message, queue);
//啟動阻塞監聽
loop->event_process();
return NULL;
}
thread_pool::thread_pool(int thread_cnt)
{
_index = 0;
_queues = NULL;
_thread_cnt = thread_cnt;
if (_thread_cnt <= 0) {
fprintf(stderr, "_thread_cnt < 0\n");
exit(1);
}
//任務隊列的個數和線程個數一致
_queues = new thread_queue<task_msg>*[thread_cnt];
_tids = new pthread_t[thread_cnt];
int ret;
for (int i = 0; i < thread_cnt; ++i) {
//創建一個線程
printf("create %d thread\n", i);
//給當前線程創建一個任務消息隊列
_queues[i] = new thread_queue<task_msg>();
ret = pthread_create(&_tids[i], NULL, thread_main, _queues[i]);
if (ret == -1) {
perror("thread_pool, create thread");
exit(1);
}
//將線程脫離
pthread_detach(_tids[i]);
}
}
thread_queue<task_msg>* thread_pool::get_thread()
{
if (_index == _thread_cnt) {
_index = 0;
}
return _queues[_index];
}
```
? 這里主要看`deal_task_message()`方法,是處理收到的task任務的。目前我們只對`NEW_CONN`類型的任務進行處理,一般任務先不做處理,因為暫時用不上。
? `NEW_CONN`的處理主要是讓當前線程創建鏈接,并且將該鏈接由當前線程的event_loop接管。
? 接下來我們就要將線程池添加到reactor框架中去。
### 10.4 reactor線程池關聯
? 將線程池添加到`tcp_server`中。
> lars_reactor/include/tcp_server.h
```c
#pragma once
#include <netinet/in.h>
#include "event_loop.h"
#include "tcp_conn.h"
#include "message.h"
#include "thread_pool.h"
class tcp_server
{
public:
// ...
// ...
private:
// ...
//線程池
thread_pool *_thread_pool;
};
```
在構造函數中,添加_thread_pool的初始化工作。并且在accept成功之后交給線程處理客戶端的讀寫事件。
```c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <strings.h>
#include <unistd.h>
#include <signal.h>
#include <sys/types.h> /* See NOTES */
#include <sys/socket.h>
#include <arpa/inet.h>
#include <errno.h>
#include "tcp_server.h"
#include "tcp_conn.h"
#include "reactor_buf.h"
//server的構造函數
tcp_server::tcp_server(event_loop *loop, const char *ip, uint16_t port)
{
// ...
//6 創建鏈接管理
_max_conns = MAX_CONNS;
//創建鏈接信息數組
conns = new tcp_conn*[_max_conns+3];//3是因為stdin,stdout,stderr 已經被占用,再新開fd一定是從3開始,所以不加3就會棧溢出
if (conns == NULL) {
fprintf(stderr, "new conns[%d] error\n", _max_conns);
exit(1);
}
//7 =============創建線程池=================
int thread_cnt = 3;//TODO 從配置文件中讀取
if (thread_cnt > 0) {
_thread_pool = new thread_pool(thread_cnt);
if (_thread_pool == NULL) {
fprintf(stderr, "tcp_server new thread_pool error\n");
exit(1);
}
}
// ========================================
//8 注冊_socket讀事件-->accept處理
_loop->add_io_event(_sockfd, accept_callback, EPOLLIN, this);
}
//開始提供創建鏈接服務
void tcp_server::do_accept()
{
int connfd;
while(true) {
//accept與客戶端創建鏈接
printf("begin accept\n");
connfd = accept(_sockfd, (struct sockaddr*)&_connaddr, &_addrlen);
if (connfd == -1) {
if (errno == EINTR) {
fprintf(stderr, "accept errno=EINTR\n");
continue;
}
else if (errno == EMFILE) {
//建立鏈接過多,資源不夠
fprintf(stderr, "accept errno=EMFILE\n");
}
else if (errno == EAGAIN) {
fprintf(stderr, "accept errno=EAGAIN\n");
break;
}
else {
fprintf(stderr, "accept error\n");
exit(1);
}
}
else {
//accept succ!
int cur_conns;
get_conn_num(&cur_conns);
//1 判斷鏈接數量
if (cur_conns >= _max_conns) {
fprintf(stderr, "so many connections, max = %d\n", _max_conns);
close(connfd);
}
else {
// ========= 將新連接由線程池處理 ==========
if (_thread_pool != NULL) {
//啟動多線程模式 創建鏈接
//1 選擇一個線程來處理
thread_queue<task_msg>* queue = _thread_pool->get_thread();
//2 創建一個新建鏈接的消息任務
task_msg task;
task.type = task_msg::NEW_CONN;
task.connfd = connfd;
//3 添加到消息隊列中,讓對應的thread進程event_loop處理
queue->send(task);
// =====================================
}
else {
//啟動單線程模式
tcp_conn *conn = new tcp_conn(connfd, _loop);
if (conn == NULL) {
fprintf(stderr, "new tcp_conn error\n");
exit(1);
}
printf("[tcp_server]: get new connection succ!\n");
break;
}
}
}
}
}
```
### 10.5 完成Lars ReactorV0.8開發
? 0.8版本的server.cpp和client.cpp是不用改變的。開啟服務端和客戶端觀察執行結果即可。
服務端:
```bash
$ ./server
msg_router init...
create 0 thread
create 1 thread
create 2 thread
add msg cb msgid = 1
add msg cb msgid = 2
begin accept
begin accept
[thread]: get new connection succ!
read data: Hello Lars!
call msgid = 1
call data = Hello Lars!
call msglen = 11
callback_busi ...
=======
```
客戶端
```bash
$ ./client
msg_router init...
do_connect EINPROGRESS
add msg cb msgid = 1
add msg cb msgid = 101
connect 127.0.0.1:7777 succ!
do write over, del EPOLLOUT
call msgid = 101
call data = welcome! you online..
call msglen = 21
recv server: [welcome! you online..]
msgid: [101]
len: [21]
=======
call msgid = 1
call data = Hello Lars!
call msglen = 11
recv server: [Hello Lars!]
msgid: [1]
len: [11]
=======
```
? 我們會發現,鏈接已經成功創建成功,并且是由于線程處理的讀寫任務。
---
### 關于作者:
作者:`Aceld(劉丹冰)`
mail: [danbing.at@gmail.com](mailto:danbing.at@gmail.com)
github: [https://github.com/aceld](https://github.com/aceld)
原創書籍: [http://www.hmoore.net/@aceld](http://www.hmoore.net/@aceld)

>**原創聲明:未經作者允許請勿轉載, 如果轉載請注明出處**
- 一、Lars系統概述
- 第1章-概述
- 第2章-項目目錄構建
- 二、Reactor模型服務器框架
- 第1章-項目結構與V0.1雛形
- 第2章-內存管理與Buffer封裝
- 第3章-事件觸發EventLoop
- 第4章-鏈接與消息封裝
- 第5章-Client客戶端模型
- 第6章-連接管理及限制
- 第7章-消息業務路由分發機制
- 第8章-鏈接創建/銷毀Hook機制
- 第9章-消息任務隊列與線程池
- 第10章-配置文件讀寫功能
- 第11章-udp服務與客戶端
- 第12章-數據傳輸協議protocol buffer
- 第13章-QPS性能測試
- 第14章-異步消息任務機制
- 第15章-鏈接屬性設置功能
- 三、Lars系統之DNSService
- 第1章-Lars-dns簡介
- 第2章-數據庫創建
- 第3章-項目目錄結構及環境構建
- 第4章-Route結構的定義
- 第5章-獲取Route信息
- 第6章-Route訂閱模式
- 第7章-Backend Thread實時監控
- 四、Lars系統之Report Service
- 第1章-項目概述-數據表及proto3協議定義
- 第2章-獲取report上報數據
- 第3章-存儲線程池及消息隊列
- 五、Lars系統之LoadBalance Agent
- 第1章-項目概述及構建
- 第2章-主模塊業務結構搭建
- 第3章-Report與Dns Client設計與實現
- 第4章-負載均衡模塊基礎設計
- 第5章-負載均衡獲取Host主機信息API
- 第6章-負載均衡上報Host主機信息API
- 第7章-過期窗口清理與過載超時(V0.5)
- 第8章-定期拉取最新路由信息(V0.6)
- 第9章-負載均衡獲取Route信息API(0.7)
- 第10章-API初始化接口(V0.8)
- 第11章-Lars Agent性能測試工具
- 第12章- Lars啟動工具腳本