# Netty入門
## 一、Netty概述
> Netty 是一個異步的、基于事件驅動的網絡應用框架,用于快速開發可維護、高性能的網絡服務器和客戶端。《官網》
使用Netty的框架有:
* RocketMQ - 阿里巴巴開源的消息隊列。
* gRPC - rpc 框架。
* Dubbo - rpc 框架。
* Zookeeper - 分布式協調框架。
* Spring 5.x - flux api 完全拋棄了 tomcat ,使用 netty 作為服務器端。
Netty框架與Java的nio相比:
* NIO工作量大,bug多。
* NIO需要自己構建協議。
* NIO需要自己解決TCP的傳輸問題。
* NIO的epoll空輪詢可能會導致CPU100%的問題。
* Netty對NIO的API進行了增強,例如將ThreadLocal增強成FastThreadLocal,將ByteBuffer增強成ByteBuf。
## 二、使用
1. 引入依賴
~~~
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.39.Final</version>
</dependency>
~~~
2. 服務端代碼
~~~
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel ch) {
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new SimpleChannelInboundHandler<String>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) {
System.out.println(msg);
}
});
}
})
.bind(8080);
~~~
- ServerBootstrap 服務端的啟動類。
- NioEventLoopGroup 類似于線程池+Selector。
- childHandler 用于處理SocketChannel的內容,可以理解成Server和Client是父子關系。
- pipeline 管道,可以理解成每個Handler都是一個工序,管道由這些工序組成。
3. 客戶端代碼
~~~
new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) {
ch.pipeline().addLast(new StringEncoder());
}
})
.connect("127.0.0.1", 8080)
.sync()
.channel()
.writeAndFlush(new Date() + ": hello world!");
~~~
- Bootstrap 客戶端啟動類。
- sync 表示同步連接建立后才能調用writeAndFlush往服務端輸出內容。
**注意:**
在客戶端和服務端中接受數據和發送數據都會去調用Handler處理器的方法進行處理。可以將channel理解為數據的通道,pipeline理解為處理數據的流水線,而Handler就是該流水線中的每道工序。
而eventLoop可以理解為處理數據的工人,由單線程的線程池組成。
## 三、組件
### 3.1 EventLoop
EventLoop表示事件循環對象,對應著一個線程,其本質是一個單線程的線程池,同時里面維護了一個Selector。通過里面的run方法源源不斷的處理Channel上的IO事件。
EventLoop繼承自OrderedEventExecutor和EventLoopGroup,里面有個parent方法用來判斷屬于哪個EventLoopGroup。
### 3.2 EventLoopGroup
EventLoopGroup 是一組 EventLoop,Channel 一般會調用 EventLoopGroup 的 register 方法來綁定其中一個 EventLoop,后續這個 Channel 上的 io 事件都由此 EventLoop 來處理(**這樣做可以保證該channel上 io 事件處理時的線程安全**)。
:-: 
EventLoopGroup有多個實現類,常用的有:
~~~
?MultithreadEventLoopGroup
?NioEventLoopGroup
?DefaultEventLoopGroup 只能執行普通任務和定時任務不能執行IO任務
~~~
實現類需要實現如下方法:
* 實現了 Iterable 接口提供遍歷 EventLoop。
* 實現 next 方法獲取集合中下一個 EventLoop。
NioEventLoopGroup的默認線程數由*DEFAULT\_EVENT\_LOOP\_THREADS*這個參數配置:
:-: 
默認為CPU核心數的兩倍。
NioEventLoopGroup可以設置一個Boss EventLoop用于處理accept事件,多個Worker EventLoop用于處理SocketChannel IO 讀寫事件。兩個Worker會輪流處理事件,同時是綁定channel進行處理。
channel交由特點的channel處理的關鍵代碼如下:
~~~
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
// 下一個 handler 的事件循環是否與當前的事件循環是同一個線程
EventExecutor executor = next.executor();
// 是,直接調用
if (executor.inEventLoop()) { // 判斷當前handler中的線程是否和executor是同一個線程
next.invokeChannelRead(m);
}
// 不是,將要執行的代碼作為任務提交給下一個事件循環處理(換人)
else {
executor.execute(new Runnable() { // 下一個handler的線程
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
~~~
如果兩個Handler綁定的是同個線程,則直接執行,否則的話交由下一個eventLoop判斷執行。
NioEventLoop也可以用于執行普通任務和定時任務:
~~~
NioEventLoopGroup nioWorkers = new NioEventLoopGroup(2);
log.debug("server start...");
// 讓線程啟動起來
Thread.sleep(2000);
nioWorkers.execute(()->{
log.debug("normal task...");
});
nioWorkers.scheduleAtFixedRate(() -> {
log.debug("running...");
}, 0, 1, TimeUnit.SECONDS);
~~~
**關閉EventLoopGroup**
優雅關閉 `shutdownGracefully` 方法。該方法會首先切換 `EventLoopGroup` 到關閉狀態從而拒絕新的任務的加入,然后在任務隊列的任務都處理完成后,停止線程的運行。從而確保整體應用是在正常有序的狀態下退出的。
默認是2秒后關閉,15秒后超時關閉,由如下兩個參數限定:
:-: 
shutdownGracefully方法在關閉的時候會進行空循環關閉
:-: 
### 3.3 Channel
可以理解為數據流通道,其主要的API有:
1. close():關閉Channel。
2. closeFuture():處理Channel的關閉。
3. pipeline():往流水線中添加處理器。
4. write():將輸入寫入緩沖區中,不一定會立刻刷出。
5. writeAndFlush():寫入并立刻將數據刷出。
**ChannelFuture**
注意帶有**Future、promise**名稱的都是和異步方法一起使用的。
~~~
ChannelFuture channelFuture = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) {
ch.pipeline().addLast(new StringEncoder());
}
})
// 異步非阻塞,由main線程發起調用,真正執行 connect 的是nio線程
.connect("127.0.0.1", 8080); // 1
channelFuture.sync().channel().writeAndFlush(new Date() + ": hello world!");
~~~
由于ChannelFuture本身是異步非阻塞的,所以才需要調用sync方法等待連接后獲取channel對象。也可以使用回調的方法獲取連接:
~~~
// addListener(回調對象)也可以異步處理結果,nio線程連接建立之后,就會調用operationComplete方法,處理channel的是nio線程
channelFuture.addListener((ChannelFutureListener) future -> {
System.out.println(future.channel()); // 2
});
~~~
**CloseFuture**
調用closeFuture方法可以獲取CloseFuture對象,可用于同步或者異步處理關閉。
~~~
// 獲取 CloseFuture 對象, 1) 同步處理關閉, 2) 異步處理關閉,回調函出,nio線程處理
ChannelFuture closeFuture = channel.closeFuture();
/*log.debug("waiting close...");
closeFuture.sync();
log.debug("處理關閉之后的操作");*/
closeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
log.debug("處理關閉之后的操作");
// gracefully 優雅的
group.shutdownGracefully();
}
});
~~~
總的來說,Netty使用異步操作,用不同的線程來發起連接建立,另外一個線程來真正的建立連接,這樣可以提高效率。原因如下:
* 單線程沒法異步提高效率,必須配合多線程、多核 cpu 才能發揮異步的優勢,類似于cpu的**流水線指令**。
* 異步**并沒有縮短響應時間**,反而有所增加,提高的是吞吐量,即單位時間的處理量。
* 合理進行任務拆分,也是利用異步的關鍵。
### 3.4 Future & Promise
Netty的Future繼承自JDK的Future,而Promise又繼承自Netty的Future,三者有如下的區別:
* jdk Future **只能**同步等待任務結束(或成功、或失敗)才能得到結果,通過get方法來進行同步的阻塞。
* netty Future 可以同步等待任務結束得到結果(get方法同步阻塞),也可以異步方式(addListener)得到結果,但都是要等任務結束才能得到結果。
* netty Promise 不僅有 netty Future 的功能,而且脫離了任務獨立存在,只作為兩個線程間傳遞結果的容器。
三者的使用方式如下:
~~~
public class TestFuture {
// 測試JDK Future
public void testJKDFuture() {
ExecutorService service = Executors.newFixedThreadPool(2);
Future<Integer> future = service.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
log.debug("執行計算");
Thread.sleep(1000);
return 50;
}
});
// 主線程通過future來獲取結果
log.debug("等待結果");
// get方法會阻塞獲取
log.debug("結果是 {}", future.get());
}
// 測試 netty Future
public void testNettyFuture() {
NioEventLoopGroup group = new NioEventLoopGroup();
EventLoop eventLoop = group.next();
eventLoop.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
log.debug("執行計算");
Thread.sleep(1000);
return 70;
}
});
// 同步等待
log.debug("等待結果...");
log.debug("結果是{}", future.get());
// 異步等待
future.addListener(new GenericFutureListener<Future<? super Integer>>() {
@Override
public void operationComplete(Future<? super Integer> future) throws Exception {
log.debug("接收結果{}", future.getNow());
}
})
}
// promise
public void testNettyPromise() {
DefaultEventLoop eventExecutors = new DefaultEventLoop();
DefaultPromise<Integer> promise = new DefaultPromise<>(eventExecutors);
eventExecutors.execute(()->{
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("set success, {}",10);
// 手動裝入結果
promise.setSuccess(10);
});
log.debug("start...");
log.debug("{}",promise.getNow()); // 還沒有結果
// get方法是同步阻塞的
log.debug("{}",promise.get());
}
}
~~~
Future和Promise都可以看成一個容器,用于不同線程之間交互數據,例如在線程池中執行完后的結果在主線程中被獲取。而使用Promise則可以更加靈活的處理數據,不僅僅是通過返回值來獲取。
### 3.5 Pipeline & Handler
ChannelHandler 用來處理 Channel 上的各種事件,分為入站、出站兩種。所有 ChannelHandler 被連成一串,就是 Pipeline、
* 入站處理器通常是 ChannelInboundHandlerAdapter 的子類,主要**用來讀取客戶端數據,寫回結果**。
* 出站處理器通常是 ChannelOutboundHandlerAdapter 的子類,主要對**寫回結果進行加工**。
打個比喻,每個 Channel 是一個產品的加工車間,Pipeline 是車間中的流水線,ChannelHandler 就是流水線上的各道工序,而 ByteBuf 是原材料,經過很多工序的加工:先經過一道道入站工序,再經過一道道出站工序最終變成產品。
Netty會對每個pipeline添加上header handler和tail handler,形成一個雙向鏈表。出入站工序處理順序的舉例:
~~~java
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel ch) {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println(1);
ctx.fireChannelRead(msg); // 1
}
});
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println(2);
ctx.fireChannelRead(msg); // 2
}
});
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println(3);
ctx.channel().write(msg); // 3
}
});
// 出站處理器需要有寫入操作才會觸發,出站是按照加入pipeline的順序相反。
ch.pipeline().addLast(new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg,
ChannelPromise promise) {
System.out.println(4);
ctx.write(msg, promise); // 4
}
});
ch.pipeline().addLast(new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg,
ChannelPromise promise) {
System.out.println(5);
ctx.write(msg, promise); // 5
}
});
ch.pipeline().addLast(new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg,
ChannelPromise promise) {
System.out.println(6);
ctx.write(msg, promise); // 6
}
});
}
})
.bind(8080);
~~~
總共添加了三道入站處理器和三道出站處理器工序,對于每個入站處理器,需要在處理完數據最后調用**ChannelHandlerContext::fireChannelRead(msg)**,來將msg交由給下一道入站處理器處理。對于出站處理器,需要由寫出數據的時候才會執行,同時是按照添加的順序逆序執行的,調用ChannelHandlerContext::channel()::write(msg)會將處理工序指向到pipeline的tail節點,之后向前處理出站處理器。調用ChannelHandlerContext::write()會將工序指向當前節點的上一個節點。完整的結構如圖所示:
:-: 
因此輸出結果為:
~~~
1
2
3
6
5
4
~~~
可以在pipeline工序中通過msg傳遞上一個工序處理的msg結果, 跟生活中的流水線一樣。為了讓數據能夠傳遞,在每個handler中必須調用super.channelRead(ctx, msg)或者是ctx.fireChannelRead(msg);
對于寫出數據的方法有如下兩個類似的方式
~~~
?ctx.writeAndFlush(ctx.alloc().bytebuff()); // 從當前handler往前找出站處理器
?ch.writeAndFlush(ctx.alloc().bytebuff()); // 從tail往前找出站處理器
~~~
**備注:使用了責任鏈模式。**
### 3.6 ByteBuf
與nio包的ByteBuffer類似,都是對字節數據的封裝,可以看成是ByteBuffer的增強。
#### 3.6.1. 創建方式
~~~
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(10);
~~~
從直接內存中一個大小為10字節的ByteBuf,可以自動擴容(ByteBuffer是不能自動擴容的),默認為256字節。
展示ByteBuf的內容,使用自定義的log方法:
~~~
private static void log(ByteBuf buffer) {
int length = buffer.readableBytes();
int rows = length / 16 + (length % 15 == 0 ? 0 : 1) + 4;
StringBuilder buf = new StringBuilder(rows * 80 * 2)
.append("read index:").append(buffer.readerIndex())
.append(" write index:").append(buffer.writerIndex())
.append(" capacity:").append(buffer.capacity())
.append(NEWLINE);
appendPrettyHexDump(buf, buffer);
System.out.println(buf.toString());
}
~~~
輸出如下:
~~~
read index:0 write index:0 capacity:10
~~~
這種創建方式在windows系統會默認開啟池化和分配直接內存。
**池化技術**:類似于線程池可以重用線程,Netty開啟池化技術也能重用ByteBuf。其優點有:
* 沒有池化,則每次都得創建新的 ByteBuf 實例,這個操作對直接內存代價昂貴,就算是堆內存,也會增加 GC 壓力。
* 有了池化,則可以重用池中 ByteBuf 實例,并且采用了與 jemalloc 類似的內存分配算法提升分配效率。
* 高并發時,池化功能更節約內存,減少內存溢出的可能。
池化可以通過如下參數開啟:
~~~
-Dio.netty.allocator.type={unpooled|pooled} # unpooled沒有開啟,pooled表示開啟
~~~
4.1 以后,非 Android 平臺**默認啟用池化實現**,Android 平臺啟用非池化實現。
**其他分配方式**
~~~
ByteBuf buffer = ByteBufAllocator.DEFAULT.heapBuffer(10); // 分配JVM堆內存
ByteBuf buffer = ByteBufAllocator.DEFAULT.directBuffer(10); // 分配直接內存
~~~
* 直接內存創建和銷毀的代價昂貴,但讀寫性能高(少一次內存復制,系統內存和JDK的內存會進行映射),適合配合池化功能一起用。
* 直接內存對 GC 壓力小,因為這部分內存不受 JVM 垃圾回收的管理,但也要注意及時主動釋放。
:-: 
#### 3.6.2.組成與API
ByteBuf包含四個部分,由讀寫指針來維護讀寫操作。
:-: 
**寫入操作**
ByteBuf的寫入操作的API有write和set開頭的組成,其中write開頭的會修改寫指針,set開頭的不會改變寫指針的位置。
**擴容規則**
當容量不夠用的時候ByteBuf會自動擴容,擴容規則如下:
* 如何寫入后數據大小未超過 512字節,則選擇下一個 16 的整數倍,例如寫入后大小為 12 ,則擴容后 capacity 是 16。
* 如果寫入后數據大小超過 512字節,則選擇下一個 2^n,例如寫入后大小為 513,則擴容后 capacity 是 2^10=1024(2^9=512 已經不夠了)。
* 擴容不能超過 max capacity (整數的最大值)會報錯。
**讀取**
read開頭的方法,指針會往前移動,可以通過mark或者reset設置重復讀取。另外一系列get開頭的方法不會移動讀指針。
**釋放內存**
ByteBuf盡量自己手動釋放,ByteBuf采用的是“引用計數”(繼承ReferencedCounted接口)的方式,當計數為0的時候就會被清理掉。其中調用**release**方法可以讓引用計數+1,調用retain可以讓引用計數-1。不同的ByteBuf的釋放機制不盡相同,在使用的過程中需要讓最后處理的Handler進行手動釋放ByteBuf。
同時head handler和tail handler也會保證流到這里的ByteBuf會被釋放掉。
**復制**
調用copy會進行深度復制操作,對新ByteBuf的操作不會影響原來的ByteBuf。
ByteBuf和ByteBuffer相比的優點:
1. 使用池化技術,重用ByteBuf,提高內存的使用。
2. ByteBuf使用讀寫指針,ByteBuffer只使用一個指針,需要通過filp,clear,compact不斷調整指針的狀態。
3. 會自動擴容。
4. 支持鏈式調用,使用更流暢。
5. 很多地方體現零拷貝,例如 slice、duplicate、CompositeByteBuf,減少內存復制,提高性能。
- 第一章 Java基礎
- ThreadLocal
- Java異常體系
- Java集合框架
- List接口及其實現類
- Queue接口及其實現類
- Set接口及其實現類
- Map接口及其實現類
- JDK1.8新特性
- Lambda表達式
- 常用函數式接口
- stream流
- 面試
- 第二章 Java虛擬機
- 第一節、運行時數據區
- 第二節、垃圾回收
- 第三節、類加載機制
- 第四節、類文件與字節碼指令
- 第五節、語法糖
- 第六節、運行期優化
- 面試常見問題
- 第三章 并發編程
- 第一節、Java中的線程
- 第二節、Java中的鎖
- 第三節、線程池
- 第四節、并發工具類
- AQS
- 第四章 網絡編程
- WebSocket協議
- Netty
- Netty入門
- Netty-自定義協議
- 面試題
- IO
- 網絡IO模型
- 第五章 操作系統
- IO
- 文件系統的相關概念
- Java幾種文件讀寫方式性能對比
- Socket
- 內存管理
- 進程、線程、協程
- IO模型的演化過程
- 第六章 計算機網絡
- 第七章 消息隊列
- RabbitMQ
- 第八章 開發框架
- Spring
- Spring事務
- Spring MVC
- Spring Boot
- Mybatis
- Mybatis-Plus
- Shiro
- 第九章 數據庫
- Mysql
- Mysql中的索引
- Mysql中的鎖
- 面試常見問題
- Mysql中的日志
- InnoDB存儲引擎
- 事務
- Redis
- redis的數據類型
- redis數據結構
- Redis主從復制
- 哨兵模式
- 面試題
- Spring Boot整合Lettuce+Redisson實現布隆過濾器
- 集群
- Redis網絡IO模型
- 第十章 設計模式
- 設計模式-七大原則
- 設計模式-單例模式
- 設計模式-備忘錄模式
- 設計模式-原型模式
- 設計模式-責任鏈模式
- 設計模式-過濾模式
- 設計模式-觀察者模式
- 設計模式-工廠方法模式
- 設計模式-抽象工廠模式
- 設計模式-代理模式
- 第十一章 后端開發常用工具、庫
- Docker
- Docker安裝Mysql
- 第十二章 中間件
- ZooKeeper