<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                合規國際互聯網加速 OSASE為企業客戶提供高速穩定SD-WAN國際加速解決方案。 廣告
                ## 15) 異步消息任務機制 ? 我們之前在`include/task_msg.h`中, 其中task的消息類型我們只是實現了`NEW_CONN`,目的是`thread_pool`選擇一個線程,讓一個線程里的`thread_queue`去創建一個連接對象。但是并沒有對`NEW_TASK`的任務類型進行定義。這種類型是允許服務端去執行某項具體的業務。并不是根據客戶端來消息去被動回復的業務,而是服務端主動發送的業務給到客戶端。 ### 15.1 任務函數類型 ? 我們先定義task的回調函數類型 > lars_reactor/include/event_loop.h ```c //... //定義異步任務回調函數類型 typedef void (*task_func)(event_loop *loop, void *args); //... ``` ? 為了防止循環頭文件引用,我們把typedef定義在`event_loop.h`中。 > lars_reactor/include/task_msg.h ```c #pragma once #include "event_loop.h" //定義異步任務回調函數類型 typedef void (*task_func)(event_loop *loop, void *args); struct task_msg { enum TASK_TYPE { NEW_CONN, //新建鏈接的任務 NEW_TASK, //一般的任務 }; TASK_TYPE type; //任務類型 //任務的一些參數 union { //針對 NEW_CONN新建鏈接任務,需要傳遞connfd int connfd; //針對 NEW_TASK 新建任務, //可以給一個任務提供一個回調函數 struct { task_func task_cb; //注冊的任務函數 void *args; //任務函數對應的形參 }; }; }; ``` ? `task_func`是我們定義的一個任務的回調函數類型,第一個參數當然就是讓哪個loop機制去執行這個task任務。很明顯,一個loop是對應一個thread線程的。也就是讓哪個thread去執行這個task任務。args是`task_func`的函數形參。 ? ### 15.2 event_loop模塊添加task任務機制 ? 我們知道,task綁定一個loop,很明顯,一個`event_loop`應該擁有需要被執行的task集合。 ? 在這里,我們將event_loop加上已經就緒的task任務的屬性 > lars_reactor/include/event_loop.h ```c #pragma once /* * * event_loop事件處理機制 * * */ #include <sys/epoll.h> #include <ext/hash_map> #include <ext/hash_set> #include <vector> #include "event_base.h" #include "task_msg.h" #define MAXEVENTS 10 // map: fd->io_event typedef __gnu_cxx::hash_map<int, io_event> io_event_map; //定義指向上面map類型的迭代器 typedef __gnu_cxx::hash_map<int, io_event>::iterator io_event_map_it; //全部正在監聽的fd集合 typedef __gnu_cxx::hash_set<int> listen_fd_set; //定義異步任務回調函數類型 typedef void (*task_func)(event_loop *loop, void *args); class event_loop { public: //構造,初始化epoll堆 event_loop(); //阻塞循環處理事件 void event_process(); //添加一個io事件到loop中 void add_io_event(int fd, io_callback *proc, int mask, void *args=NULL); //刪除一個io事件從loop中 void del_io_event(int fd); //刪除一個io事件的EPOLLIN/EPOLLOUT void del_io_event(int fd, int mask); // =========================================== //獲取全部監聽事件的fd集合 void get_listen_fds(listen_fd_set &fds) { fds = listen_fds; } //=== 異步任務task模塊需要的方法 === //添加一個任務task到ready_tasks集合中 void add_task(task_func func, void *args); //執行全部的ready_tasks里面的任務 void execute_ready_tasks(); // =========================================== private: int _epfd; //epoll fd //當前event_loop 監控的fd和對應事件的關系 io_event_map _io_evs; //當前event_loop 一共哪些fd在監聽 listen_fd_set listen_fds; //一次性最大處理的事件 struct epoll_event _fired_evs[MAXEVENTS]; // =========================================== //需要被執行的task集合 typedef std::pair<task_func, void*> task_func_pair; std::vector<task_func_pair> _ready_tasks; // =========================================== }; ``` 添加了兩個屬性: `task_func_pair`: 回調函數和參數的鍵值對. `_ready_tasks`: 所有已經就緒的待執行的任務集合。 同時添加了兩個主要方法: `void add_task(task_func func, void *args)`: 添加一個任務到_ready_tasks中. `void execute_ready_tasks()`:執行全部的_ready_tasks任務。 將這兩個方法實現如下: > lars_reactor/src/event_loop.cpp ```c //... //添加一個任務task到ready_tasks集合中 void event_loop::add_task(task_func func, void *args) { task_func_pair func_pair(func, args); _ready_tasks.push_back(func_pair); } //執行全部的ready_tasks里面的任務 void event_loop::execute_ready_tasks() { std::vector<task_func_pair>::iterator it; for (it = _ready_tasks.begin(); it != _ready_tasks.end(); it++) { task_func func = it->first;//任務回調函數 void *args = it->second;//回調函數形參 //執行任務 func(this, args); } //全部執行完畢,清空當前的_ready_tasks _ready_tasks.clear(); } //... ``` ? 那么`execute_ready_tasks()`函數需要在一個恰當的時候被執行,我們這里就放在每次event_loop一次`epoll_wait()`處理完一組fd事件之后,觸發一次額外的task任務。 > lars_reactor/src/event_loop.cpp ```c //阻塞循環處理事件 void event_loop::event_process() { while (true) { io_event_map_it ev_it; int nfds = epoll_wait(_epfd, _fired_evs, MAXEVENTS, 10); for (int i = 0; i < nfds; i++) { //... //... } //每次處理完一組epoll_wait觸發的事件之后,處理異步任務 this->execute_ready_tasks(); } } ``` ? 這里補充一下,因為在task的回調函數中,有形參`event_loop *loop`,可能會使用當前loop中監控的fd信息,所以我們應該給event_loop補充一個獲取當前loop監控的全部fd信息的方法 ```c class event_loop{ //... //獲取全部監聽事件的fd集合 void get_listen_fds(listen_fd_set &fds) { fds = listen_fds; } //... }; ``` ### 15.3 thread_pool模塊添加task任務機制 ? 接下來我們就要用thread_pool來想每個thread所綁定的event_pool中去發送task任務,很明顯thread_pool應該具備能夠將task加入到event_pool中的_ready_task集合的功能。 > lars_reactor/include/thread_pool.h ```c #pragma once #include <pthread.h> #include "task_msg.h" #include "thread_queue.h" class thread_pool { public: //構造,初始化線程池, 開辟thread_cnt個 thread_pool(int thread_cnt); //獲取一個thead thread_queue<task_msg>* get_thread(); //發送一個task任務給thread_pool里的全部thread void send_task(task_func func, void *args = NULL); private: //_queues是當前thread_pool全部的消息任務隊列頭指針 thread_queue<task_msg> ** _queues; //當前線程池中的線程個數 int _thread_cnt; //已經啟動的全部therad編號 pthread_t * _tids; //當前選中的線程隊列下標 int _index; }; ``` ? `send_task()`方法就是發送給線程池中全部的thread去執行task任務. > lars_reactor/src/thread_pool.cpp ```c void thread_pool::send_task(task_func func, void *args) { task_msg task; //給當前thread_pool中的每個thread里的pool添加一個task任務 for (int i = 0; i < _thread_cnt; i++) { //封裝一個task消息 task.type = task_msg::NEW_TASK; task.task_cb = func; task.args = args; //取出第i個thread的消息隊列 thread_queue<task_msg> *queue = _queues[i]; //發送task消息 queue->send(task); } } ``` ? `send_task()`的實現實際上是告知全部的thread,封裝一個`NEW_TASK`類型的消息,通過`task_queue`告知對應的thread.很明顯當我們進行 `queue->send(task)`的時候,當前的thread綁定的loop,就會觸發`deal_task_message()`回調了。 > lars_reactor/src/thread_pool.cpp ```c /* * 一旦有task消息過來,這個業務是處理task消息業務的主流程 * * 只要有人調用 thread_queue:: send()方法就會觸發次函數 */ void deal_task_message(event_loop *loop, int fd, void *args) { //得到是哪個消息隊列觸發的 thread_queue<task_msg>* queue = (thread_queue<task_msg>*)args; //將queue中的全部任務取出來 std::queue<task_msg> tasks; queue->recv(tasks); while (tasks.empty() != true) { task_msg task = tasks.front(); //彈出一個元素 tasks.pop(); if (task.type == task_msg::NEW_CONN) { //是一個新建鏈接的任務 //并且將這個tcp_conn加入當當前線程的loop中去監聽 tcp_conn *conn = new tcp_conn(task.connfd, loop); if (conn == NULL) { fprintf(stderr, "in thread new tcp_conn error\n"); exit(1); } printf("[thread]: get new connection succ!\n"); } else if (task.type == task_msg::NEW_TASK) { //===========是一個新的普通任務=============== //當前的loop就是一個thread的事件監控loop,讓當前loop觸發task任務的回調 loop->add_task(task.task_cb, task.args); //========================================== } else { //其他未識別任務 fprintf(stderr, "unknow task!\n"); } } } ``` ? 我們判斷task.type如果是`NEW_TASK`就將該task加入到當前loop中去. 通過上面的設計,可以看出來,thread_pool的`send_task()`應該是一個對外的開發者接口,所以我們要讓服務器的`tcp_server`能夠獲取到`thread_pool`屬性. > lars_reactor/include/tcp_server.h ```c class tcp_server { //... //獲取當前server的線程池 thread_pool *thread_poll() { return _thread_pool; } //... }; ``` ? ok,這樣我們基本上完成的task異步處理業務的機制. 下面我們來測試一下這個功能. ### 15.4 完成Lars Reactor V0.11開發 > server.cpp ```c #include "tcp_server.h" #include <string> #include <string.h> #include "config_file.h" tcp_server *server; void print_lars_task(event_loop *loop, void *args) { printf("======= Active Task Func! ========\n"); listen_fd_set fds; loop->get_listen_fds(fds);//不同線程的loop,返回的fds是不同的 //可以向所有fds觸發 listen_fd_set::iterator it; //遍歷fds for (it = fds.begin(); it != fds.end(); it++) { int fd = *it; tcp_conn *conn = tcp_server::conns[fd]; //取出fd if (conn != NULL) { int msgid = 101; const char *msg = "Hello I am a Task!"; conn->send_message(msg, strlen(msg), msgid); } } } //回顯業務的回調函數 void callback_busi(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data) { printf("callback_busi ...\n"); //直接回顯 conn->send_message(data, len, msgid); } //打印信息回調函數 void print_busi(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data) { printf("recv client: [%s]\n", data); printf("msgid: [%d]\n", msgid); printf("len: [%d]\n", len); } //新客戶端創建的回調 void on_client_build(net_connection *conn, void *args) { int msgid = 101; const char *msg = "welcome! you online.."; conn->send_message(msg, strlen(msg), msgid); //創建鏈接成功之后觸發任務 server->thread_poll()->send_task(print_lars_task); } //客戶端銷毀的回調 void on_client_lost(net_connection *conn, void *args) { printf("connection is lost !\n"); } int main() { event_loop loop; //加載配置文件 config_file::setPath("./serv.conf"); std::string ip = config_file::instance()->GetString("reactor", "ip", "0.0.0.0"); short port = config_file::instance()->GetNumber("reactor", "port", 8888); printf("ip = %s, port = %d\n", ip.c_str(), port); server = new tcp_server(&loop, ip.c_str(), port); //注冊消息業務路由 server->add_msg_router(1, callback_busi); server->add_msg_router(2, print_busi); //注冊鏈接hook回調 server->set_conn_start(on_client_build); server->set_conn_close(on_client_lost); loop.event_process(); return 0; } ``` ? 我們在每次建立連接成功之后,觸發任務機制。其中`print_lars_task()`方法就是我們的異步任務。由于是全部thead都出發,所以該方法會被每個thread執行。但是不同的thread中的pool所返回的fd是不一樣的,這里在`print_lars_task()`中,我們給對應的客戶端做了一個簡單的消息發送。 ? > client.cpp ```c #include "tcp_client.h" #include <stdio.h> #include <string.h> //客戶端業務 void busi(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data) { //得到服務端回執的數據 char *str = NULL; str = (char*)malloc(len+1); memset(str, 0, len+1); memcpy(str, data, len); printf("recv server: [%s]\n", str); printf("msgid: [%d]\n", msgid); printf("len: [%d]\n", len); } //客戶端銷毀的回調 void on_client_build(net_connection *conn, void *args) { int msgid = 1; const char *msg = "Hello Lars!"; conn->send_message(msg, strlen(msg), msgid); } //客戶端銷毀的回調 void on_client_lost(net_connection *conn, void *args) { printf("on_client_lost...\n"); printf("Client is lost!\n"); } int main() { event_loop loop; //創建tcp客戶端 tcp_client client(&loop, "127.0.0.1", 7777, "clientv0.6"); //注冊消息路由業務 client.add_msg_router(1, busi); client.add_msg_router(101, busi); //設置hook函數 client.set_conn_start(on_client_build); client.set_conn_close(on_client_lost); //開啟事件監聽 loop.event_process(); return 0; } ``` ? 客戶端代碼無差別。 編譯并運行 服務端: ```bash $ ./server msg_router init... ip = 127.0.0.1, port = 7777 create 0 thread create 1 thread create 2 thread create 3 thread create 4 thread add msg cb msgid = 1 add msg cb msgid = 2 begin accept begin accept [thread]: get new connection succ! callback_busi ... ======= Active Task Func! ======== ======= Active Task Func! ======== ======= Active Task Func! ======== ======= Active Task Func! ======== ======= Active Task Func! ======== ``` 客戶端: ```c $ ./client msg_router init... do_connect EINPROGRESS add msg cb msgid = 1 add msg cb msgid = 101 connect 127.0.0.1:7777 succ! recv server: [welcome! you online..] msgid: [101] len: [21] recv server: [Hello Lars!] msgid: [1] len: [11] recv server: [Hello I am a Task!] msgid: [101] len: [18] ``` ? task機制已經集成完畢,lars_reactor功能更加強大了。 --- ### 關于作者: 作者:`Aceld(劉丹冰)` mail: [danbing.at@gmail.com](mailto:danbing.at@gmail.com) github: [https://github.com/aceld](https://github.com/aceld) 原創書籍: [http://www.hmoore.net/@aceld](http://www.hmoore.net/@aceld) ![](https://img.kancloud.cn/b0/d1/b0d11a21ba62e96aef1c11d5bfff2cf8_227x227.jpg) >**原創聲明:未經作者允許請勿轉載, 如果轉載請注明出處**
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看