<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                企業??AI智能體構建引擎,智能編排和調試,一鍵部署,支持知識庫和私有化部署方案 廣告
                ## 6) Route訂閱模式 ### 6.1 訂閱模塊的設計與實現 ? 訂閱模式整體的設計. > lars_dns/include/subscribe.h ```c #pragma once #include <vector> #include <pthread.h> #include <ext/hash_set> #include <ext/hash_map> #include "lars_reactor.h" #include "lars.pb.h" #include "dns_route.h" using namespace __gnu_cxx; //定義訂閱列表數據關系類型,key->modid/cmdid, value->fds(訂閱的客戶端文件描述符) typedef hash_map<uint64_t, hash_set<int>> subscribe_map; //定義發布列表的數據關系類型, key->fd(訂閱客戶端的文件描述符), value->modids typedef hash_map<int, hash_set<uint64_t>> publish_map; class SubscribeList { public: //設計單例 static void init() { _instance = new SubscribeList(); } static SubscribeList *instance() { //保證init方法在這個進程執行中,只執行一次 pthread_once(&_once, init); return _instance; } //訂閱 void subscribe(uint64_t mod, int fd); //取消訂閱 void unsubscribe(uint64_t mod, int fd); //發布 void publish(std::vector<uint64_t> &change_mods); //根據在線用戶fd得到需要發布的列表 void make_publish_map(listen_fd_set &online_fds, publish_map &need_publish); private: //設計單例 SubscribeList(); SubscribeList(const SubscribeList &); const SubscribeList& operator=(const SubscribeList); static SubscribeList *_instance; static pthread_once_t _once; subscribe_map _book_list; //訂閱清單 pthread_mutex_t _book_list_lock; publish_map _push_list; //發布清單 pthread_mutex_t _push_list_lock; }; ``` ? 首先`SubscribeList`采用單例設計。這里面定義了兩種數據類型 ```c //定義訂閱列表數據關系類型,key->modid/cmdid, value->fds(訂閱的客戶端文件描述符) typedef hash_map<uint64_t, hash_set<int>> subscribe_map; //定義發布列表的數據關系類型, key->fd(訂閱客戶端的文件描述符), value->modids typedef hash_map<int, hash_set<uint64_t>> publish_map; ``` ? `subscribe_map`是目前dns系統的總體訂閱列表,記錄了訂閱的modid/cmdid都有哪些fds已經訂閱了,其實一個fd就代表一個客戶端。 ? `publish_map`是即將發布的表,其實這里面是subscribe_map的一個反表,key是訂閱的客戶端fd,而value是該客戶端需要接收的訂閱modid/cmdid數據。 **屬性**: `_book_list`:目前dns已經全部的訂閱信息清單。 `_push_list`:目前dns即將發布的客戶端及訂閱信息清單。 **方法** `void subscribe(uint64_t mod, int fd)`: 加入modid/cmdid 和訂閱的客戶端fd到_book_list中。 `void unsubscribe(uint64_t mod, int fd)`:取消一條訂閱數據。 `void publish(std::vector<uint64_t> &change_mods)`: 發布訂閱數據,其中change_mods是需要發布的那些modid/cmdid組合。 `void make_publish_map(listen_fd_set &online_fds, publish_map &need_publish)`: 根據目前在線的訂閱用戶,得到需要通信的發布訂閱列表。 具體實現如下: > lars_dns/src/subscribe.cpp ```c #include "subscribe.h" extern tcp_server *server; //單例對象 SubscribeList *SubscribeList::_instance = NULL; //用于保證創建單例的init方法只執行一次的鎖 pthread_once_t SubscribeList::_once = PTHREAD_ONCE_INIT; SubscribeList::SubscribeList() { } //訂閱 void SubscribeList::subscribe(uint64_t mod, int fd) { //將mod->fd的關系加入到_book_list中 pthread_mutex_lock(&_book_list_lock); _book_list[mod].insert(fd); pthread_mutex_unlock(&_book_list_lock); } //取消訂閱 void SubscribeList::unsubscribe(uint64_t mod, int fd) { //將mod->fd關系從_book_list中刪除 pthread_mutex_lock(&_book_list_lock); if (_book_list.find(mod) != _book_list.end()) { _book_list[mod].erase(fd); if (_book_list[mod].empty() == true) { _book_list.erase(mod); } } pthread_mutex_unlock(&_book_list_lock); } void push_change_task(event_loop *loop, void *args) { SubscribeList *subscribe = (SubscribeList*)args; //1 獲取全部的在線客戶端fd listen_fd_set online_fds; loop->get_listen_fds(online_fds); //2 從subscribe的_push_list中 找到與online_fds集合匹配,放在一個新的publish_map里 publish_map need_publish; subscribe->make_publish_map(online_fds, need_publish); //3 依次從need_publish取出數據 發送給對應客戶端鏈接 publish_map::iterator it; for (it = need_publish.begin(); it != need_publish.end(); it++) { int fd = it->first; //fd //遍歷 fd對應的 modid/cmdid集合 hash_set<uint64_t>::iterator st; for (st = it->second.begin(); st != it->second.end(); st++) { //一個modid/cmdid int modid = int((*st) >> 32); int cmdid = int(*st); //組裝pb消息,發送給客戶 lars::GetRouteResponse rsp; rsp.set_modid(modid); rsp.set_cmdid(cmdid); //通過route查詢對應的host ip/port信息 進行組裝 host_set hosts = Route::instance()->get_hosts(modid, cmdid) ; for (host_set_it hit = hosts.begin(); hit != hosts.end(); hit++) { uint64_t ip_port_pair = *hit; lars::HostInfo host_info; host_info.set_ip((uint32_t)(ip_port_pair >> 32)); host_info.set_port((int)ip_port_pair); //添加到rsp中 rsp.add_host()->CopyFrom(host_info); } //給當前fd 發送一個更新消息 std::string responseString; rsp.SerializeToString(&responseString); //通過fd取出鏈接信息 net_connection *conn = tcp_server::conns[fd]; if (conn != NULL) { conn->send_message(responseString.c_str(), responseString.size(), lars::ID_GetRouteResponse); } } } } //根據在線用戶fd得到需要發布的列表 void SubscribeList::make_publish_map( listen_fd_set &online_fds, publish_map &need_publish) { publish_map::iterator it; pthread_mutex_lock(&_push_list_lock); //遍歷_push_list 找到 online_fds匹配的數據,放到need_publish中 for (it = _push_list.begin(); it != _push_list.end(); it++) { //it->first 是 fd //it->second 是 modid/cmdid if (online_fds.find(it->first) != online_fds.end()) { //匹配到 //當前的鍵值對移動到need_publish中 need_publish[it->first] = _push_list[it->first]; //當該組數據從_push_list中刪除掉 _push_list.erase(it); } } pthread_mutex_unlock(&_push_list_lock); } //發布 void SubscribeList::publish(std::vector<uint64_t> &change_mods) { //1 將change_mods已經修改的mod->fd // 放到 發布清單_push_list中 pthread_mutex_lock(&_book_list_lock); pthread_mutex_lock(&_push_list_lock); std::vector<uint64_t>::iterator it; for (it = change_mods.begin(); it != change_mods.end(); it++) { uint64_t mod = *it; if (_book_list.find(mod) != _book_list.end()) { //將mod下面的fd set集合拷遷移到 _push_list中 hash_set<int>::iterator fds_it; for (fds_it = _book_list[mod].begin(); fds_it != _book_list[mod].end(); fds_it++) { int fd = *fds_it; _push_list[fd].insert(mod); } } } pthread_mutex_unlock(&_push_list_lock); pthread_mutex_unlock(&_book_list_lock); //2 通知各個線程去執行推送任務 server->thread_poll()->send_task(push_change_task, this); } ``` ? 這里需要注意的是`publish()`里的server變量是全局變量,全局唯一的server句柄。 ### 6.2 開啟訂閱 ? 那么訂閱功能實現了,該如何是調用觸發訂閱功能能,我們可以在一個客戶端建立連接成功之后來調用. > lars_dns/src/dns_service.cpp ```c #include <ext/hash_set> #include "lars_reactor.h" #include "subscribe.h" #include "dns_route.h" #include "lars.pb.h" tcp_server *server; using __gnu_cxx::hash_set; typedef hash_set<uint64_t> client_sub_mod_list; // ... //訂閱route 的modid/cmdid void create_subscribe(net_connection * conn, void *args) { conn->param = new client_sub_mod_list; } //退訂route 的modid/cmdid void clear_subscribe(net_connection * conn, void *args) { client_sub_mod_list::iterator it; client_sub_mod_list *sub_list = (client_sub_mod_list*)conn->param; for (it = sub_list->begin(); it != sub_list->end(); it++) { uint64_t mod = *it; SubscribeList::instance()->unsubscribe(mod, conn->get_fd()); } delete sub_list; conn->param = NULL; } int main(int argc, char **argv) { event_loop loop; //加載配置文件 config_file::setPath("conf/lars_dns.conf"); std::string ip = config_file::instance()->GetString("reactor", "ip", "0.0.0.0"); short port = config_file::instance()->GetNumber("reactor", "port", 7778); //創建tcp服務器 server = new tcp_server(&loop, ip.c_str(), port); //==========注冊鏈接創建/銷毀Hook函數============ server->set_conn_start(create_subscribe); server->set_conn_close(clear_subscribe); //============================================ //注冊路由業務 server->add_msg_router(lars::ID_GetRouteRequest, get_route); //開始事件監聽 printf("lars dns service ....\n"); loop.event_process(); return 0; } ``` ? 這里注冊了兩個鏈接Hook。`create_subscribe()`和`clear_subscribe()`。 `client_sub_mod_list`為當前客戶端鏈接所訂閱的route信息列表。主要存放當前客戶訂閱的modid/cmdid的集合。因為不同的客戶端訂閱的信息不同,所以要將該列表與每個conn進行綁定。 --- ### 關于作者: 作者:`Aceld(劉丹冰)` mail: [danbing.at@gmail.com](mailto:danbing.at@gmail.com) github: [https://github.com/aceld](https://github.com/aceld) 原創書籍: [http://www.hmoore.net/@aceld](http://www.hmoore.net/@aceld) ![](https://img.kancloud.cn/b0/d1/b0d11a21ba62e96aef1c11d5bfff2cf8_227x227.jpg) >**原創聲明:未經作者允許請勿轉載, 如果轉載請注明出處**
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看