<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ??一站式輕松地調用各大LLM模型接口,支持GPT4、智譜、豆包、星火、月之暗面及文生圖、文生視頻 廣告
                Backend Thread的后臺總業務流程如下: ![](https://img.kancloud.cn/9c/da/9cdaea769064e2aecced69365f720928_839x796.png) ### 7.1 數據庫表相關查詢方法實現 ? 我們先實現一些基本的數據表達查詢方法: > lars_dns/src/dns_route.cpp ```c /* * return 0, 表示 加載成功,version沒有改變 * 1, 表示 加載成功,version有改變 * -1 表示 加載失敗 * */ int Route::load_version() { //這里面只會有一條數據 snprintf(_sql, 1000, "SELECT version from RouteVersion WHERE id = 1;"); int ret = mysql_real_query(&_db_conn, _sql, strlen(_sql)); if (ret) { fprintf(stderr, "load version error: %s\n", mysql_error(&_db_conn)); return -1; } MYSQL_RES *result = mysql_store_result(&_db_conn); if (!result) { fprintf(stderr, "mysql store result: %s\n", mysql_error(&_db_conn)); return -1; } long line_num = mysql_num_rows(result); if (line_num == 0) { fprintf(stderr, "No version in table RouteVersion: %s\n", mysql_error(&_db_conn)); return -1; } MYSQL_ROW row = mysql_fetch_row(result); //得到version long new_version = atol(row[0]); if (new_version == this->_version) { //加載成功但是沒有修改 return 0; } this->_version = new_version; printf("now route version is %ld\n", this->_version); mysql_free_result(result); return 1; } //加載RouteData到_temp_pointer int Route::load_route_data() { _temp_pointer->clear(); snprintf(_sql, 100, "SELECT * FROM RouteData;"); int ret = mysql_real_query(&_db_conn, _sql, strlen(_sql)); if (ret) { fprintf(stderr, "load version error: %s\n", mysql_error(&_db_conn)); return -1; } MYSQL_RES *result = mysql_store_result(&_db_conn); if (!result) { fprintf(stderr, "mysql store result: %s\n", mysql_error(&_db_conn)); return -1; } long line_num = mysql_num_rows(result); MYSQL_ROW row; for (long i = 0;i < line_num; ++i) { row = mysql_fetch_row(result); int modid = atoi(row[1]); int cmdid = atoi(row[2]); unsigned ip = atoi(row[3]); int port = atoi(row[4]); uint64_t key = ((uint64_t)modid << 32) + cmdid; uint64_t value = ((uint64_t)ip << 32) + port; (*_temp_pointer)[key].insert(value); } printf("load data to tmep succ! size is %lu\n", _temp_pointer->size()); mysql_free_result(result); return 0; } //將temp_pointer的數據更新到data_pointer void Route::swap() { pthread_rwlock_wrlock(&_map_lock); route_map *temp = _data_pointer; _data_pointer = _temp_pointer; _temp_pointer = temp; pthread_rwlock_unlock(&_map_lock); } //加載RouteChange得到修改的modid/cmdid //將結果放在vector中 void Route::load_changes(std::vector<uint64_t> &change_list) { //讀取當前版本之前的全部修改 snprintf(_sql, 1000, "SELECT modid,cmdid FROM RouteChange WHERE version <= %ld;", _version); int ret = mysql_real_query(&_db_conn, _sql, strlen(_sql)); if (ret) { fprintf(stderr, "mysql_real_query: %s\n", mysql_error(&_db_conn)); return ; } MYSQL_RES *result = mysql_store_result(&_db_conn); if (!result) { fprintf(stderr, "mysql_store_result %s\n", mysql_error(&_db_conn)); return ; } long lineNum = mysql_num_rows(result); if (lineNum == 0) { fprintf(stderr, "No version in table ChangeLog: %s\n", mysql_error(&_db_conn)); return ; } MYSQL_ROW row; for (long i = 0;i < lineNum; ++i) { row = mysql_fetch_row(result); int modid = atoi(row[0]); int cmdid = atoi(row[1]); uint64_t key = (((uint64_t)modid) << 32) + cmdid; change_list.push_back(key); } mysql_free_result(result); } //將RouteChange //刪除RouteChange的全部修改記錄數據,remove_all為全部刪除 //否則默認刪除當前版本之前的全部修改 void Route::remove_changes(bool remove_all) { if (remove_all == false) { snprintf(_sql, 1000, "DELETE FROM RouteChange WHERE version <= %ld;", _version); } else { snprintf(_sql, 1000, "DELETE FROM RouteChange;"); } int ret = mysql_real_query(&_db_conn, _sql, strlen(_sql)); if (ret != 0) { fprintf(stderr, "delete RouteChange: %s\n", mysql_error(&_db_conn)); return ; } return; } ``` 這里面提供了基本的對一些表的加載和刪除操作: `load_version()`:加載當前route信息版本號。 `load_route_data()`:加載`RouteData`信息表,到_temp_pointer中。 `swap()`:將__temp_pointer的表數據同步到_data_temp表中. `load_changes()`:加載RouteChange得到修改的modid/cmdid,將結果放在vector中 `remove_changes()`:清空之前的修改記錄。 ### 7.2 Backend Thread業務流程實現 > lars_dns/src/dns_route.cpp ```c //周期性后端檢查db的route信息的更新變化業務 //backend thread完成 void *check_route_changes(void *args) { int wait_time = 10;//10s自動修改一次,也可以從配置文件讀取 long last_load_time = time(NULL); //清空全部的RouteChange Route::instance()->remove_changes(true); //1 判斷是否有修改 while (true) { sleep(1); long current_time = time(NULL); //1.1 加載RouteVersion得到當前版本號 int ret = Route::instance()->load_version(); if (ret == 1) { //version改版 有modid/cmdid修改 //2 如果有修改 //2.1 將最新的RouteData加載到_temp_pointer中 if (Route::instance()->load_route_data() == 0) { //2.2 更新_temp_pointer數據到_data_pointer map中 Route::instance()->swap(); last_load_time = current_time;//更新最后加載時間 } //2.3 獲取被修改的modid/cmdid對應的訂閱客戶端,進行推送 std::vector<uint64_t> changes; Route::instance()->load_changes(changes); //推送 SubscribeList::instance()->publish(changes); //2.4 刪除當前版本之前的修改記錄 Route::instance()->remove_changes(); } else { //3 如果沒有修改 if (current_time - last_load_time >= wait_time) { //3.1 超時,加載最新的temp_pointer if (Route::instance()->load_route_data() == 0) { //3.2 _temp_pointer數據更新到_data_pointer map中 Route::instance()->swap(); last_load_time = current_time; } } } } return NULL; } ``` ? 該實現與上面流程圖描述的過程一樣。那么`check_route_changes()`我們可以讓一個后臺線程進行承載。 > lars_dns/src/dns_service.cpp ```c int main(int argc, char **argv) { event_loop loop; //加載配置文件 config_file::setPath("conf/lars_dns.conf"); std::string ip = config_file::instance()->GetString("reactor", "ip", "0.0.0.0"); short port = config_file::instance()->GetNumber("reactor", "port", 7778); //創建tcp服務器 server = new tcp_server(&loop, ip.c_str(), port); //注冊鏈接創建/銷毀Hook函數 server->set_conn_start(create_subscribe); server->set_conn_close(clear_subscribe); //注冊路由業務 server->add_msg_router(lars::ID_GetRouteRequest, get_route); // ================================================= //開辟backend thread 周期性檢查db數據庫route信息的更新狀態 pthread_t tid; int ret = pthread_create(&tid, NULL, check_route_changes, NULL); if (ret == -1) { perror("pthread_create backendThread"); exit(1); } //設置分離模式 pthread_detach(tid); // ================================================= //開始事件監聽 printf("lars dns service ....\n"); loop.event_process(); return 0; } ``` ### 7.3 完成dns模塊的訂閱功能測試V0.3 ? 我們提供一個修改一個modid/cmdid的sql語句來觸發訂閱條件,并且讓dns service服務器主動給訂閱的客戶端發送該訂閱消息。 > lars_dns/test/test_insert_dns_route.sql ```sql USE lars_dns; SET @time = UNIX_TIMESTAMP(NOW()); INSERT INTO RouteData(modid, cmdid, serverip, serverport) VALUES(1, 1, 3232235953, 9999); UPDATE RouteVersion SET version = @time WHERE id = 1; INSERT INTO RouteChange(modid, cmdid, version) VALUES(1, 1, @time); ``` 客戶端代碼: > lars_dns/test/lars_dns_test1.cpp ```c #include <string.h> #include <unistd.h> #include <string> #include "lars_reactor.h" #include "lars.pb.h" //命令行參數 struct Option { Option():ip(NULL),port(0) {} char *ip; short port; }; Option option; void Usage() { printf("Usage: ./lars_dns_test -h ip -p port\n"); } //解析命令行 void parse_option(int argc, char **argv) { for (int i = 0; i < argc; i++) { if (strcmp(argv[i], "-h") == 0) { option.ip = argv[i + 1]; } else if (strcmp(argv[i], "-p") == 0) { option.port = atoi(argv[i + 1]); } } if ( !option.ip || !option.port ) { Usage(); exit(1); } } //typedef void (*conn_callback)(net_connection *conn, void *args); void on_connection(net_connection *conn, void *args) { //發送Route信息請求 lars::GetRouteRequest req; req.set_modid(1); req.set_cmdid(1); std::string requestString; req.SerializeToString(&requestString); conn->send_message(requestString.c_str(), requestString.size(), lars::ID_GetRouteRequest); } void deal_get_route(const char *data, uint32_t len, int msgid, net_connection *net_conn, void *user_data) { //解包得到數據 lars::GetRouteResponse rsp; rsp.ParseFromArray(data, len); //打印數據 printf("modid = %d\n", rsp.modid()); printf("cmdid = %d\n", rsp.cmdid()); printf("host_size = %d\n", rsp.host_size()); for (int i = 0; i < rsp.host_size(); i++) { printf("-->ip = %u\n", rsp.host(i).ip()); printf("-->port = %d\n", rsp.host(i).port()); } } int main(int argc, char **argv) { parse_option(argc, argv); event_loop loop; tcp_client *client; //創建客戶端 client = new tcp_client(&loop, option.ip, option.port, "lars_dns_test"); if (client == NULL) { fprintf(stderr, "client == NULL\n"); exit(1); } //客戶端成功建立連接,首先發送請求包 client->set_conn_start(on_connection); //設置服務端回應包處理業務 client->add_msg_router(lars::ID_GetRouteResponse, deal_get_route); loop.event_process(); return 0; } ``` 啟動dns_server: ```bash $ ./bin/lars_dns msg_router init... create 0 thread create 1 thread create 2 thread create 3 thread create 4 thread add msg cb msgid = 1 lars dns service .... now route version is 1571058286 modID = 1, cmdID = 1, ip = 3232235953, port = 9999 ``` 啟動客戶端: ```bash $ ./lars_dns_test1 -h 127.0.0.1 -p 7778 msg_router init... do_connect EINPROGRESS add msg cb msgid = 2 connect 127.0.0.1:7778 succ! modid = 1 cmdid = 1 host_size = 1 -->ip = 3232235953 -->port = 9999 ``` 我們知道,第一請求modid/cmdid就會訂閱該Route模塊。 然后我們通過外界修改modid=1,cmdid=1的模塊,新開一個終端,執行test_insert_dns_route.sql ```bash Type 'help;' or '\h' for help. Type '\c' to clear the current input statement. mysql> use lars_dns; Reading table information for completion of table and column names You can turn off this feature to get a quicker startup with -A Database changed mysql> \. test_insert_dns_route.sql Database changed Query OK, 0 rows affected (0.00 sec) Query OK, 1 row affected (0.01 sec) Query OK, 1 row affected (0.01 sec) Rows matched: 1 Changed: 1 Warnings: 0 Query OK, 1 row affected (0.02 sec) mysql> ``` 然后我會會發現客戶端已經得到一個新的消息,就是最新的route數據過來。是由dns_service主動推送過來的訂閱消息. 客戶端: ```bash $ ./lars_dns_test1 -h 127.0.0.1 -p 7778 msg_router init... do_connect EINPROGRESS add msg cb msgid = 2 connect 127.0.0.1:7778 succ! modid = 1 cmdid = 1 host_size = 1 -->ip = 3232235953 -->port = 9999 modid = 1 cmdid = 1 host_size = 1 -->ip = 3232235953 -->port = 9999 ``` ? 這樣我們的訂閱功能就完成了,整體的lars_dns模塊的工作到此的基本需求全部也已經滿足。 --- ### 關于作者: 作者:`Aceld(劉丹冰)` mail: [danbing.at@gmail.com](mailto:danbing.at@gmail.com) github: [https://github.com/aceld](https://github.com/aceld) 原創書籍: [http://www.hmoore.net/@aceld](http://www.hmoore.net/@aceld) ![](https://img.kancloud.cn/b0/d1/b0d11a21ba62e96aef1c11d5bfff2cf8_227x227.jpg) >**原創聲明:未經作者允許請勿轉載, 如果轉載請注明出處**
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看