## 6) Route訂閱模式
### 6.1 訂閱模塊的設計與實現
? 訂閱模式整體的設計.
> lars_dns/include/subscribe.h
```c
#pragma once
#include <vector>
#include <pthread.h>
#include <ext/hash_set>
#include <ext/hash_map>
#include "lars_reactor.h"
#include "lars.pb.h"
#include "dns_route.h"
using namespace __gnu_cxx;
//定義訂閱列表數據關系類型,key->modid/cmdid, value->fds(訂閱的客戶端文件描述符)
typedef hash_map<uint64_t, hash_set<int>> subscribe_map;
//定義發布列表的數據關系類型, key->fd(訂閱客戶端的文件描述符), value->modids
typedef hash_map<int, hash_set<uint64_t>> publish_map;
class SubscribeList {
public:
//設計單例
static void init() {
_instance = new SubscribeList();
}
static SubscribeList *instance() {
//保證init方法在這個進程執行中,只執行一次
pthread_once(&_once, init);
return _instance;
}
//訂閱
void subscribe(uint64_t mod, int fd);
//取消訂閱
void unsubscribe(uint64_t mod, int fd);
//發布
void publish(std::vector<uint64_t> &change_mods);
//根據在線用戶fd得到需要發布的列表
void make_publish_map(listen_fd_set &online_fds,
publish_map &need_publish);
private:
//設計單例
SubscribeList();
SubscribeList(const SubscribeList &);
const SubscribeList& operator=(const SubscribeList);
static SubscribeList *_instance;
static pthread_once_t _once;
subscribe_map _book_list; //訂閱清單
pthread_mutex_t _book_list_lock;
publish_map _push_list; //發布清單
pthread_mutex_t _push_list_lock;
};
```
? 首先`SubscribeList`采用單例設計。這里面定義了兩種數據類型
```c
//定義訂閱列表數據關系類型,key->modid/cmdid, value->fds(訂閱的客戶端文件描述符)
typedef hash_map<uint64_t, hash_set<int>> subscribe_map;
//定義發布列表的數據關系類型, key->fd(訂閱客戶端的文件描述符), value->modids
typedef hash_map<int, hash_set<uint64_t>> publish_map;
```
? `subscribe_map`是目前dns系統的總體訂閱列表,記錄了訂閱的modid/cmdid都有哪些fds已經訂閱了,其實一個fd就代表一個客戶端。
? `publish_map`是即將發布的表,其實這里面是subscribe_map的一個反表,key是訂閱的客戶端fd,而value是該客戶端需要接收的訂閱modid/cmdid數據。
**屬性**:
`_book_list`:目前dns已經全部的訂閱信息清單。
`_push_list`:目前dns即將發布的客戶端及訂閱信息清單。
**方法**
`void subscribe(uint64_t mod, int fd)`: 加入modid/cmdid 和訂閱的客戶端fd到_book_list中。
`void unsubscribe(uint64_t mod, int fd)`:取消一條訂閱數據。
`void publish(std::vector<uint64_t> &change_mods)`: 發布訂閱數據,其中change_mods是需要發布的那些modid/cmdid組合。
`void make_publish_map(listen_fd_set &online_fds, publish_map &need_publish)`: 根據目前在線的訂閱用戶,得到需要通信的發布訂閱列表。
具體實現如下:
> lars_dns/src/subscribe.cpp
```c
#include "subscribe.h"
extern tcp_server *server;
//單例對象
SubscribeList *SubscribeList::_instance = NULL;
//用于保證創建單例的init方法只執行一次的鎖
pthread_once_t SubscribeList::_once = PTHREAD_ONCE_INIT;
SubscribeList::SubscribeList()
{
}
//訂閱
void SubscribeList::subscribe(uint64_t mod, int fd)
{
//將mod->fd的關系加入到_book_list中
pthread_mutex_lock(&_book_list_lock);
_book_list[mod].insert(fd);
pthread_mutex_unlock(&_book_list_lock);
}
//取消訂閱
void SubscribeList::unsubscribe(uint64_t mod, int fd)
{
//將mod->fd關系從_book_list中刪除
pthread_mutex_lock(&_book_list_lock);
if (_book_list.find(mod) != _book_list.end()) {
_book_list[mod].erase(fd);
if (_book_list[mod].empty() == true) {
_book_list.erase(mod);
}
}
pthread_mutex_unlock(&_book_list_lock);
}
void push_change_task(event_loop *loop, void *args)
{
SubscribeList *subscribe = (SubscribeList*)args;
//1 獲取全部的在線客戶端fd
listen_fd_set online_fds;
loop->get_listen_fds(online_fds);
//2 從subscribe的_push_list中 找到與online_fds集合匹配,放在一個新的publish_map里
publish_map need_publish;
subscribe->make_publish_map(online_fds, need_publish);
//3 依次從need_publish取出數據 發送給對應客戶端鏈接
publish_map::iterator it;
for (it = need_publish.begin(); it != need_publish.end(); it++) {
int fd = it->first; //fd
//遍歷 fd對應的 modid/cmdid集合
hash_set<uint64_t>::iterator st;
for (st = it->second.begin(); st != it->second.end(); st++) {
//一個modid/cmdid
int modid = int((*st) >> 32);
int cmdid = int(*st);
//組裝pb消息,發送給客戶
lars::GetRouteResponse rsp;
rsp.set_modid(modid);
rsp.set_cmdid(cmdid);
//通過route查詢對應的host ip/port信息 進行組裝
host_set hosts = Route::instance()->get_hosts(modid, cmdid) ;
for (host_set_it hit = hosts.begin(); hit != hosts.end(); hit++) {
uint64_t ip_port_pair = *hit;
lars::HostInfo host_info;
host_info.set_ip((uint32_t)(ip_port_pair >> 32));
host_info.set_port((int)ip_port_pair);
//添加到rsp中
rsp.add_host()->CopyFrom(host_info);
}
//給當前fd 發送一個更新消息
std::string responseString;
rsp.SerializeToString(&responseString);
//通過fd取出鏈接信息
net_connection *conn = tcp_server::conns[fd];
if (conn != NULL) {
conn->send_message(responseString.c_str(), responseString.size(), lars::ID_GetRouteResponse);
}
}
}
}
//根據在線用戶fd得到需要發布的列表
void SubscribeList::make_publish_map(
listen_fd_set &online_fds,
publish_map &need_publish)
{
publish_map::iterator it;
pthread_mutex_lock(&_push_list_lock);
//遍歷_push_list 找到 online_fds匹配的數據,放到need_publish中
for (it = _push_list.begin(); it != _push_list.end(); it++) {
//it->first 是 fd
//it->second 是 modid/cmdid
if (online_fds.find(it->first) != online_fds.end()) {
//匹配到
//當前的鍵值對移動到need_publish中
need_publish[it->first] = _push_list[it->first];
//當該組數據從_push_list中刪除掉
_push_list.erase(it);
}
}
pthread_mutex_unlock(&_push_list_lock);
}
//發布
void SubscribeList::publish(std::vector<uint64_t> &change_mods)
{
//1 將change_mods已經修改的mod->fd
// 放到 發布清單_push_list中
pthread_mutex_lock(&_book_list_lock);
pthread_mutex_lock(&_push_list_lock);
std::vector<uint64_t>::iterator it;
for (it = change_mods.begin(); it != change_mods.end(); it++) {
uint64_t mod = *it;
if (_book_list.find(mod) != _book_list.end()) {
//將mod下面的fd set集合拷遷移到 _push_list中
hash_set<int>::iterator fds_it;
for (fds_it = _book_list[mod].begin(); fds_it != _book_list[mod].end(); fds_it++) {
int fd = *fds_it;
_push_list[fd].insert(mod);
}
}
}
pthread_mutex_unlock(&_push_list_lock);
pthread_mutex_unlock(&_book_list_lock);
//2 通知各個線程去執行推送任務
server->thread_poll()->send_task(push_change_task, this);
}
```
? 這里需要注意的是`publish()`里的server變量是全局變量,全局唯一的server句柄。
### 6.2 開啟訂閱
? 那么訂閱功能實現了,該如何是調用觸發訂閱功能能,我們可以在一個客戶端建立連接成功之后來調用.
> lars_dns/src/dns_service.cpp
```c
#include <ext/hash_set>
#include "lars_reactor.h"
#include "subscribe.h"
#include "dns_route.h"
#include "lars.pb.h"
tcp_server *server;
using __gnu_cxx::hash_set;
typedef hash_set<uint64_t> client_sub_mod_list;
// ...
//訂閱route 的modid/cmdid
void create_subscribe(net_connection * conn, void *args)
{
conn->param = new client_sub_mod_list;
}
//退訂route 的modid/cmdid
void clear_subscribe(net_connection * conn, void *args)
{
client_sub_mod_list::iterator it;
client_sub_mod_list *sub_list = (client_sub_mod_list*)conn->param;
for (it = sub_list->begin(); it != sub_list->end(); it++) {
uint64_t mod = *it;
SubscribeList::instance()->unsubscribe(mod, conn->get_fd());
}
delete sub_list;
conn->param = NULL;
}
int main(int argc, char **argv)
{
event_loop loop;
//加載配置文件
config_file::setPath("conf/lars_dns.conf");
std::string ip = config_file::instance()->GetString("reactor", "ip", "0.0.0.0");
short port = config_file::instance()->GetNumber("reactor", "port", 7778);
//創建tcp服務器
server = new tcp_server(&loop, ip.c_str(), port);
//==========注冊鏈接創建/銷毀Hook函數============
server->set_conn_start(create_subscribe);
server->set_conn_close(clear_subscribe);
//============================================
//注冊路由業務
server->add_msg_router(lars::ID_GetRouteRequest, get_route);
//開始事件監聽
printf("lars dns service ....\n");
loop.event_process();
return 0;
}
```
? 這里注冊了兩個鏈接Hook。`create_subscribe()`和`clear_subscribe()`。
`client_sub_mod_list`為當前客戶端鏈接所訂閱的route信息列表。主要存放當前客戶訂閱的modid/cmdid的集合。因為不同的客戶端訂閱的信息不同,所以要將該列表與每個conn進行綁定。
---
### 關于作者:
作者:`Aceld(劉丹冰)`
mail: [danbing.at@gmail.com](mailto:danbing.at@gmail.com)
github: [https://github.com/aceld](https://github.com/aceld)
原創書籍: [http://www.hmoore.net/@aceld](http://www.hmoore.net/@aceld)

>**原創聲明:未經作者允許請勿轉載, 如果轉載請注明出處**
- 一、Lars系統概述
- 第1章-概述
- 第2章-項目目錄構建
- 二、Reactor模型服務器框架
- 第1章-項目結構與V0.1雛形
- 第2章-內存管理與Buffer封裝
- 第3章-事件觸發EventLoop
- 第4章-鏈接與消息封裝
- 第5章-Client客戶端模型
- 第6章-連接管理及限制
- 第7章-消息業務路由分發機制
- 第8章-鏈接創建/銷毀Hook機制
- 第9章-消息任務隊列與線程池
- 第10章-配置文件讀寫功能
- 第11章-udp服務與客戶端
- 第12章-數據傳輸協議protocol buffer
- 第13章-QPS性能測試
- 第14章-異步消息任務機制
- 第15章-鏈接屬性設置功能
- 三、Lars系統之DNSService
- 第1章-Lars-dns簡介
- 第2章-數據庫創建
- 第3章-項目目錄結構及環境構建
- 第4章-Route結構的定義
- 第5章-獲取Route信息
- 第6章-Route訂閱模式
- 第7章-Backend Thread實時監控
- 四、Lars系統之Report Service
- 第1章-項目概述-數據表及proto3協議定義
- 第2章-獲取report上報數據
- 第3章-存儲線程池及消息隊列
- 五、Lars系統之LoadBalance Agent
- 第1章-項目概述及構建
- 第2章-主模塊業務結構搭建
- 第3章-Report與Dns Client設計與實現
- 第4章-負載均衡模塊基礎設計
- 第5章-負載均衡獲取Host主機信息API
- 第6章-負載均衡上報Host主機信息API
- 第7章-過期窗口清理與過載超時(V0.5)
- 第8章-定期拉取最新路由信息(V0.6)
- 第9章-負載均衡獲取Route信息API(0.7)
- 第10章-API初始化接口(V0.8)
- 第11章-Lars Agent性能測試工具
- 第12章- Lars啟動工具腳本