# Java NIO之選擇器
## 1.簡介
前面的文章說了緩沖區,說了通道,本文就來說說 NIO 中另一個重要的實現,即選擇器 Selector。在更早的[文章](http://www.coolblog.xyz/2018/02/08/IO%E6%A8%A1%E5%9E%8B%E7%AE%80%E8%BF%B0/)中,我簡述了幾種 IO 模型。如果大家看過之前的文章,并動手寫過代碼的話。再看 Java 的選擇器大概就會知道它是什么了,以及怎么用了。選擇器是 Java 多路復用模型的一個實現,可以同時監控多個非阻塞套接字通道。示意圖大致如下:

如果大家了解過多路復用模型,那應該也會知道幾種復用模型的實現。比如 select,poll 以及 Linux 下的 epoll 和 BSD 下的 kqueue。Java 的選擇器并非憑空創造,而是在底層操作系統提供的接口的基礎上封裝而來。相關的細節,我隨后會進行分析。
關于 Java 選擇器的簡介這里先說到這,接下來進入正題。
## [](http://www.tianxiaobo.com/2018/04/03/Java-NIO%E4%B9%8B%E9%80%89%E6%8B%A9%E5%99%A8/#2基本操作及實現)2.基本操作及實現
本章我將對 Selector 的創建,通道的注冊,Selector 的選擇過程進行分析。內容篇幅較大,希望大家耐心看完。由于 Selector 相關類在不同操作系統下的實現是不同的,加之個人對 Linux epoll 更為熟悉,所以本文所分析的源碼也是和 epoll 相關的。好了,進入正題吧。
### [](http://www.tianxiaobo.com/2018/04/03/Java-NIO%E4%B9%8B%E9%80%89%E6%8B%A9%E5%99%A8/#21-創建選擇器)2.1 創建選擇器
選擇器 Selector 是一個抽象類,所以不能直接創建。Selector 提供了一個 open 方法,通過 open 方法既可以創建選擇器實例。示例代碼如下:
1
Selector selector = Selector.open();
上面的代碼比較簡單,只有一行。不過不要被表象迷惑,這行代碼僅是完整實現的冰山一角,更復雜的邏輯則隱藏在水面之下。
在簡介一節,我已經說了 Java 選擇器是對底層多路復用接口的一個包裝,這里的 open 方法也不例外。假設我們的 Java 運行在 Linux 平臺下,那么 open 最終所做的事情應該是調用操作系統的`epoll_create`函數,用于創建 epoll 實例。真實情況是不是如此呢?答案就在冰山深處,接下來就讓我們一起去求索吧。下面我們將沿著 open 方法一路走下去,如下:
```
public abstract class Selector implements Closeable {
public static Selector open() throws IOException {
// 創建 SelectorProvider,再通過其 openSelector 方法創建 Selector
return SelectorProvider.provider().openSelector();
}
// 省略無關代碼
}
public abstract class SelectorProvider {
public static SelectorProvider provider() {
synchronized (lock) {
if (provider != null)
return provider;
return AccessController.doPrivileged(
new PrivilegedAction<SelectorProvider>() {
public SelectorProvider run() {
if (loadProviderFromProperty())
return provider;
if (loadProviderAsService())
return provider;
// 創建默認的 SelectorProvider
provider = sun.nio.ch.DefaultSelectorProvider.create();
return provider;
}
});
}
}
}
public class DefaultSelectorProvider {
private DefaultSelectorProvider() { }
/**
* 根據系統名稱創建相應的 SelectorProvider
*/
public static SelectorProvider create() {
String osname = AccessController
.doPrivileged(new GetPropertyAction("os.name"));
if (osname.equals("SunOS"))
return createProvider("sun.nio.ch.DevPollSelectorProvider");
if (osname.equals("Linux"))
return createProvider("sun.nio.ch.EPollSelectorProvider");
//
return new sun.nio.ch.PollSelectorProvider();
}
/**
* 加載 SelectorProvider 類,并創建實例
*/
@SuppressWarnings("unchecked")
private static SelectorProvider createProvider(String cn) {
Class<SelectorProvider> c;
try {
c = (Class<SelectorProvider>)Class.forName(cn);
} catch (ClassNotFoundException x) {
throw new AssertionError(x);
}
try {
return c.newInstance();
} catch (IllegalAccessException | InstantiationException x) {
throw new AssertionError(x);
}
}
}
/**
* 創建完 SelectorProvider,接下來要調用 openSelector 方法
* 創建 Selector 的繼承類了。
*/
public class EPollSelectorProvider extends SelectorProviderImpl {
public AbstractSelector openSelector() throws IOException {
return new EPollSelectorImpl(this);
}
}
class EPollSelectorImpl extends SelectorImpl {
EPollSelectorImpl(SelectorProvider sp) throws IOException {
// 調用父類構造方法
super(sp);
long pipeFds = IOUtil.makePipe(false);
fd0 = (int) (pipeFds >>> 32);
fd1 = (int) pipeFds;
// 創建 EPollArrayWrapper,EPollArrayWrapper 是一個重要的實現
pollWrapper = new EPollArrayWrapper();
pollWrapper.initInterrupt(fd0, fd1);
fdToKey = new HashMap<>();
}
}
public abstract class SelectorImpl extends AbstractSelector {
protected SelectorImpl(SelectorProvider sp) {
super(sp);
keys = new HashSet<SelectionKey>();
selectedKeys = new HashSet<SelectionKey>();
/* 初始化 publicKeys 和 publicSelectedKeys,
* publicKeys 即 selector.keys() 方法所返回的集合,
* publicSelectedKeys 則是 selector.selectedKeys() 方法返回的集合
*/
if (Util.atBugLevel("1.4")) {
publicKeys = keys;
publicSelectedKeys = selectedKeys;
} else {
publicKeys = Collections.unmodifiableSet(keys);
publicSelectedKeys = Util.ungrowableSet(selectedKeys);
}
}
}
/**
* EPollArrayWrapper 一個重要的實現,這一層再往下就是 C 代碼了
*/
class EPollArrayWrapper {
EPollArrayWrapper() throws IOException {
// 調用 epollCreate 方法創建 epoll 文件描述符
epfd = epollCreate();
// the epoll_event array passed to epoll_wait
// 初始化 pollArray,該對象用于存儲就緒文件描述符和事件
int allocationSize = NUM_EPOLLEVENTS * SIZE_EPOLLEVENT;
pollArray = new AllocatedNativeObject(allocationSize, true);
pollArrayAddress = pollArray.address();
// eventHigh needed when using file descriptors > 64k
if (OPEN_MAX > MAX_UPDATE_ARRAY_SIZE)
eventsHigh = new HashMap<>();
}
// epollCreate 方法是 native 類型的
private native int epollCreate();
}
```
以上代碼時 Java 層面的,Java 層調用棧最下面的類是 EPollArrayWrapper(源碼路徑可以在附錄中查找)。EPollArrayWrapper 是一個重要的實現,起著承上啟下的作用。上層是 Java 代碼,下層是 C 代碼。上層的代碼看完了,接下來看看冰山深處的 C 代碼:
```
JNIEXPORT jint JNICALL
Java_sun_nio_ch_EPollArrayWrapper_epollCreate(JNIEnv *env, jobject this)
{
// 調用 epoll_create 函數創建 epoll 實例,并返回文件描述符 epfd
int epfd = epoll_create(256);
if (epfd < 0) {
JNU_ThrowIOExceptionWithLastError(env, "epoll_create failed");
}
return epfd;
}
```
上面的代碼很簡單,僅做了創建 epoll 實例這一件事。看到這里,答案就明了了。最后在附一張時序圖幫助大家理清代碼調用順序,如下:

### [](http://www.tianxiaobo.com/2018/04/03/Java-NIO%E4%B9%8B%E9%80%89%E6%8B%A9%E5%99%A8/#22-選擇鍵)2.2 選擇鍵
#### [](http://www.tianxiaobo.com/2018/04/03/Java-NIO%E4%B9%8B%E9%80%89%E6%8B%A9%E5%99%A8/#221-幾種事件)2.2.1 幾種事件
選擇鍵 SelectionKey 包含4種事件,分別是:
```
public static final int OP_READ = 1 << 0;
public static final int OP_WRITE = 1 << 2;
public static final int OP_CONNECT = 1 << 3;
public static final int OP_ACCEPT = 1 << 4;
```
事件之間可以通過或運算進行組合,比如:
```
int interestOps = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
```
#### [](http://www.tianxiaobo.com/2018/04/03/Java-NIO%E4%B9%8B%E9%80%89%E6%8B%A9%E5%99%A8/#222-兩種事件集合interestops-和-readyops)2.2.2 兩種事件集合:interestOps 和 readyOps
interestOps 即感興趣的事件集合,通道調用 register 方法注冊時會設置此值,interestOps 可通過 SelectionKey interestOps() 方法獲取。readyOps 是就緒事件集合,可通過 SelectionKey readyOps() 獲取。
interestOps 和 readyOps 被聲明在 SelectionKey 子類 SelectionKeyImpl 中,代碼如下:
```
public class SelectionKeyImpl extends AbstractSelectionKey {
private volatile int interestOps;
private int readyOps;
}
```
接下來再來看看與 readyOps 事件集合相關的幾個方法,如下:
```
selectionKey.isAcceptable();
selectionKey.isConnectable();
selectionKey.isReadable();
selectionKey.isWritable();
```
以上方法從字面意思上就可以知道有什么用,這里就不解釋了。接下來以 isReadable 方法為例,簡單看一下這個方法是如何實現。
```
public final boolean isReadable() {
return (readyOps() & OP_READ) != 0;
}
```
上面說到可以通過或運算組合事件,這里則是通過與運算來測試某個事件是否在事件集合中。比如
```
readyOps = SelectionKey.OP_READ | SelectionKey.OP_WRITE = 0101,
readyOps & OP_READ = 0101 & 0001 = 0001,
readyOps & OP_CONNECT = 0101 & 1000 = 0
```
`readyOps & OP_READ != 0`,所以 OP\_READ 在事件集合中。`readyOps & OP_CONNECT == 0`,所以 OP\_CONNECT 不在事件集合中。
#### [](http://www.tianxiaobo.com/2018/04/03/Java-NIO%E4%B9%8B%E9%80%89%E6%8B%A9%E5%99%A8/#223-attach-方法)2.2.3 attach 方法
attach 是一個好用的方法,通過這個方法,可以將對象暫存在 SelectionKey 中,待需要的時候直接取出來即可。比如本文對應的練習代碼實現了一個簡單的 HTTP 服務器,在讀取用戶請求數據后(即 selectionKey.isReadable() 為 true),會去解析請求頭,然后將請求頭信息通過 attach 方法放入 selectionKey 中。待通道可寫后,再從 selectionKey 中取出請求頭,并根據請求頭回復客戶端不同的消息。當然,這只是一個應用場景,attach 可能還有其他的應用場景,比如標識通道。不過其他的場景我沒使用過,就不說了。attach 使用方式如下:
```
selectionKey.attach(obj);
Object attachedObj = selectionKey.attachment();
```
### [](http://www.tianxiaobo.com/2018/04/03/Java-NIO%E4%B9%8B%E9%80%89%E6%8B%A9%E5%99%A8/#23-通道注冊)2.3 通道注冊
通道注冊即將感興趣的事件告知 Selector,待事件發生時,Selector 即可返回就緒事件,我們就可以去做后續的事情了。比如 ServerSocketChannel 通道通常對 OP\_ACCEPT 事件感興趣,那么我們就可以把這個事件注冊給 Selector。待事件發生,即服務端接受客戶端連接后,我們即可獲取這個就緒的事件并做相應的操作。通道注冊的示例代碼如下:
```
channel.configureBlocking(false);
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
```
起初我以為通道注冊操作會調用操作系統的 epoll\_ctl 函數,但最終通過看源碼,發現自己的理解是錯的。既然通道注冊階段不調用 epoll\_ctl 函數。那么,epoll\_ctl 什么時候才會被調用呢?如果不調用 epoll\_ctl,那么注冊過程都干了什么事情呢?關于第一個問題,本節還無法解答,不過第二個問題則可以說說。接下來讓我們深入通道類 register 方法的調用棧中去探尋答案吧。
```
public abstract class SelectableChannel extends AbstractInterruptibleChannel implements Channel {
public final SelectionKey register(Selector sel, int ops) throws ClosedChannelException {
return register(sel, ops, null);
}
public abstract SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException;
}
public abstract class AbstractSelectableChannel extends SelectableChannel {
private SelectionKey[] keys = null;
public final SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException {
synchronized (regLock) {
// 省去一些校驗代碼
// 從 keys 數組中查找,查找條件為 k.selector() == sel
SelectionKey k = findKey(sel);
// 如果 k 不為空,則修改 k 所感興趣的事件
if (k != null) {
k.interestOps(ops);
k.attach(att);
}
// k 為空,則創建一個 SelectionKey,并存儲到 keys 數組中
if (k == null) {
// New registration
synchronized (keyLock) {
if (!isOpen())
throw new ClosedChannelException();
k = ((AbstractSelector)sel).register(this, ops, att);
addKey(k);
}
}
return k;
}
}
}
public abstract class AbstractSelector extends Selector {
protected abstract SelectionKey register(AbstractSelectableChannel ch,
int ops, Object att);
}
public abstract class SelectorImpl extends AbstractSelector {
protected final SelectionKey register(AbstractSelectableChannel ch, int ops, Object attachment) {
if (!(ch instanceof SelChImpl))
throw new IllegalSelectorException();
// 創建 SelectionKeyImpl 實例
SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
k.attach(attachment);
synchronized (publicKeys) {
implRegister(k);
}
k.interestOps(ops);
return k;
}
}
class EPollSelectorImpl extends SelectorImpl {
protected void implRegister(SelectionKeyImpl ski) {
if (closed)
throw new ClosedSelectorException();
SelChImpl ch = ski.channel;
int fd = Integer.valueOf(ch.getFDVal());
// 存儲 fd 和 SelectionKeyImpl 的映射關系
fdToKey.put(fd, ski);
pollWrapper.add(fd);
// 將 SelectionKeyImpl 實例存儲到 keys 中(這里的 keys 聲明在 SelectorImpl 類中),keys 集合可由 selector.keys() 方法獲取
keys.add(ski);
}
}
public class SelectionKeyImpl extends AbstractSelectionKey {
public SelectionKey interestOps(int ops) {
ensureValid();
return nioInterestOps(ops);
}
public SelectionKey nioInterestOps(int ops) {
if ((ops & ~channel().validOps()) != 0)
throw new IllegalArgumentException();
// 轉換并設置感興趣的事件
channel.translateAndSetInterestOps(ops, this);
// 設置 interestOps 變量
interestOps = ops;
return this;
}
}
class SocketChannelImpl extends SocketChannel implements SelChImpl {
public void translateAndSetInterestOps(int ops, SelectionKeyImpl sk) {
int newOps = 0;
// 轉換事件
if ((ops & SelectionKey.OP_READ) != 0)
newOps |= PollArrayWrapper.POLLIN;
if ((ops & SelectionKey.OP_WRITE) != 0)
newOps |= PollArrayWrapper.POLLOUT;
if ((ops & SelectionKey.OP_CONNECT) != 0)
newOps |= PollArrayWrapper.POLLCONN;
// 設置事件
sk.selector.putEventOps(sk, newOps);
}
}
class class EPollSelectorImpl extends SelectorImpl {
public void putEventOps(SelectionKeyImpl ski, int ops) {
if (closed)
throw new ClosedSelectorException();
SelChImpl ch = ski.channel;
// 設置感興趣的事件
pollWrapper.setInterest(ch.getFDVal(), ops);
}
}
class EPollArrayWrapper {
void setInterest(int fd, int mask) {
synchronized (updateLock) {
// 擴容 updateDescriptors 數組,并存儲文件描述符 fd
int oldCapacity = updateDescriptors.length;
if (updateCount == oldCapacity) {
int newCapacity = oldCapacity + INITIAL_PENDING_UPDATE_SIZE;
int[] newDescriptors = new int[newCapacity];
System.arraycopy(updateDescriptors, 0, newDescriptors, 0, oldCapacity);
updateDescriptors = newDescriptors;
}
updateDescriptors[updateCount++] = fd;
// events are stored as bytes for efficiency reasons
byte b = (byte)mask;
assert (b == mask) && (b != KILLED);
// 存儲事件
setUpdateEvents(fd, b, false);
}
}
private void setUpdateEvents(int fd, byte events, boolean force) {
if (fd < MAX_UPDATE_ARRAY_SIZE) {
if ((eventsLow[fd] != KILLED) || force) {
eventsLow[fd] = events;
}
} else {
Integer key = Integer.valueOf(fd);
if (!isEventsHighKilled(key) || force) {
eventsHigh.put(key, Byte.valueOf(events));
}
}
}
}
```
到 setUpdateEvents 這個方法,整個調用棧就結束了。但是我們并未在調用棧中看到調用 epoll\_ctl 函數的地方,也就是說,通道注冊時,并不會立即調用 epoll\_ctl,而是先將事件集合 events 存放在 eventsLow。至于 epoll\_ctl 函數何時調用的,需要大家繼續往下看了。
### [](http://www.tianxiaobo.com/2018/04/03/Java-NIO%E4%B9%8B%E9%80%89%E6%8B%A9%E5%99%A8/#24-選擇過程)2.4 選擇過程
#### [](http://www.tianxiaobo.com/2018/04/03/Java-NIO%E4%B9%8B%E9%80%89%E6%8B%A9%E5%99%A8/#241-選擇方法)2.4.1 選擇方法
Selector 包含3種不同功能的選擇方法,分別如下:
* int select()
* int select(long timeout)
* int selectNow()
select() 是一個阻塞方法,僅在至少一個通道處于就緒狀態時才返回。
select(long timeout) 同樣也是阻塞方法,不過可對該方法設置超時時間(timeout > 0),使得線程不會被一直阻塞。如果 timeout = 0,會一直阻塞線程。
selectNow() 為非阻塞方法,調用后立即返回。
以上3個方法均返回 int 類型值,表示每次調用 select 或 selectNow 方法后,新就緒通道的數量。如果某個通道在上一次調用 select 方法時就已經處于就緒狀態,但并未將該通道對應的 SelectionKey 對象從 selectedKeys 集合中移除。假設另一個的通道在本次調用 select 期間處于就緒狀態,此時,select 返回1,而不是2。
#### [](http://www.tianxiaobo.com/2018/04/03/Java-NIO%E4%B9%8B%E9%80%89%E6%8B%A9%E5%99%A8/#242-選擇過程)2.4.2 選擇過程
選擇方法用起來雖然簡單,但方法之下隱藏的邏輯還是比較復雜的。大致分為下面幾個步驟:
1. 檢查已取消鍵集合 cancelledKeys 是否為空,不為空則將 cancelledKeys 的鍵從 keys 和 selectedKeys 中移除,并將鍵和通道注銷。
2. 調用操作系統的 epoll\_ctl 函數將通道感興趣的事件注冊到 epoll 實例中
3. 調用操作系統的 epoll\_wait 函數監聽事件
4. 再次執行步驟1
5. 更新 selectedKeys 集合,并返回就緒通道數量
上面五個步驟對應于 EPollSelectorImpl 類中 doSelect 方法的邏輯,如下:
```
protected int doSelect(long timeout) throws IOException {
if (closed)
throw new ClosedSelectorException();
// 處理已取消鍵集合,對應步驟1
processDeregisterQueue();
try {
begin();
// select 方法的核心,對應步驟2和3
pollWrapper.poll(timeout);
} finally {
end();
}
// 處理已取消鍵集合,對應步驟4
processDeregisterQueue();
// 更新 selectedKeys 集合,并返回就緒通道數量,對應步驟5
int numKeysUpdated = updateSelectedKeys();
if (pollWrapper.interrupted()) {
// Clear the wakeup pipe
pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);
synchronized (interruptLock) {
pollWrapper.clearInterrupted();
IOUtil.drain(fd0);
interruptTriggered = false;
}
}
return numKeysUpdated;
}
```
接下來,我們按照上面的步驟順序去分析代碼實現。先來看看步驟1對應的代碼:
+----SelectorImpl.java
```
void processDeregisterQueue() throws IOException {
// Precondition: Synchronized on this, keys, and selectedKeys
Set<SelectionKey> cks = cancelledKeys();
synchronized (cks) {
if (!cks.isEmpty()) {
Iterator<SelectionKey> i = cks.iterator();
// 遍歷 cancelledKeys,執行注銷操作
while (i.hasNext()) {
SelectionKeyImpl ski = (SelectionKeyImpl)i.next();
try {
// 執行注銷邏輯
implDereg(ski);
} catch (SocketException se) {
throw new IOException("Error deregistering key", se);
} finally {
i.remove();
}
}
}
}
}
```
+----EPollSelectorImpl.java
```
protected void implDereg(SelectionKeyImpl ski) throws IOException {
assert (ski.getIndex() >= 0);
SelChImpl ch = ski.channel;
int fd = ch.getFDVal();
// 移除 fd 和選擇鍵鍵的映射關系
fdToKey.remove(Integer.valueOf(fd));
// 從 epoll 實例中刪除事件
pollWrapper.remove(fd);
ski.setIndex(-1);
// 從 keys 和 selectedKeys 中移除選擇鍵
keys.remove(ski);
selectedKeys.remove(ski);
// 注銷選擇鍵
deregister((AbstractSelectionKey)ski);
// 注銷通道
SelectableChannel selch = ski.channel();
if (!selch.isOpen() && !selch.isRegistered())
((SelChImpl)selch).kill();
}
```
上面的代碼代碼邏輯不是很復雜,首先是獲取 cancelledKeys 集合,然后遍歷集合,并對每個選擇鍵及其對應的通道執行注銷操作。接下來再來看看步驟2和3對應的代碼,如下:
+----EPollArrayWrapper.java
```
int poll(long timeout) throws IOException {
// 調用 epoll_ctl 函數注冊事件,對應步驟3
updateRegistrations();
// 調用 epoll_wait 函數等待事件發生,對應步驟4
updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);
for (int i=0; i<updated; i++) {
if (getDescriptor(i) == incomingInterruptFD) {
interruptedIndex = i;
interrupted = true;
break;
}
}
return updated;
}
/**
* Update the pending registrations.
*/
private void updateRegistrations() {
synchronized (updateLock) {
int j = 0;
while (j < updateCount) {
// 獲取 fd 和 events,這兩個值在調用 register 方法時被存儲到數組中
int fd = updateDescriptors[j];
short events = getUpdateEvents(fd);
boolean isRegistered = registered.get(fd);
int opcode = 0;
if (events != KILLED) {
// 確定 opcode 的值
if (isRegistered) {
opcode = (events != 0) ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
} else {
opcode = (events != 0) ? EPOLL_CTL_ADD : 0;
}
if (opcode != 0) {
// 注冊事件
epollCtl(epfd, opcode, fd, events);
// 設置 fd 的注冊狀態
if (opcode == EPOLL_CTL_ADD) {
registered.set(fd);
} else if (opcode == EPOLL_CTL_DEL) {
registered.clear(fd);
}
}
}
j++;
}
updateCount = 0;
}
// 下面兩個均是 native 方法
private native void epollCtl(int epfd, int opcode, int fd, int events);
private native int epollWait(long pollAddress, int numfds, long timeout, int epfd) throws IOException;
}
```
看到 updateRegistrations 方法的實現,大家現在知道 epoll\_ctl 這個函數是在哪里調用的了。在 3.2 節通道注冊的結尾給大家埋了一個疑問,這里就是答案了。注冊通道實際上只是先將事件收集起來,等調用 select 方法時,在一起通過 epoll\_ctl 函數將事件注冊到 epoll 實例中。
上面 epollCtl 和 epollWait 方法是 native 類型的,接下來我們再來看看這兩個方法是如何實現的。如下:
+----EPollArrayWrapper.c
```
JNIEXPORT void JNICALL Java_sun_nio_ch_EPollArrayWrapper_epollCtl(JNIEnv *env, jobject this, jint epfd, jint opcode, jint fd, jint events) {
struct epoll_event event;
int res;
event.events = events;
event.data.fd = fd;
// 調用 epoll_ctl 注冊事件
RESTARTABLE(epoll_ctl(epfd, (int)opcode, (int)fd, &event), res);
if (res < 0 && errno != EBADF && errno != ENOENT && errno != EPERM) {
JNU_ThrowIOExceptionWithLastError(env, "epoll_ctl failed");
}
}
JNIEXPORT jint JNICALL Java_sun_nio_ch_EPollArrayWrapper_epollWait(JNIEnv *env, jobject this, jlong address, jint numfds, jlong timeout, jint epfd) {
struct epoll_event *events = jlong_to_ptr(address);
int res;
if (timeout <= 0) { /* Indefinite or no wait */
// 調用 epoll_wait 等待事件
RESTARTABLE(epoll_wait(epfd, events, numfds, timeout), res);
} else { /* Bounded wait; bounded restarts */
res = iepoll(epfd, events, numfds, timeout);
}
if (res < 0) {
JNU_ThrowIOExceptionWithLastError(env, "epoll_wait failed");
}
return res;
}
```
上面的C代碼沒什么復雜的邏輯,這里就不多說了。如果大家對 epoll\_ctl 和 epoll\_wait 函數不了解,可以參考 Linux man-page。關于 epoll 的示例,也可以參考我的另一篇文章[“基于epoll實現簡單的web服務器”](http://www.coolblog.xyz/2018/03/02/%E5%9F%BA%E4%BA%8Eepoll%E5%AE%9E%E7%8E%B0%E7%AE%80%E5%8D%95%E7%9A%84web%E6%9C%8D%E5%8A%A1%E5%99%A8/)。
說完步驟2和3對應的代碼,接下來再來說說步驟4和5。由于步驟4和步驟1是一樣的,這里不再贅述。最后再來說說步驟5的邏輯。代碼如下:
+----EPollSelectorImpl.java
```
private int updateSelectedKeys() {
int entries = pollWrapper.updated;
int numKeysUpdated = 0;
for (int i=0; i<entries; i++) {
/* 從 pollWrapper 成員變量的 pollArray 中獲取文件描述符,
* pollArray 中的數據由 epoll_wait 設置
*/
int nextFD = pollWrapper.getDescriptor(i);
SelectionKeyImpl ski = fdToKey.get(Integer.valueOf(nextFD));
// ski is null in the case of an interrupt
if (ski != null) {
// 從 pollArray 中獲取就緒事件集合
int rOps = pollWrapper.getEventOps(i);
/* 如果 selectedKeys 已包含選擇鍵,則選擇鍵必須由新的事件發生時,
* 才會將 numKeysUpdated + 1
*/
if (selectedKeys.contains(ski)) {
if (ski.channel.translateAndSetReadyOps(rOps, ski)) {
numKeysUpdated++;
}
} else {
// 轉換并設置就緒事件集合
ski.channel.translateAndSetReadyOps(rOps, ski);
if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
// 更新 selectedKeys 集合,并將 numKeysUpdated + 1
selectedKeys.add(ski);
numKeysUpdated++;
}
}
}
}
// 返回 numKeysUpdated
return numKeysUpdated;
}
```
+----SocketChannelImpl.java
```
public boolean translateReadyOps(int ops, int initialOps, SelectionKeyImpl sk) {
int intOps = sk.nioInterestOps(); // Do this just once, it synchronizes
int oldOps = sk.nioReadyOps();
int newOps = initialOps;
if ((ops & PollArrayWrapper.POLLNVAL) != 0) {
return false;
}
if ((ops & (PollArrayWrapper.POLLERR
| PollArrayWrapper.POLLHUP)) != 0) {
newOps = intOps;
sk.nioReadyOps(newOps);
// No need to poll again in checkConnect,
// the error will be detected there
readyToConnect = true;
return (newOps & ~oldOps) != 0;
}
/*
* 轉換事件
*/
if (((ops & PollArrayWrapper.POLLIN) != 0) &&
((intOps & SelectionKey.OP_READ) != 0) &&
(state == ST_CONNECTED))
newOps |= SelectionKey.OP_READ;
if (((ops & PollArrayWrapper.POLLCONN) != 0) &&
((intOps & SelectionKey.OP_CONNECT) != 0) &&
((state == ST_UNCONNECTED) || (state == ST_PENDING))) {
newOps |= SelectionKey.OP_CONNECT;
readyToConnect = true;
}
if (((ops & PollArrayWrapper.POLLOUT) != 0) &&
((intOps & SelectionKey.OP_WRITE) != 0) &&
(state == ST_CONNECTED))
newOps |= SelectionKey.OP_WRITE;
// 設置事件
sk.nioReadyOps(newOps);
// 如果新的就緒事件和老的就緒事件不相同,則返回true,否則返回 false
return (newOps & ~oldOps) != 0;
}
```
上面就是步驟5的邏輯了,簡單總結一下。首先是獲取就緒通道數量,然后再獲取這些就緒通道對應的文件描述符 fd,以及就緒事件集合 rOps。之后調用 translateAndSetReadyOps 轉換并設置就緒事件集合。最后,將選擇鍵添加到 selectedKeys 集合中,并累加 numKeysUpdated 值,之后返回該值。
以上就是選擇過程的代碼講解,貼了不少代碼,可能不太好理解。Java NIO 和操作系統接口關聯比較大,所以在學習 NIO 相關原理時,也應該去了解諸如 epoll 等系統調用的知識。沒有這些背景知識,很多東西看起來不太好懂。好了,本節到此結束。
### [](http://www.tianxiaobo.com/2018/04/03/Java-NIO%E4%B9%8B%E9%80%89%E6%8B%A9%E5%99%A8/#25-模板代碼)2.5 模板代碼
使用 NIO 選擇器編程時,主干代碼的結構一般比較固定。所以把主干代碼寫好后,就可以往里填業務代碼了。下面貼一個服務端的模板代碼,如下:
```
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.socket().bind(new InetSocketAddress("localhost", 8080));
ssc.configureBlocking(false);
Selector selector = Selector.open();
ssc.register(selector, SelectionKey.OP_ACCEPT);
while(true) {
int readyNum = selector.select();
if (readyNum == 0) {
continue;
}
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectedKeys.iterator();
while(it.hasNext()) {
SelectionKey key = it.next();
if(key.isAcceptable()) {
// 接受連接
} else if (key.isReadable()) {
// 通道可讀
} else if (key.isWritable()) {
// 通道可寫
}
it.remove();
}
}
```
### [](http://www.tianxiaobo.com/2018/04/03/Java-NIO%E4%B9%8B%E9%80%89%E6%8B%A9%E5%99%A8/#26-實例演示)2.6 實例演示
原本打算將示例演示的代碼放在本節中展示,奈何文章篇幅已經很大了,所以決定把本節的內容獨立成文。在[下一篇文章](http://www.coolblog.xyz/2018/04/04/%E5%9F%BA%E4%BA%8E-Java-NIO-%E5%AE%9E%E7%8E%B0%E7%AE%80%E5%8D%95%E7%9A%84-HTTP-%E6%9C%8D%E5%8A%A1%E5%99%A8/)中,我將會演示使用 Java NIO 完成一個簡單的 HTTP 服務器。這里先貼張效果圖,如下:

## [](http://www.tianxiaobo.com/2018/04/03/Java-NIO%E4%B9%8B%E9%80%89%E6%8B%A9%E5%99%A8/#3總結)3.總結
到這里,本文差不多就要結束了。原本只是打算簡單說說 Selector 的用法,然后再寫一份實例代碼。但是后來發現這樣寫顯得比較空洞,沒什么深度。所以后來翻了一下 Selector 的源碼,大致理解了 Selector 的邏輯,然后就有了上面的分析。不過 Selector 的邏輯并不止我上面所說的那些,還有一些內容我現在還沒看,所以就沒有講。對于已寫出來的分析,由于我個人水平有限,難免會有錯誤。如果有錯誤,也歡迎大家指出來,共同進步!
好了,本文到此結束,感謝大家的閱讀。
## [](http://www.tianxiaobo.com/2018/04/03/Java-NIO%E4%B9%8B%E9%80%89%E6%8B%A9%E5%99%A8/#參考)參考
* [Java NIO Selector - jenkov.com](http://tutorials.jenkov.com/java-nio/selectors.html#pageToc)
* [Java NIO(6): Selector - 知乎](https://zhuanlan.zhihu.com/p/27434028)
## [](http://www.tianxiaobo.com/2018/04/03/Java-NIO%E4%B9%8B%E9%80%89%E6%8B%A9%E5%99%A8/#附錄)附錄
文中貼的一些代碼是沒有包含在 JDK src.zip 包里的,這里單獨列舉出來,方便大家查找。
| 文件名 | 路徑 |
| --- | --- |
| DefaultSelectorProvider.java | jdk/src/solaris/classes/sun/nio/ch/DefaultSelectorProvider.java |
| EPollSelectorProvider.java | jdk/src/solaris/classes/sun/nio/ch/EPollSelectorProvider.java |
| SelectorImpl.java | jdk/src/share/classes/sun/nio/ch/SelectorImpl.java |
| EPollSelectorImpl.java | jdk/src/solaris/classes/sun/nio/ch/EPollSelectorImpl.java |
| EPollArrayWrapper.java | jdk/src/solaris/classes/sun/nio/ch/EPollArrayWrapper.java |
| SelectionKeyImpl.java | jdk/src/share/classes/sun/nio/ch/SelectionKeyImpl.java |
| SocketChannelImpl.java | jdk/src/share/classes/sun/nio/ch/SocketChannelImpl.java |
| EPollArrayWrapper.c | jdk/src/solaris/native/sun/nio/ch/EPollArrayWrapper.c |
轉載自 :[http://www.tianxiaobo.com/2018/04/03/Java-NIO之選擇器/](http://www.tianxiaobo.com/2018/04/03/Java-NIO%E4%B9%8B%E9%80%89%E6%8B%A9%E5%99%A8/)
- 一.JVM
- 1.1 java代碼是怎么運行的
- 1.2 JVM的內存區域
- 1.3 JVM運行時內存
- 1.4 JVM內存分配策略
- 1.5 JVM類加載機制與對象的生命周期
- 1.6 常用的垃圾回收算法
- 1.7 JVM垃圾收集器
- 1.8 CMS垃圾收集器
- 1.9 G1垃圾收集器
- 2.面試相關文章
- 2.1 可能是把Java內存區域講得最清楚的一篇文章
- 2.0 GC調優參數
- 2.1GC排查系列
- 2.2 內存泄漏和內存溢出
- 2.2.3 深入理解JVM-hotspot虛擬機對象探秘
- 1.10 并發的可達性分析相關問題
- 二.Java集合架構
- 1.ArrayList深入源碼分析
- 2.Vector深入源碼分析
- 3.LinkedList深入源碼分析
- 4.HashMap深入源碼分析
- 5.ConcurrentHashMap深入源碼分析
- 6.HashSet,LinkedHashSet 和 LinkedHashMap
- 7.容器中的設計模式
- 8.集合架構之面試指南
- 9.TreeSet和TreeMap
- 三.Java基礎
- 1.基礎概念
- 1.1 Java程序初始化的順序是怎么樣的
- 1.2 Java和C++的區別
- 1.3 反射
- 1.4 注解
- 1.5 泛型
- 1.6 字節與字符的區別以及訪問修飾符
- 1.7 深拷貝與淺拷貝
- 1.8 字符串常量池
- 2.面向對象
- 3.關鍵字
- 4.基本數據類型與運算
- 5.字符串與數組
- 6.異常處理
- 7.Object 通用方法
- 8.Java8
- 8.1 Java 8 Tutorial
- 8.2 Java 8 數據流(Stream)
- 8.3 Java 8 并發教程:線程和執行器
- 8.4 Java 8 并發教程:同步和鎖
- 8.5 Java 8 并發教程:原子變量和 ConcurrentMap
- 8.6 Java 8 API 示例:字符串、數值、算術和文件
- 8.7 在 Java 8 中避免 Null 檢查
- 8.8 使用 Intellij IDEA 解決 Java 8 的數據流問題
- 四.Java 并發編程
- 1.線程的實現/創建
- 2.線程生命周期/狀態轉換
- 3.線程池
- 4.線程中的協作、中斷
- 5.Java鎖
- 5.1 樂觀鎖、悲觀鎖和自旋鎖
- 5.2 Synchronized
- 5.3 ReentrantLock
- 5.4 公平鎖和非公平鎖
- 5.3.1 說說ReentrantLock的實現原理,以及ReentrantLock的核心源碼是如何實現的?
- 5.5 鎖優化和升級
- 6.多線程的上下文切換
- 7.死鎖的產生和解決
- 8.J.U.C(java.util.concurrent)
- 0.簡化版(快速復習用)
- 9.鎖優化
- 10.Java 內存模型(JMM)
- 11.ThreadLocal詳解
- 12 CAS
- 13.AQS
- 0.ArrayBlockingQueue和LinkedBlockingQueue的實現原理
- 1.DelayQueue的實現原理
- 14.Thread.join()實現原理
- 15.PriorityQueue 的特性和原理
- 16.CyclicBarrier的實際使用場景
- 五.Java I/O NIO
- 1.I/O模型簡述
- 2.Java NIO之緩沖區
- 3.JAVA NIO之文件通道
- 4.Java NIO之套接字通道
- 5.Java NIO之選擇器
- 6.基于 Java NIO 實現簡單的 HTTP 服務器
- 7.BIO-NIO-AIO
- 8.netty(一)
- 9.NIO面試題
- 六.Java設計模式
- 1.單例模式
- 2.策略模式
- 3.模板方法
- 4.適配器模式
- 5.簡單工廠
- 6.門面模式
- 7.代理模式
- 七.數據結構和算法
- 1.什么是紅黑樹
- 2.二叉樹
- 2.1 二叉樹的前序、中序、后序遍歷
- 3.排序算法匯總
- 4.java實現鏈表及鏈表的重用操作
- 4.1算法題-鏈表反轉
- 5.圖的概述
- 6.常見的幾道字符串算法題
- 7.幾道常見的鏈表算法題
- 8.leetcode常見算法題1
- 9.LRU緩存策略
- 10.二進制及位運算
- 10.1.二進制和十進制轉換
- 10.2.位運算
- 11.常見鏈表算法題
- 12.算法好文推薦
- 13.跳表
- 八.Spring 全家桶
- 1.Spring IOC
- 2.Spring AOP
- 3.Spring 事務管理
- 4.SpringMVC 運行流程和手動實現
- 0.Spring 核心技術
- 5.spring如何解決循環依賴問題
- 6.springboot自動裝配原理
- 7.Spring中的循環依賴解決機制中,為什么要三級緩存,用二級緩存不夠嗎
- 8.beanFactory和factoryBean有什么區別
- 九.數據庫
- 1.mybatis
- 1.1 MyBatis-# 與 $ 區別以及 sql 預編譯
- Mybatis系列1-Configuration
- Mybatis系列2-SQL執行過程
- Mybatis系列3-之SqlSession
- Mybatis系列4-之Executor
- Mybatis系列5-StatementHandler
- Mybatis系列6-MappedStatement
- Mybatis系列7-參數設置揭秘(ParameterHandler)
- Mybatis系列8-緩存機制
- 2.淺談聚簇索引和非聚簇索引的區別
- 3.mysql 證明為什么用limit時,offset很大會影響性能
- 4.MySQL中的索引
- 5.數據庫索引2
- 6.面試題收集
- 7.MySQL行鎖、表鎖、間隙鎖詳解
- 8.數據庫MVCC詳解
- 9.一條SQL查詢語句是如何執行的
- 10.MySQL 的 crash-safe 原理解析
- 11.MySQL 性能優化神器 Explain 使用分析
- 12.mysql中,一條update語句執行的過程是怎么樣的?期間用到了mysql的哪些log,分別有什么作用
- 十.Redis
- 0.快速復習回顧Redis
- 1.通俗易懂的Redis數據結構基礎教程
- 2.分布式鎖(一)
- 3.分布式鎖(二)
- 4.延時隊列
- 5.位圖Bitmaps
- 6.Bitmaps(位圖)的使用
- 7.Scan
- 8.redis緩存雪崩、緩存擊穿、緩存穿透
- 9.Redis為什么是單線程、及高并發快的3大原因詳解
- 10.布隆過濾器你值得擁有的開發利器
- 11.Redis哨兵、復制、集群的設計原理與區別
- 12.redis的IO多路復用
- 13.相關redis面試題
- 14.redis集群
- 十一.中間件
- 1.RabbitMQ
- 1.1 RabbitMQ實戰,hello world
- 1.2 RabbitMQ 實戰,工作隊列
- 1.3 RabbitMQ 實戰, 發布訂閱
- 1.4 RabbitMQ 實戰,路由
- 1.5 RabbitMQ 實戰,主題
- 1.6 Spring AMQP 的 AMQP 抽象
- 1.7 Spring AMQP 實戰 – 整合 RabbitMQ 發送郵件
- 1.8 RabbitMQ 的消息持久化與 Spring AMQP 的實現剖析
- 1.9 RabbitMQ必備核心知識
- 2.RocketMQ 的幾個簡單問題與答案
- 2.Kafka
- 2.1 kafka 基礎概念和術語
- 2.2 Kafka的重平衡(Rebalance)
- 2.3.kafka日志機制
- 2.4 kafka是pull還是push的方式傳遞消息的?
- 2.5 Kafka的數據處理流程
- 2.6 Kafka的腦裂預防和處理機制
- 2.7 Kafka中partition副本的Leader選舉機制
- 2.8 如果Leader掛了的時候,follower沒來得及同步,是否會出現數據不一致
- 2.9 kafka的partition副本是否會出現腦裂情況
- 十二.Zookeeper
- 0.什么是Zookeeper(漫畫)
- 1.使用docker安裝Zookeeper偽集群
- 3.ZooKeeper-Plus
- 4.zk實現分布式鎖
- 5.ZooKeeper之Watcher機制
- 6.Zookeeper之選舉及數據一致性
- 十三.計算機網絡
- 1.進制轉換:二進制、八進制、十六進制、十進制之間的轉換
- 2.位運算
- 3.計算機網絡面試題匯總1
- 十四.Docker
- 100.面試題收集合集
- 1.美團面試常見問題總結
- 2.b站部分面試題
- 3.比心面試題
- 4.騰訊面試題
- 5.哈羅部分面試
- 6.筆記
- 十五.Storm
- 1.Storm和流處理簡介
- 2.Storm 核心概念詳解
- 3.Storm 單機版本環境搭建
- 4.Storm 集群環境搭建
- 5.Storm 編程模型詳解
- 6.Storm 項目三種打包方式對比分析
- 7.Storm 集成 Redis 詳解
- 8.Storm 集成 HDFS 和 HBase
- 9.Storm 集成 Kafka
- 十六.Elasticsearch
- 1.初識ElasticSearch
- 2.文檔基本CRUD、集群健康檢查
- 3.shard&replica
- 4.document核心元數據解析及ES的并發控制
- 5.document的批量操作及數據路由原理
- 6.倒排索引
- 十七.分布式相關
- 1.分布式事務解決方案一網打盡
- 2.關于xxx怎么保證高可用的問題
- 3.一致性hash原理與實現
- 4.微服務注冊中心 Nacos 比 Eureka的優勢
- 5.Raft 協議算法
- 6.為什么微服務架構中需要網關
- 0.CAP與BASE理論
- 十八.Dubbo
- 1.快速掌握Dubbo常規應用
- 2.Dubbo應用進階
- 3.Dubbo調用模塊詳解
- 4.Dubbo調用模塊源碼分析
- 6.Dubbo協議模塊