在事件循環讀取到數據之后,會進入unsafe的read方法。unsafe內部使用了兩個類處理內存的分配,ByteBufAllocator和RecvByteBufAllocator。ByteBufAllocator用來處理內存的分配,RecvByteBufAllocator用來計算此次讀循環應該分配多少內存。
主事件循環組收到Accept事件后,會創建與客戶端連接的NioSocketChannel,并將READ注冊在子事件循環組中的selector上面,由事件循環不斷select()查詢就緒讀I/O后交給NioSocketChannel處理。NioSocketChannel在初始化時創建了NioSocketChannelConfig,config內部會創建AdaptiveRecvByteBufAllocator實例用來計算內存大小,ByteBufAllocator.DEFAULT作為事件分配內存的工具類。
## 8.1.1 RecvByteBufAllocator
RecvByteBufAllocator是用于計算下次讀循環應該分配多少內存的接口,只有一個方法。讀循環是因為分配的初始ByteBuf不一定能夠容納所有讀取到的數據,因此可能會多次讀取,直到讀完客戶端發送的數據。(具體邏輯可見AbstractNioByteChannel的read())
```
Handle newHandle();
```
newHandle用來返回RecvByteBufAllocator內部的計算器Handle,Handle提供了實際的計算操作,內部保存了記錄每次分配多少內存的信息,提供預測緩沖大小等功能,下面是Handle接口:
```
ByteBuf allocate(ByteBufAllocator alloc); // 創建一個空間合理的緩沖,在不浪費空間的情況下能夠容納需要讀取的所有inbound的數據,內部由alloc來進行實際的分配
int guess(); // 猜測所需的緩沖區大小,不進行實際的分配
void reset(ChannelConfig config); // 每次開始讀循環之前,重置相關屬性
void incMessagesRead(int numMessages); // 增加本地讀循環的次數
void lastBytesRead(int bytes); // 設置最后一次讀到的字節數
int lastBytesRead(); // 最后一次讀到的字節數
void attemptedBytesRead(int bytes); // 設置讀操作嘗試讀取的字節數
void attemptedBytesRead(); // 獲取嘗試讀取的字節數
boolean continueReading(); // 判斷需要繼續讀
void readComplete(); // 讀結束后調用
```
AdaptiveRecvByteBufAllocator是我們實際使用的緩沖管理區,這個類可以動態計算下次需要分配的內存大小,其根據讀取到的數據預測所需字節大小,從而自動增加或減少;如果上一次讀循環將緩沖填充滿,那么預測的字節數會變大。如果連續兩次讀循環不能填滿已分配的緩沖區,則會減少所需的緩沖大小。需要注意的是,這個類只是計算大小,真正的分配動作由ByteBufAllocator完成。
AdaptiveRecvByteBufAllocator內部維護了一個SIZE_TABLE數組,使用slab的思想記錄了不同的內存塊大小,按照分配需要的大小尋找最合適的內存塊。SIZE_TABLE數組中的值都是2的n次方,這樣便于軟硬件進行處理。位置0從16開始,之后每次增加16,直到512;而從512之后起,每次增加一倍,直到int的最大值;這是因為當我需要的內存很小時,增長的幅度也不大,而較大時增長幅度也很大。例如,當我們需要分配一塊40的緩沖時,根據SIZE_TABLE會定位到64,index為2。這是SIZE_TABLE的主要作用。
```
16 32 48 64 80 96 112 128 144 160 176 192 208 224 240 256
```
AdaptiveRecvByteBufAllocator在初始化時,會設置三個大小屬性:緩沖最小值,初始值和最大值,并根據SIZE_TABLE定位到相應的index,保存在minIndex,initial,maxIndex中。
HandleImpl在創建時內部保存了AdaptiveRecvByteBufAllocator的緩沖最小/最大和初始的index,并記錄了下次需要分配的緩沖大小nextReceiveBufferSize,guess()時返回的即是該值。每次讀循環完成后,會根據實際讀取到的字節數和當前緩沖大小重新設置下次需要分配的緩沖大小。程序如下:
```
private void record(int actualReadBytes) {
if (actualReadBytes <= SIZE_TABLE[Math.max(0, index - INDEX_DECREMENT - 1)]) {
if (decreaseNow) { // 因為連續兩次小于緩沖大小才會減小
index = Math.max(index - INDEX_DECREMENT, minIndex);
nextReceiveBufferSize = SIZE_TABLE[index];
decreaseNow = false;
} else {
decreaseNow = true;
}
} else if (actualReadBytes >= nextReceiveBufferSize) {//讀到的值大于緩沖大小
index = Math.min(index + INDEX_INCREMENT, maxIndex); // INDEX_INCREMENT=4 index前進4
nextReceiveBufferSize = SIZE_TABLE[index];
decreaseNow = false;
}
}
@Override
public void readComplete() { //讀取完成后調用
record(totalBytesRead());
}
```
了解了AdaptiveRecvByteBufAllocator之后,以一個實例進行演示。每次讀循環開始時,先reset重置此次循環讀取到的字節數,讀取完成后readComplete會計算并調整下次循環需要分配的緩沖大小。
```
// 默認最小64 初始1024 最大65536
AdaptiveRecvByteBufAllocator adAlloctor = new AdaptiveRecvByteBufAllocator();
Handle handle = adAlloctor.newHandle();
System.out.println("------------讀循環1----------------------------");
handle.reset(null);// 讀取循環開始前先重置,將讀取的次數和字節數設置為0
System.out.println(String.format("讀循環1-1:需要分配的大小:%d", handle.guess()));
handle.lastBytesRead(1024);
System.out.println(String.format("讀循環1-2:需要分配的大小:%d", handle.guess()));// 讀循環中緩沖大小不變
handle.lastBytesRead(1024);
handle.readComplete();
System.out.println("------------讀循環2----------------------------");
handle.reset(null);// 讀取循環開始前先重置,將讀取的次數和字節數設置為0
System.out.println(String.format("讀循環2-1:需要分配的大小:%d", handle.guess()));
handle.lastBytesRead(1024);
handle.readComplete();
System.out.println("------------讀循環3----------------------------");
handle.reset(null);// 讀取循環開始前先重置,將讀取的次數和字節數設置為0
System.out.println(String.format("讀循環3-1:需要分配的大小:%d", handle.guess()));
handle.lastBytesRead(1024);
handle.readComplete();
System.out.println("------------讀循環4----------------------------");
handle.reset(null);// 讀取循環開始前先重置,將讀取的次數和字節數設置為0
System.out.println(String.format("讀循環4-1:需要分配的大小:%d", handle.guess()));
handle.readComplete();
//###############################
//------------讀循環1----------------------------
//讀循環1-1:需要分配的大小:1024
//讀循環1-2:需要分配的大小:1024
//------------讀循環2----------------------------
//讀循環2-1:需要分配的大小:16384 (1024 × 2^INDEX_INCREMENT)
//------------讀循環3----------------------------
//讀循環3-1:需要分配的大小:16384
//------------讀循環4----------------------------
//讀循環4-1:需要分配的大小:8192 (16384 / 2^INDEX_DECREMENT)
```
## 8.1.2 內存分配算法
Netty采用了jemalloc的思想,這是FreeBSD實現的一種并發malloc的算法。jemalloc依賴多個Arena來分配內存,運行中的應用都有固定數量的多個Arena,默認的數量與處理器的個數有關。系統中有多個Arena的原因是由于各個線程進行內存分配時競爭不可避免,這可能會極大的影響內存分配的效率,為了緩解高并發時的線程競爭,Netty允許使用者創建多個分配器(Arena)來分離鎖,提高內存分配效率,當然是以內存來作為代價的。
線程首次分配/回收內存時,首先會為其分配一個固定的Arena。線程選擇Arena時使用round-robin的方式,也就是順序輪流選取,這是因為jemalloc任務依靠線程地址進行hash選取是不可靠的。
jemalloc的另一個思路是使用Thread-local storage,每個線程各種保存Arena和緩存池信息,這樣可以減少競爭并提高訪問效率。Arena將內存分為很多Chunk進行管理,Chunk內部保存Page,以頁為單位申請。
申請內存分配時,會講分配的規格分為幾類:TINY,SAMLL,NORMAL和HUGE,分別對應不同的范圍,處理過程也不相同。

## 8.1.3 ByteBufAllocator
這個類用來進行實際的內存分配,默認使用的是ByteBufAllocator.DEFAULT,初始化時會根據配置和平臺進行賦值。`io.netty.allocator.type`可以設置為`unpooled`和`pooled`指定是否需要緩沖池,如果不設置則會根據平臺判斷。一般情況下,我們會在linux運行,使用的是有緩沖池的內存分配器。
```
//
String allocType = SystemPropertyUtil.get(
"io.netty.allocator.type", PlatformDependent.isAndroid() ? "unpooled" : "pooled");
allocType = allocType.toLowerCase(Locale.US).trim();
ByteBufAllocator alloc;
if ("unpooled".equals(allocType)) {
alloc = UnpooledByteBufAllocator.DEFAULT;
logger.debug("-Dio.netty.allocator.type: {}", allocType);
} else if ("pooled".equals(allocType)) {
alloc = PooledByteBufAllocator.DEFAULT;
logger.debug("-Dio.netty.allocator.type: {}", allocType);
} else {
alloc = PooledByteBufAllocator.DEFAULT;
logger.debug("-Dio.netty.allocator.type: pooled (unknown: {})", allocType);
}
```
## 8.1.4 PooledByteBufAllocator
Netty實際使用內存分配器會根據配置采用PooledByteBufAllocator.DEFAULT或PooledByteBufAllocator.DEFAULT,所有事件循環線程使用的是一個分配器實例。
PooledByteBufAllocator將內存分為PoolArena,PoolChunk和PoolPage,Chunk中包含多個內存頁,Arena包含3個Chunk。在PooledByteBufAllocator類加載時,會對這些配置進行初始化設置。
* 最大chunk大小:(Integer.max_value+1)/2 約為1GB
* 最大page大小:默認為8192,要求大于4096且為2的n次方
* 最大順序:默認為11,在0-14之間
* 默認chunk大小:頁大小* 2^order,即chunk由2的order個page組成
* Arena個數: Arena分為堆內存和直接內存,默認有3個chunk。由于pool的大小不能超過最大內存的一半,并且我們在事件循環組中使用了2×cores個線程,為了避免通過jvm進行同步,盡量選取大于2×cores的值。在netty中,使用2×cores和堆/直接內存/2/3的最小值作為Arena的數量
* 在內存分配的使用上,使用tiny:512,small:256;,normal:64作為閥值
* 默認緩存大小為32KB,這是jemalloc的推薦
* DEFAULT_CACHE_TRIM_INTERVAL:默認為8192,超過這個閥值會被free
PooledByteBufAllocator內部有兩個重要數組`HeapArena`和`DirectArena`,用來記錄堆內存和直接內存當前的使用狀態。PoolArena都實現了PoolArenaMetric接口,用于測量內存使用狀況。PooledByteBufAllocator初始化時,會根據之前的配置,初始化Arena信息,保存在heapArenas和directArenas,并分布使用兩個list記錄Metric。除此之外,還有一個重要的對象PoolThreadLocalCache,其繼承了FastThreadLocal,用于線程的本地緩存,在內存管理中,線程本地內存緩區的信息會保存在PoolThreadCache對象中。
PooledByteBufAllocator覆蓋的newHeapBuffer和newDirectBuffer用來分配內存,我們以newHeapBuffer為例學習。
## 8.1.5 PoolArena
PoolArena內部有三個重要的鏈表,tinySubpagePools/smallSubpagePools和PoolChunkList。前兩個用于保存page的使用狀態,最后一個用來保存chunk的使用狀態。
**tinySubpagePools**
用來保存為tiny規格分配的內存頁的鏈表,共有32個這樣的鏈表,保存著從16開始到512字節的內存頁,32的大小是固定的,因為正好匹配tiny規格的范圍(0,512),間隔為16。

例如,當分配64字節的內存時,會從tinySubpagePools查找合適的內存頁面,如果找到,會調用該頁的allocation方法,嘗試在該頁繼續分配bytebuf,如果未找到則會創建新的頁,然后加入到這個鏈表。
**smallSubpagePools**
用來保存為small規格分配的內存頁的鏈表,共有4個這樣的鏈表,保存著從1024開始到8192字節的內存頁,鏈表數組的大小不是固定的,根據PageSize有所變化,計算公式是1024 * 2^(4-1) = PageSIze,也就是說從1024開始直到PageSize,每次乘以2,共需要幾次。默認的PageSize為8192,2的13次方,1024*2的3次方=8192,因此共有4個。

Arena在分配samll范圍內的內存時,會從這個鏈表進行查找。
**PoolChunkList**
Arena內部有6個Chunk鏈表,保存在ChunkList對象中;而ChunkList本身也是鏈表,共有6個:
* qInit:存儲剩余內存0-25%的chunk
* q000:存儲剩余內存1-50%的chunk
* q025:存儲剩余內存25-75%的chunk
* q050:存儲剩余內存50-100%個chunk
* q075:存儲剩余內存75-100%個chunk
* q100:存儲剩余內存100%chunk

當分配內存時,Arena會在chunklist查找可用的chunk,如果沒有才會創建新的chunk,chunk內部也保存了頁的當前使用狀態。
至此,我們只是簡單了解了一下Arena相關的幾個數據結構,需要記住的是所有線程共享使用一個Allocator,Allocator內部保存了內存分配的相關配置信息,包含多個Arena;每個線程會固定使用一個Arena,Arena中記錄了Chunk鏈表和Page的使用信息。這些信息對于之后的內存分配是很重要的。