<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ThinkChat2.0新版上線,更智能更精彩,支持會話、畫圖、視頻、閱讀、搜索等,送10W Token,即刻開啟你的AI之旅 廣告
                # Netty入門 ## 一、Netty概述 > Netty 是一個異步的、基于事件驅動的網絡應用框架,用于快速開發可維護、高性能的網絡服務器和客戶端。《官網》 使用Netty的框架有: * RocketMQ - 阿里巴巴開源的消息隊列。 * gRPC - rpc 框架。 * Dubbo - rpc 框架。 * Zookeeper - 分布式協調框架。 * Spring 5.x - flux api 完全拋棄了 tomcat ,使用 netty 作為服務器端。 Netty框架與Java的nio相比: * NIO工作量大,bug多。 * NIO需要自己構建協議。 * NIO需要自己解決TCP的傳輸問題。 * NIO的epoll空輪詢可能會導致CPU100%的問題。 * Netty對NIO的API進行了增強,例如將ThreadLocal增強成FastThreadLocal,將ByteBuffer增強成ByteBuf。 &nbsp; ## 二、使用 1. 引入依賴 ~~~ <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.39.Final</version> </dependency> ~~~ 2. 服務端代碼 ~~~ new ServerBootstrap() .group(new NioEventLoopGroup()) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<NioSocketChannel>() { protected void initChannel(NioSocketChannel ch) { ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new SimpleChannelInboundHandler<String>() { @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) { System.out.println(msg); } }); } }) .bind(8080); ~~~ - ServerBootstrap 服務端的啟動類。 - NioEventLoopGroup 類似于線程池+Selector。 - childHandler 用于處理SocketChannel的內容,可以理解成Server和Client是父子關系。 - pipeline 管道,可以理解成每個Handler都是一個工序,管道由這些工序組成。 3. 客戶端代碼 ~~~ new Bootstrap() .group(new NioEventLoopGroup()) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) { ch.pipeline().addLast(new StringEncoder()); } }) .connect("127.0.0.1", 8080) .sync() .channel() .writeAndFlush(new Date() + ": hello world!"); ~~~ - Bootstrap 客戶端啟動類。 - sync 表示同步連接建立后才能調用writeAndFlush往服務端輸出內容。 **注意:** 在客戶端和服務端中接受數據和發送數據都會去調用Handler處理器的方法進行處理。可以將channel理解為數據的通道,pipeline理解為處理數據的流水線,而Handler就是該流水線中的每道工序。 而eventLoop可以理解為處理數據的工人,由單線程的線程池組成。 &nbsp; ## 三、組件 ### 3.1 EventLoop EventLoop表示事件循環對象,對應著一個線程,其本質是一個單線程的線程池,同時里面維護了一個Selector。通過里面的run方法源源不斷的處理Channel上的IO事件。 EventLoop繼承自OrderedEventExecutor和EventLoopGroup,里面有個parent方法用來判斷屬于哪個EventLoopGroup。 &nbsp; ### 3.2 EventLoopGroup EventLoopGroup 是一組 EventLoop,Channel 一般會調用 EventLoopGroup 的 register 方法來綁定其中一個 EventLoop,后續這個 Channel 上的 io 事件都由此 EventLoop 來處理(**這樣做可以保證該channel上 io 事件處理時的線程安全**)。 :-: ![](https://img.kancloud.cn/da/4f/da4fc6b42f35878ba0ac24187594180b_945x159.png) EventLoopGroup有多個實現類,常用的有: ~~~ ?MultithreadEventLoopGroup ?NioEventLoopGroup ?DefaultEventLoopGroup 只能執行普通任務和定時任務不能執行IO任務 ~~~ 實現類需要實現如下方法: * 實現了 Iterable 接口提供遍歷 EventLoop。 * 實現 next 方法獲取集合中下一個 EventLoop。 NioEventLoopGroup的默認線程數由*DEFAULT\_EVENT\_LOOP\_THREADS*這個參數配置: :-: ![](https://img.kancloud.cn/f9/f4/f9f4e9fa9f22228dbdc00a2112cbe4aa_1014x200.png) 默認為CPU核心數的兩倍。 NioEventLoopGroup可以設置一個Boss EventLoop用于處理accept事件,多個Worker EventLoop用于處理SocketChannel IO 讀寫事件。兩個Worker會輪流處理事件,同時是綁定channel進行處理。 channel交由特點的channel處理的關鍵代碼如下: ~~~ static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); // 下一個 handler 的事件循環是否與當前的事件循環是同一個線程 EventExecutor executor = next.executor(); // 是,直接調用 if (executor.inEventLoop()) { // 判斷當前handler中的線程是否和executor是同一個線程 next.invokeChannelRead(m); } // 不是,將要執行的代碼作為任務提交給下一個事件循環處理(換人) else { executor.execute(new Runnable() { // 下一個handler的線程 @Override public void run() { next.invokeChannelRead(m); } }); } } ~~~ 如果兩個Handler綁定的是同個線程,則直接執行,否則的話交由下一個eventLoop判斷執行。 NioEventLoop也可以用于執行普通任務和定時任務: ~~~ NioEventLoopGroup nioWorkers = new NioEventLoopGroup(2); log.debug("server start..."); // 讓線程啟動起來 Thread.sleep(2000); nioWorkers.execute(()->{ log.debug("normal task..."); }); nioWorkers.scheduleAtFixedRate(() -> { log.debug("running..."); }, 0, 1, TimeUnit.SECONDS); ~~~ &nbsp; **關閉EventLoopGroup** 優雅關閉 `shutdownGracefully` 方法。該方法會首先切換 `EventLoopGroup` 到關閉狀態從而拒絕新的任務的加入,然后在任務隊列的任務都處理完成后,停止線程的運行。從而確保整體應用是在正常有序的狀態下退出的。 默認是2秒后關閉,15秒后超時關閉,由如下兩個參數限定: :-: ![](https://img.kancloud.cn/a1/48/a148dac8151e56e7b496b5e7ba1ff13b_535x68.png) shutdownGracefully方法在關閉的時候會進行空循環關閉 :-: ![](https://img.kancloud.cn/81/a9/81a913e467d842c94b4e40b9b53349c6_482x360.png) &nbsp; ### 3.3 Channel 可以理解為數據流通道,其主要的API有: 1. close():關閉Channel。 2. closeFuture():處理Channel的關閉。 3. pipeline():往流水線中添加處理器。 4. write():將輸入寫入緩沖區中,不一定會立刻刷出。 5. writeAndFlush():寫入并立刻將數據刷出。 &nbsp; **ChannelFuture** 注意帶有**Future、promise**名稱的都是和異步方法一起使用的。 ~~~ ChannelFuture channelFuture = new Bootstrap() .group(new NioEventLoopGroup()) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) { ch.pipeline().addLast(new StringEncoder()); } }) // 異步非阻塞,由main線程發起調用,真正執行 connect 的是nio線程 .connect("127.0.0.1", 8080); // 1 channelFuture.sync().channel().writeAndFlush(new Date() + ": hello world!"); ~~~ 由于ChannelFuture本身是異步非阻塞的,所以才需要調用sync方法等待連接后獲取channel對象。也可以使用回調的方法獲取連接: ~~~ // addListener(回調對象)也可以異步處理結果,nio線程連接建立之后,就會調用operationComplete方法,處理channel的是nio線程 channelFuture.addListener((ChannelFutureListener) future -> { System.out.println(future.channel()); // 2 }); ~~~ &nbsp; **CloseFuture** 調用closeFuture方法可以獲取CloseFuture對象,可用于同步或者異步處理關閉。 ~~~ // 獲取 CloseFuture 對象, 1) 同步處理關閉, 2) 異步處理關閉,回調函出,nio線程處理 ChannelFuture closeFuture = channel.closeFuture(); /*log.debug("waiting close..."); closeFuture.sync(); log.debug("處理關閉之后的操作");*/ closeFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { log.debug("處理關閉之后的操作"); // gracefully 優雅的 group.shutdownGracefully(); } }); ~~~ &nbsp; 總的來說,Netty使用異步操作,用不同的線程來發起連接建立,另外一個線程來真正的建立連接,這樣可以提高效率。原因如下: * 單線程沒法異步提高效率,必須配合多線程、多核 cpu 才能發揮異步的優勢,類似于cpu的**流水線指令**。 * 異步**并沒有縮短響應時間**,反而有所增加,提高的是吞吐量,即單位時間的處理量。 * 合理進行任務拆分,也是利用異步的關鍵。 &nbsp; ### 3.4 Future & Promise Netty的Future繼承自JDK的Future,而Promise又繼承自Netty的Future,三者有如下的區別: * jdk Future **只能**同步等待任務結束(或成功、或失敗)才能得到結果,通過get方法來進行同步的阻塞。 * netty Future 可以同步等待任務結束得到結果(get方法同步阻塞),也可以異步方式(addListener)得到結果,但都是要等任務結束才能得到結果。 * netty Promise 不僅有 netty Future 的功能,而且脫離了任務獨立存在,只作為兩個線程間傳遞結果的容器。 三者的使用方式如下: ~~~ public class TestFuture { // 測試JDK Future public void testJKDFuture() { ExecutorService service = Executors.newFixedThreadPool(2); Future<Integer> future = service.submit(new Callable<Integer>() { @Override public Integer call() throws Exception { log.debug("執行計算"); Thread.sleep(1000); return 50; } }); // 主線程通過future來獲取結果 log.debug("等待結果"); // get方法會阻塞獲取 log.debug("結果是 {}", future.get()); } // 測試 netty Future public void testNettyFuture() { NioEventLoopGroup group = new NioEventLoopGroup(); EventLoop eventLoop = group.next(); eventLoop.submit(new Callable<Integer>() { @Override public Integer call() throws Exception { log.debug("執行計算"); Thread.sleep(1000); return 70; } }); // 同步等待 log.debug("等待結果..."); log.debug("結果是{}", future.get()); // 異步等待 future.addListener(new GenericFutureListener<Future<? super Integer>>() { @Override public void operationComplete(Future<? super Integer> future) throws Exception { log.debug("接收結果{}", future.getNow()); } }) } // promise public void testNettyPromise() { DefaultEventLoop eventExecutors = new DefaultEventLoop(); DefaultPromise<Integer> promise = new DefaultPromise<>(eventExecutors); eventExecutors.execute(()->{ try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } log.debug("set success, {}",10); // 手動裝入結果 promise.setSuccess(10); }); log.debug("start..."); log.debug("{}",promise.getNow()); // 還沒有結果 // get方法是同步阻塞的 log.debug("{}",promise.get()); } } ~~~ Future和Promise都可以看成一個容器,用于不同線程之間交互數據,例如在線程池中執行完后的結果在主線程中被獲取。而使用Promise則可以更加靈活的處理數據,不僅僅是通過返回值來獲取。 &nbsp; ### 3.5 Pipeline & Handler ChannelHandler 用來處理 Channel 上的各種事件,分為入站、出站兩種。所有 ChannelHandler 被連成一串,就是 Pipeline、 * 入站處理器通常是 ChannelInboundHandlerAdapter 的子類,主要**用來讀取客戶端數據,寫回結果**。 * 出站處理器通常是 ChannelOutboundHandlerAdapter 的子類,主要對**寫回結果進行加工**。 打個比喻,每個 Channel 是一個產品的加工車間,Pipeline 是車間中的流水線,ChannelHandler 就是流水線上的各道工序,而 ByteBuf 是原材料,經過很多工序的加工:先經過一道道入站工序,再經過一道道出站工序最終變成產品。 Netty會對每個pipeline添加上header handler和tail handler,形成一個雙向鏈表。出入站工序處理順序的舉例: ~~~java new ServerBootstrap() .group(new NioEventLoopGroup()) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<NioSocketChannel>() { protected void initChannel(NioSocketChannel ch) { ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { System.out.println(1); ctx.fireChannelRead(msg); // 1 } }); ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { System.out.println(2); ctx.fireChannelRead(msg); // 2 } }); ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { System.out.println(3); ctx.channel().write(msg); // 3 } }); // 出站處理器需要有寫入操作才會觸發,出站是按照加入pipeline的順序相反。 ch.pipeline().addLast(new ChannelOutboundHandlerAdapter(){ @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { System.out.println(4); ctx.write(msg, promise); // 4 } }); ch.pipeline().addLast(new ChannelOutboundHandlerAdapter(){ @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { System.out.println(5); ctx.write(msg, promise); // 5 } }); ch.pipeline().addLast(new ChannelOutboundHandlerAdapter(){ @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { System.out.println(6); ctx.write(msg, promise); // 6 } }); } }) .bind(8080); ~~~ 總共添加了三道入站處理器和三道出站處理器工序,對于每個入站處理器,需要在處理完數據最后調用**ChannelHandlerContext::fireChannelRead(msg)**,來將msg交由給下一道入站處理器處理。對于出站處理器,需要由寫出數據的時候才會執行,同時是按照添加的順序逆序執行的,調用ChannelHandlerContext::channel()::write(msg)會將處理工序指向到pipeline的tail節點,之后向前處理出站處理器。調用ChannelHandlerContext::write()會將工序指向當前節點的上一個節點。完整的結構如圖所示: :-: ![](https://img.kancloud.cn/8b/a7/8ba73726c2999f0a0e4bae576ff8de3d_1005x255.png) 因此輸出結果為: ~~~ 1 2 3 6 5 4 ~~~ 可以在pipeline工序中通過msg傳遞上一個工序處理的msg結果, 跟生活中的流水線一樣。為了讓數據能夠傳遞,在每個handler中必須調用super.channelRead(ctx, msg)或者是ctx.fireChannelRead(msg); 對于寫出數據的方法有如下兩個類似的方式 ~~~ ?ctx.writeAndFlush(ctx.alloc().bytebuff()); // 從當前handler往前找出站處理器 ?ch.writeAndFlush(ctx.alloc().bytebuff()); // 從tail往前找出站處理器 ~~~ **備注:使用了責任鏈模式。** &nbsp; ### 3.6 ByteBuf 與nio包的ByteBuffer類似,都是對字節數據的封裝,可以看成是ByteBuffer的增強。 #### 3.6.1. 創建方式 ~~~ ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(10); ~~~ 從直接內存中一個大小為10字節的ByteBuf,可以自動擴容(ByteBuffer是不能自動擴容的),默認為256字節。 展示ByteBuf的內容,使用自定義的log方法: ~~~ private static void log(ByteBuf buffer) { int length = buffer.readableBytes(); int rows = length / 16 + (length % 15 == 0 ? 0 : 1) + 4; StringBuilder buf = new StringBuilder(rows * 80 * 2) .append("read index:").append(buffer.readerIndex()) .append(" write index:").append(buffer.writerIndex()) .append(" capacity:").append(buffer.capacity()) .append(NEWLINE); appendPrettyHexDump(buf, buffer); System.out.println(buf.toString()); } ~~~ 輸出如下: ~~~ read index:0 write index:0 capacity:10 ~~~ 這種創建方式在windows系統會默認開啟池化和分配直接內存。 **池化技術**:類似于線程池可以重用線程,Netty開啟池化技術也能重用ByteBuf。其優點有: * 沒有池化,則每次都得創建新的 ByteBuf 實例,這個操作對直接內存代價昂貴,就算是堆內存,也會增加 GC 壓力。 * 有了池化,則可以重用池中 ByteBuf 實例,并且采用了與 jemalloc 類似的內存分配算法提升分配效率。 * 高并發時,池化功能更節約內存,減少內存溢出的可能。 池化可以通過如下參數開啟: ~~~ -Dio.netty.allocator.type={unpooled|pooled} # unpooled沒有開啟,pooled表示開啟 ~~~ 4.1 以后,非 Android 平臺**默認啟用池化實現**,Android 平臺啟用非池化實現。 &nbsp; **其他分配方式** ~~~ ByteBuf buffer = ByteBufAllocator.DEFAULT.heapBuffer(10); // 分配JVM堆內存 ByteBuf buffer = ByteBufAllocator.DEFAULT.directBuffer(10); // 分配直接內存 ~~~ * 直接內存創建和銷毀的代價昂貴,但讀寫性能高(少一次內存復制,系統內存和JDK的內存會進行映射),適合配合池化功能一起用。 * 直接內存對 GC 壓力小,因為這部分內存不受 JVM 垃圾回收的管理,但也要注意及時主動釋放。 :-: ![](https://img.kancloud.cn/ec/7b/ec7ba84c484a9860816e84410a509c48_1192x620.png) #### 3.6.2.組成與API ByteBuf包含四個部分,由讀寫指針來維護讀寫操作。 :-: ![](https://img.kancloud.cn/68/e9/68e9afa3f7014e3419b5c60fc98fc001_626x195.png) &nbsp; **寫入操作** ByteBuf的寫入操作的API有write和set開頭的組成,其中write開頭的會修改寫指針,set開頭的不會改變寫指針的位置。 &nbsp; **擴容規則** 當容量不夠用的時候ByteBuf會自動擴容,擴容規則如下: * 如何寫入后數據大小未超過 512字節,則選擇下一個 16 的整數倍,例如寫入后大小為 12 ,則擴容后 capacity 是 16。 * 如果寫入后數據大小超過 512字節,則選擇下一個 2^n,例如寫入后大小為 513,則擴容后 capacity 是 2^10=1024(2^9=512 已經不夠了)。 * 擴容不能超過 max capacity (整數的最大值)會報錯。 &nbsp; **讀取** read開頭的方法,指針會往前移動,可以通過mark或者reset設置重復讀取。另外一系列get開頭的方法不會移動讀指針。 &nbsp; **釋放內存** ByteBuf盡量自己手動釋放,ByteBuf采用的是“引用計數”(繼承ReferencedCounted接口)的方式,當計數為0的時候就會被清理掉。其中調用**release**方法可以讓引用計數+1,調用retain可以讓引用計數-1。不同的ByteBuf的釋放機制不盡相同,在使用的過程中需要讓最后處理的Handler進行手動釋放ByteBuf。 同時head handler和tail handler也會保證流到這里的ByteBuf會被釋放掉。 **復制** 調用copy會進行深度復制操作,對新ByteBuf的操作不會影響原來的ByteBuf。 &nbsp; ByteBuf和ByteBuffer相比的優點: 1. 使用池化技術,重用ByteBuf,提高內存的使用。 2. ByteBuf使用讀寫指針,ByteBuffer只使用一個指針,需要通過filp,clear,compact不斷調整指針的狀態。 3. 會自動擴容。 4. 支持鏈式調用,使用更流暢。 5. 很多地方體現零拷貝,例如 slice、duplicate、CompositeByteBuf,減少內存復制,提高性能。
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看