一、場景描述 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?
? ? ?很多做服務接口的人或多或少的遇到這樣的場景,由于業務應用系統的負載能力有限,為了防止非預期的請求對系統壓力過大而拖垮業務應用系統。
? ? 也就是面對大流量時,如何進行流量控制?
? ? 服務接口的流量控制策略:分流、降級、限流等。本文討論下限流策略,雖然降低了服務接口的訪問頻率和并發量,卻換取服務接口和業務應用系統的高可用。
? ? ?實際場景中常用的限流策略:
Nginx前端限流
? ? ? ? ?按照一定的規則如帳號、IP、系統調用邏輯等在Nginx層面做限流
業務應用系統限流
? ? ? ? 1、客戶端限流
? ? ? ? 2、服務端限流
數據庫限流
? ? ? ? 紅線區,力保數據庫
二、常用的限流算法 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??
? ? ?常用的限流算法由:樓桶算法和令牌桶算法。本文不具體的詳細說明兩種算法的原理,原理會在接下來的文章中做說明。
? ? ?1、漏桶算法
? ? ? ? ?漏桶(Leaky Bucket)算法思路很簡單,水(請求)先進入到漏桶里,漏桶以一定的速度出水(接口有響應速率),當水流入速度過大會直接溢出(訪問頻率超過接口響應速率),然后就拒絕請求,可以看出漏桶算法能強行限制數據的傳輸速率.示意圖如下:

? ? ? ? ?可見這里有兩個變量,一個是桶的大小,支持流量突發增多時可以存多少的水(burst),另一個是水桶漏洞的大小(rate)。
? ? ? ? ?因為漏桶的漏出速率是固定的參數,所以,即使網絡中不存在資源沖突(沒有發生擁塞),漏桶算法也不能使流突發(burst)到端口速率.因此,漏桶算法對于存在突發特性的流量來說缺乏效率.
? ? ?2、令牌桶算法
? ? ? ? ?令牌桶算法(Token Bucket)和 Leaky Bucket 效果一樣但方向相反的算法,更加容易理解.隨著時間流逝,系統會按恒定1/QPS時間間隔(如果QPS=100,則間隔是10ms)往桶里加入Token(想象和漏洞漏水相反,有個水龍頭在不斷的加水),如果桶已經滿了就不再加了.新請求來臨時,會各自拿走一個Token,如果沒有Token可拿了就阻塞或者拒絕服務.

?
令牌桶的另外一個好處是可以方便的改變速度. 一旦需要提高速率,則按需提高放入桶中的令牌的速率. 一般會定時(比如100毫秒)往桶中增加一定數量的令牌, 有些變種算法則實時的計算應該增加的令牌的數量.
三、基于Redis功能的實現 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?
? ? ? ?簡陋的設計思路:假設一個用戶(用IP判斷)每分鐘訪問某一個服務接口的次數不能超過10次,那么我們可以在Redis中創建一個鍵,并此時我們就設置鍵的過期時間為60秒,每一個用戶對此服務接口的訪問就把鍵值加1,在60秒內當鍵值增加到10的時候,就禁止訪問服務接口。在某種場景中添加訪問時間間隔還是很有必要的。
? ? ? 1)使用Redis的incr命令,將計數器作為Lua腳本? ? ? ? ?
```
local current
current = redis.call("incr",KEYS[1])
if tonumber(current) == 1 then
redis.call("expire",KEYS[1],1)
end
```
?Lua腳本在Redis中運行,保證了incr和expire兩個操作的原子性。
? ? ? ?2)使用Reids的列表結構代替incr命令
```
FUNCTION LIMIT_API_CALL(ip)
current = LLEN(ip)
IF current > 10 THEN
ERROR "too many requests per second"
ELSE
IF EXISTS(ip) == FALSE
MULTI
RPUSH(ip,ip)
EXPIRE(ip,1)
EXEC
ELSE
RPUSHX(ip,ip)
END
PERFORM_API_CALL()
END
```
?Rate Limit使用Redis的列表作為容器,LLEN用于對訪問次數的檢查,一個事物中包含了RPUSH和EXPIRE兩個命令,用于在第一次執行計數是創建列表并設置過期時間,
? ? RPUSHX在后續的計數操作中進行增加操作。
四、基于令牌桶算法的實現 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?
? ? ? ?令牌桶算法可以很好的支撐突然額流量的變化即滿令牌桶數的峰值。
import java.io.BufferedWriter;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import com.google.common.base.Preconditions;
import com.netease.datastream.util.framework.LifeCycle;
20 public class TokenBucket implements LifeCycle {
// 默認桶大小個數 即最大瞬間流量是64M
private static final int DEFAULT_BUCKET_SIZE = 1024 * 1024 * 64;
// 一個桶的單位是1字節
private int everyTokenSize = 1;
// 瞬間最大流量
private int maxFlowRate;
// 平均流量
private int avgFlowRate;
// 隊列來緩存桶數量:最大的流量峰值就是 = everyTokenSize*DEFAULT_BUCKET_SIZE 64M = 1 * 1024 * 1024 * 64
private ArrayBlockingQueue<Byte> tokenQueue = new ArrayBlockingQueue<Byte>(DEFAULT_BUCKET_SIZE);
private ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
private volatile boolean isStart = false;
private ReentrantLock lock = new ReentrantLock(true);
private static final byte A_CHAR = 'a';
public TokenBucket() {
}
public TokenBucket(int maxFlowRate, int avgFlowRate) {
this.maxFlowRate = maxFlowRate;
this.avgFlowRate = avgFlowRate;
}
public TokenBucket(int everyTokenSize, int maxFlowRate, int avgFlowRate) {
this.everyTokenSize = everyTokenSize;
this.maxFlowRate = maxFlowRate;
this.avgFlowRate = avgFlowRate;
}
public void addTokens(Integer tokenNum) {
// 若是桶已經滿了,就不再家如新的令牌
for (int i = 0; i < tokenNum; i++) {
tokenQueue.offer(Byte.valueOf(A_CHAR));
}
}
public TokenBucket build() {
start();
return this;
}
/**
* 獲取足夠的令牌個數
*
* @return
*/
public boolean getTokens(byte[] dataSize) {
Preconditions.checkNotNull(dataSize);
Preconditions.checkArgument(isStart, "please invoke start method first !");
int needTokenNum = dataSize.length / everyTokenSize + 1;// 傳輸內容大小對應的桶個數
final ReentrantLock lock = this.lock;
lock.lock();
try {
boolean result = needTokenNum <= tokenQueue.size(); // 是否存在足夠的桶數量
if (!result) {
return false;
}
int tokenCount = 0;
for (int i = 0; i < needTokenNum; i++) {
Byte poll = tokenQueue.poll();
if (poll != null) {
tokenCount++;
}
}
return tokenCount == needTokenNum;
} finally {
lock.unlock();
}
}
@Override
public void start() {
// 初始化桶隊列大小
if (maxFlowRate != 0) {
tokenQueue = new ArrayBlockingQueue<Byte>(maxFlowRate);
}
// 初始化令牌生產者
TokenProducer tokenProducer = new TokenProducer(avgFlowRate, this);
scheduledExecutorService.scheduleAtFixedRate(tokenProducer, 0, 1, TimeUnit.SECONDS);
isStart = true;
}
@Override
public void stop() {
isStart = false;
scheduledExecutorService.shutdown();
}
@Override
public boolean isStarted() {
return isStart;
}
class TokenProducer implements Runnable {
private int avgFlowRate;
private TokenBucket tokenBucket;
public TokenProducer(int avgFlowRate, TokenBucket tokenBucket) {
this.avgFlowRate = avgFlowRate;
this.tokenBucket = tokenBucket;
}
@Override
public void run() {
tokenBucket.addTokens(avgFlowRate);
}
}
public static TokenBucket newBuilder() {
return new TokenBucket();
}
public TokenBucket everyTokenSize(int everyTokenSize) {
this.everyTokenSize = everyTokenSize;
return this;
}
public TokenBucket maxFlowRate(int maxFlowRate) {
this.maxFlowRate = maxFlowRate;
return this;
}
public TokenBucket avgFlowRate(int avgFlowRate) {
this.avgFlowRate = avgFlowRate;
return this;
}
private String stringCopy(String data, int copyNum) {
StringBuilder sbuilder = new StringBuilder(data.length() * copyNum);
for (int i = 0; i < copyNum; i++) {
sbuilder.append(data);
}
return sbuilder.toString();
}
public static void main(String[] args) throws IOException, InterruptedException {
tokenTest();
}
private static void arrayTest() {
ArrayBlockingQueue<Integer> tokenQueue = new ArrayBlockingQueue<Integer>(10);
tokenQueue.offer(1);
tokenQueue.offer(1);
tokenQueue.offer(1);
System.out.println(tokenQueue.size());
System.out.println(tokenQueue.remainingCapacity());
}
private static void tokenTest() throws InterruptedException, IOException {
TokenBucket tokenBucket = TokenBucket.newBuilder().avgFlowRate(512).maxFlowRate(1024).build();
BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(new FileOutputStream("/tmp/ds_test")));
String data = "xxxx";// 四個字節
for (int i = 1; i <= 1000; i++) {
Random random = new Random();
int i1 = random.nextInt(100);
boolean tokens = tokenBucket.getTokens(tokenBucket.stringCopy(data, i1).getBytes());
TimeUnit.MILLISECONDS.sleep(100);
if (tokens) {
bufferedWriter.write("token pass --- index:" + i1);
System.out.println("token pass --- index:" + i1);
} else {
bufferedWriter.write("token rejuect --- index" + i1);
System.out.println("token rejuect --- index" + i1);
}
bufferedWriter.newLine();
bufferedWriter.flush();
}
bufferedWriter.close();
}
}
- 技能知識點
- 對死鎖問題的理解
- 文件系統原理:如何用1分鐘遍歷一個100TB的文件?
- 數據庫原理:為什么PrepareStatement性能更好更安全?
- Java Web程序的運行時環境到底是怎樣的?
- 你真的知道自己要解決的問題是什么嗎?
- 如何解決問題
- 經驗分享
- GIT的HTTP方式免密pull、push
- 使用xhprof對php7程序進行性能分析
- 微信掃碼登錄和使用公眾號方式進行掃碼登錄
- 關于curl跳轉抓取
- Linux 下配置 Git 操作免登錄 ssh 公鑰
- Linux Memcached 安裝
- php7安裝3.4版本的phalcon擴展
- centos7下php7.0.x安裝phalcon框架
- 將字符串按照指定長度分割
- 搜索html源碼中標簽包的純文本
- 更換composer鏡像源為阿里云
- mac 隱藏文件顯示/隱藏
- 谷歌(google)世界各國網址大全
- 實戰文檔
- PHP7安裝intl擴展和linux安裝icu
- linux編譯安裝時常見錯誤解決辦法
- linux刪除文件后不釋放磁盤空間解決方法
- PHP開啟異步多線程執行腳本
- file_exists(): open_basedir restriction in effect. File完美解決方案
- PHP 7.1 安裝 ssh2 擴展,用于PHP進行ssh連接
- php命令行加載的php.ini
- linux文件實時同步
- linux下php的psr.so擴展源碼安裝
- php將字符串中的\n變成真正的換行符?
- PHP7 下安裝 memcache 和 memcached 擴展
- PHP 高級面試題 - 如果沒有 mb 系列函數,如何切割多字節字符串
- PHP設置腳本最大執行時間的三種方法
- 升級Php 7.4帶來的兩個大坑
- 不同域名的iframe下,fckeditor在chrome下的SecurityError,解決辦法~~
- Linux find+rm -rf 執行組合刪除
- 從零搭建Prometheus監控報警系統
- Bug之group_concat默認長度限制
- PHP生成的XML顯示無效的Char值27消息(PHP generated XML shows invalid Char value 27 message)
- XML 解析中,如何排除控制字符
- PHP各種時間獲取
- nginx配置移動自適應跳轉
- 已安裝nginx動態添加模塊
- auto_prepend_file與auto_append_file使用方法
- 利用nginx實現web頁面插入統計代碼
- Nginx中的rewrite指令(break,last,redirect,permanent)
- nginx 中 index try_files location 這三個配置項的作用
- linux安裝git服務器
- PHP 中運用 elasticsearch
- PHP解析Mysql Binlog
- 好用的PHP學習網(持續更新中)
- 一篇寫給準備升級PHP7的小伙伴的文章
- linux 安裝php7 -系統centos7
- Linux 下多php 版本共存安裝
- PHP編譯安裝時常見錯誤解決辦法,php編譯常見錯誤
- nginx upstream模塊--負載均衡
- 如何解決Tomcat服務器打開不了HOST Manager的問題
- PHP的內存泄露問題與垃圾回收
- Redis數據結構 - string字符串
- PHP開發api接口安全驗證
- 服務接口API限流 Rate Limit
- php內核分析---內存管理(一)
- PHP內存泄漏問題解析
- 【代碼片-1】 MongoDB與PHP -- 高級查詢
- 【代碼片-1】 php7 mongoDB 簡單封裝
- php與mysql系統中出現大量數據庫sleep的空連接問題分析
- 解決crond引發大量sendmail、postdrop進程問題
- PHP操作MongoDB GridFS 存儲文件,如圖片文件
- 淺談php安全
- linux上keepalived+nginx實現高可用web負載均衡
- 整理php防注入和XSS攻擊通用過濾