[TOC]
# 簡介
**nio優勢不在于數據傳送的速度**
nio是new io的簡稱,在jdk1.4里面提供的新api
為所有的原始類型提供(Buffer)緩存支持
字符集編碼解碼解決方案
channel:一個新的原始i/o抽象
支持鎖和內存映射文件的文件訪問接口
提供多路(non-bloking)非阻塞式的高伸縮性網絡I/O
# socket /nio原理
1. 阻塞和非阻塞
阻塞和非阻塞是進程在訪問數據的時候,數據是否準備就緒的一種處理方式。
當數據沒有準備好的時候,
阻塞:往往需要等待緩沖區中的數據準備好之后才處理,否則一直等待。
非阻塞:當我們的進程訪問我們的數據緩沖區的時候,數據沒有準備好的時候,直接返回,不需要等待。有數據的時候,也直接返回
2. 同步和異步
同步和異步都是基于應用程序和操作系統處理IO事件所采用的方式:
同步:應用程序要直接參與IO事件的操作;
異步:所有的IO讀寫事件交給操作系統去處理;
同步的方式在處理IO事件的時候,必須阻塞在某個方法上面等待我們的IO事件完成(阻塞在IO事件或者通過輪詢IO事件的方式);對于異步來說,所有的IO讀寫都交給了操作系統,這個時候,我們可以去做其他的事情,并不需要去完成真正的IO操作。當操作系統完成IO之后,給我們的應用程序一個通知就可以了。
同步有兩種實現模式:
1. 阻塞到IO事件 阻塞到read 或者 write 方法上,這個時候我們就完全不能做自己的事情。(在這種情況下,我們只能把讀寫方法放置到線程中,然后阻塞線程的方式來實現并發服務,對線程的性能開銷比較大)
2. IO事件的輪詢 --在linux c語言編程中叫做多路復用技術(select模式)
讀寫事件交給一個專門的線程來處理,這個線程完成IO事件的注冊功能,還有就是不斷地去輪詢我們的讀寫緩沖區(操作系統),看是否有數據準備好,然后通知我們的相應的業務處理線程。這樣的話,我們的業務處理線程就可以做其他的事情。在這種模式下,阻塞的不是所有的IO線程,而是阻塞的只是select線程
比喻說明:
Client
Selector 管家
BOSS
當客人來的時候,就給管家說,我來了(注冊),管家得到這個注冊信息后,就給BOSS說,我這里有一個或者多個客人。BOSS就說你去給某人A這件東西(IO數據),給另外一個人B另一件東西。這個時候,客人是可以去做自己的事情(比如看看花園等等),當管家知道BOSS給他任務后,他就會去找對應的某人(根據客人的注冊信息),告訴他BOSS給他了某樣東西。
# JAVA IO模型
基于以上4中IO模型,JAVA對應的實現有:
BIO--同步阻塞: JDK1.4以前我們使用的都是BIO
阻塞到我們的讀寫方法,阻塞到線程來提高并發性能,但是效果不是很好
NIO--同步非阻塞:JDK1.4 linux多路復用技術(select模式) 實現IO事件的輪詢方式:同步非阻塞的模式,這種方式目前是主流的網絡通信模式
mina netty ——網絡通信框架,比自己寫NIO要容易些,并且代碼可讀性更好
AIO:JDK1.7(NIO2)真正的異步非阻塞IO(基于linux的epoll模式)
小結:
1)BIO阻塞的IO
2)NIO select多路復用+非阻塞 同步非阻塞
3)AIO異步非阻塞IO
# NIO詳解
New IO成功的解決了上述問題,它是怎樣解決的呢?
IO處理客戶端請求的最小單位是線程
而NIO使用了比線程還小一級的單位:通道(Channel)
可以說,NIO中只需要一個線程就能完成所有接收,讀,寫等操作
要學習NIO,首先要理解它的三大核心
Selector,選擇器
Buffer,緩沖區
Channel,通道
**Buffer**
首先要知道什么是Buffer
在NIO中數據交互不再像IO機制那樣使用流
而是使用Buffer(緩沖區)
可以看出Buffer在整個工作流程中的位置
buffer實際上是一個容器,一個連續數組,它通過幾個變量來保存這個數據的當前位置狀態:
1. capacity:容量,緩沖區能容納元素的數量
2. position:當前位置,是緩沖區中下一次發生讀取和寫入操作的索引,當前位置通過大多數讀寫操作向前推進
3. limit:界限,是緩沖區中最后一個有效位置之后下一個位置的索引
如圖:

幾個常用方法:
~~~
.flip() //將limit設置為position,然后position重置為0,返回對緩沖區的引用
.clear() //清空調用緩沖區并返回對緩沖區的引用
~~~
來點實際點的,上面圖中的具體代碼如下:
1. 首先給Buffer分配空間,以字節為單位
~~~
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
~~~
創建一個ByteBuffer對象并且指定內存大小
2. 向Buffer中寫入數據:
~~~
1).數據從Channel到Buffer:channel.read(byteBuffer);
2).數據從Client到Buffer:byteBuffer.put(...);
~~~
3. 從Buffer中讀取數據:
~~~
1).數據從Buffer到Channel:channel.write(byteBuffer);
2).數據從Buffer到Server:byteBuffer.get(...);
~~~
**Selector**
選擇器是NIO的核心,它是channel的管理者
通過執行select()阻塞方法,監聽是否有channel準備好
一旦有數據可讀,此方法的返回值是SelectionKey的數量
所以服務端通常會死循環執行select()方法,直到有channl準備就緒,然后開始工作
每個channel都會和Selector綁定一個事件,然后生成一個SelectionKey的對象
需要注意的是:
channel和Selector綁定時,channel必須是非阻塞模式
而FileChannel不能切換到非阻塞模式,因為它不是套接字通道,所以FileChannel不能和Selector綁定事件
在NIO中一共有四種事件:
1. SelectionKey.OP_CONNECT:連接事件
2. SelectionKey.OP_ACCEPT:接收事件
3. SelectionKey.OP_READ:讀事件
4. SelectionKey.OP_WRITE:寫事件
**Channel**
共有四種通道:
FileChannel:作用于IO文件流
DatagramChannel:作用于UDP協議
SocketChannel:作用于TCP協議
ServerSocketChannel:作用于TCP協議
本篇文章通過常用的TCP協議來講解NIO
我們以ServerSocketChannel為例:
打開一個ServerSocketChannel通道
~~~
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
~~~
關閉ServerSocketChannel通道:
~~~
serverSocketChannel.close();
~~~
循環監聽SocketChannel:
~~~
while(true){
SocketChannel socketChannel = serverSocketChannel.accept();
clientChannel.configureBlocking(false);
}
~~~
`clientChannel.configureBlocking(false);`語句是將此通道設置為非阻塞,也就是異步
自由控制阻塞或非阻塞便是NIO的特性之一
**SelectionKey**
SelectionKey是通道和選擇器交互的核心組件
比如在SocketChannel上綁定一個Selector,并注冊為連接事件:
~~~
SocketChannel clientChannel = SocketChannel.open();
clientChannel.configureBlocking(false);
clientChannel.connect(new InetSocketAddress(port));
clientChannel.register(selector, SelectionKey.OP_CONNECT);
~~~
核心在register()方法,它返回一個SelectionKey對象
來檢測channel事件是那種事件可以使用以下方法:
~~~
selectionKey.isAcceptable();
selectionKey.isConnectable();
selectionKey.isReadable();
selectionKey.isWritable();
~~~
服務端便是通過這些方法 在輪詢中執行相對應操作
當然通過Channel與Selector綁定的key也可以反過來拿到他們
~~~
Channel channel = selectionKey.channel();
Selector selector = selectionKey.selector();
~~~
在Channel上注冊事件時,我們也可以順帶綁定一個Buffer:
~~~
clientChannel.register(key.selector(), SelectionKey.OP_READ,ByteBuffer.allocateDirect(1024));
~~~
或者綁定一個Object:
~~~
selectionKey.attach(Object);
Object anthorObj = selectionKey.attachment();
~~~
# NIO代碼
**編解碼類CharsetHelper**
~~~
package com.nio;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CharsetEncoder;
public class CharsetHelper {
private static final String UTF_8 = "UTF-8";
private static CharsetEncoder encoder = Charset.forName(UTF_8).newEncoder();
private static CharsetDecoder decoder = Charset.forName(UTF_8).newDecoder();
public static ByteBuffer encode(CharBuffer in) throws CharacterCodingException {
return encoder.encode(in);
}
public static CharBuffer decode(ByteBuffer in) throws CharacterCodingException{
return decoder.decode(in);
}
}
~~~
**服務端類NioServer**
~~~
package com.nio;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.*;
import java.util.Iterator;
public class NioServer {
//和操作系統交互的緩存
private ByteBuffer readBuffer;
//輪詢器
private Selector selector;
public static void main(String[] args) {
NioServer server = new NioServer();
server.init();
System.out.println("server started --> 8383");
server.listen();
}
private void init() {
//分配個緩存
readBuffer = ByteBuffer.allocate(1024);
ServerSocketChannel serverSocketChannel;
try {
//創建一個socket channel; channel是nio中對通信通道的抽象,不分入站出站方向
serverSocketChannel = ServerSocketChannel.open();
//設置通道為非阻塞的方式
serverSocketChannel.configureBlocking(false);
//將通道綁定在服務器的ip地址和某個端口上
serverSocketChannel.socket().bind(new InetSocketAddress(8383));
//打開一個多路復用器
selector = Selector.open();
//將上面創建好的socket channel注冊到selector多路復用器上
//對于復用端來說,一定要先注冊一個OP_ACCEPT事件用來響應客戶端的連接請求
//將上述的通道管理器和通道綁定,并為該通道注冊OP_ACCEPT事件
//注冊事件后,當該事件到達時,selector.select()會返回(一個key),如果該事件沒到達selector.select()會一直阻塞
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
}catch (Exception e) {
e.printStackTrace();
}
}
private void listen() {
while(true) {
try {
//去詢問一次selector選擇器
//這是一個阻塞方法,一直等待直到有數據可讀,返回值是key的數量(可以有多個)
selector.select();
//拿到事件的key
//如果channel有數據了,將生成的key訪入keys集合中,得到這個keys集合的迭代器
Iterator<SelectionKey> ite = selector.selectedKeys().iterator();
//使用迭代器遍歷集合
while (ite.hasNext()) {
//遍歷到一個事件key,得到集合中的一個key實例
SelectionKey key = ite.next();
//確保不重復處理
//拿到當前key實例之后記得在迭代器中將這個元素刪除,非常重要,否則會出錯
ite.remove();
//處理事件
handlekey(key);
}
}catch (Exception e) {
e.printStackTrace();
}
}
}
//處理事件
private void handlekey(SelectionKey key) {
SocketChannel channel = null;
try {
//這個key是可接受連接的嗎
if (key.isAcceptable()) {
ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
//接收連接請求
channel = serverChannel.accept();
channel.configureBlocking(false);
//對讀的事件感興趣
channel.register(selector,SelectionKey.OP_READ);
//這個事件是可讀的嗎
}else if(key.isReadable()) {
channel = (SocketChannel) key.channel();
//先清空一下buffer,因為要用,以防是老的數據
readBuffer.clear();
/**
* 當客戶端channel關閉后,會不斷收到read事件,但沒有消息,即read方法返回-1
* 所以這時服務器端也需要關閉channel,避免無限無效的處理
* 把消息read到readBuffer中
*/
int count = channel.read(readBuffer);
if (count > 0) {
// 一定需要調用flip函數,否則讀取錯誤數據
// 簡單來說,flip操作就是讓讀寫指針、limit指針復位到正確的位置
readBuffer.flip();
/*
* 使用CharBuffer配合取出正確的數據;
* String question = new String(readBuffer.array());可能會出錯,
* 因為前面readBuffer.clear();并未真正清理數據 只是重置緩沖區的position,
* limit, mark, 而readBuffer.array()會返回整個緩沖區的內容。
* decode方法只取readBuffer的position到limit數據。
* 例如,上一次讀取到緩沖區的是"where", clear后position為0,limit為 1024,
* 再次讀取“bye"到緩沖區后,position為3,limit不變,
* flip后position為0,limit為3,前三個字符被覆蓋了,但"re"還存在緩沖區中, 所以 new
* String(readBuffer.array()) 返回 "byere",
* 而decode(readBuffer)返回"bye"。
*/
CharBuffer charBuffer = CharsetHelper.decode(readBuffer);
String question = charBuffer.toString();
// 根據客戶端的請求,調用相應的業務方法獲取業務結果
String answer = getAnswer(question);
channel.write(CharsetHelper.encode(CharBuffer.wrap(answer)));
} else {
// 這里關閉channel,因為客戶端已經關閉channel或者異常了
channel.close();
}
}
}catch (Exception e) {
e.printStackTrace();
}
}
private String getAnswer(String question) {
String answer = null;
if ("who".equals(question)) {
answer = "我是鳳姐\n";
} else if ("what".equals(question)) {
answer = "我是來幫你解悶的\n";
} else if ("where".equals(question)) {
answer = "我來自外太空\n";
} else if ("hi".equals(question)) {
answer = "hello\n";
} else if ("bye".equals(question)) {
answer = "88\n";
} else {
answer = "請輸入 who, 或者what, 或者where";
}
return answer;
}
}
~~~
**客戶端NioClient**
~~~
package com.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
public class NioClient implements Runnable{
private BlockingQueue<String> words;
private Random random;
public static void main(String[] args) {
// 多個線程發起Socket客戶端連接請求
for(int i=0; i<10; i++){
NioClient c = new NioClient();
c.init();
new Thread(c).start();
}
}
@Override
public void run() {
SocketChannel channel = null;
Selector selector = null;
try {
channel = SocketChannel.open();
channel.configureBlocking(false);
// 主動請求連接
channel.connect(new InetSocketAddress("localhost", 8383));
selector = Selector.open();
channel.register(selector, SelectionKey.OP_CONNECT);
boolean isOver = false;
while(! isOver){
selector.select();
Iterator<SelectionKey> ite = selector.selectedKeys().iterator();
while(ite.hasNext()){
SelectionKey key = ite.next();
ite.remove();
if(key.isConnectable()){
if(channel.isConnectionPending()){
if(channel.finishConnect()){
//只有當連接成功后才能注冊OP_READ事件
key.interestOps(SelectionKey.OP_READ);
channel.write(CharsetHelper.encode(CharBuffer.wrap(getWord())));
sleep();
}
else{
key.cancel();
}
}
}
else if(key.isReadable()){
ByteBuffer byteBuffer = ByteBuffer.allocate(128);
channel.read(byteBuffer);
byteBuffer.flip();
CharBuffer charBuffer = CharsetHelper.decode(byteBuffer);
String answer = charBuffer.toString();
System.out.println(Thread.currentThread().getId() + "---" + answer);
String word = getWord();
if(word != null){
channel.write(CharsetHelper.encode(CharBuffer.wrap(word)));
}
else{
isOver = true;
}
sleep();
}
}
}
} catch (IOException e) {
// TODO
}
finally{
if(channel != null){
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if(selector != null){
try {
selector.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
private void init() {
words = new ArrayBlockingQueue<String>(5);
try {
words.put("hi");
words.put("who");
words.put("what");
words.put("where");
words.put("bye");
} catch (InterruptedException e) {
e.printStackTrace();
}
random = new Random();
}
private String getWord(){
return words.poll();
}
private void sleep() {
try {
TimeUnit.SECONDS.sleep(random.nextInt(3));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private void sleep(long l) {
try {
TimeUnit.SECONDS.sleep(l);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
~~~
- linux
- 常用命令
- 高級文本命令
- 面試題
- redis
- String
- list
- hash
- set
- sortedSet
- 案例-推薦
- java高級特性
- 多線程
- 實現線程的三種方式
- 同步關鍵詞
- 讀寫鎖
- 鎖的相關概念
- 多線程的join
- 有三個線程T1 T2 T3,保證順序執行
- java五種線程池
- 守護線程與普通線程
- ThreadLocal
- BlockingQueue消息隊列
- JMS
- 反射
- volatile
- jvm
- IO
- nio
- netty
- netty簡介
- 案例一發送字符串
- 案例二發送對象
- 輕量級RPC開發
- 簡介
- spring(IOC/AOP)
- spring初始化順序
- 通過ApplicationContextAware加載Spring上下文
- InitializingBean的作用
- 結論
- 自定義注解
- zk在框架中的應用
- hadoop
- 簡介
- hadoop集群搭建
- hadoop單機安裝
- HDFS簡介
- hdfs基本操作
- hdfs環境搭建
- 常見問題匯總
- hdfs客戶端操作
- mapreduce工作機制
- 案列-單詞統計
- 局部聚合Combiner
- 案列-流量統計(分區,排序,比較)
- 案列-倒排索引
- 案例-共同好友
- 案列-join算法實現
- 案例-求topN(分組)
- 自定義inputFormat
- 自定義outputFormat
- 框架運算全流程
- mapreduce的優化方案
- HA機制
- Hive
- 安裝
- DDL操作
- 創建表
- 修改表
- DML操作
- Load
- insert
- select
- join操作
- 嚴格模式
- 數據類型
- shell參數
- 函數
- 內置運算符
- 內置函數
- 自定義函數
- Transform實現
- 特殊分割符處理
- 案例
- 級聯求和accumulate
- flume
- 簡介
- 安裝
- 常用的組件
- 攔截器
- 案例
- 采集目錄到HDFS
- 采集文件到HDFS
- 多個agent串聯
- 日志采集和匯總
- 自定義攔截器
- 高可用配置
- 使用注意
- sqoop
- 安裝
- 數據導入
- 導入數據到HDFS
- 導入關系表到HIVE
- 導入表數據子集
- 增量導入
- 數據導出
- 作業
- 原理
- azkaban
- 簡介
- 安裝
- 案例
- 簡介
- command類型單一job
- command類型多job工作流flow
- HDFS操作任務
- mapreduce任務
- hive腳本任務
- hbase
- 簡介
- 安裝
- 命令行
- 基本CURD
- 過濾器查詢
- 系統架構
- 物理存儲
- 尋址機制
- 讀寫過程
- Region管理
- master工作機制
- 建表高級屬性
- 與mapreduce結合
- 協處理器
- 點擊流平臺開發
- 簡介
- storm
- 簡介
- 安裝
- 集群啟動及任務過程分析
- 單詞統計
- 并行度
- ACK容錯機制
- ACK簡介