## 15) 異步消息任務機制
? 我們之前在`include/task_msg.h`中, 其中task的消息類型我們只是實現了`NEW_CONN`,目的是`thread_pool`選擇一個線程,讓一個線程里的`thread_queue`去創建一個連接對象。但是并沒有對`NEW_TASK`的任務類型進行定義。這種類型是允許服務端去執行某項具體的業務。并不是根據客戶端來消息去被動回復的業務,而是服務端主動發送的業務給到客戶端。
### 15.1 任務函數類型
? 我們先定義task的回調函數類型
> lars_reactor/include/event_loop.h
```c
//...
//定義異步任務回調函數類型
typedef void (*task_func)(event_loop *loop, void *args);
//...
```
? 為了防止循環頭文件引用,我們把typedef定義在`event_loop.h`中。
> lars_reactor/include/task_msg.h
```c
#pragma once
#include "event_loop.h"
//定義異步任務回調函數類型
typedef void (*task_func)(event_loop *loop, void *args);
struct task_msg
{
enum TASK_TYPE
{
NEW_CONN, //新建鏈接的任務
NEW_TASK, //一般的任務
};
TASK_TYPE type; //任務類型
//任務的一些參數
union {
//針對 NEW_CONN新建鏈接任務,需要傳遞connfd
int connfd;
//針對 NEW_TASK 新建任務,
//可以給一個任務提供一個回調函數
struct {
task_func task_cb; //注冊的任務函數
void *args; //任務函數對應的形參
};
};
};
```
? `task_func`是我們定義的一個任務的回調函數類型,第一個參數當然就是讓哪個loop機制去執行這個task任務。很明顯,一個loop是對應一個thread線程的。也就是讓哪個thread去執行這個task任務。args是`task_func`的函數形參。
?
### 15.2 event_loop模塊添加task任務機制
? 我們知道,task綁定一個loop,很明顯,一個`event_loop`應該擁有需要被執行的task集合。
? 在這里,我們將event_loop加上已經就緒的task任務的屬性
> lars_reactor/include/event_loop.h
```c
#pragma once
/*
*
* event_loop事件處理機制
*
* */
#include <sys/epoll.h>
#include <ext/hash_map>
#include <ext/hash_set>
#include <vector>
#include "event_base.h"
#include "task_msg.h"
#define MAXEVENTS 10
// map: fd->io_event
typedef __gnu_cxx::hash_map<int, io_event> io_event_map;
//定義指向上面map類型的迭代器
typedef __gnu_cxx::hash_map<int, io_event>::iterator io_event_map_it;
//全部正在監聽的fd集合
typedef __gnu_cxx::hash_set<int> listen_fd_set;
//定義異步任務回調函數類型
typedef void (*task_func)(event_loop *loop, void *args);
class event_loop
{
public:
//構造,初始化epoll堆
event_loop();
//阻塞循環處理事件
void event_process();
//添加一個io事件到loop中
void add_io_event(int fd, io_callback *proc, int mask, void *args=NULL);
//刪除一個io事件從loop中
void del_io_event(int fd);
//刪除一個io事件的EPOLLIN/EPOLLOUT
void del_io_event(int fd, int mask);
// ===========================================
//獲取全部監聽事件的fd集合
void get_listen_fds(listen_fd_set &fds) {
fds = listen_fds;
}
//=== 異步任務task模塊需要的方法 ===
//添加一個任務task到ready_tasks集合中
void add_task(task_func func, void *args);
//執行全部的ready_tasks里面的任務
void execute_ready_tasks();
// ===========================================
private:
int _epfd; //epoll fd
//當前event_loop 監控的fd和對應事件的關系
io_event_map _io_evs;
//當前event_loop 一共哪些fd在監聽
listen_fd_set listen_fds;
//一次性最大處理的事件
struct epoll_event _fired_evs[MAXEVENTS];
// ===========================================
//需要被執行的task集合
typedef std::pair<task_func, void*> task_func_pair;
std::vector<task_func_pair> _ready_tasks;
// ===========================================
};
```
添加了兩個屬性:
`task_func_pair`: 回調函數和參數的鍵值對.
`_ready_tasks`: 所有已經就緒的待執行的任務集合。
同時添加了兩個主要方法:
`void add_task(task_func func, void *args)`: 添加一個任務到_ready_tasks中.
`void execute_ready_tasks()`:執行全部的_ready_tasks任務。
將這兩個方法實現如下:
> lars_reactor/src/event_loop.cpp
```c
//...
//添加一個任務task到ready_tasks集合中
void event_loop::add_task(task_func func, void *args)
{
task_func_pair func_pair(func, args);
_ready_tasks.push_back(func_pair);
}
//執行全部的ready_tasks里面的任務
void event_loop::execute_ready_tasks()
{
std::vector<task_func_pair>::iterator it;
for (it = _ready_tasks.begin(); it != _ready_tasks.end(); it++) {
task_func func = it->first;//任務回調函數
void *args = it->second;//回調函數形參
//執行任務
func(this, args);
}
//全部執行完畢,清空當前的_ready_tasks
_ready_tasks.clear();
}
//...
```
? 那么`execute_ready_tasks()`函數需要在一個恰當的時候被執行,我們這里就放在每次event_loop一次`epoll_wait()`處理完一組fd事件之后,觸發一次額外的task任務。
> lars_reactor/src/event_loop.cpp
```c
//阻塞循環處理事件
void event_loop::event_process()
{
while (true) {
io_event_map_it ev_it;
int nfds = epoll_wait(_epfd, _fired_evs, MAXEVENTS, 10);
for (int i = 0; i < nfds; i++) {
//...
//...
}
//每次處理完一組epoll_wait觸發的事件之后,處理異步任務
this->execute_ready_tasks();
}
}
```
? 這里補充一下,因為在task的回調函數中,有形參`event_loop *loop`,可能會使用當前loop中監控的fd信息,所以我們應該給event_loop補充一個獲取當前loop監控的全部fd信息的方法
```c
class event_loop{
//...
//獲取全部監聽事件的fd集合
void get_listen_fds(listen_fd_set &fds) {
fds = listen_fds;
}
//...
};
```
### 15.3 thread_pool模塊添加task任務機制
? 接下來我們就要用thread_pool來想每個thread所綁定的event_pool中去發送task任務,很明顯thread_pool應該具備能夠將task加入到event_pool中的_ready_task集合的功能。
> lars_reactor/include/thread_pool.h
```c
#pragma once
#include <pthread.h>
#include "task_msg.h"
#include "thread_queue.h"
class thread_pool
{
public:
//構造,初始化線程池, 開辟thread_cnt個
thread_pool(int thread_cnt);
//獲取一個thead
thread_queue<task_msg>* get_thread();
//發送一個task任務給thread_pool里的全部thread
void send_task(task_func func, void *args = NULL);
private:
//_queues是當前thread_pool全部的消息任務隊列頭指針
thread_queue<task_msg> ** _queues;
//當前線程池中的線程個數
int _thread_cnt;
//已經啟動的全部therad編號
pthread_t * _tids;
//當前選中的線程隊列下標
int _index;
};
```
? `send_task()`方法就是發送給線程池中全部的thread去執行task任務.
> lars_reactor/src/thread_pool.cpp
```c
void thread_pool::send_task(task_func func, void *args)
{
task_msg task;
//給當前thread_pool中的每個thread里的pool添加一個task任務
for (int i = 0; i < _thread_cnt; i++) {
//封裝一個task消息
task.type = task_msg::NEW_TASK;
task.task_cb = func;
task.args = args;
//取出第i個thread的消息隊列
thread_queue<task_msg> *queue = _queues[i];
//發送task消息
queue->send(task);
}
}
```
? `send_task()`的實現實際上是告知全部的thread,封裝一個`NEW_TASK`類型的消息,通過`task_queue`告知對應的thread.很明顯當我們進行 `queue->send(task)`的時候,當前的thread綁定的loop,就會觸發`deal_task_message()`回調了。
> lars_reactor/src/thread_pool.cpp
```c
/*
* 一旦有task消息過來,這個業務是處理task消息業務的主流程
*
* 只要有人調用 thread_queue:: send()方法就會觸發次函數
*/
void deal_task_message(event_loop *loop, int fd, void *args)
{
//得到是哪個消息隊列觸發的
thread_queue<task_msg>* queue = (thread_queue<task_msg>*)args;
//將queue中的全部任務取出來
std::queue<task_msg> tasks;
queue->recv(tasks);
while (tasks.empty() != true) {
task_msg task = tasks.front();
//彈出一個元素
tasks.pop();
if (task.type == task_msg::NEW_CONN) {
//是一個新建鏈接的任務
//并且將這個tcp_conn加入當當前線程的loop中去監聽
tcp_conn *conn = new tcp_conn(task.connfd, loop);
if (conn == NULL) {
fprintf(stderr, "in thread new tcp_conn error\n");
exit(1);
}
printf("[thread]: get new connection succ!\n");
}
else if (task.type == task_msg::NEW_TASK) {
//===========是一個新的普通任務===============
//當前的loop就是一個thread的事件監控loop,讓當前loop觸發task任務的回調
loop->add_task(task.task_cb, task.args);
//==========================================
}
else {
//其他未識別任務
fprintf(stderr, "unknow task!\n");
}
}
}
```
? 我們判斷task.type如果是`NEW_TASK`就將該task加入到當前loop中去.
通過上面的設計,可以看出來,thread_pool的`send_task()`應該是一個對外的開發者接口,所以我們要讓服務器的`tcp_server`能夠獲取到`thread_pool`屬性.
> lars_reactor/include/tcp_server.h
```c
class tcp_server {
//...
//獲取當前server的線程池
thread_pool *thread_poll() {
return _thread_pool;
}
//...
};
```
? ok,這樣我們基本上完成的task異步處理業務的機制. 下面我們來測試一下這個功能.
### 15.4 完成Lars Reactor V0.11開發
> server.cpp
```c
#include "tcp_server.h"
#include <string>
#include <string.h>
#include "config_file.h"
tcp_server *server;
void print_lars_task(event_loop *loop, void *args)
{
printf("======= Active Task Func! ========\n");
listen_fd_set fds;
loop->get_listen_fds(fds);//不同線程的loop,返回的fds是不同的
//可以向所有fds觸發
listen_fd_set::iterator it;
//遍歷fds
for (it = fds.begin(); it != fds.end(); it++) {
int fd = *it;
tcp_conn *conn = tcp_server::conns[fd]; //取出fd
if (conn != NULL) {
int msgid = 101;
const char *msg = "Hello I am a Task!";
conn->send_message(msg, strlen(msg), msgid);
}
}
}
//回顯業務的回調函數
void callback_busi(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data)
{
printf("callback_busi ...\n");
//直接回顯
conn->send_message(data, len, msgid);
}
//打印信息回調函數
void print_busi(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data)
{
printf("recv client: [%s]\n", data);
printf("msgid: [%d]\n", msgid);
printf("len: [%d]\n", len);
}
//新客戶端創建的回調
void on_client_build(net_connection *conn, void *args)
{
int msgid = 101;
const char *msg = "welcome! you online..";
conn->send_message(msg, strlen(msg), msgid);
//創建鏈接成功之后觸發任務
server->thread_poll()->send_task(print_lars_task);
}
//客戶端銷毀的回調
void on_client_lost(net_connection *conn, void *args)
{
printf("connection is lost !\n");
}
int main()
{
event_loop loop;
//加載配置文件
config_file::setPath("./serv.conf");
std::string ip = config_file::instance()->GetString("reactor", "ip", "0.0.0.0");
short port = config_file::instance()->GetNumber("reactor", "port", 8888);
printf("ip = %s, port = %d\n", ip.c_str(), port);
server = new tcp_server(&loop, ip.c_str(), port);
//注冊消息業務路由
server->add_msg_router(1, callback_busi);
server->add_msg_router(2, print_busi);
//注冊鏈接hook回調
server->set_conn_start(on_client_build);
server->set_conn_close(on_client_lost);
loop.event_process();
return 0;
}
```
? 我們在每次建立連接成功之后,觸發任務機制。其中`print_lars_task()`方法就是我們的異步任務。由于是全部thead都出發,所以該方法會被每個thread執行。但是不同的thread中的pool所返回的fd是不一樣的,這里在`print_lars_task()`中,我們給對應的客戶端做了一個簡單的消息發送。
?
> client.cpp
```c
#include "tcp_client.h"
#include <stdio.h>
#include <string.h>
//客戶端業務
void busi(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data)
{
//得到服務端回執的數據
char *str = NULL;
str = (char*)malloc(len+1);
memset(str, 0, len+1);
memcpy(str, data, len);
printf("recv server: [%s]\n", str);
printf("msgid: [%d]\n", msgid);
printf("len: [%d]\n", len);
}
//客戶端銷毀的回調
void on_client_build(net_connection *conn, void *args)
{
int msgid = 1;
const char *msg = "Hello Lars!";
conn->send_message(msg, strlen(msg), msgid);
}
//客戶端銷毀的回調
void on_client_lost(net_connection *conn, void *args)
{
printf("on_client_lost...\n");
printf("Client is lost!\n");
}
int main()
{
event_loop loop;
//創建tcp客戶端
tcp_client client(&loop, "127.0.0.1", 7777, "clientv0.6");
//注冊消息路由業務
client.add_msg_router(1, busi);
client.add_msg_router(101, busi);
//設置hook函數
client.set_conn_start(on_client_build);
client.set_conn_close(on_client_lost);
//開啟事件監聽
loop.event_process();
return 0;
}
```
? 客戶端代碼無差別。
編譯并運行
服務端:
```bash
$ ./server
msg_router init...
ip = 127.0.0.1, port = 7777
create 0 thread
create 1 thread
create 2 thread
create 3 thread
create 4 thread
add msg cb msgid = 1
add msg cb msgid = 2
begin accept
begin accept
[thread]: get new connection succ!
callback_busi ...
======= Active Task Func! ========
======= Active Task Func! ========
======= Active Task Func! ========
======= Active Task Func! ========
======= Active Task Func! ========
```
客戶端:
```c
$ ./client
msg_router init...
do_connect EINPROGRESS
add msg cb msgid = 1
add msg cb msgid = 101
connect 127.0.0.1:7777 succ!
recv server: [welcome! you online..]
msgid: [101]
len: [21]
recv server: [Hello Lars!]
msgid: [1]
len: [11]
recv server: [Hello I am a Task!]
msgid: [101]
len: [18]
```
? task機制已經集成完畢,lars_reactor功能更加強大了。
---
### 關于作者:
作者:`Aceld(劉丹冰)`
mail: [danbing.at@gmail.com](mailto:danbing.at@gmail.com)
github: [https://github.com/aceld](https://github.com/aceld)
原創書籍: [http://www.hmoore.net/@aceld](http://www.hmoore.net/@aceld)

>**原創聲明:未經作者允許請勿轉載, 如果轉載請注明出處**
- 一、Lars系統概述
- 第1章-概述
- 第2章-項目目錄構建
- 二、Reactor模型服務器框架
- 第1章-項目結構與V0.1雛形
- 第2章-內存管理與Buffer封裝
- 第3章-事件觸發EventLoop
- 第4章-鏈接與消息封裝
- 第5章-Client客戶端模型
- 第6章-連接管理及限制
- 第7章-消息業務路由分發機制
- 第8章-鏈接創建/銷毀Hook機制
- 第9章-消息任務隊列與線程池
- 第10章-配置文件讀寫功能
- 第11章-udp服務與客戶端
- 第12章-數據傳輸協議protocol buffer
- 第13章-QPS性能測試
- 第14章-異步消息任務機制
- 第15章-鏈接屬性設置功能
- 三、Lars系統之DNSService
- 第1章-Lars-dns簡介
- 第2章-數據庫創建
- 第3章-項目目錄結構及環境構建
- 第4章-Route結構的定義
- 第5章-獲取Route信息
- 第6章-Route訂閱模式
- 第7章-Backend Thread實時監控
- 四、Lars系統之Report Service
- 第1章-項目概述-數據表及proto3協議定義
- 第2章-獲取report上報數據
- 第3章-存儲線程池及消息隊列
- 五、Lars系統之LoadBalance Agent
- 第1章-項目概述及構建
- 第2章-主模塊業務結構搭建
- 第3章-Report與Dns Client設計與實現
- 第4章-負載均衡模塊基礎設計
- 第5章-負載均衡獲取Host主機信息API
- 第6章-負載均衡上報Host主機信息API
- 第7章-過期窗口清理與過載超時(V0.5)
- 第8章-定期拉取最新路由信息(V0.6)
- 第9章-負載均衡獲取Route信息API(0.7)
- 第10章-API初始化接口(V0.8)
- 第11章-Lars Agent性能測試工具
- 第12章- Lars啟動工具腳本