Backend Thread的后臺總業務流程如下:

### 7.1 數據庫表相關查詢方法實現
? 我們先實現一些基本的數據表達查詢方法:
> lars_dns/src/dns_route.cpp
```c
/*
* return 0, 表示 加載成功,version沒有改變
* 1, 表示 加載成功,version有改變
* -1 表示 加載失敗
* */
int Route::load_version()
{
//這里面只會有一條數據
snprintf(_sql, 1000, "SELECT version from RouteVersion WHERE id = 1;");
int ret = mysql_real_query(&_db_conn, _sql, strlen(_sql));
if (ret)
{
fprintf(stderr, "load version error: %s\n", mysql_error(&_db_conn));
return -1;
}
MYSQL_RES *result = mysql_store_result(&_db_conn);
if (!result)
{
fprintf(stderr, "mysql store result: %s\n", mysql_error(&_db_conn));
return -1;
}
long line_num = mysql_num_rows(result);
if (line_num == 0)
{
fprintf(stderr, "No version in table RouteVersion: %s\n", mysql_error(&_db_conn));
return -1;
}
MYSQL_ROW row = mysql_fetch_row(result);
//得到version
long new_version = atol(row[0]);
if (new_version == this->_version)
{
//加載成功但是沒有修改
return 0;
}
this->_version = new_version;
printf("now route version is %ld\n", this->_version);
mysql_free_result(result);
return 1;
}
//加載RouteData到_temp_pointer
int Route::load_route_data()
{
_temp_pointer->clear();
snprintf(_sql, 100, "SELECT * FROM RouteData;");
int ret = mysql_real_query(&_db_conn, _sql, strlen(_sql));
if (ret)
{
fprintf(stderr, "load version error: %s\n", mysql_error(&_db_conn));
return -1;
}
MYSQL_RES *result = mysql_store_result(&_db_conn);
if (!result)
{
fprintf(stderr, "mysql store result: %s\n", mysql_error(&_db_conn));
return -1;
}
long line_num = mysql_num_rows(result);
MYSQL_ROW row;
for (long i = 0;i < line_num; ++i)
{
row = mysql_fetch_row(result);
int modid = atoi(row[1]);
int cmdid = atoi(row[2]);
unsigned ip = atoi(row[3]);
int port = atoi(row[4]);
uint64_t key = ((uint64_t)modid << 32) + cmdid;
uint64_t value = ((uint64_t)ip << 32) + port;
(*_temp_pointer)[key].insert(value);
}
printf("load data to tmep succ! size is %lu\n", _temp_pointer->size());
mysql_free_result(result);
return 0;
}
//將temp_pointer的數據更新到data_pointer
void Route::swap()
{
pthread_rwlock_wrlock(&_map_lock);
route_map *temp = _data_pointer;
_data_pointer = _temp_pointer;
_temp_pointer = temp;
pthread_rwlock_unlock(&_map_lock);
}
//加載RouteChange得到修改的modid/cmdid
//將結果放在vector中
void Route::load_changes(std::vector<uint64_t> &change_list)
{
//讀取當前版本之前的全部修改
snprintf(_sql, 1000, "SELECT modid,cmdid FROM RouteChange WHERE version <= %ld;", _version);
int ret = mysql_real_query(&_db_conn, _sql, strlen(_sql));
if (ret)
{
fprintf(stderr, "mysql_real_query: %s\n", mysql_error(&_db_conn));
return ;
}
MYSQL_RES *result = mysql_store_result(&_db_conn);
if (!result)
{
fprintf(stderr, "mysql_store_result %s\n", mysql_error(&_db_conn));
return ;
}
long lineNum = mysql_num_rows(result);
if (lineNum == 0)
{
fprintf(stderr, "No version in table ChangeLog: %s\n", mysql_error(&_db_conn));
return ;
}
MYSQL_ROW row;
for (long i = 0;i < lineNum; ++i)
{
row = mysql_fetch_row(result);
int modid = atoi(row[0]);
int cmdid = atoi(row[1]);
uint64_t key = (((uint64_t)modid) << 32) + cmdid;
change_list.push_back(key);
}
mysql_free_result(result);
}
//將RouteChange
//刪除RouteChange的全部修改記錄數據,remove_all為全部刪除
//否則默認刪除當前版本之前的全部修改
void Route::remove_changes(bool remove_all)
{
if (remove_all == false)
{
snprintf(_sql, 1000, "DELETE FROM RouteChange WHERE version <= %ld;", _version);
}
else
{
snprintf(_sql, 1000, "DELETE FROM RouteChange;");
}
int ret = mysql_real_query(&_db_conn, _sql, strlen(_sql));
if (ret != 0)
{
fprintf(stderr, "delete RouteChange: %s\n", mysql_error(&_db_conn));
return ;
}
return;
}
```
這里面提供了基本的對一些表的加載和刪除操作:
`load_version()`:加載當前route信息版本號。
`load_route_data()`:加載`RouteData`信息表,到_temp_pointer中。
`swap()`:將__temp_pointer的表數據同步到_data_temp表中.
`load_changes()`:加載RouteChange得到修改的modid/cmdid,將結果放在vector中
`remove_changes()`:清空之前的修改記錄。
### 7.2 Backend Thread業務流程實現
> lars_dns/src/dns_route.cpp
```c
//周期性后端檢查db的route信息的更新變化業務
//backend thread完成
void *check_route_changes(void *args)
{
int wait_time = 10;//10s自動修改一次,也可以從配置文件讀取
long last_load_time = time(NULL);
//清空全部的RouteChange
Route::instance()->remove_changes(true);
//1 判斷是否有修改
while (true) {
sleep(1);
long current_time = time(NULL);
//1.1 加載RouteVersion得到當前版本號
int ret = Route::instance()->load_version();
if (ret == 1) {
//version改版 有modid/cmdid修改
//2 如果有修改
//2.1 將最新的RouteData加載到_temp_pointer中
if (Route::instance()->load_route_data() == 0) {
//2.2 更新_temp_pointer數據到_data_pointer map中
Route::instance()->swap();
last_load_time = current_time;//更新最后加載時間
}
//2.3 獲取被修改的modid/cmdid對應的訂閱客戶端,進行推送
std::vector<uint64_t> changes;
Route::instance()->load_changes(changes);
//推送
SubscribeList::instance()->publish(changes);
//2.4 刪除當前版本之前的修改記錄
Route::instance()->remove_changes();
}
else {
//3 如果沒有修改
if (current_time - last_load_time >= wait_time) {
//3.1 超時,加載最新的temp_pointer
if (Route::instance()->load_route_data() == 0) {
//3.2 _temp_pointer數據更新到_data_pointer map中
Route::instance()->swap();
last_load_time = current_time;
}
}
}
}
return NULL;
}
```
? 該實現與上面流程圖描述的過程一樣。那么`check_route_changes()`我們可以讓一個后臺線程進行承載。
> lars_dns/src/dns_service.cpp
```c
int main(int argc, char **argv)
{
event_loop loop;
//加載配置文件
config_file::setPath("conf/lars_dns.conf");
std::string ip = config_file::instance()->GetString("reactor", "ip", "0.0.0.0");
short port = config_file::instance()->GetNumber("reactor", "port", 7778);
//創建tcp服務器
server = new tcp_server(&loop, ip.c_str(), port);
//注冊鏈接創建/銷毀Hook函數
server->set_conn_start(create_subscribe);
server->set_conn_close(clear_subscribe);
//注冊路由業務
server->add_msg_router(lars::ID_GetRouteRequest, get_route);
// =================================================
//開辟backend thread 周期性檢查db數據庫route信息的更新狀態
pthread_t tid;
int ret = pthread_create(&tid, NULL, check_route_changes, NULL);
if (ret == -1) {
perror("pthread_create backendThread");
exit(1);
}
//設置分離模式
pthread_detach(tid);
// =================================================
//開始事件監聽
printf("lars dns service ....\n");
loop.event_process();
return 0;
}
```
### 7.3 完成dns模塊的訂閱功能測試V0.3
? 我們提供一個修改一個modid/cmdid的sql語句來觸發訂閱條件,并且讓dns service服務器主動給訂閱的客戶端發送該訂閱消息。
> lars_dns/test/test_insert_dns_route.sql
```sql
USE lars_dns;
SET @time = UNIX_TIMESTAMP(NOW());
INSERT INTO RouteData(modid, cmdid, serverip, serverport) VALUES(1, 1, 3232235953, 9999);
UPDATE RouteVersion SET version = @time WHERE id = 1;
INSERT INTO RouteChange(modid, cmdid, version) VALUES(1, 1, @time);
```
客戶端代碼:
> lars_dns/test/lars_dns_test1.cpp
```c
#include <string.h>
#include <unistd.h>
#include <string>
#include "lars_reactor.h"
#include "lars.pb.h"
//命令行參數
struct Option
{
Option():ip(NULL),port(0) {}
char *ip;
short port;
};
Option option;
void Usage() {
printf("Usage: ./lars_dns_test -h ip -p port\n");
}
//解析命令行
void parse_option(int argc, char **argv)
{
for (int i = 0; i < argc; i++) {
if (strcmp(argv[i], "-h") == 0) {
option.ip = argv[i + 1];
}
else if (strcmp(argv[i], "-p") == 0) {
option.port = atoi(argv[i + 1]);
}
}
if ( !option.ip || !option.port ) {
Usage();
exit(1);
}
}
//typedef void (*conn_callback)(net_connection *conn, void *args);
void on_connection(net_connection *conn, void *args)
{
//發送Route信息請求
lars::GetRouteRequest req;
req.set_modid(1);
req.set_cmdid(1);
std::string requestString;
req.SerializeToString(&requestString);
conn->send_message(requestString.c_str(), requestString.size(), lars::ID_GetRouteRequest);
}
void deal_get_route(const char *data, uint32_t len, int msgid, net_connection *net_conn, void *user_data)
{
//解包得到數據
lars::GetRouteResponse rsp;
rsp.ParseFromArray(data, len);
//打印數據
printf("modid = %d\n", rsp.modid());
printf("cmdid = %d\n", rsp.cmdid());
printf("host_size = %d\n", rsp.host_size());
for (int i = 0; i < rsp.host_size(); i++) {
printf("-->ip = %u\n", rsp.host(i).ip());
printf("-->port = %d\n", rsp.host(i).port());
}
}
int main(int argc, char **argv)
{
parse_option(argc, argv);
event_loop loop;
tcp_client *client;
//創建客戶端
client = new tcp_client(&loop, option.ip, option.port, "lars_dns_test");
if (client == NULL) {
fprintf(stderr, "client == NULL\n");
exit(1);
}
//客戶端成功建立連接,首先發送請求包
client->set_conn_start(on_connection);
//設置服務端回應包處理業務
client->add_msg_router(lars::ID_GetRouteResponse, deal_get_route);
loop.event_process();
return 0;
}
```
啟動dns_server:
```bash
$ ./bin/lars_dns
msg_router init...
create 0 thread
create 1 thread
create 2 thread
create 3 thread
create 4 thread
add msg cb msgid = 1
lars dns service ....
now route version is 1571058286 modID = 1, cmdID = 1, ip = 3232235953, port = 9999
```
啟動客戶端:
```bash
$ ./lars_dns_test1 -h 127.0.0.1 -p 7778
msg_router init...
do_connect EINPROGRESS
add msg cb msgid = 2
connect 127.0.0.1:7778 succ!
modid = 1
cmdid = 1
host_size = 1
-->ip = 3232235953
-->port = 9999
```
我們知道,第一請求modid/cmdid就會訂閱該Route模塊。
然后我們通過外界修改modid=1,cmdid=1的模塊,新開一個終端,執行test_insert_dns_route.sql
```bash
Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.
mysql> use lars_dns;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A
Database changed
mysql> \. test_insert_dns_route.sql
Database changed
Query OK, 0 rows affected (0.00 sec)
Query OK, 1 row affected (0.01 sec)
Query OK, 1 row affected (0.01 sec)
Rows matched: 1 Changed: 1 Warnings: 0
Query OK, 1 row affected (0.02 sec)
mysql>
```
然后我會會發現客戶端已經得到一個新的消息,就是最新的route數據過來。是由dns_service主動推送過來的訂閱消息.
客戶端:
```bash
$ ./lars_dns_test1 -h 127.0.0.1 -p 7778
msg_router init...
do_connect EINPROGRESS
add msg cb msgid = 2
connect 127.0.0.1:7778 succ!
modid = 1
cmdid = 1
host_size = 1
-->ip = 3232235953
-->port = 9999
modid = 1
cmdid = 1
host_size = 1
-->ip = 3232235953
-->port = 9999
```
? 這樣我們的訂閱功能就完成了,整體的lars_dns模塊的工作到此的基本需求全部也已經滿足。
---
### 關于作者:
作者:`Aceld(劉丹冰)`
mail: [danbing.at@gmail.com](mailto:danbing.at@gmail.com)
github: [https://github.com/aceld](https://github.com/aceld)
原創書籍: [http://www.hmoore.net/@aceld](http://www.hmoore.net/@aceld)

>**原創聲明:未經作者允許請勿轉載, 如果轉載請注明出處**
- 一、Lars系統概述
- 第1章-概述
- 第2章-項目目錄構建
- 二、Reactor模型服務器框架
- 第1章-項目結構與V0.1雛形
- 第2章-內存管理與Buffer封裝
- 第3章-事件觸發EventLoop
- 第4章-鏈接與消息封裝
- 第5章-Client客戶端模型
- 第6章-連接管理及限制
- 第7章-消息業務路由分發機制
- 第8章-鏈接創建/銷毀Hook機制
- 第9章-消息任務隊列與線程池
- 第10章-配置文件讀寫功能
- 第11章-udp服務與客戶端
- 第12章-數據傳輸協議protocol buffer
- 第13章-QPS性能測試
- 第14章-異步消息任務機制
- 第15章-鏈接屬性設置功能
- 三、Lars系統之DNSService
- 第1章-Lars-dns簡介
- 第2章-數據庫創建
- 第3章-項目目錄結構及環境構建
- 第4章-Route結構的定義
- 第5章-獲取Route信息
- 第6章-Route訂閱模式
- 第7章-Backend Thread實時監控
- 四、Lars系統之Report Service
- 第1章-項目概述-數據表及proto3協議定義
- 第2章-獲取report上報數據
- 第3章-存儲線程池及消息隊列
- 五、Lars系統之LoadBalance Agent
- 第1章-項目概述及構建
- 第2章-主模塊業務結構搭建
- 第3章-Report與Dns Client設計與實現
- 第4章-負載均衡模塊基礎設計
- 第5章-負載均衡獲取Host主機信息API
- 第6章-負載均衡上報Host主機信息API
- 第7章-過期窗口清理與過載超時(V0.5)
- 第8章-定期拉取最新路由信息(V0.6)
- 第9章-負載均衡獲取Route信息API(0.7)
- 第10章-API初始化接口(V0.8)
- 第11章-Lars Agent性能測試工具
- 第12章- Lars啟動工具腳本