每隔一段時間需要調度任務執行,也許你想注冊一個任務在客戶端完成連接5分鐘后執行,一個常見的用例是發送一個消息“你還活著?”到遠端通,如果遠端沒有反應,則可以關閉通道(連接)和釋放資源。
本節介紹使用強大的 EventLoop 實現任務調度,還會簡單介紹 Java API的任務調度,以方便和 Netty 比較加深理解。
### [](https://github.com/waylau/essential-netty-in-action/blob/master/ADVANCED%20TOPICS/Scheduling%20tasks%20for%20later%20execution.md#使用普通的-java-api-調度任務)使用普通的 Java API 調度任務
在 Java 中使用 JDK 提供的 ScheduledExecutorService 實現任務調度。使用 Executors 提供的靜態方法創建 ScheduledExecutorService,有如下方法
Table 15.1 java.util.concurrent.Executors-Static methods to create a ScheduledExecutorService
| 方法 | 描述 |
| --- | --- |
| newScheduledThreadPool(int corePoolSize) newScheduledThreadPool(int corePoolSize,ThreadFactorythreadFactory) | |
ScheduledThreadExecutorService 用于調度命令來延遲或者周期性的執行。 corePoolSize 用于計算線程的數量 newSingleThreadScheduledExecutor() newSingleThreadScheduledExecutor(ThreadFact orythreadFactory) | 新建一個 ScheduledThreadExecutorService 可以用于調度命令來延遲或者周期性的執行。它將使用一個線程來執行調度的任務
下面的 ScheduledExecutorService 調度任務 60 執行一次
Listing 15.4 Schedule task with a ScheduledExecutorService
~~~
ScheduledExecutorService executor = Executors
.newScheduledThreadPool(10); //1
ScheduledFuture<?> future = executor.schedule(
new Runnable() { //2
@Override
public void run() {
System.out.println("Now it is 60 seconds later"); //3
}
}, 60, TimeUnit.SECONDS); //4
// do something
//
executor.shutdown(); //5
~~~
1. 新建 ScheduledExecutorService 使用10個線程
2. 新建 runnable 調度執行
3. 稍后運行
4. 調度任務60秒后執行
5. 關閉 ScheduledExecutorService 來釋放任務完成的資源
### [](https://github.com/waylau/essential-netty-in-action/blob/master/ADVANCED%20TOPICS/Scheduling%20tasks%20for%20later%20execution.md#使用-eventloop-調度任務)使用 EventLoop 調度任務
使用 ScheduledExecutorService 工作的很好,但是有局限性,比如在一個額外的線程中執行任務。如果需要執行很多任務,資源使用就會很嚴重;對于像 Netty 這樣的高性能的網絡框架來說,嚴重的資源使用是不能接受的。Netty 對這個問題提供了很好的方法。
Netty 允許使用 EventLoop 調度任務分配到通道,如下面代碼:
Listing 15.5 Schedule task with EventLoop
~~~
Channel ch = null; // Get reference to channel
ScheduledFuture<?> future = ch.eventLoop().schedule(
new Runnable() {
@Override
public void run() {
System.out.println("Now its 60 seconds later");
}
}, 60, TimeUnit.SECONDS);
~~~
1. 新建 runnable 用于執行調度
2. 稍后執行
3. 調度任務60秒后運行
如果想任務每隔多少秒執行一次,看下面代碼:
Listing 15.6 Schedule a fixed task with the EventLoop
~~~
Channel ch = null; // Get reference to channel
ScheduledFuture<?> future = ch.eventLoop().scheduleAtFixedRate(
new Runnable() {
@Override
public void run() {
System.out.println("Run every 60 seconds");
}
}, 60, 60, TimeUnit.SECONDS);
~~~
1. 新建 runnable 用于執行調度
2. 將運行直到 ScheduledFuture 被取消
3. 調度任務60秒運行
取消操作,可以使用 ScheduledFuture 返回每個異步操作。 ScheduledFuture 提供一個方法用于取消一個調度了的任務或者檢查它的狀態。一個簡單的取消操作如下:
~~~
ScheduledFuture<?> future = ch.eventLoop()
.scheduleAtFixedRate(..); //1
// Some other code that runs...
future.cancel(false); //2
~~~
1. 調度任務并獲取返回的 ScheduledFuture
2. 取消任務,阻止它再次運行
### [](https://github.com/waylau/essential-netty-in-action/blob/master/ADVANCED%20TOPICS/Scheduling%20tasks%20for%20later%20execution.md#調度的內部實現)調度的內部實現
Netty 內部實現其實是基于George Varghese 提出的 “Hashed and hierarchical timing wheels: Data structures to efficiently implement timer facility(散列和分層定時輪:數據結構有效實現定時器)”。這種實現只保證一個近似執行,也就是說任務的執行可能不是100%準確;在實踐中,這已經被證明是一個可容忍的限制,不影響多數應用程序。所以,定時執行任務不可能100%準確的按時執行。
為了更好的理解它是如何工作,我們可以這樣認為:
* 在指定的延遲時間后調度任務;
* 任務被插入到 EventLoop 的 Schedule-Task-Queue(調度任務隊列);
* 如果任務需要馬上執行,EventLoop 檢查每個運行;
* 如果有一個任務要執行,EventLoop 將立刻執行它,并從隊列中刪除;
* EventLoop 等待下一次運行,從第4步開始一遍又一遍的重復。
因為這樣的實現計劃執行不可能100%正確,對于多數用例不可能100%準備的執行計劃任務;在 Netty 中,這樣的工作幾乎沒有資源開銷。
但是如果需要更準確的執行呢?很容易,你需要使用ScheduledExecutorService 的另一個實現,這不是 Netty 的內容。記住,如果不遵循 Netty 的線程模型協議,你將需要自己同步并發訪問。
- Introduction
- 開始
- Netty-異步和數據驅動
- Netty 介紹
- 構成部分
- 關于本書
- 第一個 Netty 應用
- 設置開發環境
- Netty 客戶端/服務端 總覽
- 寫一個 echo 服務器
- 寫一個 echo 客戶端
- 編譯和運行 Echo 服務器和客戶端
- 總結
- Netty 總覽
- Netty 快速入門
- Channel, Event 和 I/O
- 什么是 Bootstrapping 為什么要用
- ChannelHandler 和 ChannelPipeline
- 近距離觀察 ChannelHandler
- 總結
- 核心功能
- Transport(傳輸)
- 案例研究:Transport 的遷移
- Transport API
- 包含的 Transport
- Transport 使用情況
- 總結
- Buffer(緩沖)
- Buffer API
- ByteBuf - 字節數據的容器
- 字節級別的操作
- ByteBufHolder
- ByteBuf 分配
- 總結
- ChannelHandler 和 ChannelPipeline
- ChannelHandler 家族
- ChannelPipeline
- ChannelHandlerContext
- 總結
- Codec 框架
- 什么是 Codec
- Decoder(解碼器)
- Encoder(編碼器)
- 抽象 Codec(編解碼器)類
- 總結
- 提供了的 ChannelHandler 和 Codec
- 使用 SSL/TLS 加密 Netty 程序
- 構建 Netty HTTP/HTTPS 應用
- 空閑連接以及超時
- 解碼分隔符和基于長度的協議
- 編寫大型數據
- 序列化數據
- 總結
- Bootstrap 類型
- 引導客戶端和無連接協議
- 引導服務器
- 從 Channel 引導客戶端
- 在一個引導中添加多個 ChannelHandler
- 使用Netty 的 ChannelOption 和屬性
- 關閉之前已經引導的客戶端或服務器
- 總結
- 引導
- Bootstrap 類型
- 引導客戶端和無連接協議
- 引導服務器
- 從 Channel 引導客戶端
- 在一個引導中添加多個 ChannelHandler
- 使用Netty 的 ChannelOption 和屬性
- 關閉之前已經引導的客戶端或服務器
- 總結
- NETTY BY EXAMPLE
- 單元測試
- 總覽
- 測試 ChannelHandler
- 測試異常處理
- 總結
- WebSocket
- WebSocket 程序示例
- 添加 WebSocket 支持
- 測試程序
- 總結
- SPDY
- SPDY 背景
- 示例程序
- 實現
- 啟動 SpdyServer 并測試
- 總結
- 通過 UDP 廣播事件
- UDP 基礎
- UDP 廣播
- UDP 示例
- EventLog 的 POJO
- 寫廣播器
- 寫監視器
- 運行 LogEventBroadcaster 和 LogEventMonitor
- 總結
- 高級主題
- 實現自定義的編解碼器
- 編解碼器的范圍
- 實現 Memcached 編解碼器
- 了解 Memcached 二進制協議
- Netty 編碼器和解碼器
- 測試編解碼器
- EventLoop 和線程模型
- 線程模型的總覽
- EventLoop
- EventLoop
- I/O EventLoop/Thread 分配細節
- 總結
- 用例1:Droplr Firebase 和 Urban Airship
- 用例2:Facebook 和 Twitter