[TOC]
# 發送字符串代碼
## service
**EchoServer**
~~~
package com.netty;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class EchoServer {
private final int port;
public EchoServer(int port) {
this.port = port;
}
public void start() throws InterruptedException {
EventLoopGroup eventLoopGroup = null;
try{
//server端引導類
ServerBootstrap serverBootstrap = new ServerBootstrap();
//連接池處理數據
eventLoopGroup = new NioEventLoopGroup();
//指定通道類型為NioServerSocketChannel,一種異步模式,OIO阻塞模式為OioServerSocketChannel
// 設置InetSocketAddress讓服務器監聽某個端口已等待客戶端連接
// 設置childHandler執行所有的連接請求
serverBootstrap.group(eventLoopGroup).channel(NioServerSocketChannel.class)
.localAddress("127.0.0.1",port)
.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel channel) throws Exception {
//進
//注冊兩個InboundHandler,執行順序為注冊順序,所以應該是InboundHandler1
//InboundHandler2
//出
//注冊兩個OutboundHandler,執行順序為注冊順序的逆序,所以應該是OutboundHandler2
//OutboundHandler1
//在我業務處理中增加一系列的流水線,業務經過這些流水線就能得到結果了
channel.pipeline().addLast(new EchoInHandler1());
channel.pipeline().addLast(new EchoOutHandler1());
channel.pipeline().addLast(new EchoOutHandler2());
channel.pipeline().addLast(new EchoInHandler2());
}
});
//最后綁定服務器等待直到綁定完成,調用sync()方法會阻塞直到服務器完成綁定
ChannelFuture channelFuture = serverBootstrap.bind().sync();
System.out.println("開始監聽,端口為: "+channelFuture.channel().localAddress());
//等待channel關閉,因為使用sync(),所以關閉操作也會被阻塞,調用sync()方法會阻塞直到服務器關閉
channelFuture.channel().closeFuture().sync();
}catch (Exception e){
e.printStackTrace();
}finally {
// 阻塞等待線程組關閉
eventLoopGroup.shutdownGracefully().sync();
}
}
public static void main(String[] args) throws InterruptedException {
new EchoServer(20000).start();
}
}
~~~
**EchoInHandler1**
~~~
package com.netty;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
public class EchoInHandler1 extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
System.out.println("in1");
// 用fireChannelRead發送到下一個InboundHandler
ctx.fireChannelRead(msg);
//這個方法走完會走channelReadComplete
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush(); //刷新后才將數據發出到SocketChannel
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//如果發生異常走這個
cause.printStackTrace();
//把連接關閉
ctx.close();
}
}
~~~
**EchoInHandler2**
~~~
package com.netty;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.util.Date;
public class EchoInHandler2 extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("in2");
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
//解碼
String body = new String(req, "UTF-8");
System.out.println("接收客戶端數據:" + body);
//向客戶端寫數據
System.out.println("server向client發送數據");
String currentTime = new Date(System.currentTimeMillis()).toString();
//把數據變成ByteBuf
ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
//開始準備發,然后會走OutHandler
ctx.write(resp);
//方法執行完會走channelReadComplete
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//刷新后才將數據發出到SocketChannel
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//異常處理
cause.printStackTrace();
ctx.close();
}
}
~~~
**EchoOutHandler1**
~~~
package com.netty;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
public class EchoOutHandler1 extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("out1");
//msg就是InHandler發過來的,你可以對他再次加工
System.out.println(msg);
ctx.write(msg);
ctx.flush();
}
}
~~~
**EchoOutHandler2**
~~~
package com.netty;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
public class EchoOutHandler2 extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("out2");
ctx.write(msg);
// super.write(ctx, msg, promise);
}
}
~~~
## client
**EchoClient**
~~~
package com.nettyClient;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.net.InetSocketAddress;
public class EchoClient {
private final String host;
private final int port;
public EchoClient(String host, int port) {
this.host = host;
this.port = port;
}
public void start() throws Exception {
EventLoopGroup nioEventLoopGroup = null;
try {
// 客戶端引導類
Bootstrap bootstrap = new Bootstrap();
// EventLoopGroup可以理解為是一個線程池,這個線程池用來處理連接、接受數據、發送數據
nioEventLoopGroup = new NioEventLoopGroup();
bootstrap.group(nioEventLoopGroup)//多線程處理
.channel(NioSocketChannel.class)//指定通道類型為NioServerSocketChannel,一種異步模式,OIO阻塞模式為OioServerSocketChannel
.remoteAddress(new InetSocketAddress(host, port))//地址
.handler(new ChannelInitializer<SocketChannel>() {//業務處理類
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new EchoClientHandler());//注冊handler
}
});
// 連接服務器
ChannelFuture channelFuture = bootstrap.connect().sync();
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
nioEventLoopGroup.shutdownGracefully().sync();
}
}
public static void main(String[] args) throws Exception {
new EchoClient("127.0.0.1", 20000).start();
}
}
~~~
**EchoClientHandler**
~~~
package com.nettyClient;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
// 客戶端連接服務器后被調用
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("客戶端連接服務器,開始發送數據……");
byte[] req = "QUERY TIME ORDER".getBytes();//消息
ByteBuf firstMessage = Unpooled.buffer(req.length);//創建一個空的ByteBuff用于緩存即將發送的數據
firstMessage.writeBytes(req);//發送
ctx.writeAndFlush(firstMessage);//flush
}
// ? 從服務器接收到數據后調用
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg)
throws Exception {
System.out.println("client 讀取server數據..");
// 服務端返回消息后
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "UTF-8");
System.out.println("服務端返回的數據為 :" + body);
}
// ? 發生異常時被調用
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
System.out.println("client exceptionCaught..");
// 釋放資源
ctx.close();
}
}
~~~
- linux
- 常用命令
- 高級文本命令
- 面試題
- redis
- String
- list
- hash
- set
- sortedSet
- 案例-推薦
- java高級特性
- 多線程
- 實現線程的三種方式
- 同步關鍵詞
- 讀寫鎖
- 鎖的相關概念
- 多線程的join
- 有三個線程T1 T2 T3,保證順序執行
- java五種線程池
- 守護線程與普通線程
- ThreadLocal
- BlockingQueue消息隊列
- JMS
- 反射
- volatile
- jvm
- IO
- nio
- netty
- netty簡介
- 案例一發送字符串
- 案例二發送對象
- 輕量級RPC開發
- 簡介
- spring(IOC/AOP)
- spring初始化順序
- 通過ApplicationContextAware加載Spring上下文
- InitializingBean的作用
- 結論
- 自定義注解
- zk在框架中的應用
- hadoop
- 簡介
- hadoop集群搭建
- hadoop單機安裝
- HDFS簡介
- hdfs基本操作
- hdfs環境搭建
- 常見問題匯總
- hdfs客戶端操作
- mapreduce工作機制
- 案列-單詞統計
- 局部聚合Combiner
- 案列-流量統計(分區,排序,比較)
- 案列-倒排索引
- 案例-共同好友
- 案列-join算法實現
- 案例-求topN(分組)
- 自定義inputFormat
- 自定義outputFormat
- 框架運算全流程
- mapreduce的優化方案
- HA機制
- Hive
- 安裝
- DDL操作
- 創建表
- 修改表
- DML操作
- Load
- insert
- select
- join操作
- 嚴格模式
- 數據類型
- shell參數
- 函數
- 內置運算符
- 內置函數
- 自定義函數
- Transform實現
- 特殊分割符處理
- 案例
- 級聯求和accumulate
- flume
- 簡介
- 安裝
- 常用的組件
- 攔截器
- 案例
- 采集目錄到HDFS
- 采集文件到HDFS
- 多個agent串聯
- 日志采集和匯總
- 自定義攔截器
- 高可用配置
- 使用注意
- sqoop
- 安裝
- 數據導入
- 導入數據到HDFS
- 導入關系表到HIVE
- 導入表數據子集
- 增量導入
- 數據導出
- 作業
- 原理
- azkaban
- 簡介
- 安裝
- 案例
- 簡介
- command類型單一job
- command類型多job工作流flow
- HDFS操作任務
- mapreduce任務
- hive腳本任務
- hbase
- 簡介
- 安裝
- 命令行
- 基本CURD
- 過濾器查詢
- 系統架構
- 物理存儲
- 尋址機制
- 讀寫過程
- Region管理
- master工作機制
- 建表高級屬性
- 與mapreduce結合
- 協處理器
- 點擊流平臺開發
- 簡介
- storm
- 簡介
- 安裝
- 集群啟動及任務過程分析
- 單詞統計
- 并行度
- ACK容錯機制
- ACK簡介