<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                # 服務容錯-Sentinel [TOC] ## 1.Sentinel概述 隨著微服務的流行,服務和服務之間的穩定性變得越來越重要。Sentinel 是面向分布式服務架構的**輕量級**流量控制產品,主要以流量為切入點,從流量控制、熔斷降級、系統負載保護等多個維度來幫助您保護服務的穩定性。 ????? ## 2.整合Sentinel * 第一步: 加依賴 ``` ??????? <!--alibaba sentinel--> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-sentinel</artifactId> </dependency> ???? ?? ? <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> ``` * 第二步:寫配置 ```yaml spring: ??cloud: ????nacos: ??????discovery: ????????server-addr: localhost:8848 ????????namespace: 807451c6-1a54-4c3f-b652-ac1adacdd6f7 ????sentinel: ??????transport: ????????# 指定sentinel 控制臺的地址 ????????dashboard: localhost:8080???? # 全端點開放 management: ??endpoints: ????web: ??????exposure: ????????include: "*" ``` * 第三步:核查是否成功 如果整合成功,就可以看到?actuator/sentine?端點 然后 我們啟動項目進入端點,可以看到返回對應數據,說明整合成功 ![](https://img.kancloud.cn/cd/46/cd46899e0e5d06c94ed990f4be83705b_1091x710.png) ## 3.Sentinel-dashboard (控制臺)--- 重點 ### 啟動sentinel-dashboard ![](https://img.kancloud.cn/19/3c/193cf6fecdf75123346d9ecacb444a40_1610x876.png) ### 訪問sentinel-dashboard ### [http://127.0.0.1:8989/](http://127.0.0.1:8989/) ### sentinel/sentinel ### 登錄頁面 ![](https://img.kancloud.cn/93/e5/93e59cc065413ce08a30a26c17ec005c_1920x606.png) ### 登錄成功 ![](https://img.kancloud.cn/96/17/96174ff06e1225ace15f6b0245179426_1901x662.png) 注意點:一開始進去Sentinel 控制臺 默認是懶加載的模式 就算應用注冊上去,也是看不到該應用的;可以調用該應用的?/actuator/sentinel?端點,觸發懶加模式; ![](https://img.kancloud.cn/73/e8/73e882478005ad079bd35f733a8fd593_1895x746.png) 以上整合 sentinel 完成,接下來開始sentinel的監控配置; ### 執行流程 Sentinel 的執行流程分為三個階段: * Sentinel Core 與 Sentinel Dashboard 建立連接; > 第一步,建立連接。 Sentine Core 在初始化的時候,通過 application.yml 參數中指定的 Dashboard 的 IP地址,會主動向 dashboard 發起連接的請求。 ``` # Sentinel Dashboard通信地址 spring: cloud sentinel: transport: # 指定sentinel 控制臺的地址 dashboard: 127.0.0.1:8080 eager: true ``` 該請求是以心跳包的方式定時向 Dashboard 發送,包含 Sentinel Core 的 AppName、IP、端口信息。這里有個重要細節:Sentinel Core為了能夠持續接收到來自 Dashboard的數據,會在微服務實例設備上監聽 8719 端口,在心跳包上報時也是上報這個 8719 端口。在 Sentinel Dashboard 接收到心跳包后,來自 Sentinel Core的AppName、IP、端口信息會被封裝為 MachineInfo 對象放入 ConcurrentHashMap 保存在 JVM的內存中供后續使用。 ![](https://img.kancloud.cn/c5/f1/c5f1014c724d0251496bdc2260493683_886x399.png) * Sentinel Dashboard 向 Sentinel Core 下發新的保護規則; > 如果在 Dashboard 頁面中設置了新的保護規則,會先從當前的 MachineInfo 中提取符合要求的微服務實例信息,之后通過 Dashboard內置的 transport 模塊將新規則打包推送到微服務實例的 Sentinel Core,Sentinel Core收 到新規則在微服務應用中對本地規則進行更新,這些新規則會保存在微服務實例的 JVM 內存中。 ![](https://img.kancloud.cn/e5/a9/e5a9f692a8acb4ce762c7ba1f25556ec_867x371.png) * Sentinel Core 應用新的保護規則,實施限流、熔斷等動作。 > Sentinel Core 為服務限流、熔斷提供了核心攔截器 SentinelWebInterceptor,這個攔截器默認對所有請求 /** 進行攔截,然后開始請求的鏈式處理流程,在對于每一個處理請求的節點被稱為 Slot(槽),通過多個槽的連接形成處理鏈,在請求的流轉過程中,如果有任何一個 Slot 驗證未通過,都會產生 BlockException,請求處理鏈便會中斷,并返回“Blocked by sentinel" 異常信息。 ![](https://img.kancloud.cn/85/4b/854b733a2c2641a38445b31512df86d8_942x909.png) ## 4.專業名稱 Sentinel 的一些概念 * 資源:資源是 Sentinel 的關鍵概念。資源,可以是一個方法、一段代碼、由應用提供的接口,或者由應用調用其它應用的接口。 * 規則:圍繞資源的實時狀態設定的規則,包括流量控制規則、熔斷降級規則以及系統保護規則、自定義規則。 * 降級:在流量劇增的情況下,為保證系統能夠正常運行,根據資源的實時狀態、訪問流量以及系統負載有策略的拒絕掉一部分流量。 Sentinel 可以簡單的分為 Sentinel 核心庫和 Dashboard。核心庫不依賴 Dashboard,但是結合 Dashboard 可以取得最好的效果。 1.?**定義資源** 2.?**定義規則** 3.?**檢驗規則是否生效** 先把可能需要保護的資源定義好,之后再配置規則。也可以理解為,只要有了資源,我們就可以在任何時候靈活地定義各種流量控制規則。在編碼的時候,只需要考慮這個代碼是否需要保護,如果需要保護,就將之定義為一個資源。 對于主流的框架,我們提供適配,只需要按照適配中的說明配置,Sentinel 就會默認定義提供的服務,方法等為資源 。 ### 4.1.定義資源 我們啟動用戶中心模塊,訪問?/users-anon/login?端點 ![](https://img.kancloud.cn/62/ce/62ce4d2d4a811006592a573cf56fa406_1872x849.png) 在簇點鏈路中找到??**/users-anon/login**?定義的資源,然后點擊流控 ![](https://img.kancloud.cn/d2/2c/d22c91267c9c119e55708adf45bedd15_1860x718.png) #### Sentinel 資源指標數據統計相關的類 Sentinel 中指標數據統計以資源為維度。資源使用 ResourceWrapper 對象表示,我們把 ResourceWrapper 對象稱為資源 ID。如果一個資源描述的是一個接口,那么資源名稱通常就是接口的 url,例如“GET:**/users-anon/login**。 ##### **ResourceWrapper** ~~~ public abstract class ResourceWrapper { protected final String name; protected final EntryType entryType; protected final int resourceType; public ResourceWrapper(String name, EntryType entryType, int resourceType) { this.name = name; this.entryType = entryType; this.resourceType = resourceType; } } ~~~ ResourceWrapper 有三個字段: * name 為資源名稱,例如:“GET:**/users-anon/login**”。 * entryType 為流量類型,即流入流量還是流出流量,通俗點說就是發起請求還是接收請求。 * resourceType 表示資源的類型,例如 Dubbo RPC、Web MVC 或者 API Gateway 網關。 EntryType 是一個枚舉類型: ~~~ public enum EntryType { IN("IN"), OUT("OUT"); } ~~~ 可以把 IN 和 OUT 簡單理解為接收處理請求與發送請求。當接收到別的服務或者前端發來的請求,那么 entryType 為 IN;當向其他服務發起請求時,那么 entryType 就為 OUT。例如,在消費端向服務提供者發送請求,當請求失敗率達到多少時觸發熔斷降級,那么服務消費端為實現熔斷降級就需要統計資源的 OUT 類型流量。 Sentinel 目前支持的資源類型有以下幾種: ~~~ public final class ResourceTypeConstants { public static final int COMMON = 0; public static final int COMMON_WEB = 1; public static final int COMMON_RPC = 2; public static final int COMMON_API_GATEWAY = 3; public static final int COMMON_DB_SQL = 4; } ~~~ * COMMON:默認 * COMMON_WEB:Web 應用的接口 * COMMON_RPC:Dubbo 框架的 RPC 接口 * COMMON_API_GATEWAY:用于 API Gateway 網關 * COMMON_DB_SQL:數據庫 SQL 操作 ### 4.2.定義規則 #### 1.流量控制規則(FlowRule) ![](https://img.kancloud.cn/d1/58/d158e9fcb29ed8815912cb21f4172c80_1268x863.png) 資源名:資源名稱,必需項 針對來源:默認 default,該項定義資源需要自定義某些規則,后面會說到; 閾值類型:2種類型,QPS 和 線程數 單機閾值:定義數量,超過則返回失敗 是否集群:后續會說明; 流控模式:3種類型; * 直接拒絕:超過閾值直接拋異常??FlowException * 關聯: 是指關聯的資源到達閾值,當前資源無法訪問,既做到了限流的效果 * 鏈路: 只會記錄指定鏈路上的資源 流控效果:3種類型; * 1.快速失敗:直接失敗,拋異常 FlowException 源碼位置 canPass方法 ``` com.alibaba.csp.sentinel.slots.block.flow.controller.DefaultController @Override public boolean canPass(Node node, int acquireCount, boolean prioritized) { ??????????//獲取系統當前qps數量 ????????int curCount = avgUsedTokens(node); ??????????//如果當前qps + 請求qps > 定義閾值??并且 prioritized 這個定義為 false 所以直接拋出異常 ????????if (curCount + acquireCount > count) { ????????????if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) { ????????????????long currentTime; ????????????????long waitInMs; ????????????????currentTime = TimeUtil.currentTimeMillis(); ????????????????waitInMs = node.tryOccupyNext(currentTime, acquireCount, count); ????????????????if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) { ????????????????????node.addWaitingRequest(currentTime + waitInMs, acquireCount); ????????????????????node.addOccupiedPass(acquireCount); ????????????????????sleep(waitInMs); ????????????????????// PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}. ????????????????????throw new PriorityWaitException(waitInMs); ????????????????} ????????????} ????????????return false; ????????} ????????return true; ????} ``` ?? * 2.Warm up(慢啟動模式):Warm Up(`RuleConstant.CONTROL_BEHAVIOR_WARM_UP`)方式,即預熱/冷啟動方式。當系統長期處于低水位的情況下,當流量突然增加時,直接把系統拉升到高水位可能瞬間把系統壓垮。通過"冷啟動",讓通過的流量緩慢增加,在一定時間內逐漸增加到閾值上限,給冷系統一個預熱的時間,避免冷系統被壓垮。 源碼位置 canPass方法 ``` com.alibaba.csp.sentinel.slots.block.flow.controller.WarmUpController ????@Override ????public boolean canPass(Node node, int acquireCount, boolean prioritized) { ????????long passQps = (long) node.passQps(); ????????long previousQps = (long) node.previousPassQps(); ????????syncToken(previousQps); ????????// 開始計算它的斜率 ????????// 如果進入了警戒線,開始調整他的qps ????????long restToken = storedTokens.get(); ????????if (restToken >= warningToken) { ????????????long aboveToken = restToken - warningToken; ????????????// 消耗的速度要比warning快,但是要比慢 ????????????// current interval = restToken*slope+1/count ????????????double warningQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count)); ????????????if (passQps + acquireCount <= warningQps) { ????????????????return true; ????????????} ????????} else { ????????????if (passQps + acquireCount <= count) { ????????????????return true; ????????????} ????????} ????????return false; ????} ``` ????? * 3.排隊等待:勻速排隊(`RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER`)方式會嚴格控制請求通過的間隔時間,也即是讓請求以均勻的速度通過,對應的是漏桶算法;**閾值類型必須為QPS,否則無效** 源碼位置 canPass方法 ``` com.alibaba.csp.sentinel.slots.block.flow.controller.RateLimiterController ????@Override ????public boolean canPass(Node node, int acquireCount, boolean prioritized) { ????????// Pass when acquire count is less or equal than 0. ????????if (acquireCount <= 0) { ????????????return true; ????????} ????????// Reject when count is less or equal than 0. ????????// Otherwise,the costTime will be max of long and waitTime will overflow in some cases. ????????if (count <= 0) { ????????????return false; ????????} ????????long currentTime = TimeUtil.currentTimeMillis(); ????????// Calculate the interval between every two requests. ????????long costTime = Math.round(1.0 * (acquireCount) / count * 1000); ????????// Expected pass time of this request. ??????????// latestPassedTime 根據最后一次通過的時間,排隊等待,其實定義好了每段時間經過,如果超過定義的超時時間,就丟棄請求 ????????long expectedTime = costTime + latestPassedTime.get(); ????????if (expectedTime <= currentTime) { ????????????// Contention may exist here, but it's okay. ??????????????//記錄最后一次時間 (這里可能存在爭議,但沒關系。) ????????????latestPassedTime.set(currentTime); ????????????return true; ????????} else { ????????????// Calculate the time to wait. ????????????long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis(); ??????????????//如果等待的時間 大于 我最大的等待時間 直接拋棄 ????????????if (waitTime > maxQueueingTimeMs) { ????????????????return false; ????????????} else { ????????????????long oldTime = latestPassedTime.addAndGet(costTime); ????????????????try { ????????????????????waitTime = oldTime - TimeUtil.currentTimeMillis(); ????????????????????if (waitTime > maxQueueingTimeMs) { ????????????????????????latestPassedTime.addAndGet(-costTime); ????????????????????????return false; ????????????????????} ????????????????????// in race condition waitTime may <= 0 ????????????????????if (waitTime > 0) { ????????????????????????Thread.sleep(waitTime); ????????????????????} ????????????????????return true; ????????????????} catch (InterruptedException e) { ????????????????} ????????????} ????????} ????????return false; ????} ``` ##### 流量控制測試 1.打開 Sentinel 控制臺,在簇點鏈路中找到??**/users-anon/login**?定義的資源,然后點擊流控 ![](https://img.kancloud.cn/7e/b2/7eb23d7585b786fb57fb75dc55f807c2_1833x768.png) 我們嘗試QPS 單機閾值 1,然后調用 http://localhost:7000/users-anon/login?username=admin 資源 發現返回 Blocked by Sentinel (flow limiting) 說明我們流控 快速失敗成功了 ![](https://img.kancloud.cn/0b/b5/0bb5cd90262a11c5de6513c10d03eeb0_1346x198.png) #### 2.熔斷降級規則(DegradeRule) ##### 1.概述 除了流量控制以外,對調用鏈路中不穩定的資源進行熔斷降級也是保障高可用的重要措施之一。由于調用關系的復雜性,如果調用鏈路中的某個資源不穩定,最終會導致請求發生堆積。Sentinel?**熔斷降級**會在調用鏈路中某個資源出現不穩定狀態時(例如調用超時或異常比例升高),對這個資源的調用進行限制,讓請求快速失敗,避免影響到其它的資源而導致級聯錯誤。當資源被降級后,在接下來的降級時間窗口之內,對該資源的調用都自動熔斷(默認行為是拋出?`DegradeException`)。 ![](https://img.kancloud.cn/7d/6d/7d6dc00cf6c1a6621483a8ab7cee9c3b_1840x515.png) ##### 2.降級策略 -?平均響應時間 (`DEGRADE_GRADE_RT`):當 1s 內持續進入 5 個請求,對應時刻的平均響應時間(秒級)均超過閾值(`count`,以 ms 為單位),那么在接下的時間窗口(`DegradeRule`?中的?`timeWindow`,以 s 為單位)之內,對這個方法的調用都會自動地熔斷(拋出?`DegradeException`)。注意 Sentinel 默認統計的 RT 上限是 4900 ms,**超出此閾值的都會算作 4900 ms**,若需要變更此上限可以通過啟動配置項?`-Dcsp.sentinel.statistic.max.rt=xxx`?來配置。 -?異常比例 (`DEGRADE_GRADE_EXCEPTION_RATIO`):當資源的每秒請求量 >= 5,并且每秒異常總數占通過量的比值超過閾值(`DegradeRule`?中的?`count`)之后,資源進入降級狀態,即在接下的時間窗口(`DegradeRule`?中的?`timeWindow`,以 s 為單位)之內,對這個方法的調用都會自動地返回。異常比率的閾值范圍是?`[0.0, 1.0]`,代表 0% - 100%。 -?異常數 (`DEGRADE_GRADE_EXCEPTION_COUNT`):當資源近 1 分鐘的異常數目超過閾值之后會進行熔斷。注意由于統計時間窗口是分鐘級別的,若?`timeWindow`?小于 60s,則結束熔斷狀態后仍可能再進入熔斷狀態。 核心代碼: ```java com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule ``` ```java public boolean passCheck(Context context, DefaultNode node, int acquireCount, Object... args) { ????????if (cut.get()) { ????????????return false; ????????} ????????ClusterNode clusterNode = ClusterBuilderSlot.getClusterNode(this.getResource()); ????????if (clusterNode == null) { ????????????return true; ????????} ??????????// RT方式 ????????if (grade == RuleConstant.DEGRADE_GRADE_RT) { ??????????????//rt的平均數 ????????????double rt = clusterNode.avgRt(); ????????????if (rt < this.count) { ????????????????passCount.set(0); ????????????????return true; ????????????} ????????????// Sentinel will degrade the service only if count exceeds. ??????????????// 當前通過數 小于 RT最大是 返回可以通過 ????????????if (passCount.incrementAndGet() < RT_MAX_EXCEED_N) { ????????????????return true; ????????????} ????????} else if (grade == RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO) { // 異常比例 ??????????????//異常QPS ????????????double exception = clusterNode.exceptionQps(); ????????????//成功QPS ????????????double success = clusterNode.successQps(); ??????????????//總數 ????????????double total = clusterNode.totalQps(); ????????????// if total qps less than RT_MAX_EXCEED_N, pass. ??????????????//總數小于最大的RT 直接通過 ????????????if (total < RT_MAX_EXCEED_N) { ????????????????return true; ????????????} ??????????????//可以訪問的數量 ????????????double realSuccess = success - exception; ????????????if (realSuccess <= 0 && exception < RT_MAX_EXCEED_N) { ????????????????return true; ????????????} ??????????????//異常數量 除以 成功數量 小于定義的數量 返回成功 ????????????if (exception / success < count) { ????????????????return true; ????????????} ????????} else if (grade == RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT) { ??????????????// 總異常數量 ????????????double exception = clusterNode.totalException(); ??????????????//總異常數量 小于 數量 ????????????if (exception < count) { ????????????????return true; ????????????} ????????} ????????if (cut.compareAndSet(false, true)) { ????????????ResetTask resetTask = new ResetTask(this); ????????????pool.schedule(resetTask, timeWindow, TimeUnit.SECONDS); ????????} ????????return false; ????} ``` ##### 熔斷降級測試 我們嘗試RT???1ms ,時間窗口 5 s,然后調用 http://localhost:7000/users-anon/login?username=admin 資源 發現返回 Blocked by Sentinel (flow limiting) ![](https://img.kancloud.cn/44/ca/44ca50b83f64a5639f4d9d9da6199f45_1826x612.png) ![](https://img.kancloud.cn/e6/b3/e6b33ec72595e05a1b7d790e60a0ad1e_1140x267.png) **注意點:測試單個的時候應該把之前相關的規則刪掉,因為Sentinel支持多個規則共存,到時候測試不出哪個是對應的規則;** **測試的debug圖**,請同學們仔細看清楚上面的源碼 ![](https://img.kancloud.cn/4a/5a/4a5ac546170faa6508a66e618bd527e2_1485x894.png) #### 3.熱點規則 (ParamFlowRule) ##### 1.概述 何為熱點?熱點即經常訪問的數據。很多時候我們希望統計某個熱點數據中訪問頻次最高的 Top K 數據,并對其訪問進行限制。比如: -?商品 ID 為參數,統計一段時間內最常購買的商品 ID 并進行限制 -?用戶 ID 為參數,針對一段時間內頻繁訪問的用戶 ID 進行限制 熱點參數限流會統計傳入參數中的熱點參數,并根據配置的限流閾值與模式,對包含熱點參數的資源調用進行限流。熱點參數限流可以看做是一種特殊的流量控制,僅對包含熱點參數的資源調用生效 ![](https://img.kancloud.cn/e5/d6/e5d6b3aa7f1441d603d877a7052a639f_1207x471.png) ##### 熱點規格測試 ![](https://img.kancloud.cn/bb/a1/bba1b6cea12ecde22f5dfbc347cb25f6_862x727.png) 這次我們針對 username 參數進行限流,如果我們只有一個參數,這樣限流的意思也等于限流整個url,所以我們點擊高級選項 ```java ????//定義一個 Sentinel 資源 否則熱點無效,資源名稱隨便取,不能重復即可??????? ????@SentinelResource("findByUsername") ????@GetMapping(value = "/users-anon/login", params = "username") ????@ApiOperation(value = "根據用戶名查詢用戶") ????@LogAnnotation(module="user-center",recordRequestParam=false) ????public LoginAppUser findByUsername(String username) throws ControllerException { ????????try { ????????????return sysUserService.findByUsername(username); ????????} catch (ServiceException e) { ????????????throw new ControllerException(e); ????????} ????} ``` ![](https://img.kancloud.cn/44/e6/44e6aebcde5a9c5c63112e3c6de34148_1815x476.png) ![](https://img.kancloud.cn/2b/15/2b1563fc6ccdeca1f75e9ebde1f0e852_1840x640.png) ![](https://img.kancloud.cn/91/3f/913f3747f65e805b22befb9b8e3b1f71_1701x831.png) 點擊進行保存 這個時候,因為我們方法只有一個參數,不管怎么設置也是對整個方法進行限流,這種方式完全可以依靠上面的幾個規則解決,所以,接下來我們點擊編輯,高級選項,我們針對**admin**?限流10次 ,其他用戶我們限流1次的規則 進行測試 ![](https://img.kancloud.cn/4d/69/4d690ff98072bbaa0ce64d888a29c8c0_1218x794.png) 這個就是訪問一次之后,在進行訪問,直接返回 500 錯誤,刷新2次,就出現錯誤 然后 我們訪問admin,多刷新幾次,也沒有出現500錯誤,可能本人測試機比較慢,部分請求慢,這個同學們可以自己測試 **注意點:也就是熱點參數,我們已經可以細粒度為某個參數進行限流;或者某個參數值進行限流;** **參數必須為基本類型,不可以為復雜類型** ```java com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowChecker ``` ```java public static boolean passCheck(ResourceWrapper resourceWrapper, /*@Valid*/ ParamFlowRule rule, /*@Valid*/ int count, ?????????????????????????????Object... args) { ????????if (args == null) { ????????????return true; ????????} ????????int paramIdx = rule.getParamIdx(); ????????if (args.length <= paramIdx) { ????????????return true; ????????} ????????// Get parameter value. If value is null, then pass. ????????Object value = args[paramIdx]; ????????if (value == null) { ????????????return true; ????????} ????????if (rule.isClusterMode() && rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) { ????????????return passClusterCheck(resourceWrapper, rule, count, value); ????????} ????????return passLocalCheck(resourceWrapper, rule, count, value); ????} ``` ```java com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowSlot ``` ```java void checkFlow(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException { ????????if (args == null) { ????????????return; ????????} ????????if (!ParamFlowRuleManager.hasRules(resourceWrapper.getName())) { ????????????return; ????????} ????????List rules = ParamFlowRuleManager.getRulesOfResource(resourceWrapper.getName()); ????????for (ParamFlowRule rule : rules) { ????????????applyRealParamIdx(rule, args.length); ????????????// Initialize the parameter metrics. ????????????ParameterMetricStorage.initParamMetricsFor(resourceWrapper, rule); ????????????if (!ParamFlowChecker.passCheck(resourceWrapper, rule, count, args)) { ????????????????String triggeredParam = ""; ????????????????if (args.length > rule.getParamIdx()) { ????????????????????Object value = args[rule.getParamIdx()]; ????????????????????triggeredParam = String.valueOf(value); ????????????????} ????????????????throw new ParamFlowException(resourceWrapper.getName(), triggeredParam, rule); ????????????} ????????} ????} ``` 拋出該異常,我們可以通過全局異常捕抓,然后自定義對應異常就行 ``` com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowException: owen ``` ![](https://img.kancloud.cn/80/7d/807d166144bf66b671d6bff9008ae173_1189x432.png) #### 4.系統保護規則 (SystemRule) Sentinel 系統自適應限流從整體維度對應用入口流量進行控制,結合應用的 Load、總體平均 RT、入口 QPS 和線程數等幾個維度的監控指標,讓系統的入口流量和系統的負載達到一個平衡,讓系統盡可能跑在最大吞吐量的同時保證系統整體的穩定性。 ![](https://img.kancloud.cn/fb/56/fb56e4c24f0602583b861d5b1f737c60_1833x508.png) 系統保護規則是從應用級別的入口流量進行控制,從單臺機器的總體 Load、RT、入口 QPS 和線程數四個維度監控應用數據,讓系統盡可能跑在最大吞吐量的同時保證系統整體的穩定性。 系統保護規則是應用整體維度的,而不是資源維度的,并且**僅對入口流量生效**。入口流量指的是進入應用的流量(`EntryType.IN`),比如 Web 服務或 Dubbo 服務端接收的請求,都屬于入口流量。 系統規則支持以下的閾值類型: \-?**Load**(僅對 Linux/Unix-like 機器生效):當系統 load1 超過閾值,且系統當前的并發線程數超過系統容量時才會觸發系統保護。系統容量由系統的?`maxQps * minRt`?計算得出。設定參考值一般是?`CPU cores * 2.5`。 \-?**CPU usage**(1.5.0+ 版本):當系統 CPU 使用率超過閾值即觸發系統保護(取值范圍 0.0-1.0)。 \-?**RT**:當單臺機器上所有入口流量的平均 RT 達到閾值即觸發系統保護,單位是毫秒。 \-?**線程數**:當單臺機器上所有入口流量的并發線程數達到閾值即觸發系統保護。 \-?**入口 QPS**:當單臺機器上所有入口流量的 QPS 達到閾值即觸發系統保護。 核心代碼 ```java com.alibaba.csp.sentinel.slots.system.SystemRuleManager ``` ```java ????private static boolean checkBbr(int currentThread) { ????????if (currentThread > 1 && ????????????currentThread > Constants.ENTRY_NODE.maxSuccessQps() * Constants.ENTRY_NODE.minRt() / 1000) { ????????????return false; ????????} ????????return true; ????} ``` ```java ?public static void checkSystem(ResourceWrapper resourceWrapper) throws BlockException { ????????// Ensure the checking switch is on. ????????if (!checkSystemStatus.get()) { ????????????return; ????????} ????????// for inbound traffic only ????????if (resourceWrapper.getEntryType() != EntryType.IN) { ????????????return; ????????} ????????// total qps ????????double currentQps = Constants.ENTRY_NODE == null ? 0.0 : Constants.ENTRY_NODE.successQps(); ????????if (currentQps > qps) { ????????????throw new SystemBlockException(resourceWrapper.getName(), "qps"); ????????} ????????// total thread ????????int currentThread = Constants.ENTRY_NODE == null ? 0 : Constants.ENTRY_NODE.curThreadNum(); ????????if (currentThread > maxThread) { ????????????throw new SystemBlockException(resourceWrapper.getName(), "thread"); ????????} ????????double rt = Constants.ENTRY_NODE == null ? 0 : Constants.ENTRY_NODE.avgRt(); ????????if (rt > maxRt) { ????????????throw new SystemBlockException(resourceWrapper.getName(), "rt"); ????????} ????????// load. BBR algorithm. ????????if (highestSystemLoadIsSet && getCurrentSystemAvgLoad() > highestSystemLoad) { ????????????if (!checkBbr(currentThread)) { ????????????????throw new SystemBlockException(resourceWrapper.getName(), "load"); ????????????} ????????} ????????// cpu usage ????????if (highestCpuUsageIsSet && getCurrentCpuUsage() > highestCpuUsage) { ????????????if (!checkBbr(currentThread)) { ????????????????throw new SystemBlockException(resourceWrapper.getName(), "cpu"); ????????????} ????????} ????} ``` 因為系統設置,一般比較少用,這里測試也不方便,沒有進行測試,貼出2段核心源碼,如果有興趣的同學,可以繼續研究; #### 5.訪問控制規則 (AuthorityRule) 很多時候,我們需要根據調用方來限制資源是否通過,這時候可以使用 Sentinel 的黑白名單控制的功能。黑白名單根據資源的請求來源(`origin`)限制資源是否通過,若配置白名單則只有請求來源位于白名單內時才可通過;若配置黑名單則請求來源位于黑名單時不通過,其余的請求通過。 ![](https://img.kancloud.cn/e0/f2/e0f2d359225caa0e2f0a68a29f3990e6_1837x636.png) ## 5.代碼配置規則 ?????Sentinel支持頁面的方式新增規則,也支持代碼的方式新增規則; 下面定義流控規則 ```java private void initFlowQpsRule() { ????List rules = new ArrayList<>(); ????FlowRule rule = new FlowRule(resourceName); ????// set limit qps to 20 ????rule.setCount(20); ????rule.setGrade(RuleConstant.FLOW_GRADE_QPS); ????rule.setLimitApp("default"); ????rules.add(rule); ????FlowRuleManager.loadRules(rules); } ``` 代碼中 “resourceName” 指定隨機一個資源名就行了 。 Sentinel 實現限流降級、熔斷降級、黑白名單限流降級、系統自適應限流降級以及熱點參數限流降級都是由 ProcessorSlot、Checker、Rule、RuleManager 組合完成。ProcessorSlot 作為調用鏈路的切入點,負責調用 Checker 檢查當前請求是否可以放行;Checker 則根據資源名稱從 RuleManager 中拿到為該資源配置的 Rule(規則),取 ClusterNode 統計的實時指標數據與規則對比,如果達到規則的閾值則拋出 Block 異常,拋出 Block 異常意味著請求被拒絕,也就實現了限流或熔斷。 可以總結為以下三個步驟: 1. 在 ProcessorSlot#entry 方法中調用 Checker#check 方法,并將 DefaultNode 傳遞給 Checker。 2. Checker 從 DefaultNode 拿到 ClusterNode,并根據資源名稱從 RuleManager 獲取為該資源配置的規則。 3. Checker 從 ClusterNode 中獲取當前時間窗口的某項指標數據(QPS、avgRt 等)與規則的閾值對比,如果達到規則的閾值則拋出 Block 異常(也有可能將 check 交給 Rule 去實現)。 ## 6.控制臺相關配置項 ```properties #spring settings spring.http.encoding.force=true spring.http.encoding.charset=UTF-8 spring.http.encoding.enabled=true #logging settings logging.level.org.springframework.web=INFO logging.file=${user.home}/logs/csp/sentinel-dashboard.log logging.pattern.file= %d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n #logging.pattern.console= %d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n #auth settings auth.filter.exclude-urls=/,/auth/login,/auth/logout,/registry/machine,/version auth.filter.exclude-url-suffixes=htm,html,js,css,map,ico,ttf,woff,png # If auth.enabled=false, Sentinel console disable login auth.username=sentinel auth.password=sentinel # Inject the dashboard version. It's required to enable # filtering in pom.xml for this resource file. sentinel.dashboard.version=${project.version} ``` 前面我們也下載了一個 dashboard 查看?**application.properties**,可以指定修改sentinel賬號與密碼; ## 7.注解支持 Sentinel 提供了?`@SentinelResource`?注解用于定義資源,并提供了 AspectJ 的擴展用于自動定義資源、處理?`BlockException`?等。 ### @SentinelResource 注解 `@SentinelResource`?用于定義資源,并提供可選的異常處理和 fallback 配置項。?`@SentinelResource`?注解包含以下屬性: \-?`value`:資源名稱,必需項(不能為空) \-?`entryType`:entry 類型,可選項(默認為?`EntryType.OUT`) \-?`blockHandler`?/?`blockHandlerClass`:?`blockHandler `對應處理?`BlockException`?的函數名稱,可選項。blockHandler 函數訪問范圍需要是?`public`,返回類型需要與原方法相匹配,參數類型需要和原方法相匹配并且最后加一個額外的參數,類型為?`BlockException`。blockHandler 函數默認需要和原方法在同一個類中。若希望使用其他類的函數,則可以指定?`blockHandlerClass`?為對應的類的?`Class`?對象,注意對應的函數必需為 static 函數,否則無法解析。 \-?`fallback`:fallback 函數名稱,可選項,用于在拋出異常的時候提供 fallback 處理邏輯。fallback 函數可以針對所有類型的異常(除了`exceptionsToIgnore`里面排除掉的異常類型)進行處理。fallback 函數簽名和位置要求: ??-?返回值類型必須與原函數返回值類型一致; ??-?方法參數列表需要和原函數一致,或者可以額外多一個?`Throwable`?類型的參數用于接收對應的異常。 ??-?fallback 函數默認需要和原方法在同一個類中。若希望使用其他類的函數,則可以指定?`fallbackClass`?為對應的類的?`Class`?對象,注意對應的函數必需為 static 函數,否則無法解析。 \-?`defaultFallback`(since 1.6.0):默認的 fallback 函數名稱,可選項,通常用于通用的 fallback 邏輯(即可以用于很多服務或方法)。默認 fallback 函數可以針對所有類型的異常(除了`exceptionsToIgnore`里面排除掉的異常類型)進行處理。若同時配置了 fallback 和 defaultFallback,則只有 fallback 會生效。defaultFallback 函數簽名要求: ??-?返回值類型必須與原函數返回值類型一致; ??-?方法參數列表需要為空,或者可以額外多一個?`Throwable`?類型的參數用于接收對應的異常。 ??-?defaultFallback 函數默認需要和原方法在同一個類中。若希望使用其他類的函數,則可以指定?`fallbackClass`?為對應的類的?`Class`?對象,注意對應的函數必需為 static 函數,否則無法解析。 \-?`exceptionsToIgnore`(since 1.6.0):用于指定哪些異常被排除掉,不會計入異常統計中,也不會進入 fallback 邏輯中,而是會原樣拋出。 \> 注:1.6.0 之前的版本 fallback 函數只針對降級異常(`DegradeException`)進行處理,**不能針對業務異常進行處理**。 特別地,若 blockHandler 和 fallback 都進行了配置,則被限流降級而拋出?`BlockException`?時只會進入?`blockHandler`?處理邏輯。若未配置?`blockHandler`、`fallback`?和?`defaultFallback`,則被限流降級時會將?`BlockException`?**直接拋出**。 核心代碼: ```java com.alibaba.csp.sentinel.annotation.aspectj.SentinelResourceAspect ``` ```java ????@Around("sentinelResourceAnnotationPointcut()") ????public Object invokeResourceWithSentinel(ProceedingJoinPoint pjp) throws Throwable { ????????Method originMethod = resolveMethod(pjp); ????????SentinelResource annotation = originMethod.getAnnotation(SentinelResource.class); ????????if (annotation == null) { ????????????// Should not go through here. ????????????throw new IllegalStateException("Wrong state for SentinelResource annotation"); ????????} ????????String resourceName = getResourceName(annotation.value(), originMethod); ????????EntryType entryType = annotation.entryType(); ????????int resourceType = annotation.resourceType(); ????????Entry entry = null; ????????try { ????????????entry = SphU.entry(resourceName, resourceType, entryType, pjp.getArgs()); ????????????Object result = pjp.proceed(); ????????????return result; ????????} catch (BlockException ex) { ????????????return handleBlockException(pjp, annotation, ex); ????????} catch (Throwable ex) { ????????????Class[] exceptionsToIgnore = annotation.exceptionsToIgnore(); ????????????// The ignore list will be checked first. ????????????if (exceptionsToIgnore.length > 0 && exceptionBelongsTo(ex, exceptionsToIgnore)) { ????????????????throw ex; ????????????} ????????????if (exceptionBelongsTo(ex, annotation.exceptionsToTrace())) { ????????????????traceException(ex, annotation); ????????????????return handleFallback(pjp, annotation, ex); ????????????} ????????????// No fallback function can handle the exception, so throw it out. ????????????throw ex; ????????} finally { ????????????if (entry != null) { ????????????????entry.exit(1, pjp.getArgs()); ????????????} ????????} ????} ``` 上面源碼核心邏輯,我們可以用簡單的代碼解釋下 ```java Entry entry = null; // 務必保證finally會被執行 try { ??// 資源名可使用任意有業務語義的字符串 ??entry = SphU.entry("自定義資源名"); ??// 被保護的業務邏輯 ??// do something... } catch (BlockException e1) { ??// 資源訪問阻止,被限流或被降級 ??// 進行相應的處理操作 } finally { ??if (entry != null) { ????entry.exit(); ??} } ``` ## 8.Sentinel 源碼分析 ### 基于滑動窗口的實時指標數據統計 > Sentinel 是基于滑動窗口實現的實時指標數據統計,要深入理解 Sentinel 的限流實現原理,首先我們得要了解其指標數據統計的實現,例如如何統計 QPS。 * Bucket Sentinel 使用 Bucket 統計一個窗口時間內的各項指標數據,這些指標數據包括請求總數、成功總數、異常總數、總耗時、最小耗時、最大耗時等,而一個 Bucket 可以是記錄一秒內的數據,也可以是 10 毫秒內的數據,這個時間長度稱為窗口時間。 ![](https://img.kancloud.cn/87/1c/871cbf152fe94536361b4b40a5e2b570_958x355.png) 如上面代碼所示,Bucket 記錄一段時間內的各項指標數據用的是一個 LongAdder 數組,LongAdder 保證了數據修改的原子性,并且性能比 AtomicInteger 表現更好。數組的每個元素分別記錄一個時間窗口內的請求總數、異常數、總耗時,如下圖所示。 ![](https://img.kancloud.cn/d7/fc/d7fc6eabbe49f4ae31a17779e35dc864_1027x424.png) Sentinel 用枚舉類型 MetricEvent 的 ordinal 屬性作為下標,ordinal 的值從 0 開始,按枚舉元素的順序遞增,正好可以用作數組的下標。 ![](https://img.kancloud.cn/a4/e9/a4e9a8dab87b4fd3ddc7addfd6866b20_1371x501.png) 當需要獲取 Bucket 記錄總的成功請求數或者異常總數、總的請求處理耗時,可根據事件類型(MetricEvent)從 Bucket 的 LongAdder 數組中獲取對應的 LongAdder,并調用 sum 方法獲取總數,如下代碼所示。 ![](https://img.kancloud.cn/dc/7d/dc7df398793c4dbd131a1dfebd44e8d7_1264x692.png) 當需要 Bucket 記錄一個成功請求或者一個異常請求、處理請求的耗時,可根據事件類型(MetricEvent)從 LongAdder 數組中獲取對應的 LongAdder,并調用其 add 方法,如下代碼所示。 ![](https://img.kancloud.cn/ce/57/ce5726e985652bc271682f47c31b2e08_1302x492.png) * 滑動窗口 Sentinel 定義一個 Bucket 數組,根據時間戳來定位到數組的下標。 ![](https://img.kancloud.cn/2e/43/2e438ac0348a37299de112b10b2fb559_1144x341.png) 假設我們需要統計每 1 秒處理的請求數等數據,且只需要保存最近一分鐘的數據。那么 Bucket 數組的大小就可以設置為 60,每個 Bucket 的 windowLengthInMs(窗口時間)大小就是 1000 毫秒(1 秒),如下圖所示構建時間輪數據結構。 ![](https://img.kancloud.cn/3a/1f/3a1fb1601cda345d262595c6eb2300a1_720x255.png) * WindowWrap Bucket 自身并不保存時間窗口信息, Sentinel 給 Bucket 加了一個包裝類 WindowWrap,用于記錄 Bucket 的時間窗口信息。 當接收到一個請求時,根據接收到請求的時間戳計算出一個數組索引,從滑動窗口(WindowWrap 數組)中獲取一個 WindowWrap,從而獲取 WindowWrap 包裝的 Bucket,調用 Bucket 的 add 方法記錄相應的事件。 根據當前時間戳定位 Bucket 的算法實現如下。 ![](https://img.kancloud.cn/6b/4a/6b4a7ededfae2be5f312fbaf1a7474c9_1625x672.png) 上面代碼實現的是,通過當前時間戳計算出當前時間窗口的 Bucket(New Buket)在數組中的索引(cidx),以及 Bucket 時間窗口的開始時間,通過索引從數組中取得 Bucket(Old Bucket)。 * 當索引(cidx)處不存在 Bucket 時,創建一個新的 Bucket,并且確保線程安全寫入到數組 cidx 處,將此 Bucket 返回; * 當 Old Bucket 不為空時,且 Old Bucket 時間窗口的開始時間與當前計算得到的 New Buket 的時間窗口開始時間相等時,該 Bucket 就是當前要找的 Bucket,直接返回; * 當計算出 New Bucket 時間窗口的開始時間大于當前數組 cidx 位置存儲的 Old Bucket 時間窗口的開始時間時,可以復用這個 Old Bucket,確保線程安全重置 Bucket,并返回; * 當計算出 New Bucket 時間窗口的開始時間小于當前數組 cidx 位置存儲的 Old Bucket 時間窗口的開始時間時,直接返回一個空的 Bucket,因為時間不會倒退。 * WindowWrap 用于包裝 Bucket,隨著 Bucket 一起創建。 * WindowWrap 數組實現滑動窗口,Bucket 只負責統計各項指標數據,WindowWrap 用于記錄 Bucket 的時間窗口信息。 * 定位 Bucket 實際上是定位 WindowWrap,拿到 WindowWrap 就能拿到 Bucket。 * Node Node 用于持有實時統計的指標數據,Node 接口定義了一個 Node 類所需要提供的各項指標數據統計的相關功能,為外部屏蔽滑動窗口的存在。提供記錄請求被拒絕、請求被放行、請求處理異常、請求處理成功的方法,以及獲取當前時間窗口統計的請求總數、平均耗時等方法。Node 幾個實現類DefaultNode、ClusterNode、EntranceNode、StatisticNode 的關系如下圖所示。。 ![](https://img.kancloud.cn/9a/5c/9a5c8792e636cafcaa807d972403842a_998x460.png) * StatisticNode Statistic 即統計的意思,StatisticNode 是 Node 接口的實現類,是實現實時指標數據統計 Node。 ~~~ public class StatisticNode implements Node { // 秒級滑動窗口,2 個時間窗口大小為 500 毫秒的 Bucket private transient volatile Metric rollingCounterInSecond = new ArrayMetric(2,1000); // 分鐘級滑動窗口,60 個 Bucket 數組,每個 Bucket 統計的時間窗口大小為 1 秒 private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false); // 統計并發使用的線程數 private LongAdder curThreadNum = new LongAdder(); } ~~~ 如代碼所示,一個 StatisticNode 包含一個秒級和一個分鐘級的滑動窗口,以及并行線程數計數器。秒級滑動窗口用于統計實時的 QPS,分鐘級的滑動窗口用于保存最近一分鐘內的歷史指標數據,并行線程計數器用于統計當前并行占用的線程數。 StatisticNode 的分鐘級和秒級滑動窗口統計的指標數據分別有不同的用處。例如,StatisticNode 記錄請求成功和請求執行耗時的方法中調用了兩個滑動窗口的對應指標項的記錄方法,代碼如下: ~~~ @Override public void addRtAndSuccess(long rt, int successCount) { // 秒級滑動窗口 rollingCounterInSecond.addSuccess(successCount); rollingCounterInSecond.addRT(rt); // 分鐘級滑動窗口 rollingCounterInMinute.addSuccess(successCount); rollingCounterInMinute.addRT(rt); } ~~~ 獲取前一秒被 Sentinel 拒絕的請求總數從分鐘級滑動窗口獲取,代碼如下: ~~~ @Override public double previousBlockQps() { return this.rollingCounterInMinute.previousWindowBlock(); } ~~~ 而獲取當前一秒內已經被 Sentinel 拒絕的請求總數則從秒級滑動窗口獲取,代碼如下: ~~~ @Override public double blockQps() { return rollingCounterInSecond.block() / rollingCounterInSecond.getWindowIntervalInSec(); } ~~~ 獲取最小耗時也是從秒級的滑動窗口取的,代碼如下: ~~~ @Override public double minRt() { // 秒級滑動窗口 return rollingCounterInSecond.minRt(); } ~~~ 由于方法比較多,這里就不詳細介紹每個方法的實現了。 StatisticNode 還負責統計并行占用的線程數,用于實現信號量隔離,按資源所能并發占用的最大線程數實現限流。當接收到一個請求就將 curThreadNum 自增 1,當處理完請求時就將 curThreadNum 自減一,如果同時處理 10 個請求,那么 curThreadNum 的值就為 10。 假設我們配置 tomcat 處理請求的線程池大小為 200,通過控制并發線程數實現信號量隔離的好處就是不讓一個接口同時使用完這 200 個線程,避免因為一個接口響應慢將 200 個線程都阻塞導致應用無法處理其他請求的問題,這也是實現信號量隔離的目的。 ### Sentinel 中的 ProcessorSlot ProcessorSlot 直譯就是處理器插槽,是 Sentinel 實現限流降級、熔斷降級、系統自適應降級等功能的切入點。Sentinel 提供的 ProcessorSlot 可以分為兩類,一類是輔助完成資源指標數據統計的切入點,一類是實現降級功能的切入點。 輔助資源指標數據統計的 ProcessorSlot: * NodeSelectorSlot:為當前資源創建 DefaultNode,并且將 DefaultNode 賦值給 Context.curEntry.curNode(見倒數第二張圖);如果當前調用鏈路上只出現過一次 SphU#entry 的情況,將該 DefaultNode 添加到的 Context.entranceNode 的子節點(如倒數第一張圖所示,名為 sentinel\_spring\_web\_context 的 EntranceNode),否則添加到 Context.curEntry.parent 的子節點(childList)。有點抽象,我們在分析 NodeSelectorSlot 源碼時再詳細介紹。 * ClusterBuilderSlot:如果當前資源未創建 ClusterNode,則為資源創建 ClusterNode;將 ClusterNode 賦值給當前資源的 DefaultNode.clusterNode;如果調用來源(origin)不為空,則為調用來源創建 StatisticNode,用于實現按調用來源統計資源的指標數據,ClusterNode 持有每個調用來源的 StatisticNode。 * StatisticSlot:這是 Sentinel 最為重要的類之一,用于實現指標數據統計。先是調用后續的 ProcessorSlot#entry 判斷是否放行請求,再根據判斷結果進行相應的指標數據統計操作。 實現降級功能的 ProcessorSlot: * AuthoritySlot:實現黑白名單降級 * SystemSlot:實現系統自適應降級 * FlowSlot:實現限流降級 * DegradeSlot:實現熔斷降級 Sentinel 的整體工具流程就是使用責任鏈模式將所有的 ProcessorSlot 按照一定的順序串成一個單向鏈表。輔助完成資源指標數據統計的 ProcessorSlot 必須在實現降級功能的 ProcessorSlot 的前面,原因很簡單,降級功能需要依據資源的指標數據做判斷,當然,如果某個 ProcessorSlot 不依賴指標數據實現降級功能,那這個 ProcessorSlot 的位置就沒有約束。 除了按分類排序外,同一個分類下的每個 ProcessorSlot 可能也需要有嚴格的排序。比如輔助完成資源指標數據統計的 ProcessorSlot 的排序順序為: > NodeSelectorSlot->ClusterBuilderSlot->StatisticSlot 如果順序亂了就會拋出異常,而實現降級功能的 ProcessorSlot 就沒有嚴格的順序要求,AuthoritySlot、SystemSlot、FlowSlot、DegradeSlot 這幾個的順序可以按需調整。 實現將 ProcessorSlot 串成一個單向鏈表的是 ProcessorSlotChain,這個 ProcessorSlotChain 是由 SlotChainBuilder 構造的,默認 SlotChainBuilder 構造的 ProcessorSlotChain 注冊的 ProcessorSlot 以及順序如下代碼所示。 ~~~ public class DefaultSlotChainBuilder implements SlotChainBuilder { @Override public ProcessorSlotChain build() { ProcessorSlotChain chain = new DefaultProcessorSlotChain(); chain.addLast(new NodeSelectorSlot()); chain.addLast(new ClusterBuilderSlot()); chain.addLast(new LogSlot()); chain.addLast(new StatisticSlot()); chain.addLast(new AuthoritySlot()); chain.addLast(new SystemSlot()); chain.addLast(new FlowSlot()); chain.addLast(new DegradeSlot()); return chain; } } ~~~ ![](https://img.kancloud.cn/ab/98/ab985ac29dc9052007a418dac2220054_1062x368.png) #### Slot 作用 我們需要了解一下,默認 Slot 有7 個,前 3 個 Slot為前置處理,用于收集、統計、分析必要的數據;后 4 個為規則校驗 Slot,從Dashboard 推送的新規則保存在“規則池”中,然后對應 Slot 進行讀取并校驗當前請求是否允許放行,允許放行則送入下一個 Slot 直到最終被 RestController 進行業務處理,不允許放行則直接拋出 BlockException 返回響應。 以下是每一個 Slot 的具體職責: NodeSelectorSlot 負責收集資源的路徑,并將這些資源的調用路徑,以樹狀結構存儲起來,用于根據調用路徑來限流降級; ClusterBuilderSlot 則用于存儲資源的統計信息以及調用者信息,例如該資源的 RT(運行時間), QPS, thread count(線程總數)等,這些信息將用作為多維度限流,降級的依據; StatistcSlot 則用于記錄,統計不同維度的runtime 信息; SystemSlot 則通過系統的狀態,例如CPU、內存的情況,來控制總的入口流量; AuthoritySlot 則根據黑白名單,來做黑白名單控制; FlowSlot 則用于根據預設的限流規則,以及前面 slot 統計的狀態,來進行限流; DegradeSlot 則通過統計信息,以及預設的規則,來做熔斷降級。 到這里我們理解了 Sentinel 通信與降級背后的執行過程,下面咱們學習如何有效配置 Sentinel 的限流策略。 ## 控制臺是如何獲取到實時數據的 Sentinel 能夠被大家所認可,除了他自身的輕量級,高性能,可擴展之外,跟控制臺的好用和易用也有著莫大的關系,因為通過控制臺極大的方便了我們日常的運維工作。 我們可以在控制臺上操作各種限流、降級、系統保護的規則,也可以查看每個資源的實時數據,還能管理集群環境下的服務端與客戶端機器。 但是控制臺只是一個獨立的 spring boot 應用,他本身是沒有任何數據的,他的數據都是從其他的 sentinel 實例中獲取的,那他是如何獲取到這些數據的呢?帶著這個疑問我們從源碼中尋找答案。 最簡單的方法莫過于啟動一個控制臺的實例,然后從頁面上查看每個接口請求的url,然后再到 dashboard 的代碼中去深挖下去。 ### dashboard源碼分析 dashboard 是通過一個叫 SentinelApiClient 的類去指定的 ip 和 port 處獲取數據的。這個 ip 和 port 是前端頁面直接提交給后端的,而前端頁面又是通過 /app/{app}/machines.json 接口獲取機器列表的 ![](https://img.kancloud.cn/88/23/8823bffc9e797af5524101874250c291_1118x863.png) ### 連接 dashboard 機器列表中展示的就是所有連接到 dashboard 上的 sentinel 的實例,包括普通限流的 sentinel-core 和集群模式下的 token-server 和 token-client。一個 sentinel-core 的實例要接入 dashboard 的幾個步驟: 1. 引入 dashboard 的依賴 2. 配置 dashboard 的 ip 和 port 3. 初始化 sentinel-core,連接 dashboard sentinel-core 在初始化的時候,通過 JVM 參數中指定的 dashboard 的 ip 和 port,會主動向 dashboard 發起連接的請求,該請求是通過 HeartbeatSender 接口以心跳的方式發送的,并將自己的 ip 和 port 告知 dashboard。這里 sentinel-core 上報給 dashboard 的端口是 sentinel 對外暴露的自己的 CommandCenter 的端口。 HeartbeatSender 有兩個實現類,一個是通過 http,另一個是通過 netty,我們看 http 的實現類: > SimpleHttpHeartbeatSender.java ~~~java private final HeartbeatMessage heartBeat = new HeartbeatMessage(); private final SimpleHttpClient httpClient = new SimpleHttpClient(); @Override public boolean sendHeartbeat() throws Exception { if (TransportConfig.getRuntimePort() <= 0) { RecordLog.info("[SimpleHttpHeartbeatSender] Runtime port not initialized, won't send heartbeat"); return false; } InetSocketAddress addr = getAvailableAddress(); if (addr == null) { return false; } SimpleHttpRequest request = new SimpleHttpRequest(addr, HEARTBEAT_PATH); request.setParams(heartBeat.generateCurrentMessage()); try { SimpleHttpResponse response = httpClient.post(request); if (response.getStatusCode() == OK_STATUS) { return true; } } catch (Exception e) { RecordLog.warn("[SimpleHttpHeartbeatSender] Failed to send heartbeat to " + addr + " : ", e); } return false; } ~~~ 通過一個 HttpClient 向 dashboard 發送了自己的信息,包括 ip port 和版本號等信息。 其中 consoleHost 和 consolePort 的值就是從 JVM 參數 csp.sentinel.dashboard.server 中獲取的。 dashboard 在接收到 sentinel-core 的連接之后,就會與 sentinel-core 建立連接,并將 sentinel-core 上報的 ip 和 port 的信息包裝成一個 MachineInfo 對象,然后通過 SimpleMachineDiscovery 將該對象保存在一個 map 中,如下圖所示: ![](https://img.kancloud.cn/4b/cb/4bcb52bd960216466d362bb1ce35c40a_1183x637.png) ### 定時發送心跳 sentinel-core 連接上 dashboard 之后,并不是就結束了,事實上 sentinel-core 是通過一個 ScheduledExecutorService 的定時任務,每隔 10 秒鐘向 dashboard 發送一次心跳信息。發送心跳的目的主要是告訴 dashboard 我這臺 sentinel 的實例還活著,你可以繼續向我請求數據。 這也就是為什么 dashboard 中每個 app 對應的機器列表要用 Set 來保存的原因,如果用 List 來保存的話就可能存在同一臺機器保存了多次的情況。 心跳可以維持雙方之間的連接是正常的,但是也有可能因為各種原因,某一方或者雙方都離線了,那他們之間的連接就丟失了。 1.sentinel-core 宕機 如果是 sentinel-core 宕機了,那么這時 dashboard 中保存在內存里面的機器列表還是存在的。目前 dashboard 只是在接收到 sentinel-core 發送過來的心跳包的時候更新一次機器列表,當 sentinel-core 宕機了,不再發送心跳數據的時候,dashboard 是沒有將 “失聯” 的 sentinel-core 實例給去除的。而是頁面上每次查詢的時候,會去用當前時間減去機器上次心跳包的時間,如果時間差大于 5 分鐘了,才會將該機器標記為 “失聯”。 所以我們在頁面上的機器列表中,需要至少等到 5 分鐘之后,才會將具體失聯的 sentinel-core 的機器標記為 “失聯”。如下圖所示: ![](https://img.kancloud.cn/63/b7/63b763e5d2d22a80b08ddcdaccefe98c_1512x482.png) 2.dashboard 宕機 如果 dashboard 宕機了,sentinel-core 的定時任務實際上是會一直請求下去的,只要 dashboard 恢復后就會自動重新連接上 dashboard,雙方之間的連接又會恢復正常了,如果 dashboard 一直不恢復,那么 sentinel-core 就會一直報錯,在 sentinel-record.log 中我們會看到如下的報錯信息: ![](https://img.kancloud.cn/b8/b2/b8b20e224c6262bb1578d7ac1ce0d281_1167x433.png) 不過實際生產中,不可能出現 dashboard 宕機了一直沒人去恢復的情況的,如果真出現這種情況的話,那就要吃故障了。 ### 請求數據 當 dashboard 有了具體的 sentinel-core 實例的 ip 和 port 之后,就可以去請求所需要的數據了。 讓我們再回到最開始的地方,我在頁面上查詢某一臺機器的限流的規則時,是將該機器的 ip 和 port 以及 appName 都傳給了服務端,服務端通過這些信息去具體的遠程實例中請求所需的數據,拿到數據后再封裝成 dashboard 所需的格式返回給前端頁面進行展示。 具體請求限流規則列表的代碼在 SentinelApiClient 中,如下所示: > SentinelApiClient.java ~~~java public List<FlowRuleEntity> fetchFlowRuleOfMachine(String app, String ip, int port) { String url = "http://" + ip + ":" + port + "/" + GET_RULES_PATH + "?type=" + FLOW_RULE_TYPE; String body = httpGetContent(url); logger.info("FlowRule Body:{}", body); List<FlowRule> rules = RuleUtils.parseFlowRule(body); if (rules != null) { return rules.stream().map(rule -> FlowRuleEntity.fromFlowRule(app, ip, port, rule)) .collect(Collectors.toList()); } else { return null; } } ~~~ 可以看到也是通過一個 httpClient 請求的數據,然后再對結果進行轉換,具體請求的過程是在 httpGetContent 方法中進行的,我們看下該方法,如下所示: ~~~java private String httpGetContent(String url) { final HttpGet httpGet = new HttpGet(url); final CountDownLatch latch = new CountDownLatch(1); final AtomicReference<String> reference = new AtomicReference<>(); httpClient.execute(httpGet, new FutureCallback<HttpResponse>() { @Override public void completed(final HttpResponse response) { try { reference.set(getBody(response)); } catch (Exception e) { logger.info("httpGetContent " + url + " error:", e); } finally { latch.countDown(); } } @Override public void failed(final Exception ex) { latch.countDown(); logger.info("httpGetContent " + url + " failed:", ex); } @Override public void cancelled() { latch.countDown(); } }); try { latch.await(5, TimeUnit.SECONDS); } catch (Exception e) { logger.info("wait http client error:", e); } return reference.get(); } ~~~ 從代碼中可以看到,是通過一個異步的 httpClient 再結合 CountDownLatch 等待 5 秒的超時時間去獲取結果的。 獲取數據的請求從 dashboard 中發出去了,那 sentinel-core 中是怎么進行相應處理的呢?看過我其他文章的同學肯定還記得, sentinel-core 在啟動的時候,執行了一個 InitExecutor.init 的方法,該方法會觸發所有 InitFunc 實現類的 init 方法,其中就包括兩個最重要的實現類: * HeartbeatSenderInitFunc * CommandCenterInitFunc HeartbeatSenderInitFunc 會啟動一個 HeartbeatSender 來定時的向 dashboard 發送自己的心跳包,而 CommandCenterInitFunc 則會啟動一個 CommandCenter 對外提供 sentinel-core 的數據服務,而這些數據服務是通過一個一個的 CommandHandler 來提供的,如下圖所示: ![](https://img.kancloud.cn/dd/e6/dde65d0129aeb2e1c1504a9c8b5b7313_1021x766.png) ### 總結 現在我們已經知道了 dashboard 是如何獲取到實時數據的了,具體的流程如下所示: 1.首先 sentinel-core 向 dashboard 發送心跳包 2.dashboard 將 sentinel-core 的機器信息保存在內存中 3.dashboard 根據 sentinel-core 的機器信息通過 httpClient 獲取實時的數據 4.sentinel-core 接收到請求之后,會找到具體的 CommandHandler 來處理 5.sentinel-core 將處理好的結果返回給 dashboard ### 思考 1.數據安全性 >sentinel-dashboard 和 sentinel-core 之間的通訊是基于 http 的,沒有進行加密或鑒權,可能會存在數據安全性的問題,不過這些數據并非是很機密的數據,對安全性要求并不是很高,另外增加了鑒權或加密之后,對于性能和實效性有一定的影響。 2.SentinelApiClient >目前所有的數據請求都是通過 SentinelApiClient 類去完成的,該類中充斥著大量的方法,都是發送 http 請求的。代碼的可讀性和可維護性不高,所以需要對該類進行重構,目前我能夠想到的有兩種方法: 1)通過將 sentinel-core 注冊為 rpc 服務,dashboard 就像調用本地方法一樣去調用 sentinel-core 中的方法,不過這樣的話需要引入服務注冊和發現的依賴了。 2)通過 netty 實現私有的協議,sentinel-core 通過 netty 啟動一個 CommandCenter 來對外提供服務。dashboard 通過發送 Packet 來進行數據請求,sentinel-core 來處理 Packet。不過這種方法跟目前的做法沒有太大的區別,唯一比較好的可能就是不需要為每種請求都寫一個方法,只需要定義好具體的 Packet 就好了。
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看