<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                合規國際互聯網加速 OSASE為企業客戶提供高速穩定SD-WAN國際加速解決方案。 廣告
                ### 10.1 消息任務類型 > lars_reactor/include/task_msg.h ```c #pragma once #include "event_loop.h" struct task_msg { enum TASK_TYPE { NEW_CONN, //新建鏈接的任務 NEW_TASK, //一般的任務 }; TASK_TYPE type; //任務類型 //任務的一些參數 union { //針對 NEW_CONN新建鏈接任務,需要傳遞connfd int connfd; /*==== 暫時用不上 ==== */ //針對 NEW_TASK 新建任務, //那么可以給一個任務提供一個回調函數 struct { void (*task_cb)(event_loop*, void *args); void *args; }; }; }; ``` ? 這里面task_msg一共有兩個類型的type,一個是新鏈接的任務,一個是普通任務。兩個任務所攜帶的參數不同,所以用了一個union。 ? ### 10.2 消息任務隊列 > lars_reactor/include/thread_queue.h ```c #pragma once #include <queue> #include <pthread.h> #include <sys/eventfd.h> #include <stdio.h> #include <unistd.h> #include "event_loop.h" /* * * 每個thread對應的 消息任務隊列 * * */ template <typename T> class thread_queue { public: thread_queue() { _loop = NULL; pthread_mutex_init(&_queue_mutex, NULL); _evfd = eventfd(0, EFD_NONBLOCK); if (_evfd == -1) { perror("evenfd(0, EFD_NONBLOCK)"); exit(1); } } ~thread_queue() { pthread_mutex_destroy(&_queue_mutex); close(_evfd); } //向隊列添加一個任務 void send(const T& task) { //觸發消息事件的占位傳輸內容 unsigned long long idle_num = 1; pthread_mutex_lock(&_queue_mutex); //將任務添加到隊列 _queue.push(task); //向_evfd寫,觸發對應的EPOLLIN事件,來處理該任務 int ret = write(_evfd, &idle_num, sizeof(unsigned long long)); if (ret == -1) { perror("_evfd write"); } pthread_mutex_unlock(&_queue_mutex); } //獲取隊列,(當前隊列已經有任務) void recv(std::queue<T>& new_queue) { unsigned int long long idle_num = 1; pthread_mutex_lock(&_queue_mutex); //把占位的數據讀出來,確保底層緩沖沒有數據存留 int ret = read(_evfd, &idle_num, sizeof(unsigned long long)); if (ret == -1) { perror("_evfd read"); } //將當前的隊列拷貝出去,將一個空隊列換回當前隊列,同時清空自身隊列,確保new_queue是空隊列 std::swap(new_queue, _queue); pthread_mutex_unlock(&_queue_mutex); } //設置當前thead_queue是被哪個事件觸發event_loop監控 void set_loop(event_loop *loop) { _loop = loop; } //設置當前消息任務隊列的 每個任務觸發的回調業務 void set_callback(io_callback *cb, void *args = NULL) { if (_loop != NULL) { _loop->add_io_event(_evfd, cb, EPOLLIN, args); } } //得到當前loop event_loop * get_loop() { return _loop; } private: int _evfd; //觸發消息任務隊列讀取的每個消息業務的fd event_loop *_loop; //當前消息任務隊列所綁定在哪個event_loop事件觸發機制中 std::queue<T> _queue; //隊列 pthread_mutex_t _queue_mutex; //進行添加任務、讀取任務的保護鎖 }; ``` ? 一個模板類,主要是消息任務隊列里的元素類型未必一定是`task_msg`類型。 `thread_queue`需要綁定一個`event_loop`。來觸發消息到達,捕獲消息并且觸發處理消息業務的動作。 ? 這里面有個`_evfd`是為了觸發消息隊列消息到達,處理該消息作用的,將`_evfd`加入到對應線程的`event_loop`中,然后再通過`set_callback`設置一個通用的該queue全部消息所觸發的處理業務call_back,在這個call_back里開發者可以自定義實現一些處理業務流程。 1. 通過`send`將任務發送給消息隊列。 2. 通過`event_loop`觸發注冊的io_callback得到消息隊列里的任務。 3. 在io_callback中調用`recv`取得`task`任務,根據任務的不同類型,處理自定義不同業務流程。 ### 10.3 線程池 ? 接下來,我們定義線程池,將`thread_queue`和`thread_pool`進行關聯。 > lars_reactor/include/thread_pool.h ```c #pragma once #include <pthread.h> #include "task_msg.h" #include "thread_queue.h" class thread_pool { public: //構造,初始化線程池, 開辟thread_cnt個 thread_pool(int thread_cnt); //獲取一個thead thread_queue<task_msg>* get_thread(); private: //_queues是當前thread_pool全部的消息任務隊列頭指針 thread_queue<task_msg> ** _queues; //當前線程池中的線程個數 int _thread_cnt; //已經啟動的全部therad編號 pthread_t * _tids; //當前選中的線程隊列下標 int _index; }; ``` **屬性**: `_queues`:是`thread_queue`集合,和當前線程數量一一對應,每個線程對應一個queue。里面存的元素是`task_msg`。 `_tids`:保存線程池中每個線程的ID。 `_thread_cnt`:當前線程的個數. `_index`:表示外層在選擇哪個thead處理任務時的一個下標,因為是輪詢處理,所以需要一個下標記錄。 **方法**: `thread_pool()`:構造函數,初始化線程池。 `get_thread()`:通過輪詢方式,獲取一個線程的thread_queue. > lars_reactor/src/thread_pool.cpp ```c #include "thread_pool.h" #include "event_loop.h" #include "tcp_conn.h" #include <unistd.h> #include <stdio.h> /* * 一旦有task消息過來,這個業務是處理task消息業務的主流程 * * 只要有人調用 thread_queue:: send()方法就會觸發次函數 */ void deal_task_message(event_loop *loop, int fd, void *args) { //得到是哪個消息隊列觸發的 thread_queue<task_msg>* queue = (thread_queue<task_msg>*)args; //將queue中的全部任務取出來 std::queue<task_msg> tasks; queue->recv(tasks); while (tasks.empty() != true) { task_msg task = tasks.front(); //彈出一個元素 tasks.pop(); if (task.type == task_msg::NEW_CONN) { //是一個新建鏈接的任務 //并且將這個tcp_conn加入當當前線程的loop中去監聽 tcp_conn *conn = new tcp_conn(task.connfd, loop); if (conn == NULL) { fprintf(stderr, "in thread new tcp_conn error\n"); exit(1); } printf("[thread]: get new connection succ!\n"); } else if (task.type == task_msg::NEW_TASK) { //是一個新的普通任務 //TODO } else { //其他未識別任務 fprintf(stderr, "unknow task!\n"); } } } //一個線程的主業務main函數 void *thread_main(void *args) { thread_queue<task_msg> *queue = (thread_queue<task_msg>*)args; //每個線程都應該有一個event_loop來監控客戶端鏈接的讀寫事件 event_loop *loop = new event_loop(); if (loop == NULL) { fprintf(stderr, "new event_loop error\n"); exit(1); } //注冊一個觸發消息任務讀寫的callback函數 queue->set_loop(loop); queue->set_callback(deal_task_message, queue); //啟動阻塞監聽 loop->event_process(); return NULL; } thread_pool::thread_pool(int thread_cnt) { _index = 0; _queues = NULL; _thread_cnt = thread_cnt; if (_thread_cnt <= 0) { fprintf(stderr, "_thread_cnt < 0\n"); exit(1); } //任務隊列的個數和線程個數一致 _queues = new thread_queue<task_msg>*[thread_cnt]; _tids = new pthread_t[thread_cnt]; int ret; for (int i = 0; i < thread_cnt; ++i) { //創建一個線程 printf("create %d thread\n", i); //給當前線程創建一個任務消息隊列 _queues[i] = new thread_queue<task_msg>(); ret = pthread_create(&_tids[i], NULL, thread_main, _queues[i]); if (ret == -1) { perror("thread_pool, create thread"); exit(1); } //將線程脫離 pthread_detach(_tids[i]); } } thread_queue<task_msg>* thread_pool::get_thread() { if (_index == _thread_cnt) { _index = 0; } return _queues[_index]; } ``` ? 這里主要看`deal_task_message()`方法,是處理收到的task任務的。目前我們只對`NEW_CONN`類型的任務進行處理,一般任務先不做處理,因為暫時用不上。 ? `NEW_CONN`的處理主要是讓當前線程創建鏈接,并且將該鏈接由當前線程的event_loop接管。 ? 接下來我們就要將線程池添加到reactor框架中去。 ### 10.4 reactor線程池關聯 ? 將線程池添加到`tcp_server`中。 > lars_reactor/include/tcp_server.h ```c #pragma once #include <netinet/in.h> #include "event_loop.h" #include "tcp_conn.h" #include "message.h" #include "thread_pool.h" class tcp_server { public: // ... // ... private: // ... //線程池 thread_pool *_thread_pool; }; ``` 在構造函數中,添加_thread_pool的初始化工作。并且在accept成功之后交給線程處理客戶端的讀寫事件。 ```c #include <stdio.h> #include <stdlib.h> #include <string.h> #include <strings.h> #include <unistd.h> #include <signal.h> #include <sys/types.h> /* See NOTES */ #include <sys/socket.h> #include <arpa/inet.h> #include <errno.h> #include "tcp_server.h" #include "tcp_conn.h" #include "reactor_buf.h" //server的構造函數 tcp_server::tcp_server(event_loop *loop, const char *ip, uint16_t port) { // ... //6 創建鏈接管理 _max_conns = MAX_CONNS; //創建鏈接信息數組 conns = new tcp_conn*[_max_conns+3];//3是因為stdin,stdout,stderr 已經被占用,再新開fd一定是從3開始,所以不加3就會棧溢出 if (conns == NULL) { fprintf(stderr, "new conns[%d] error\n", _max_conns); exit(1); } //7 =============創建線程池================= int thread_cnt = 3;//TODO 從配置文件中讀取 if (thread_cnt > 0) { _thread_pool = new thread_pool(thread_cnt); if (_thread_pool == NULL) { fprintf(stderr, "tcp_server new thread_pool error\n"); exit(1); } } // ======================================== //8 注冊_socket讀事件-->accept處理 _loop->add_io_event(_sockfd, accept_callback, EPOLLIN, this); } //開始提供創建鏈接服務 void tcp_server::do_accept() { int connfd; while(true) { //accept與客戶端創建鏈接 printf("begin accept\n"); connfd = accept(_sockfd, (struct sockaddr*)&_connaddr, &_addrlen); if (connfd == -1) { if (errno == EINTR) { fprintf(stderr, "accept errno=EINTR\n"); continue; } else if (errno == EMFILE) { //建立鏈接過多,資源不夠 fprintf(stderr, "accept errno=EMFILE\n"); } else if (errno == EAGAIN) { fprintf(stderr, "accept errno=EAGAIN\n"); break; } else { fprintf(stderr, "accept error\n"); exit(1); } } else { //accept succ! int cur_conns; get_conn_num(&cur_conns); //1 判斷鏈接數量 if (cur_conns >= _max_conns) { fprintf(stderr, "so many connections, max = %d\n", _max_conns); close(connfd); } else { // ========= 將新連接由線程池處理 ========== if (_thread_pool != NULL) { //啟動多線程模式 創建鏈接 //1 選擇一個線程來處理 thread_queue<task_msg>* queue = _thread_pool->get_thread(); //2 創建一個新建鏈接的消息任務 task_msg task; task.type = task_msg::NEW_CONN; task.connfd = connfd; //3 添加到消息隊列中,讓對應的thread進程event_loop處理 queue->send(task); // ===================================== } else { //啟動單線程模式 tcp_conn *conn = new tcp_conn(connfd, _loop); if (conn == NULL) { fprintf(stderr, "new tcp_conn error\n"); exit(1); } printf("[tcp_server]: get new connection succ!\n"); break; } } } } } ``` ### 10.5 完成Lars ReactorV0.8開發 ? 0.8版本的server.cpp和client.cpp是不用改變的。開啟服務端和客戶端觀察執行結果即可。 服務端: ```bash $ ./server msg_router init... create 0 thread create 1 thread create 2 thread add msg cb msgid = 1 add msg cb msgid = 2 begin accept begin accept [thread]: get new connection succ! read data: Hello Lars! call msgid = 1 call data = Hello Lars! call msglen = 11 callback_busi ... ======= ``` 客戶端 ```bash $ ./client msg_router init... do_connect EINPROGRESS add msg cb msgid = 1 add msg cb msgid = 101 connect 127.0.0.1:7777 succ! do write over, del EPOLLOUT call msgid = 101 call data = welcome! you online.. call msglen = 21 recv server: [welcome! you online..] msgid: [101] len: [21] ======= call msgid = 1 call data = Hello Lars! call msglen = 11 recv server: [Hello Lars!] msgid: [1] len: [11] ======= ``` ? 我們會發現,鏈接已經成功創建成功,并且是由于線程處理的讀寫任務。 --- ### 關于作者: 作者:`Aceld(劉丹冰)` mail: [danbing.at@gmail.com](mailto:danbing.at@gmail.com) github: [https://github.com/aceld](https://github.com/aceld) 原創書籍: [http://www.hmoore.net/@aceld](http://www.hmoore.net/@aceld) ![](https://img.kancloud.cn/b0/d1/b0d11a21ba62e96aef1c11d5bfff2cf8_227x227.jpg) >**原創聲明:未經作者允許請勿轉載, 如果轉載請注明出處**
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看