[TOC]
# 簡介
Netty中,通訊的雙方建立連接后,會把數據按照ByteBuf的方式進行傳輸,例如http協議中,就是通過HttpRequestDecoder對ByteBuf數據流進行處理,轉換成http的對象。基于這個思路,可自定義一種通訊協議:Server和客戶端直接傳輸java對象。
實現的原理是通過Encoder把java對象轉換成ByteBuf流進行傳輸,通過Decoder把ByteBuf轉換成java對象進行處理,處理邏輯如下圖所示:

# bean包
**Person類**
~~~
package com.bean;
import java.io.Serializable;
public class Person implements Serializable {
private static final long serialVersionUID = 1L;
private String name;
private String sex;
private int age;
public String toString() {
return "name:" + name + " sex:" + sex + " age:" + age;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getSex() {
return sex;
}
public void setSex(String sex) {
this.sex = sex;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
}
~~~
# coder包
**PersonEncoder**
~~~
package com.coder;
import com.bean.Person;
import com.utils.ByteObjConverter;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
/**
* 序列化
* 將object轉換成Byte[]
*/
public class PersonEncoder extends MessageToByteEncoder<Person> {
@Override
protected void encode(ChannelHandlerContext ctx, Person msg, ByteBuf out) throws Exception {
//工具類:將object轉換為byte[]
byte[] datas = ByteObjConverter.objectToByte(msg);
out.writeBytes(datas);
ctx.flush();
}
}
~~~
**PersonDecoder**
~~~
package com.coder;
import com.utils.ByteBufToBytes;
import com.utils.ByteObjConverter;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;
public class PersonDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
//工具類:將ByteBuf轉換為byte[]
ByteBufToBytes read = new ByteBufToBytes();
byte[] bytes = read.read(in);
//工具類:將byte[]轉換為object
Object obj = ByteObjConverter.byteToObject(bytes);
//out中的obj將被一個個fireChannelRead到下一個InBoundHandler中
out.add(obj);
}
}
~~~
# nettyServer包
**EchoServer**
~~~
package com.nettyServer;
import com.coder.PersonDecoder;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class EchoServer {
private final int port;
public EchoServer(int port) {
this.port = port;
}
public void start() throws Exception {
EventLoopGroup eventLoopGroup = null;
try {
//創建ServerBootstrap實例來引導綁定和啟動服務器
ServerBootstrap serverBootstrap = new ServerBootstrap();
//創建NioEventLoopGroup對象來處理事件,如接受新連接、接收數據、寫數據等等
eventLoopGroup = new NioEventLoopGroup();
//指定通道類型為NioServerSocketChannel,一種異步模式,OIO阻塞模式為OioServerSocketChannel
//設置InetSocketAddress讓服務器監聽某個端口已等待客戶端連接。
serverBootstrap.group(eventLoopGroup).channel(NioServerSocketChannel.class).localAddress("localhost",port)
.childHandler(new ChannelInitializer<Channel>() {
//設置childHandler執行所有的連接請求
@Override
protected void initChannel(Channel ch) throws Exception {
// 注冊解碼的handler
ch.pipeline().addLast(new PersonDecoder()); //IN1 反序列化
// 添加一個入站的handler到ChannelPipeline
ch.pipeline().addLast(new EchoServerHandler()); //IN2
}
});
// 最后綁定服務器等待直到綁定完成,調用sync()方法會阻塞直到服務器完成綁定,然后服務器等待通道關閉,因為使用sync(),所以關閉操作也會被阻塞。
ChannelFuture channelFuture = serverBootstrap.bind().sync();
System.out.println("開始監聽,端口為:" + channelFuture.channel().localAddress());
channelFuture.channel().closeFuture().sync();
} finally {
eventLoopGroup.shutdownGracefully().sync();
}
}
public static void main(String[] args) throws Exception {
new EchoServer(20000).start();
}
}
~~~
**EchoServerHandler**
~~~
package com.nettyServer;
import com.bean.Person;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
Person person = (Person) msg;
System.out.println(person.getName());
System.out.println(person.getAge());
System.out.println(person.getSex());
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("server 讀取數據完畢..");
ctx.flush(); //flush后才將數據發出到SocketChannel
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
cause.printStackTrace();
ctx.close();
}
}
~~~
# nettyClient包
**EchoClient**
~~~
package com.nettyClient;
import com.coder.PersonEncoder;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.net.InetSocketAddress;
public class EchoClient {
private final String host;
private final int port;
public EchoClient(String host, int port) {
this.host = host;
this.port = port;
}
public void start() throws Exception {
EventLoopGroup nioEventLoopGroup = null;
try {
// 創建Bootstrap對象用來引導啟動客戶端
Bootstrap bootstrap = new Bootstrap();
// 創建EventLoopGroup對象并設置到Bootstrap中,EventLoopGroup可以理解為是一個線程池,這個線程池用來處理連接、接受數據、發送數據
nioEventLoopGroup = new NioEventLoopGroup();
// 創建InetSocketAddress并設置到Bootstrap中,InetSocketAddress是指定連接的服務器地址
bootstrap.group(nioEventLoopGroup)//
.channel(NioSocketChannel.class)//
.remoteAddress(new InetSocketAddress(host, port))//
.handler(new ChannelInitializer<SocketChannel>() {//
// 添加一個ChannelHandler,客戶端成功連接服務器后就會被執行
@Override
protected void initChannel(SocketChannel ch)
throws Exception {
// 注冊編碼的handler
ch.pipeline().addLast(new PersonEncoder()); //out
// 注冊處理消息的handler
ch.pipeline().addLast(new EchoClientHandler()); //in
}
});
// ? 調用Bootstrap.connect()來連接服務器
ChannelFuture f = bootstrap.connect().sync();
// ? 最后關閉EventLoopGroup來釋放資源
f.channel().closeFuture().sync();
} finally {
nioEventLoopGroup.shutdownGracefully().sync();
}
}
public static void main(String[] args) throws Exception {
new EchoClient("localhost", 20000).start();
}
}
~~~
**EchoClientHandler**
~~~
package com.nettyClient;
import com.bean.Person;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
// 客戶端連接服務器后被調用
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Person person = new Person();
person.setName("angelababy");
person.setSex("girl");
person.setAge(18);
ctx.write(person);
ctx.flush();
ctx.close();
}
// ? 從服務器接收到數據后調用
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg)
throws Exception {
System.out.println("client 讀取server數據..");
// 服務端返回消息后
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "UTF-8");
System.out.println("服務端數據為 :" + body);
// ctx.close();
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
}
// ? 發生異常時被調用
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
System.out.println("client exceptionCaught..");
// 釋放資源
ctx.close();
}
}
~~~
# utils包
**ByteBufToBytes**
~~~
package com.utils;
import io.netty.buffer.ByteBuf;
public class ByteBufToBytes {
/**
* 將ByteBuf轉換為byte[]
* @param datas
* @return
*/
public byte[] read(ByteBuf datas) {
byte[] bytes = new byte[datas.readableBytes()];// 創建byte[]
datas.readBytes(bytes);// 將ByteBuf轉換為byte[]
return bytes;
}
}
~~~
**ByteObjConverter**
~~~
package com.utils;
import com.bean.Person;
import java.io.*;
public class ByteObjConverter {
/**
* 使用IO的inputstream流將byte[]轉換為object
* @param bytes
* @return
*/
public static Object byteToObject(byte[] bytes) {
Object obj = null;
ByteArrayInputStream bi = new ByteArrayInputStream(bytes);
ObjectInputStream oi = null;
try {
oi = new ObjectInputStream(bi);
obj = oi.readObject();
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
bi.close();
} catch (IOException e) {
e.printStackTrace();
}
try {
oi.close();
} catch (IOException e) {
e.printStackTrace();
}
}
return obj;
}
/**
* 使用IO的outputstream流將object轉換為byte[]
*/
public static byte[] objectToByte(Object obj) {
byte[] bytes = null;
ByteArrayOutputStream bo = new ByteArrayOutputStream();
ObjectOutputStream oo = null;
try {
oo = new ObjectOutputStream(bo);
oo.writeObject(obj);
bytes = bo.toByteArray();
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
bo.close();
} catch (IOException e) {
e.printStackTrace();
}
try {
oo.close();
} catch (IOException e) {
e.printStackTrace();
}
}
return bytes;
}
}
~~~
- linux
- 常用命令
- 高級文本命令
- 面試題
- redis
- String
- list
- hash
- set
- sortedSet
- 案例-推薦
- java高級特性
- 多線程
- 實現線程的三種方式
- 同步關鍵詞
- 讀寫鎖
- 鎖的相關概念
- 多線程的join
- 有三個線程T1 T2 T3,保證順序執行
- java五種線程池
- 守護線程與普通線程
- ThreadLocal
- BlockingQueue消息隊列
- JMS
- 反射
- volatile
- jvm
- IO
- nio
- netty
- netty簡介
- 案例一發送字符串
- 案例二發送對象
- 輕量級RPC開發
- 簡介
- spring(IOC/AOP)
- spring初始化順序
- 通過ApplicationContextAware加載Spring上下文
- InitializingBean的作用
- 結論
- 自定義注解
- zk在框架中的應用
- hadoop
- 簡介
- hadoop集群搭建
- hadoop單機安裝
- HDFS簡介
- hdfs基本操作
- hdfs環境搭建
- 常見問題匯總
- hdfs客戶端操作
- mapreduce工作機制
- 案列-單詞統計
- 局部聚合Combiner
- 案列-流量統計(分區,排序,比較)
- 案列-倒排索引
- 案例-共同好友
- 案列-join算法實現
- 案例-求topN(分組)
- 自定義inputFormat
- 自定義outputFormat
- 框架運算全流程
- mapreduce的優化方案
- HA機制
- Hive
- 安裝
- DDL操作
- 創建表
- 修改表
- DML操作
- Load
- insert
- select
- join操作
- 嚴格模式
- 數據類型
- shell參數
- 函數
- 內置運算符
- 內置函數
- 自定義函數
- Transform實現
- 特殊分割符處理
- 案例
- 級聯求和accumulate
- flume
- 簡介
- 安裝
- 常用的組件
- 攔截器
- 案例
- 采集目錄到HDFS
- 采集文件到HDFS
- 多個agent串聯
- 日志采集和匯總
- 自定義攔截器
- 高可用配置
- 使用注意
- sqoop
- 安裝
- 數據導入
- 導入數據到HDFS
- 導入關系表到HIVE
- 導入表數據子集
- 增量導入
- 數據導出
- 作業
- 原理
- azkaban
- 簡介
- 安裝
- 案例
- 簡介
- command類型單一job
- command類型多job工作流flow
- HDFS操作任務
- mapreduce任務
- hive腳本任務
- hbase
- 簡介
- 安裝
- 命令行
- 基本CURD
- 過濾器查詢
- 系統架構
- 物理存儲
- 尋址機制
- 讀寫過程
- Region管理
- master工作機制
- 建表高級屬性
- 與mapreduce結合
- 協處理器
- 點擊流平臺開發
- 簡介
- storm
- 簡介
- 安裝
- 集群啟動及任務過程分析
- 單詞統計
- 并行度
- ACK容錯機制
- ACK簡介