<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                [TOC] # **注冊中心** ## 什么是服務治理 服務治理可以說是微服務架構中最為核心和基礎的模塊,它主要用來實現各個微服務實例的自動化注冊與發現。 ## 為什么需要服務治理模塊 在最初構建微服務系統的時候可能服務并不多,我們可以通過做一些靜態配置來完成服務調用,此時看著一切都還正常。隨著項目逐漸接近尾聲,維護人員需要維護的服務越來越多,越來越復雜,最終形成大量的配置文件,維護將會變得越來越困難。此時,微服務應用實例自動化管理框架變得至關重要。 ## 服務治理框架需要完成什么任務 ● 服務注冊:在服務治理框架中,通常都會構建一個注冊中心,每個服務單元向注冊中心登記自己提供的服務,將主機與端口號、版本號、通信協議等一些附加信息告知注冊中心,注冊中心按服務名分類組織服務清單。 ● 服務發現:我們的所有服務都已經注冊到注冊中心,并且在注冊中心是按照服務名分類,并且由注冊中心維護者服務的具體位置。所以調用方需要調用某個服務時,需要先和注冊中心咨詢,注冊中心會返回被調用方服務的所有具體位置,調用方在根據某種輪詢策略選擇一個具體位置進行服務調用。 ## Netflix eureka >Eureka是Netflix開發的服務發現框架,本身是一個基于REST的服務,主要用于定位運行在AWS域中的中間層服務,以達到負載均衡和中間層服務故障轉移的目的。SpringCloud將它集成在其子項目spring-cloud-netflix中,以實現SpringCloud的服務發現功能。 ![](https://img.kancloud.cn/79/6b/796b941d0c65cd9e30b1c0ad5f95bfb8_1369x667.png) ## Eureka服務端 >Eureka服務端,我們也稱為服務注冊中心,他同其他服務注冊中心一樣,支持高可用配置。它依托于強一致性提供良好的服務實例可用性,可以應對多種不同的故障場景。 如果Eureka以集群方式部署,當集群中有分片出現故障時,那么Eureka就轉入自我保護模式。它允許在分片故障期間繼續提供服務的發現和注冊,當故障分片恢復運行時,集群中的其他分片會把它們的狀態再次同步回來。 ## Eureka客戶端 >Eureka客戶端,主要處理服務的注冊與發現。客戶端服務通過注解和參數配置的方式,嵌入在客戶端應用程序的代碼中,在應用程序運行時,Eureka客戶端向注冊中心注冊自身提供的服務并周期性地發送心跳來更新它的服務租約。同時,他也能從服務端查詢當前注冊的服務信息并把它們緩存到本地并周期性地刷新服務狀態。 ## 注冊中心原理 ![](https://img.kancloud.cn/d1/92/d1926bb396301a908ae572ca55caa3a4_783x438.png) # 注冊中心代碼詳解 ## 配置文件 ``` eureka: ? server: ??? shouldUseReadOnlyResponseCache: true #eureka是CAP理論種基于AP策略,為了保證強一致性關閉此切換CP 默認不關閉 false關閉 ??? enable-self-preservation: false??? #關閉服務器自我保護,客戶端心跳檢測15分鐘內錯誤達到80%服務會保護,導致別人還認為是好用的服務 ??? eviction-interval-timer-in-ms: 60000 #清理間隔(單位毫秒,默認是60\*1000)5秒將客戶端剔除的服務在服務注冊列表中剔除# ??? response-cache-update-interval-ms: 3000? ##eureka server刷新readCacheMap的時間,注意,client讀取的是readCacheMap,這個時間決定了多久會把readWriteCacheMap的緩存更新到readCacheMap上 #eureka server刷新readCacheMap的時間,注意,client讀取的是readCacheMap,這個時間決定了多久會把readWriteCacheMap的緩存更新到readCacheMap上默認30s ??? response-cache-auto-expiration-in-seconds: 180?? ##eureka server緩存readWriteCacheMap失效時間,這個只有在這個時間過去后緩存才會失效,失效前不會更新,過期后從registry重新讀取注冊服務信息,registry是一個ConcurrentHashMap。 ? client: ??? register-with-eureka: true? #false:不作為一個客戶端注冊到注冊中心 ??? fetch-registry: false????? #為true時,可以啟動,但報異常:Cannot execute request on any known server ??? instance-info-replication-interval-seconds: 10 ??? service-url: ????? defaultZone: [http://127.0.0.1:1111/eureka](http://127.0.0.1:1111/eureka) ? instance: ??? prefer-ip-address: true ??? instance-id: ${spring.application.name}:${spring.cloud.client.ip-address}:${spring.application.instance\_id:${server.port}} ??? lease-renewal-interval-in-seconds: 30??? ## 續約更新時間間隔(默認30秒) ??? lease-expiration-duration-in-seconds: 90 # 續約到期時間(默認90秒)? ribbon: ? ServerListRefreshInterval: 1000??? ``` ## 核心代碼@EnableEurekaServer注解 ``` /** * @author 作者 owen E-mail: 624191343@qq.com * @version 創建時間:2017年11月28日 下午22:50:29 * 類說明 * eureka高可用三臺機器 */ @EnableEurekaServer @SpringBootApplication //@EnableHystrixDashboard //@EnableTurbine public class EurekaServerApp { public static void main(String[] args) { // 1本地啟動采用此方法加載profiles文件 // ConfigurableApplicationContext context = new SpringApplicationBuilder(EurekaServerApp.class). // profiles("slave0").run(args); SpringApplication.run(EurekaServerApp.class, args); // 2服務器采用此方法 java -jar --spring.profiles.active=slave3; // SpringApplication.run(EurekaServerApp.class, args); } } ``` ## eureka 服務源碼解析 * LeaseManager ``` public interface LeaseManager<T> { ??? void register(T r, int leaseDuration, boolean isReplication); ??? boolean cancel(String appName, String id, boolean isReplication); ??? boolean renew(String appName, String id, boolean isReplication); ??? void evict(); } ``` **LeaseManager**做的事情就是 Eureka 注冊中心模型中的**服務注冊、服務續約、服務取消**和**服務剔除等**核心操作,關注于對服務注冊過程的管理。 * LookupService ``` public interface LookupService<T> { ??? Application getApplication(String appName); ??? Applications getApplications(); ??? List<InstanceInfo> getInstancesById(String id); ??? InstanceInfo getNextServerFromEureka(String virtualHostname, boolean secure); } ``` **LookupService**關注于對應用程序與服務實例的管理, InstanceRegistry 接口繼承了即LeaseManager 接口和 LookupService 接口,其子對象類圖關系: ![](https://img.kancloud.cn/85/53/8553b778afa1620de3fff5b786536a0d_1372x386.png) AbstractInstanceRegistry中有eureka 用于保存注冊信息的數據結構。 ``` private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry = new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>(); ``` 它是一個雙層的 HashMap,采用的是 JDK 中線程安全的 ConcurrentHashMap。其中第一層的 ConcurrentHashMap 的 Key 為 spring.application.name,也就是服務名,Value 為一個 ConcurrentHashMap;而第二層的 ConcurrentHashMap 的 Key 為 instanceId,也就是服務的唯一實例 ID,Value 為 Lease 對象。Eureka 采用 Lease(租約)這個詞來表示對服務注冊信息的抽象,Lease 對象保存了服務實例信息以及一些實例服務注冊相關的時間,如注冊時間 registrationTimestamp、最新的續約時間 lastUpdateTimestamp 等。 ![](https://img.kancloud.cn/fd/92/fd920914ccaaa7e54ff631e0a4a7f955_1150x405.png) AbstractInstanceRegistry主要有如下方法: ![](https://img.kancloud.cn/cd/b2/cdb23e50fd965f522c33d01cd389dc4b_764x380.png) ## 注冊代碼解析 ``` public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) { ??? try { ??????? //從已存儲的 registry 獲取一個服務定義 ??????? Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName()); ??????? REGISTER.increment(isReplication); ??????? if (gMap == null) { ??????????? //初始化一個 Map<String, Lease<InstanceInfo>> ,并放入 registry 中 ??????? } ??? ??? //根據當前注冊的 ID 找到對應的 Lease ??????? Lease<InstanceInfo> existingLease = gMap.get(registrant.getId()); ??????? if (existingLease != null && (existingLease.getHolder() != null)) { ??????????? //如果 Lease 能找到,根據當前節點的最新更新時間和注冊節點的最新更新時間比較 ??????????? //如果前者的時間晚于后者的時間,那么注冊實例就以已存在的實例為準 ??????? } else { ??????????? ??//如果找不到,代表是一個新注冊,則更新其每分鐘期望的續約數量及其閾值 ??????? } ??????? //創建一個新 Lease 并放入 Map 中 ??????? Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration); ??????? gMap.put(registrant.getId(), lease); ??????? //處理服務的 InstanceStatus ??????? registrant.setActionType(ActionType.ADDED); ??????? //更新服務最新更新時間 ??????? registrant.setLastUpdatedTimestamp(); ??????? //刷選緩存 ??????? invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress()); ??? } } ``` ## eureka客戶端基本原理 對于 Eureka 而言,微服務的提供者和消費者都是它的客戶端,其中服務提供者關注服務注冊、服務續約和服務下線等功能,而服務消費者關注于服務信息的獲取。同時,對于服務消費者而言,為了提高服務獲取的性能以及在注冊中心不可用的情況下繼續使用服務,一般都還會具有緩存機制。 在 Netflix Eureka 中,專門提供了一個客戶端包,并抽象了一個客戶端接口 EurekaClient。EurekaClient 接口繼承自 LookupService 接口,這個 LookupService 接口實際上也是我們上一課時中所介紹的 InstanceRegistry 接口的父接口。EurekaClient 在 LookupService 接口的基礎上提供了一系列擴展方法,這些擴展方法并不是重點,我們還是更應該關注于它的類層機構,如下所示: ![](https://img.kancloud.cn/5c/cf/5ccfc9cff2364765cde9a896c2c4c232_803x536.png) 可以看到 EurekaClient 接口有個實現類 DiscoveryClient(位于 com.netflix.discovery 包中),該類包含了服務提供者和服務消費者的核心處理邏輯,同時提供了我們在介紹 Eureka 服務器端基本原理時所介紹的 register、renew 等方法。DiscoveryClient 類的實現非常復雜,我們重點關注它構造方法中的這行代碼: ``` // finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch initScheduledTasks(); ``` 通過分析該方法中的代碼,我們看到系統在這里初始化了一批調度任務,具體包含緩存刷新 cacheRefresh、心跳 heartbeat、服務實例復制 InstanceInfoReplicator 等,其中緩存刷新面向服務消費者,而心跳和服務實例復制面向服務提供者。接下來我們將分別從這兩個 Eureka 客戶端組件出發討論服務注冊和發現的客戶端操作。 ## 服務提供者操作源碼解析 服務提供者關注**服務注冊、服務續約和服務下線**等功能,它可以使用 Eureka 服務器提供的 RESTful API 完成上述操作。因為篇幅關系,這里同樣以服務注冊為例給出服務提供者的操作流程。 在 DiscoveryClient 類中,服務注冊操作由register 方法完成,如下所示。為了簡單起見,我們對代碼進行了裁剪,省略了日志相關等非核心代碼: ``` boolean register() throws Throwable { ??????? EurekaHttpResponse<Void> httpResponse; ??????? try { ??????????? httpResponse = eurekaTransport.registrationClient.register(instanceInfo); ??????? } catch (Exception e) { ??????????? throw e; ??????? } ??????? return httpResponse.getStatusCode() == 204; } ``` 上述 register 方法會在 InstanceInfoReplicator 類的 run 方法中進行執行。從操作流程上講,上述代碼的邏輯非常簡單,即服務提供者先將自己注冊到 Eureka 服務器中,然后根據返回的結果確定操作是否成功。顯然,這里的重點代碼是eurekaTransport.registrationClient.register(),DiscoveryClient 通過這行代碼發起了遠程請求。 首先我們來看 EurekaTransport 類,這是 DiscoveryClient 類中的一個內部類,定義了 registrationClient 變量用于實現服務注冊。registrationClient 的類型是 EurekaHttpClient 接口,該接口的定義如下: ``` public interface EurekaHttpClient { ??? EurekaHttpResponse<Void> register(InstanceInfo info); ??? EurekaHttpResponse<Void> cancel(String appName, String id); ??? EurekaHttpResponse<InstanceInfo> sendHeartBeat(String appName, String id, InstanceInfo info, InstanceStatus overriddenStatus); ??? EurekaHttpResponse<Void> statusUpdate(String appName, String id, InstanceStatus newStatus, InstanceInfo info); ??? EurekaHttpResponse<Void> deleteStatusOverride(String appName, String id, InstanceInfo info); ??? EurekaHttpResponse<Applications> getApplications(String... regions); ??? EurekaHttpResponse<Applications> getDelta(String... regions); ??? EurekaHttpResponse<Applications> getVip(String vipAddress, String... regions); ??? EurekaHttpResponse<Applications> getSecureVip(String secureVipAddress, String... regions); ??? EurekaHttpResponse<Application> getApplication(String appName); ??? EurekaHttpResponse<InstanceInfo> getInstance(String appName, String id); ??? EurekaHttpResponse<InstanceInfo> getInstance(String id); ??? void shutdown(); } ``` 可以看到這個 EurekaHttpClient 接口定義了 Eureka 服務器的一些底層 REST API,包括 register、cancel、sendHeartBeat、statusUpdate、getApplications 等。在 Eureka 中,關于如何實現客戶端與服務器端的遠程通信,從工作原理上講只是一個 RESTful 風格的 HTTP 請求,但在具體設計和實現上可以說是非常考究,因此類層結構上也比較復雜。我們先來看 EurekaHttpClient 接口的一個實現類 EurekaHttpClientDecorator,從命名上看它是一個裝飾器(Decorator),如下所示: ``` public abstract class EurekaHttpClientDecorator implements EurekaHttpClient { ??? public enum RequestType { ??????? Register, ??????? Cancel, ??????? SendHeartBeat, ??????? StatusUpdate, ??????? DeleteStatusOverride, ??????? GetApplications ??????? … ??? } ??? public interface RequestExecutor<R> { ??????? EurekaHttpResponse<R> execute(EurekaHttpClient delegate); ??????? RequestType getRequestType(); ??? } ??? protected abstract <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor); ??? @Override ??? public EurekaHttpResponse<Void> register(final InstanceInfo info) { ??????? return execute(new RequestExecutor<Void>() { ??????????? @Override ??????????? public EurekaHttpResponse<Void> execute(EurekaHttpClient delegate) { ??????????????? return delegate.register(info); ??????????? } ??????????? @Override ??????????? public RequestType getRequestType() { ??????????????? return RequestType.Register; ??????????? } ??????? }); ??? } //省略其他方法實現 } ``` 可以看到 EurekaHttpClientDecorator 通過定義一個抽象方法 execute(RequestExecutor requestExecutor) 來包裝 EurekaHttpClient,這種包裝是代理機制的一種表現形式。 然后我們再來看如何構建一個 EurekaHttpClient,Eureka 也專門提供了 EurekaHttpClientFactory 類來負責構建具體的 EurekaHttpClient。顯然,這是工廠模式的一種典型應用。EurekaHttpClientFactory 接口定義如下: ``` public interface EurekaHttpClientFactory { ??? EurekaHttpClient newClient(); ??? void shutdown(); } ``` Eureka 中存在一批 EurekaHttpClientFactory 的實現類,包括 RetryableEurekaHttpClient 和 MetricsCollectingEurekaHttpClient 等,這些類都在com.netflix.discovery.shared.transport.decorator 包下。同時,在 com.netflix.discovery.shared.transport 包下,還存在一個 EurekaHttpClients 工具類,能夠創建通過 RedirectingEurekaHttpClient、RetryableEurekaHttpClient、SessionedEurekaHttpClient 包裝之后的 EurekaHttpClient。如下所示: ``` new EurekaHttpClientFactory() { ??????????? @Override ??????????? public EurekaHttpClient newClient() { ??????????????? return new SessionedEurekaHttpClient( ??????????????????????? name, ??????????????????????? RetryableEurekaHttpClient.createFactory( ??????????????????????????????? name, ??????????????????????????????? transportConfig, ??????????????????????????????? clusterResolver, ??????????????????????????????? RedirectingEurekaHttpClient.createFactory(transportClientFactory), ??????????????????????????????? ServerStatusEvaluators.legacyEvaluator()), ??????????????????????? transportConfig.getSessionedClientReconnectIntervalSeconds() * 1000 ??????????????? ); ??????????? } }; ``` 這是 EurekaHttpClient 創建過程中的一條分支,即通過包裝器對請求過程進行層層封裝和代理。而在執行遠程請求時,Eureka 同樣提供了另一套體系來完成真正的遠程調用,原始的 EurekaHttpClient 通過 TransportClientFactory 進行創建。TransportClientFactory 接口定義如下: ``` public interface TransportClientFactory { ??? EurekaHttpClient newClient(EurekaEndpoint serviceUrl); ??? void shutdown(); } ``` TransportClientFactory 同樣存在一批實現類,其中有些是實名類,有些是匿名類。以實名的實現類 JerseyEurekaHttpClientFactory 為例,它位于 com.netflix.discovery.shared.transport.jersey 包下,通過 EurekaJerseyClient 獲取 Jersey 客戶端,而 EurekaJerseyClient 又會使用 ApacheHttpClient4 對象,從而完成 REST 調用。 作為總結,這里也給你分享一個 Eureka 在設計和實現上的技巧,也就是所謂的高階(High Level)API和低階(Low Level)API,如下圖所示: ![](https://img.kancloud.cn/1a/8a/1a8ad05a1be7c42d46b9ed2c2860ffb7_1115x503.png) 針對高階 API,主要是通過裝飾器模式進行一系列包裝,從而創建目標 EurekaHttpClient。而關于低階 API 的話,主要是 HTTP 遠程調用的實現,Netflix 提供的是基于 Jersey 的版本,而 Spring Cloud 則提供了基于 RestTemplate 的版本。 ## 服務消費者操作源碼解析 我們在介紹注冊中心模型時,服務消費者可以配備緩存機制以加速服務路由。對于 Eureka 而言,作為客戶端組件的 DiscoveryClient 同樣具備這種緩存功能。 Eureka 客戶端通過定時任務完成緩存刷新操作,我們已經在前面的內容中提到 DiscoveryClient 中的 initScheduledTasks 方法用于初始化各種調度任務,對于緩存刷選而言,調度器的初始化過程如下所示: ``` if (clientConfig.shouldFetchRegistry()) { ??????????? int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds(); ??????????? int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound(); ??????????? scheduler.schedule( ??????????????????? new TimedSupervisorTask( ??????????????????????????? "cacheRefresh", ??????????????????????????? scheduler, ??????????????????????????? cacheRefreshExecutor, ??????????????????????????? registryFetchIntervalSeconds, ??????????????????????????? TimeUnit.SECONDS, ??????????????????????????? expBackOffBound, ??????????????????????????? new CacheRefreshThread() ??????????????????? ), ??????????????????? registryFetchIntervalSeconds, TimeUnit.SECONDS); } ``` 這里啟動了一個調度任務并通過 CacheRefreshThread 線程完成具體操作。CacheRefreshThread 線程定義如下: ``` class CacheRefreshThread implements Runnable { ??????? public void run() { ??????????? refreshRegistry(); ??????? } } ``` 對于服務消費者而言,最重要的操作就是**獲取服務注冊信息**。在這里的 refreshRegistry 方法中,我們發現在進行一系列的校驗之后,最終調用了 fetchRegistry 方法以完成注冊信息的更新,該方法代碼如下。為了簡單起見,我們對代碼進行了部分裁剪,只保留主流程: ``` private boolean fetchRegistry(boolean forceFullRegistryFetch) { ??????? try { ??????????? // 獲取應用 ??????????? Applications applications = getApplications(); ??????????? if (…) //如果滿足全量拉取條件 ??????????? { ??????????? ??// 全量拉取服務實例數據 ??????????????? getAndStoreFullRegistry(); ??????????? } else { ??????????? ? // 增量拉取服務實例數據 ??????????????? getAndUpdateDelta(applications); ??????????? } ??????? ?? // 重新計算和設置一致性hashcode applications.setAppsHashCode(applications.getReconcileHashCode()); ??????? } ??????? // 刷新本地緩存 ??????? onCacheRefreshed(); ??????? // 更新遠程服務實例運行狀態 ??????? updateInstanceRemoteStatus(); ??????? return true; } ``` 這里的幾個帶注釋的方法都非常有用,因為 getAndStoreFullRegistry 的邏輯相對比較簡單,我們將重點介紹 getAndUpdateDelta 方法,以便學習在 Eureka 中如何實現增量數據更新的設計技巧。裁剪之后的 getAndUpdateDelta 方法代碼如下所示: ``` private void getAndUpdateDelta(Applications applications) throws Throwable { ??????? long currentUpdateGeneration = fetchRegistryGeneration.get(); ?????? ?Applications delta = null; ??????? //通過 eurekaTransport.queryClient 獲取增量信息 ??????? EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get()); ??????? if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) { ??????????? delta = httpResponse.getEntity(); ??????? } ??????? if (delta == null) { ??????????? ? //如果增量信息為空,就直接發起一次全量更新 ??????????? getAndStoreFullRegistry(); ??????? } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {//通過CAS來確保請求的線程安全性 ??????????? String reconcileHashCode = ""; ??????????? if (fetchRegistryUpdateLock.tryLock()) { ??????????????? try { ??????????????? ?//比對從服務器端返回的增量數據和本地數據,合并兩者的差異數據 ??????????????????? updateDelta(delta); //用合并了增量數據之后的本地數據來生成一致性 hashcode ??????????????????? reconcileHashCode = getReconcileHashCode(applications); ??????????????? } finally { ??????????????????? fetchRegistryUpdateLock.unlock(); ??????????????? } ??????????? } else { ??????????? } //比較本地數據中的 hashcode 和來自服務器端的 hashcode ??????????? if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) { ??????????????? ?//如果 hashcode 不一致,就觸發遠程調用進行全量更新 ??????????????? reconcileAndLogDifference(delta, reconcileHashCode); ??????????? } ??????? } else { ??????? } } ``` 回顧 Eureka 服務器端基本原理,我們知道 Eureka 服務器端會保存一個服務注冊列表的緩存。Eureka 官方文檔中提到這個數據保留時間是三分鐘,而 Eureka 客戶端的定時調度機制會每隔 30 秒刷選本地緩存。原則上,只要 Eureka 客戶端不停地獲取服務器端的更新數據,就能保證自己的數據和 Eureka 服務器端的保持一致。但如果客戶端在 3 分鐘之內沒有獲取更新數據,就會導致自身與服務器端的數據不一致,這是這種更新機制所必須要考慮的問題,也是我們自己在設計類似場景時的一個注意點。 針對上述問題,Eureka 采用了一致性 HashCode 方法來進行解決。Eureka 服務器端每次返回的增量數據中都會帶有一個一致性 HashCode,這個 HashCode 會與 Eureka 客戶端用本地服務列表數據算出的一致性 HashCode 進行比對,如果兩者不一致就證明增量更新出了問題,這時候就需要執行一次全量更新。 # 注冊中心集群 Eureka-Server 之間會將注冊信息復制到集群中的 Eureka Server 的所有節點中,可以在任意一個 Eureka-Server 的實例上進行注冊,也可以在任意一個實例上進行讀取。 ![](https://box.kancloud.cn/15791e832df3566278bccc9b05c63026_898x499.png) 可以為每個實例創建一個配置文件,通過 spring.profiles.active 的方式激活,這樣就不用創建多個 Eureka Server 的項目了,Eureka-Server 本質上沒有任何區別,只是配置內容不一樣而已。 ![](https://img.kancloud.cn/93/74/9374c13bcd0d0faa993657f6700f69d6_1423x725.png) ![](https://img.kancloud.cn/58/d7/58d7f0c722561573068611cbf90cf811_1844x676.png) ![](https://img.kancloud.cn/83/96/839658390de7b8965f11cc1dd4e7c67d_1419x703.png) 注意hostname需要配置hosts文件 ![](https://img.kancloud.cn/a1/e9/a1e98d2d50ca8c6cdcd510d645cee366_758x280.png) ### 啟動集群 ![](https://img.kancloud.cn/85/a6/85a63e8503c4064be8457dee5a29b5db_1149x689.png) ![](https://img.kancloud.cn/54/b7/54b789d8348720e4503ef5ac274e12fe_1148x696.png) ![](https://img.kancloud.cn/48/ee/48ee43e773fbfbf6da3605edd1c9981d_1148x694.png) ### 查看集群 ![](https://img.kancloud.cn/42/67/426779b337bfd3a73ac9d67bd012d065_1901x958.png) ### 高可用源碼解析 Eureka 的高可用部署方式被稱為**Peer Awareness 模式**。對應的,我們在**InstanceRegistry 的類層**結構中也已經看到了它的一個擴展接口**PeerAwareInstanceRegistry**以及該接口的實現類 PeerAwareInstanceRegistryImpl。 我們還是圍繞服務注冊這個場景展開討論,在**PeerAwareInstanceRegistryImpl**中同樣存在一個 register 方法,如下所示: ``` @Override public void register(final InstanceInfo info, final boolean isReplication) { ??????? int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS; ??????? if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) { ??????????? leaseDuration = info.getLeaseInfo().getDurationInSecs(); ??????? } ??????? super.register(info, leaseDuration, isReplication); ??????? replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication); } ``` 我們在這里看到了一個非常重要的**replicateToPeers 方法**,該方法作就是用來實現服務器節點之間的狀態同步。**replicateToPeers 方法的核心代碼**如下所示: ``` for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) { ?? ?//如何該 URL 代表主機自身,則不用進行注冊 ??? if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) { ???????? continue; ??? } ??? replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node); } ``` 為了理解這個操作,我們首先需要理解 Eureka 中的集群模式,這部分代碼位于 com.netflix.eureka.cluster 包中,其中包含了代表節點的 PeerEurekaNode 和 PeerEurekaNodes 類,以及用于節點之間數據傳遞的 HttpReplicationClient 接口。而 replicateInstanceActionsToPeers 方法中則根據不同的 Action 來調用 PeerEurekaNode 的不同方法。例如,如果是 StatusUpdate Action,則會調動 PeerEurekaNode的statusUpdate 方法,而該方法又會執行如下代碼; ``` replicationClient.statusUpdate(appName, id, newStatus, info); ``` 這句代碼完成了 PeerEurekaNode 之間的通信,而 replicationClient 是 HttpReplicationClient 接口的實例,該接口定義如下: ``` public interface HttpReplicationClient extends EurekaHttpClient { ??? EurekaHttpResponse<Void> statusUpdate(String asgName, ASGStatus newStatus); ??? EurekaHttpResponse<ReplicationListResponse> submitBatchUpdates(ReplicationList replicationList); } ``` HttpReplicationClient 接口繼承自 EurekaHttpClient 接口,而 EurekaHttpClient 接口屬于 Eureka 客戶端組件,我們會在下一課時介紹 Eureka 客戶端基本原理時進行詳細介紹。在這里,我們只需要明白 Eureka 提供了 JerseyReplicationClient(位于 com.netflix.eureka.transport 包下)這一基于 Jersey 框架實現的HttpReplicationClient。以 statusUpdate 方法為例,它的實現過程如下: ``` @Override public EurekaHttpResponse<Void> statusUpdate(String asgName, ASGStatus newStatus) { ??????? ClientResponse response = null; ??????? try { ??????????? String urlPath = "asg/" + asgName + "/status"; ??????????? response = jerseyApacheClient.resource(serviceUrl) ??????????????????? .path(urlPath) ??????????????????? .queryParam("value", newStatus.name()) ??????????????????? .header(PeerEurekaNode.HEADER_REPLICATION, "true") ??????????????????? .put(ClientResponse.class); ??????????? return EurekaHttpResponse.status(response.getStatus()); ??????? } finally { ??????????? if (response != null) { ??????????????? response.close(); ??????????? } ??????? } } ``` # OCP服務治理 ![](https://box.kancloud.cn/930781f0f54f3bbe3f883c971d35925d_1893x753.png) ## eureka重要源碼 com.netflix.eureka.resources.ServerInfoResource com.netflix.eureka.resources.ApplicationsResource com.netflix.eureka.resources.InstancesResource com.netflix.eureka.resources.InstanceResource com.netflix.eureka.resources.StatusResource ### ApplicationResource分析 ``` http://127.0.0.1:1111/eureka/apps/<APPID> ``` 該地址代表的就是一個普通的 HTTP GET 請求。Eureka 中所有對服務器端的訪問都是通過**RESTful 風格**的**資源(Resource)**進行獲取,ApplicationResource 類(位于com.netflix.eureka.resources 包中)提供了根據應用獲取注冊信息的入口。我們來看該類的 getApplication 方法,核心代碼如下所示: ``` Key cacheKey = new Key( Key.EntityType.Application,appName, keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept) ); ? String payLoad = responseCache.get(cacheKey); if (payLoad != null) { ??????logger.debug("Found: {}", appName); ??????return Response.ok(payLoad).build(); } else { ??????logger.debug("Not Found: {}", appName); ??????return Response.status(Status.NOT_FOUND).build(); } ``` 可以看到這里是構建了一個**cacheKey**,并直接調用了 responseCache.get(cacheKey) 方法來返回一個字符串并構建響應。從命名上看,不難想象這里使用了緩存機制。我們來看 ResponseCache 的定義,如下所示,其中最核心的就是這里的 get 方法: ``` public interface ResponseCache { ??? void invalidate(String appName, @Nullable String vipAddress, @Nullable String secureVipAddress); ??? AtomicLong getVersionDelta(); ??? AtomicLong getVersionDeltaWithRegions(); ??? String get(Key key); ??? byte[] getGZIP(Key key); } ``` 從類層關系上看,ResponseCache 只有一個**實現類 ResponseCacheImpl**,我們來看它的 get 方法,發現該方法使用了如下處理策略: ``` Value getValue(final Key key, boolean useReadOnlyCache) { ??????? Value payload = null; ??????? try { ??????????? if (useReadOnlyCache) { ??????????????? final Value currentPayload = readOnlyCacheMap.get(key); ??????????????? if (currentPayload != null) { ??????????????????? payload = currentPayload; ??????????????? } else { ??????????????????? payload = readWriteCacheMap.get(key); ??????????????????? readOnlyCacheMap.put(key, payload); ??????????????? } ??????????? } else { ??????????????? payload = readWriteCacheMap.get(key); ??????????? } ??????? } catch (Throwable t) { ??????????? logger.error("Cannot get value for key : {}", key, t); ??????? } ??????? return payload; } ``` 可以看到上述代碼中有兩個緩存,一個是**readOnlyCacheMap**,一個是**readWriteCacheMap**。其中 readOnlyCacheMap 就是一個 JDK 中的 ConcurrentMap,而 readWriteCacheMap 使用的則是 Google Guava Cache 庫中的 LoadingCache 類型。在創建 LoadingCache過程中,緩存數據的來源是調用 generatePayload 方法來生成。而在這個 generatePayload 方法中,就會調用前面介紹的 AbstractInstanceRegistry 中的 getApplications 方法獲取應用信息并放到緩存中。這樣我們就實現了把注冊信息與緩存信息進行關聯。 這里有一個設計和實現上的技巧。把緩存設計為一個只讀的 readOnlyCacheMap 以及一個可讀寫的 readWriteCacheMap,可以更好地分離職責。但因為兩個緩存中保存的實際上是同一份數據,所以,我們在不斷更新 readWriteCacheMap 時,也需要確保 readOnlyCacheMap 中的數據得到同步。為此 ResponseCacheImpl 提供了一個定時任務 CacheUpdateTask,如下所示: ``` private TimerTask getCacheUpdateTask() { ??????? return new TimerTask() { ??????????? @Override ??????????? public void run() { ??????????????? for (Key key : readOnlyCacheMap.keySet()) { ??????????????????? try { ??????????????????????? CurrentRequestVersion.set(key.getVersion()); ??????????????????????? Value cacheValue = readWriteCacheMap.get(key); ??????????????????????? Value currentCacheValue = readOnlyCacheMap.get(key); ??????????????????????? if (cacheValue != currentCacheValue) { ??????????????????????????? readOnlyCacheMap.put(key, cacheValue); ??????????????????????? } ??????????????????? } catch (Throwable th) { ??????????????????? } ??????????????? } ??????????? } ??????? }; } ``` ## 服務治理API * 獲取服務列表 ![](https://box.kancloud.cn/3c0b7707b304caec02d27488b76239b6_1074x534.png) * 獲取某個服務的實例列表 ![](https://box.kancloud.cn/4c2234f164c15bb4a2a68022ba76b87c_1072x599.png) * 服務下線 ![](https://box.kancloud.cn/7dd8ffaf176877565a05046fbf811793_1098x620.png) ![](https://box.kancloud.cn/968ed25f9123efed077cc0ca3683a887_1090x415.png) * 服務上線 ![](https://box.kancloud.cn/9250a8601de41b115b4bf654eb8b6195_1081x742.png) > 通過以上,我們了解到了一個服務治理概念,如何服務上下線,如何服務查詢,以后的章節將講解如何構建一個微服務治理平臺呢?微服務治理平臺都有哪些功能 ![](https://img.kancloud.cn/4b/60/4b60bcd367ee5ed995fbf05971b96fe1_856x274.png) ## 小結 通過以上eureka的學習,我們可以總結如下: > * 注冊中心的實現主要涉及幾個問題:注冊中心需要提供哪些接口,該如何部署;如何存儲服務信息;如何監控服務提供者節點的存活;如果服務提供者節點有變化如何通知服務消費者,以及如何控制注冊中心的訪問權限。 > * 注冊中心必須提供以下最基本的API,例如: 服務注冊接口:服務提供者通過調用服務注冊接口來完成服務注冊。 心跳匯報接口:服務提供者通過調用心跳匯報接口完成節點存活狀態上報。 服務查詢接口:查詢注冊中心當前注冊了哪些服務信息。 服務變更查詢接口:服務消費者通過調用服務變更查詢接口,獲取最新的可用服務節點列表。 服務修改接口:修改注冊中心中某一服務的信息。 ## 服務發現負載均衡 ![](https://img.kancloud.cn/23/32/2332cdcba7764921d2d1e446b875c2c5_1292x605.png) ## 負載均衡主要組件 ![](https://img.kancloud.cn/3e/69/3e692edf8c833765456dd723a8a746f2_1133x559.png) | 組件 | 作用 | | --- | --- | | ILoadBalancer | 定義一系列的操作接口,比如選擇服務實例 | | IRule| 算法策略,內置算法策略來為服務實例的選擇提供服務| | ServerList| 負責服務實例信息的獲取,可以獲取配置文件中的,也可以從注冊中心獲取| | ServerListFilter| 過濾掉某些不想要的服務實例信息| | ServerListUpdater| 更新本地緩存的服務實例信息| | IPing| 對已有的服務實例進行可用性檢查,保證選擇的服務都是可用的| ## 指定負載均衡算法 ![](https://img.kancloud.cn/77/48/774801cb894f28399fb9e32450df799e_1464x224.png) ## 負載均衡器通過Eureka獲取動態后端服務列表 ![](https://img.kancloud.cn/f5/2c/f52c3b1b2912d000cf759c67c62e94a9_856x654.png) Netflix源碼解析之Ribbon:負載均衡器通過Eureka獲取動態后端服務列表 - 為程序員服務 Ribbon是一種客戶端的負載均衡,本質上是跑在服務消費者的進程里。服務消費者要訪問服務時,通過ribbon向一個服務注冊的列表查詢,然后以配置的負載均衡策略選擇一個后端服務發起請求。 LB的定義的兩個主要方法,分別是后端服務相關的調用: ~~~ public void addServers(List<Server> newServers); public List<Server> getServerList(boolean availableOnly); ~~~ 在netflix中這個服務注冊列表其實就是eureka服務端集中管理的注冊服務列表。獲取這個列表應該就是是通過eureka的client來完成的。 ![](https://img.kancloud.cn/ac/a0/aca0314692453a8eba6d6543c361c8a7_695x295.png) Netflix源碼解析之Ribbon:負載均衡器通過Eureka獲取動態后端服務列表 - 為程序員服務 也就是ribbon中應該在某個地方集成了eureka client來維護服務列表。這里嘗試追蹤細這個過程,確認下猜想。 [ribbon的實現](http://juke.outofmemory.cn/entry/253610) 的繼承圖上可以看到除了介紹的基本實現LoadBalancer外,還有DynamicServerListLoadBalancer的實現,可以動態的加載后端服務列表。正如名所示,可以動態的加載后端的服務列表。 DynamicServerListLoadBalancer中使用一個ServerListRefreshExecutorThread任務線程定期的更新后端服務列表。 ~~~ class ServerListRefreshExecutorThread implements Runnable { public void run() { updateListOfServers(); } } public void updateListOfServers() { servers = serverListImpl.getUpdatedListOfServers(); updateAllServerList(servers); } ~~~ 其實是通過com.netflix.loadbalancer.ServerList.getUpdatedListOfServers() 方法加載后端服務列表。ServerList這個接口正是用來獲取加載后端服務列表。 ![](https://img.kancloud.cn/55/22/55224b75c4cdea35b64921b4a3a5f847_701x170.png) Netflix源碼解析之Ribbon:負載均衡器通過Eureka獲取動態后端服務列表 - 為程序員服務 看到ConfigurationBasedServerList是從配置中(可以是通過Archaius這樣的集中配置)加載的。 而DiscoveryEnabledNIWSServerList這個實現中包含DiscoveryEnabled猜想應該就是服務發現框架里的服務吧。看進去果然是通過eureka client 從eureka server獲取服務列表進而在ribbon中可以動態的加載。 從聲明 ~~~ public class DiscoveryEnabledNIWSServerList extends AbstractServerList<DiscoveryEnabledServer>{ ~~~ 能看到管理的服務不是一般的服務,是DiscoveryEnabledServer的服務。觀察List com.netflix.niws.loadbalancer.DiscoveryEnabledNIWSServerList.obtainServersViaDiscovery() 的實現可以了解整個過程。 ~~~ private List<DiscoveryEnabledServer> obtainServersViaDiscovery() { List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>(); DiscoveryClient discoveryClient = DiscoveryManager.getInstance().getDiscoveryClient(); if (vipAddresses!=null){ for (String vipAddress : vipAddresses.split(“,”)) { // if targetRegion is null, it will be interpreted as the same region of client List<InstanceInfo> listOfinstanceInfo = discoveryClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion); for (InstanceInfo ii : listOfinstanceInfo) { if (ii.getStatus().equals(InstanceStatus.UP)) { DiscoveryEnabledServer des = new DiscoveryEnabledServer(ii, isSecure, shouldUseIpAddr); des.setZone(DiscoveryClient.getZone(ii)); serverList.add(des); } } return serverList; } ~~~ 可以看到就是通過一個com.netflix.discovery.EurekaClient作為一個句柄來獲取eureka中注冊的服務列表。獲取活的服務,并根據instanceInfo 構造成ribbon需要的DiscoveryEnabledServer并加到服務列表中。 ## 項目pom依賴關系 ![](https://box.kancloud.cn/ed7f173d31522be4dab8ebda1ab69fad_1755x561.png) ## springcloud f版本以后差異 左側2.0.x版本 右側1.5.9版本差異 ![](https://box.kancloud.cn/8c3ea07388bed5d137748773f1bcd1d9_1174x256.png) ![](https://box.kancloud.cn/9dffaf5f95441920d3154c00466083a3_1201x272.png) ![](https://box.kancloud.cn/7445f9b1d69aa634ef6c8e8512ee4867_1124x513.png) ## 常見問題 ![](https://box.kancloud.cn/f128077bbcf974d08fd2069931079195_877x452.png) ### 服務感知慢的原因 ![](https://img.kancloud.cn/64/ce/64ce8325e9c7eb82d0c7b38800f8e935_808x451.png) Eureka 服務感知慢的原因主要有兩個,一部分是因為服務緩存導致的,另一部分是因為客戶端緩存導致的。 * 服務端緩存 服務注冊到注冊中心后,服務實例信息是存儲在注冊表中的,也就是內存中。但 Eureka ?為了提高響應速度,在內部做了優化,加入了兩層的緩存結構,將 Client 需要的實例信息,直接緩存起來,獲取的時候直接從緩存中拿數據然后響應給 Client。 ?第一層緩存是 readOnlyCacheMap,readOnlyCacheMap 是采用 ConcurrentHashMap 來存儲數據的,主要負責定時與 readWriteCacheMap 進行數據同步,默認同步時間為 30 秒一次。 ?第二層緩存是 readWriteCacheMap,readWriteCacheMap 采用 Guava 來實現緩存。 緩存過期時間默認為 180 秒,當服務下線、過期、注冊、狀態變更等操作都會清除此緩存中的數據。 ?Client 獲取服務實例數據時,會先從一級緩存中獲取,如果一級緩存中不存在,再從二級緩存中獲取,如果二級緩存也不存在,會觸發緩存的加載,從存儲層拉取數據到緩存中,然后再返回給 Client。Eureka 之所以設計二級緩存機制,也是為了提高 Eureka Server 的響應速度,缺點是緩存會導致 Client 獲取不到最新的服務實例信息,然后導致無法快速發現新的服務和已下線的服務。了解了服務端的實現后,想要解決這個問題就變得很簡單了,我們可以縮短只讀緩存的更新時間(eureka.server.response-cache-update-interval-ms)讓服務發現變得更加及時,或者直接將只讀緩存關閉(eureka.server.use-read-only-response-cache=false),直接將只讀緩存關閉適合服務量小的場景。Eureka Server 中會有定時任務去檢測失效的服務,將服務實例信息從注冊表中移除,也可以將這個失效檢測的時間縮短,這樣服務下線后就能夠及時從注冊表中清除。 2. 客戶端緩存 客戶端緩存主要分為兩塊內容,一塊是 Eureka Client 緩存,一塊是 Ribbon 緩存。 * Eureka Client 緩存 Eureka Client 負責跟 Eureka Server 進行交互,在 Eureka Client 中的com.netflix.discovery.DiscoveryClient.initScheduledTasks() 方法中,初始化了一個 CacheRefreshThread 定時任務專門用來拉取 Eureka Server 的實例信息到本地。 所以我們需要縮短這個定時拉取服務信息的時間間隔(eureka.client.registryFetchIntervalSeconds)來快速發現新的服務。 * Ribbon 緩存 Ribbon 會從 Eureka Client 中獲取服務信息,ServerListUpdater 是 Ribbon 中負責服務實例更新的組件,默認的實現是 PollingServerListUpdater,通過線程定時去更新實例信息。定時刷新的時間間隔默認是 30 秒,當服務停止或者上線后,這邊最快也需要 30 秒才能將實例信息更新成最新的。我們可以將這個時間調短一點,比如 3 秒。刷新間隔的參數是通過 getRefreshIntervalMs 方法來獲取的,方法中的邏輯也是從 Ribbon 的配置中進行取值的。將這些服務端緩存和客戶端緩存的時間全部縮短后,跟默認的配置時間相比,快了很多。我們通過調整參數的方式來盡量加快服務發現的速度。 ? ## 總結 注冊中心可以說是實現服務化的關鍵,因為服務化之后,服務提供者和服務消費者不在同一個進程中運行,實現了解耦,這就需要一個紐帶去連接服務提供者和服務消費者,而注冊中心就正好承擔了這一角色。此外,服務提供者可以任意伸縮即增加節點或者減少節點,通過服務健康狀態檢測,注冊中心可以保持最新的服務節點信息,并將變化通知給訂閱服務的服務消費者。注冊中心一般采用分布式集群部署,來保證高可用性。
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看