<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ??一站式輕松地調用各大LLM模型接口,支持GPT4、智譜、豆包、星火、月之暗面及文生圖、文生視頻 廣告
                ## 5) 存儲線程池及消息隊列 ? 我們現在的reporter_service的io入庫操作,完全是在消息的callback中進行的,那么實際上,這回占用我們server的工作線程的阻塞時間,從而浪費cpu。所以我們應該將io的入庫操作,交給一個專門做入庫的消息隊列線程池來做,這樣我們的callback就會立刻返回該業務,從而可以繼續處理下一個conn鏈接的消息事件業務。 ? 所以我們就要在此給reporter_service設計一個存儲數據的線程池及配套的消息隊列。當然這里面我們還是直接用寫好的`lars_reactor`框架里的接口即可。 > lars_reporter/src/reporter_service.cpp ```c #include "lars_reactor.h" #include "lars.pb.h" #include "store_report.h" #include <string> thread_queue<lars::ReportStatusRequest> **reportQueues = NULL; int thread_cnt = 0; void get_report_status(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data) { lars::ReportStatusRequest req; req.ParseFromArray(data, len); //將上報數據存儲到db StoreReport sr; sr.store(req); //輪詢將消息平均發送到每個線程的消息隊列中 static int index = 0; //將消息發送給某個線程消息隊列 reportQueues[index]->send(req); index ++; index = index % thread_cnt; } void create_reportdb_threads() { thread_cnt = config_file::instance()->GetNumber("reporter", "db_thread_cnt", 3); //開線程池的消息隊列 reportQueues = new thread_queue<lars::ReportStatusRequest>*[thread_cnt]; if (reportQueues == NULL) { fprintf(stderr, "create thread_queue<lars::ReportStatusRequest>*[%d], error", thread_cnt) ; exit(1); } for (int i = 0; i < thread_cnt; i++) { //給當前線程創建一個消息隊列queue reportQueues[i] = new thread_queue<lars::ReportStatusRequest>(); if (reportQueues == NULL) { fprintf(stderr, "create thread_queue error\n"); exit(1); } pthread_t tid; int ret = pthread_create(&tid, NULL, store_main, reportQueues[i]); if (ret == -1) { perror("pthread_create"); exit(1); } pthread_detach(tid); } } int main(int argc, char **argv) { event_loop loop; //加載配置文件 config_file::setPath("./conf/lars_reporter.conf"); std::string ip = config_file::instance()->GetString("reactor", "ip", "0.0.0.0"); short port = config_file::instance()->GetNumber("reactor", "port", 7779); //創建tcp server tcp_server server(&loop, ip.c_str(), port); //添加數據上報請求處理的消息分發處理業務 server.add_msg_router(lars::ID_ReportStatusRequest, get_report_status); //為了防止在業務中出現io阻塞,那么需要啟動一個線程池對IO進行操作的,接受業務的請求存儲消息 create_reportdb_threads(); //啟動事件監聽 loop.event_process(); return 0; } ``` ? 這里主線程啟動了線程池,根據配置文件的`db_thread_cnt`數量來開辟。每個線程都會執行`store_main`方法,我們來看一下實現 > lars_reporter/src/store_thread.cpp ```c #include "lars.pb.h" #include "lars_reactor.h" #include "store_report.h" struct Args { thread_queue<lars::ReportStatusRequest>* first; StoreReport *second; }; //typedef void io_callback(event_loop *loop, int fd, void *args); void thread_report(event_loop *loop, int fd, void *args) { //1. 從queue里面取出需要report的數據(需要thread_queue) thread_queue<lars::ReportStatusRequest>* queue = ((Args*)args)->first; StoreReport *sr = ((Args*)args)->second; std::queue<lars::ReportStatusRequest> report_msgs; //1.1 從消息隊列中取出全部的消息元素集合 queue->recv(report_msgs); while ( !report_msgs.empty() ) { lars::ReportStatusRequest msg = report_msgs.front(); report_msgs.pop(); //2. 將數據存儲到DB中(需要StoreReport) sr->store(msg); } } void *store_main(void *args) { //得到對應的thread_queue thread_queue<lars::ReportStatusRequest> *queue = (thread_queue<lars::ReportStatusRequest>*)args; //定義事件觸發機制 event_loop loop; //定義一個存儲對象 StoreReport sr; Args callback_args; callback_args.first = queue; callback_args.second = &sr; queue->set_loop(&loop); queue->set_callback(thread_report, &callback_args); //啟動事件監聽 loop.event_process(); return NULL; } ``` ? 每個線程都會綁定一個`thread_queue<lars::ReportStatusRequest>`,然后一個線程里面有一個loop,來監控消息隊列是否有消息事件過來,如果有消息實現過來,針對每個消息會觸發`thread_report()`方法, 在`thread_report()`中,我們就直接將`lars::ReportStatusRequest`消息存儲到db中。 ? 那么,由誰來給每個線程的`thread_queue`發送消息呢,就是agent/客戶端發送的請求,我們在處理`lars::ID_ReportStatusRequest` 消息分發業務的時候調用`get_report_status()`來觸發。 > lars_reporter/src/reporter_service.cpp ```c void get_report_status(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data) { lars::ReportStatusRequest req; req.ParseFromArray(data, len); //將上報數據存儲到db StoreReport sr; sr.store(req); //輪詢將消息平均發送到每個線程的消息隊列中 static int index = 0; //將消息發送給某個線程消息隊列 reportQueues[index]->send(req); index ++; index = index % thread_cnt; } ``` ? 這里的分發機制,是采用最輪詢的方式,是每個線程依次分配,去調用`thread_queue`的`send()`方法,將消息發送給消息隊列。 ? 最后我們進行測試,效果跟之前的效果是一樣的。我們現在已經集成進來了存儲線程池,現在就不用擔心在處理業務的時候,因為DB等的io阻塞,使cpu得不到充分利用了。 --- ### 關于作者: 作者:`Aceld(劉丹冰)` mail: [danbing.at@gmail.com](mailto:danbing.at@gmail.com) github: [https://github.com/aceld](https://github.com/aceld) 原創書籍: [http://www.hmoore.net/@aceld](http://www.hmoore.net/@aceld) ![](https://img.kancloud.cn/b0/d1/b0d11a21ba62e96aef1c11d5bfff2cf8_227x227.jpg) >**原創聲明:未經作者允許請勿轉載, 如果轉載請注明出處**
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看