<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ??碼云GVP開源項目 12k star Uniapp+ElementUI 功能強大 支持多語言、二開方便! 廣告
                ## 7) 負載均衡上報Host主機信息API(V0.4) ### 7.1 proto通信協議定義 ```protobuf syntax = "proto3"; package lars; /* Lars系統的消息ID */ enum MessageId { ID_UNKNOW = 0; //proto3 enum第一個屬性必須是0,用來占位 ID_GetRouteRequest = 1; //向DNS請求Route對應的關系的消息ID ID_GetRouteResponse = 2; //DNS回復的Route信息的消息ID ID_ReportStatusRequest = 3; //上報host調用狀態信息請求消息ID ID_GetHostRequest = 4; //API 發送請求host信息給 Lb Agent模塊 消息ID ID_GetHostResponse = 5; //agent 回執給 API host信息的 消息ID ID_ReportRequest = 6; //API report get_host的調用結果給agent的 消息ID } //... //... // API 上報調用結果給 Agent(UDP) message ReportRequest { int32 modid = 1; int32 cmdid = 2; HostInfo host = 3; int32 retcode = 4; } ``` `ID_ReportRequest`和 `message ReportRequest `是針對API層與agent的請求互通協議。 ### 7.2 Lars-API:Reporter()方法客戶端實現 > Lars/api/cpp/lars_api/lars_api.h ```c #pragma once #include "lars_reactor.h" #include <string> class lars_client { public: lars_client(); ~lars_client(); //lars 系統獲取host信息 得到可用host的ip和port int get_host(int modid, int cmdid, std::string& ip, int &port); //lars 系統上報host調用信息 void report(int modid, int cmdid, const std::string &ip, int port, int retcode); private: int _sockfd[3]; //3個udp socket fd 對應agent 3個udp server uint32_t _seqid; //消息的序列號 }; ``` ? 新增`report()`方法。 > Lars/api/cpp/lars_api/lars_api.cpp ```c //lars 系統上報host調用信息 void lars_client::report(int modid, int cmdid, const std::string &ip, int port, int retcode) { //1 封裝上報消息 lars::ReportRequest req; req.set_modid(modid); req.set_cmdid(cmdid); req.set_retcode(retcode); //1.1 host信息 lars::HostInfo *hp = req.mutable_host(); //ip struct in_addr inaddr; inet_aton(ip.c_str(), &inaddr); int ip_num = inaddr.s_addr; hp->set_ip(ip_num); //port hp->set_port(port); //2. send char write_buf[4096]; //消息頭 msg_head head; head.msglen = req.ByteSizeLong(); head.msgid = lars::ID_ReportRequest; memcpy(write_buf, &head, MESSAGE_HEAD_LEN); req.SerializeToArray(write_buf + MESSAGE_HEAD_LEN, head.msglen); int index = (modid+cmdid)%3; int ret = sendto(_sockfd[index], write_buf, head.msglen + MESSAGE_HEAD_LEN, 0, NULL, 0); if (ret == -1) { perror("sendto"); } } ``` > Lars/api/cpp/example/example.cpp ```c int ret = api.get_host(modid, cmdid, ip, port); if (ret == 0) { std::cout << "host is " << ip << ":" << port << std::endl; //上報調用結果 api.report(modid, cmdid, ip, port, 0); } ``` ? 在example的業務應用中,加上調用上報api。在每次調用完`get_host`。 ### 7.3 report業務添加的配置參數信息 > Lars/lars_loadbalance_agent/conf/lars_lb_agent.conf ```ini [reporter] ip = 127.0.0.1 port = 7779 [dnsserver] ip = 127.0.0.1 port = 7778 [loadbalance] ;經過若干次獲取請求host節點后,試探選擇一次overload過載節點 probe_num=10 ;初始化host_info主機信息訪問成功的個數,防止剛啟動時少量失敗就認為過載 init_succ_cnt=180 ;當idle節點失敗率高于此值,節點變overload狀態 err_rate=0.1 ;當overload節點成功率高于此值,節點變成idle狀態 succ_rate=0.95 ;當idle節點連續失敗次數超過此值,節點變成overload狀態 contin_err_limit=15 ;當overload節點連續成功次數超過此值, 節點變成idle狀態 contin_succ_limit=15 ``` 配置文件里在`[loadbalance]`中新增了一些字段。 那么我們需要在啟動lb_agent的時候,加載這些配置文件參數. > lars_loadbalance_agent/include/main_server.h ```c #pragma once #include "lars_reactor.h" #include "lars.pb.h" #include "route_lb.h" struct load_balance_config { //經過若干次獲取請求host節點后,試探選擇一次overload過載節點 int probe_num; //初始化host_info主機信息訪問成功的個數,防止剛啟動時少量失敗就認為過載 int init_succ_cnt; //************************************************** //當idle節點失敗率高于此值,節點變overload狀態 float err_rate; //當overload節點成功率高于此值,節點變成idle狀態 float succ_rate; //當idle節點連續失敗次數超過此值,節點變成overload狀態 int contin_err_limit; //當overload節點連續成功次數超過此值, 節點變成idle狀態 int contin_succ_limit; //當前agent本地ip地址(用于上報 填充caller字段) uint32_t local_ip; //************************************************** }; ``` > lars_loadbalance_agent/src/main_server.cpp ```c #include "main_server.h" #include "lars.pb.h" #include <netdb.h> // ... //--------- 全局資源 ---------- static void init_lb_agent() { //1. 加載配置文件 config_file::setPath("./conf/lars_lb_agent.conf"); lb_config.probe_num = config_file::instance()->GetNumber("loadbalance", "probe_num", 10); lb_config.init_succ_cnt = config_file::instance()->GetNumber("loadbalance", "init_succ_cnt", 180); lb_config.err_rate = config_file::instance()->GetFloat("loadbalance", "err_rate", 0.1); lb_config.succ_rate = config_file::instance()->GetFloat("loadbalance", "succ_rate", 0.92); lb_config.contin_succ_limit = config_file::instance()->GetNumber("loadbalance", "contin_succ_limit", 10); lb_config.contin_err_limit = config_file::instance()->GetNumber("loadbalance", "contin_err_limit", 10); //2. 初始化3個route_lb模塊 create_route_lb(); //3. 加載本地ip char my_host_name[1024]; if (gethostname(my_host_name, 1024) == 0) { struct hostent *hd = gethostbyname(my_host_name); if (hd) { struct sockaddr_in myaddr; myaddr.sin_addr = *(struct in_addr*)hd->h_addr; lb_config.local_ip = ntohl(myaddr.sin_addr.s_addr); } } if (!lb_config.local_ip) { struct in_addr inaddr; inet_aton("127.0.0.1", &inaddr); lb_config.local_ip = ntohl(inaddr.s_addr); } } // ... ``` 這里的本地ip,是之后在上報的時候,發送消息需要一個caller參數,這個caller參數我們就暫時默認是當前agent的ip為caller。 ### 7.4 Agent UDP Server處理API-Report請求 ? 接下來我們針對API發送的report的`ID_ReportRequest`進行處理. > lars_loadbalance_agent/src/agent_udp_server.cpp ```c #include "lars_reactor.h" #include "main_server.h" // ... static void report_cb(const char *data, uint32_t len, int msgid, net_connection *net_conn, void *user_data) { lars::ReportRequest req; req.ParseFromArray(data, len); route_lb *ptr_route_lb = (route_lb*)user_data; ptr_route_lb->report_host(req); } void * agent_server_main(void * args) { long index = (long)args; short port = index + 8888; event_loop loop; udp_server server(&loop, "0.0.0.0", port); //給server注冊消息分發路由業務, 針對ID_GetHostRequest處理 每個udp擁有一個對應的route_lb server.add_msg_router(lars::ID_GetHostRequest, get_host_cb, r_lb[port-8888]); //====================================================== //給server注冊消息分發路由業務,針對ID_ReportRequest處理 server.add_msg_router(lars::ID_ReportRequest, report_cb, r_lb[port-8888]); //====================================================== printf("agent UDP server :port %d is started...\n", port); loop.event_process(); return NULL; } void start_UDP_servers(void) { for (long i = 0; i < 3; i ++) { pthread_t tid; int ret = pthread_create(&tid, NULL, agent_server_main, (void*)i); if (ret == -1) { perror("pthread_create"); exit(1); } pthread_detach(tid); } } ``` 這里主要是通過一個udp server中的route_lb對象來調用的`report_host(req)`方法。我們來實現這個方法。 > lars_loadbalance_agent/src/route_lb.cpp ```c //agent 上報某主機的獲取結果 void route_lb::report_host(lars::ReportRequest req) { int modid = req.modid(); int cmdid = req.cmdid(); int retcode = req.retcode(); int ip = req.host().ip(); int port = req.host().port(); uint64_t key = ((uint64_t)modid << 32) + cmdid; pthread_mutex_lock(&_mutex); if (_route_lb_map.find(key) != _route_lb_map.end()) { load_balance *lb = _route_lb_map[key]; lb->report(ip, port, retcode); //上報信息給遠程reporter服務器 lb->commit(); } pthread_mutex_unlock(&_mutex); } ``` 當然,route_lb最終還是分管每個modid/cmdid對應的load_balance模塊,那么選擇一個可用的load_balance對象,調用`load_balance`的`report()`方法.而通過`commit()`方法,將report的上報結果提交到遠程的`report service`中去。 接下來我們看一下`load_balance`的report方法實現. > lars_loadbalance_agent/src/load_balance.cpp ```c //上報當前host主機調用情況給遠端repoter service void load_balance::report(int ip, int port, int retcode) { uint64_t key = ((uint64_t)ip << 32) + port; if (_host_map.find(key) == _host_map.end()) { return; } //1 計數統計 host_info *hi = _host_map[key]; if (retcode == lars::RET_SUCC) { // retcode == 0 //更新虛擬成功、真實成功次數 hi->vsucc ++; hi->rsucc ++; //連續成功增加 hi->contin_succ ++; //連續失敗次數歸零 hi->contin_err = 0; } else { //更新虛擬失敗、真實失敗次數 hi->verr ++; hi->rerr ++; //連續失敗個數增加 hi->contin_err++; //連續成功次數歸零 hi->contin_succ = 0; } //2.檢查節點狀態 //檢查idle節點是否滿足overload條件 //或者overload節點是否滿足idle條件 //--> 如果是dile節點,則只有調用失敗才有必要判斷是否達到overload條件 if (hi->overload == false && retcode != lars::RET_SUCC) { bool overload = false; //idle節點,檢查是否達到判定為overload的狀態條件 //(1).計算失敗率,如果大于預設值失敗率,則為overload double err_rate = hi->verr * 1.0 / (hi->vsucc + hi->verr); if (err_rate > lb_config.err_rate) { overload = true; } //(2).連續失敗次數達到閾值,判定為overload if( overload == false && hi->contin_err >= (uint32_t)lb_config.contin_err_limit) { overload = true; } //判定overload需要做的更改流程 if (overload) { struct in_addr saddr; saddr.s_addr = htonl(hi->ip); printf("[%d, %d] host %s:%d change overload, succ %u err %u\n", _modid, _cmdid, inet_ntoa(saddr), hi->port, hi->vsucc, hi->verr); //設置hi為overload狀態 hi->set_overload(); //移出_idle_list,放入_overload_list _idle_list.remove(hi); _overload_list.push_back(hi); return; } } //--> 如果是overload節點,則只有調用成功才有必要判斷是否達到idle條件 else if (hi->overload == true && retcode == lars::RET_SUCC) { bool idle = false; //overload節點,檢查是否達到回到idle狀態的條件 //(1).計算成功率,如果大于預設值的成功率,則為idle double succ_rate = hi->vsucc * 1.0 / (hi->vsucc + hi->verr); if (succ_rate > lb_config.succ_rate) { idle = true; } //(2).連續成功次數達到閾值,判定為idle if (idle == false && hi->contin_succ >= (uint32_t)lb_config.contin_succ_limit) { idle = true; } //判定為idle需要做的更改流程 if (idle) { struct in_addr saddr; saddr.s_addr = htonl(hi->ip); printf("[%d, %d] host %s:%d change idle, succ %u err %u\n", _modid, _cmdid, inet_ntoa(saddr), hi->port, hi->vsucc, hi->verr); //設置為idle狀態 hi->set_idle(); //移出overload_list, 放入_idle_list _overload_list.remove(hi); _idle_list.push_back(hi); return; } } //TODO 窗口檢查和超時機制 } ``` 其中`set_idle()`與`set_overload()`方法實現如下: > lars_loadbalance_agent/src/host_info.cpp ```c #include "host_info.h" #include "main_server.h" void host_info::set_idle() { vsucc = lb_config.init_succ_cnt; verr = 0; rsucc = 0; rerr = 0; contin_succ = 0; contin_err = 0; overload = false; } void host_info::set_overload() { vsucc = 0; verr = lb_config.init_err_cnt;//overload的初試虛擬err錯誤次數 rsucc = 0; rerr = 0; contin_err = 0; contin_succ = 0; overload = true; } ``` `load_balance`的`report()`方法實現主要是針對兩個鏈表做節點的交替處理。和成功率失敗率的判斷。 > 節點失敗率 = 節點`verr` / (`vsucc` + `verr`) > 節點成功率 = 節點`vsucc` / (`vsucc` + `verr`) 當idle節點的失敗率>預設值(默認10%),將節點判定為overload; 當overload節點的成功率>預設值(默認95%),將節點判定為idle; 而不可以idle/overload節點都只關注成功率or都只關注失敗率,這樣可能造成節點在idle/overload狀態間頻繁切換 為idle節點、overload節點設置不同的閾值可以區別對待。 ? 接下來我們來實現`load_balance`的`commit()`方法。 > lars_loadbalance_agent/src/load_balance.cpp ```c //提交host的調用結果給遠程reporter service上報結果 void load_balance::commit() { if (this->empty() == true) { return; } //1. 封裝請求消息 lars::ReportStatusRequest req; req.set_modid(_modid); req.set_cmdid(_cmdid); req.set_ts(time(NULL)); req.set_caller(lb_config.local_ip); //2. 從idle_list取值 for (host_list_it it = _idle_list.begin(); it != _idle_list.end(); it++) { host_info *hi = *it; lars::HostCallResult call_res; call_res.set_ip(hi->ip); call_res.set_port(hi->port); call_res.set_succ(hi->rsucc); call_res.set_err(hi->rerr); call_res.set_overload(false); req.add_results()->CopyFrom(call_res); } //3. 從over_list取值 for (host_list_it it = _overload_list.begin(); it != _overload_list.end(); it++) { host_info *hi = *it; lars::HostCallResult call_res; call_res.set_ip(hi->ip); call_res.set_port(hi->port); call_res.set_succ(hi->rsucc); call_res.set_err(hi->rerr); call_res.set_overload(true); req.add_results()->CopyFrom(call_res); } //4 發送給report_client 的消息隊列 report_queue->send(req); } ``` --- ### 關于作者: 作者:`Aceld(劉丹冰)` mail: [danbing.at@gmail.com](mailto:danbing.at@gmail.com) github: [https://github.com/aceld](https://github.com/aceld) 原創書籍: [http://www.hmoore.net/@aceld](http://www.hmoore.net/@aceld) ![](https://img.kancloud.cn/b0/d1/b0d11a21ba62e96aef1c11d5bfff2cf8_227x227.jpg) >**原創聲明:未經作者允許請勿轉載, 如果轉載請注明出處**
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看